Batching
Overview
Batching can significantly improve a pipeline's throughput, as it reduces the number of round-trips between systems. In Conduit's context, it means that source connectors can read batches of records and that destination records can write batches of records. Here, processors act like map functions that are applied to collections (i.e., batches).
While batching can improve the performance of a pipeline, it can also increase the memory usage of the connector (as it needs to store multiple records in memory before flushing the batch). It can also increase the latency of the connector, as it needs to wait for the batch to be full.
Batching support in connectors
It should be noted that Conduit facilitates batching and that the actual implementation of batching is a connector's responsibility. For example, Conduit can collect a batch of records and send it to a destination connector. Whether the destination writes the records in a single request or not is a matter of its implementation.
Similarly, Conduit can request a batch of N records from a
source connector. The source connector might read N records in one request,
but it might also issue N read requests.
Please consult a connector's documentation to find out if and how it supports batching.
Pipeline engines and batching
Batching can be effective regardless of the pipeline engine used, because the optimization occurs in the connector itself. However, pipeline architecture v2 takes better advantage of batching, since it's able to move the whole batch, as a single unit, through the pipeline.
Configuration
Batching can be configured in a source, destination, or both. Using batching in both is discouraged and will be removed in a future release. This is due to Conduit migrating to pipeline architecture v2.
By default, batching is disabled, i.e., Conduit reads and writes records one by one.
sdk.batch.size
The maximum number of the records in a batch. A batch may contain fewer
records depending on how many records are available in a source system and on
the value of sdk.batch.delay.
Default value: 0 (disabled).
sdk.batch.delay
This is the maximum time to wait for a batch to be full.
Default value: 0.
Both parameters need to be set for batching to work correctly. If
sdk.batch.size is set, but sdk.batch.delay isn't, Conduit might wait for the
batch size to be reached either for a long time or indefinitely.
Best practices
- Consult a connector's documentation to verify whether it uses batching.
- Monitor the pipeline's performance, which should include the message throughput and resource usage (CPU, memory, etc.). You can use Conduit's built-in metrics to monitor throughput.
- Monitor the latency in the destination system (batching might increase it, which isn't desirable if the pipeline is required to be real-time).
- Verify which batch size can work with the source or destination connector. Certain source and destination systems limit the batch size.
- Know the volume of the data being generated in the source. Determine how that affects the batch size and delay.
- Experiment with different batch sizes and delays. There's likely going to be a point of diminishing returns.
Examples
Example 1: Batch delay
The following pipeline is configured to collect records for 5 seconds before flushing the batch to the destination resource. Note that the source connector is generating records at a rate of 10 records per second, meaning that a batch will contain approximately 50 records.
version: 2.2
pipelines:
  - id: pipeline1
    status: running
    name: pipeline1
    description: 'A pipeline batching 100 records at a time.'
    connectors:
      - id: source1
        type: source
        plugin: builtin:generator
        name: source1
        settings:
          rate: 10
          operations: "create"
          format.type: "structured"
          format.options.name: "string"
          format.options.company: "string"
      - id: destination1
        type: destination
        plugin: "builtin:file"
        name: destination1
        settings:
          sdk.batch.delay: "5s"
          path: /tmp/file-destination.txt
Example 2: Batch size and delay
The following pipeline is configured to collect batches of 100 records for up to 5 seconds before flushing them to the destination resource. This means that records will be flushed at most every 5 seconds, or sooner if the batch collects 100 records.
version: 2.2
pipelines:
  - id: pipeline1
    status: running
    name: pipeline1
    description: 'A pipeline batching 100 records at a time.'
    connectors:
      - id: source1
        type: source
        plugin: builtin:generator
        name: source1
        settings:
          rate: 10
          operations: "create"
          format.type: "structured"
          format.options.name: "string"
          format.options.company: "string"
      - id: destination1
        type: destination
        plugin: "builtin:file"
        name: destination1
        settings:
          sdk.batch.size: 100
          sdk.batch.delay: "5s"
          path: /tmp/file-destination.txt
