Skip to Content
ConfigurationPipeline Configuration Reference

Pipeline Configuration Reference

The pipeline configuration defines how data flows from sources through optional transforms and joins into ClickHouse. Both JSON and YAML formats are supported. The current format is V3, a structural redesign that introduces per-source configuration, an explicit transforms array, and sink-level column mapping.

While the web interface generates this configuration automatically, understanding its structure is essential for advanced use cases, CI/CD pipelines, and the Python SDK.

Full pipeline example (join pipeline with two Kafka sources)

version: v3 pipeline_id: joined-orders-pipeline name: Orders and Users Join sources: - type: kafka source_id: orders connection_params: brokers: - "kafka:9092" protocol: SASL_SSL mechanism: SCRAM-SHA-256 username: "<user>" password: "<password>" topic: orders consumer_group_initial_offset: earliest schema_fields: - name: order_id type: string - name: customer_id type: string - name: amount type: float - name: timestamp type: datetime - type: kafka source_id: users connection_params: brokers: - "kafka:9092" protocol: SASL_SSL mechanism: SCRAM-SHA-256 username: "<user>" password: "<password>" topic: users consumer_group_initial_offset: earliest schema_fields: - name: user_id type: string - name: email type: string transforms: - type: dedup source_id: orders config: key: order_id time_window: 1h - type: dedup source_id: users config: key: user_id time_window: 1h - type: filter source_id: orders config: expression: "amount > 0" join: enabled: true type: temporal left_source: source_id: orders key: customer_id time_window: 30s right_source: source_id: users key: user_id time_window: 30s output_fields: - source_id: orders name: order_id - source_id: orders name: amount - source_id: orders name: timestamp - source_id: users name: user_id - source_id: users name: email sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: true table: joined_orders max_batch_size: 1000 max_delay_time: 1s mapping: - name: order_id column_name: order_id column_type: String - name: amount column_name: amount column_type: Float64 - name: timestamp column_name: created_at column_type: DateTime - name: user_id column_name: user_id column_type: String - name: email column_name: email column_type: String metadata: tags: - demo resources: nats: stream: max_age: 24h max_bytes: 25GB sources: - source_id: orders replicas: 2 requests: cpu: 1000m memory: 1Gi limits: cpu: 1500m memory: 1.5Gi - source_id: users replicas: 1 requests: cpu: 1000m memory: 1Gi limits: cpu: 1500m memory: 1.5Gi transform: - source_id: orders replicas: 1 storage: size: 10Gi requests: cpu: 500m memory: 512Mi limits: cpu: 1000m memory: 1Gi - source_id: users replicas: 1 storage: size: 10Gi requests: cpu: 500m memory: 512Mi limits: cpu: 1000m memory: 1Gi sink: replicas: 1 requests: cpu: 250m memory: 256Mi limits: cpu: 500m memory: 512Mi

Root Configuration

version: v3 pipeline_id: my-pipeline-id name: My Pipeline Name sources: [] transforms: [] join: {} sink: {} metadata: {} resources: {}
FieldTypeRequiredDescription
versionstringYesMust be "v3".
pipeline_idstringYesUnique identifier for the pipeline.
namestringNoDisplay name shown in the UI.
sourcesarrayYesList of source configurations.
transformsarrayNoList of transform steps applied to sources.
joinobjectNoJoin configuration for combining data from two sources.
sinkobjectYesClickHouse sink configuration.
metadataobjectNoPipeline metadata such as tags.
resourcesobjectNoKubernetes resource allocation for pipeline components.

Sources Configuration

The sources array defines one or more data sources. Each entry has its own source_id, which you reference in transforms, join, and sink mapping.

Kafka Source

type: kafka source_id: orders connection_params: brokers: - "kafka:9092" protocol: PLAINTEXT mechanism: NO_AUTH topic: orders consumer_group_initial_offset: earliest schema_fields: - name: order_id type: string - name: amount type: float - name: timestamp type: datetime

OTLP Source

type: otlp.traces source_id: traces

Source Fields

FieldTypeRequiredDescription
typestringYesSource type. One of "kafka", "otlp.logs", "otlp.traces", or "otlp.metrics". See Sources.
source_idstringYesUnique identifier for this source. Referenced by transforms, join, and sink mapping.
connection_paramsobjectYes (Kafka)Kafka connection parameters. See Connections.
topicstringYes (Kafka)Kafka topic name.
consumer_group_initial_offsetstringNoInitial offset for the consumer group: "earliest" or "latest". Default: "latest". Kafka only.
schema_fieldsarrayConditionalField definitions for this source. Required for Kafka sources. Not needed for OTLP sources (schema is predefined).

Kafka Connection Parameters

FieldTypeRequiredDescription
brokersarrayYesList of Kafka broker addresses (e.g., ["kafka:9092"]).
protocolstringYesSecurity protocol: "PLAINTEXT", "SASL_PLAINTEXT", "SSL", or "SASL_SSL".
mechanismstringConditionalAuthentication mechanism (e.g., "SCRAM-SHA-256"). Required when authentication is enabled.
usernamestringConditionalKafka username. Required when authentication is enabled.
passwordstringConditionalKafka password. Required when authentication is enabled.
root_castringNoPEM-encoded CA certificate for TLS.
skip_tls_verificationbooleanNoSkip TLS certificate verification. Default: false.
kerberos_service_namestringNoKerberos service name.
kerberos_keytabstringNoKerberos keytab file.
kerberos_realmstringNoKerberos realm.
kerberos_configstringNoKerberos configuration file.

For detailed connection examples and supported protocol/mechanism combinations, see Supported Kafka Connections.

Schema Fields

Each entry in the schema_fields array defines a field from the source data.

FieldTypeRequiredDescription
namestringYesSource field name. Dot notation is supported for nested fields (e.g., data.name).
typestringYesField data type. See Supported Data Formats.

Transforms Configuration

The transforms array defines processing steps applied to source data before it reaches the sink. Each transform targets a specific source via source_id. You can apply multiple transforms to the same source — they run in the order they appear.

FieldTypeRequiredDescription
typestringYesTransform type: "dedup", "filter", or "stateless".
source_idstringYesWhich source this transform applies to. Must match a source_id from the sources array.
configobjectYesType-specific configuration. See below.

Deduplication

Removes duplicate records based on a key field within a time window.

type: dedup source_id: orders config: key: order_id time_window: 1h

Dedup config:

FieldTypeRequiredDescription
keystringYesField name to deduplicate on.
time_windowstringYesDeduplication window. See Time windows.

Filter

Keeps records that match the expression and discards the rest.

type: filter source_id: orders config: expression: "status == 'active'"

Filter config:

FieldTypeRequiredDescription
expressionstringYesBoolean expression. Records where the expression evaluates to true are kept. See Filter.

Stateless Transformation

Computes new fields from existing data using expressions.

type: stateless source_id: orders config: transforms: - expression: "int(amount) % 2 == 0" output_name: is_amount_even output_type: bool

Stateless config:

FieldTypeRequiredDescription
transformsarrayYesList of transform definitions.

Each transform definition:

FieldTypeRequiredDescription
expressionstringYesExpression to evaluate against each record. See Stateless Transformation.
output_namestringYesName of the new output field.
output_typestringYesData type of the output field.

Join Configuration

The join configuration combines records from two sources based on matching keys within a time window.

enabled: true type: temporal left_source: source_id: orders key: customer_id time_window: 30s right_source: source_id: users key: user_id time_window: 30s output_fields: - source_id: orders name: order_id output_name: ORDER_ID - source_id: orders name: amount - source_id: users name: user_id - source_id: users name: email

Join Fields

FieldTypeRequiredDescription
enabledbooleanYesWhether the join is enabled.
typestringYesJoin type (e.g., "temporal").
left_sourceobjectYes (when enabled)Left side of the join.
right_sourceobjectYes (when enabled)Right side of the join.
output_fieldsarrayYes (when enabled)Fields to include in the joined output.

Join Source

Each join source (left_source and right_source) has the same structure.

FieldTypeRequiredDescription
source_idstringYesSource identifier. Must match a source_id from the sources array.
keystringYesField to join on.
time_windowstringYesJoin time window. See Time windows.

Join Output Fields

Each entry in output_fields selects a field from one of the joined sources.

FieldTypeRequiredDescription
source_idstringYesWhich source the field comes from.
namestringYesField name from the source.
output_namestringNoRename the field in the output. If omitted, the original name is used.

Sink Configuration

The sink configuration defines the ClickHouse destination, including connection details, batching behavior, and column mapping.

type: clickhouse connection_params: host: clickhouse.example.com port: "9000" http_port: "8123" database: default username: default password: mysecret secure: true skip_certificate_verification: false table: orders max_batch_size: 1000 max_delay_time: 1s mapping: - name: order_id column_name: order_id column_type: String - name: amount column_name: amount column_type: Float64 - name: timestamp column_name: created_at column_type: DateTime

Sink Fields

FieldTypeRequiredDescription
typestringYesMust be "clickhouse".
connection_paramsobjectYesClickHouse connection parameters.
tablestringYesTarget table name in ClickHouse.
max_batch_sizeintegerNoMaximum number of records per batch. Default: 1000.
max_delay_timestringNoMaximum delay before flushing a batch. Default: "60s".
mappingarrayYesColumn mappings from source fields to ClickHouse columns.

Sink Connection Parameters

FieldTypeRequiredDescription
hoststringYesClickHouse hostname.
portstringYesClickHouse native port.
http_portstringNoClickHouse HTTP port (used by the UI for connectivity checks).
databasestringYesDatabase name.
usernamestringYesUsername.
passwordstringYesPassword (plain text).
securebooleanNoUse TLS. Default: false.
skip_certificate_verificationbooleanNoSkip certificate verification. Default: false.

Sink Column Mapping

Each entry in the mapping array maps a source field to a ClickHouse column.

FieldTypeRequiredDescription
namestringYesSource field name. Dot notation is supported for nested fields.
column_namestringYesClickHouse column name.
column_typestringYesClickHouse column type (e.g., String, Float64, DateTime).

For the full list of supported type mappings, see Supported Data Formats.

Metadata Configuration

FieldTypeRequiredDescription
tagsarrayNoList of string tags for the pipeline.

Resources Configuration

The resources object controls Kubernetes resource allocation for each pipeline component. If omitted, defaults from the Helm chart values are used.

nats: stream: max_age: 24h max_bytes: 10Gi sources: - source_id: orders replicas: 2 requests: cpu: 250m memory: 256Mi limits: cpu: 500m memory: 512Mi transform: - source_id: orders replicas: 1 storage: size: 10Gi requests: cpu: 500m memory: 512Mi limits: cpu: 1000m memory: 1Gi sink: replicas: 1 requests: cpu: 250m memory: 256Mi limits: cpu: 500m memory: 512Mi

Top-Level Resources Fields

FieldTypeRequiredDescription
natsobjectNoNATS stream configuration.
sourcesarrayNoPer-source resource allocation.
transformarrayNoPer-source transform resource allocation.
sinkobjectNoSink resource allocation.

Source Resources

Each entry in the sources array configures resources for one source’s ingestion component.

FieldTypeRequiredDescription
source_idstringYesSource identifier. Must match a source_id from the sources array.
replicasintegerNoNumber of replicas. Default: 1.
requestsobjectNoCPU and memory requests: {"cpu": "250m", "memory": "256Mi"}.
limitsobjectNoCPU and memory limits: {"cpu": "500m", "memory": "512Mi"}.

Transform Resources

Each entry in the transform array configures resources for one source’s transform component.

FieldTypeRequiredDescription
source_idstringYesSource identifier. Must match a source_id from the sources array.
replicasintegerNoNumber of replicas. Default: 1.
storageobjectNoStorage configuration. Only applicable when deduplication is enabled: {"size": "10Gi"}.
requestsobjectNoCPU and memory requests: {"cpu": "500m", "memory": "512Mi"}.
limitsobjectNoCPU and memory limits: {"cpu": "1000m", "memory": "1Gi"}.

Sink Resources

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas. Default: 1.
requestsobjectNoCPU and memory requests.
limitsobjectNoCPU and memory limits.

NATS Stream Configuration

FieldTypeRequiredDescription
max_agestringNoMaximum message retention age. Default: "24h". Immutable after pipeline creation.
max_bytesstringNoMaximum stream size. Default: "0" (unlimited, no reserved memory). Immutable after pipeline creation.

Other Configuration Notes

Time Windows

Time windows use a string format combining a number and a unit suffix:

  • "30s" — 30 seconds
  • "1m" — 1 minute
  • "1h" — 1 hour
  • "12h" — 12 hours
  • "24h" — 24 hours

V2 Reference (Deprecated)

V2 is deprecated from version 3.0.0. Click to expand the V2 reference.

Pipeline version v2 is deprecated and will be removed in a future release. See the Python SDK migration guide for instructions on updating your configurations.

V2 Example

{ "version": "v2", "pipeline_id": "kafka-to-clickhouse-pipeline", "source": { "type": "kafka", "connection_params": { "brokers": [ "kafka-broker-0:9092", "kafka-broker-1:9092" ], "protocol": "SASL_SSL", "mechanism": "SCRAM-SHA-256", "username": "<user>", "password": "<password>", "root_ca": "<base64 encoded ca>" }, "topics": [ { "consumer_group_initial_offset": "earliest", "name": "user_logins", "deduplication": { "enabled": true, "id_field": "session_id", "id_field_type": "string", "time_window": "12h" } }, { "consumer_group_initial_offset": "earliest", "name": "orders", "deduplication": { "enabled": true, "id_field": "order_id", "id_field_type": "string", "time_window": "12h" } } ] }, "join": { "enabled": false, "type": "temporal", "sources": [ { "source_id": "user_logins", "join_key": "user_id", "time_window": "1h", "orientation": "left" }, { "source_id": "orders", "join_key": "user_id", "time_window": "1h", "orientation": "right" } ] }, "sink": { "type": "clickhouse", "host": "<host>", "port": "12753", "database": "default", "username": "<user>", "password": "<password>", "secure": true, "max_batch_size": 1000, "max_delay_time": "10m", "table": "user_orders" }, "schema": { "fields": [ { "source_id": "user_logins", "name": "session_id", "type": "string", "column_name": "session_id", "column_type": "UUID" }, { "source_id": "user_logins", "name": "user_id", "type": "string", "column_name": "user_id", "column_type": "UUID" }, { "source_id": "orders", "name": "user_id", "type": "string" }, { "source_id": "orders", "name": "order_id", "type": "string", "column_name": "order_id", "column_type": "UUID" }, { "source_id": "user_logins", "name": "timestamp", "type": "datetime", "column_name": "login_at", "column_type": "DateTime" }, { "source_id": "orders", "name": "timestamp", "type": "datetime", "column_name": "order_placed_at", "column_type": "DateTime" } ] }, "pipeline_resources": { "ingestor": { "left": { "replicas": 5, "requests": { "cpu": "1000m", "memory": "1Gi" }, "limits": { "cpu": "1500m", "memory": "1.5Gi" } }, "right": { "replicas": 2, "requests": { "cpu": "1000m", "memory": "1Gi" }, "limits": { "cpu": "1500m", "memory": "1.5Gi" } } } } }

V2 to V3 Migration

V2V3Notes
source (singular object)sources[] (array)Each source has its own source_id.
source.topics[] with namesources[].topic (singular)One topic per source entry.
Dedup nested in topictransforms[] with type: "dedup"Separate transform entry per source.
Top-level filtertransforms[] with type: "filter"Filter is now a transform step.
Top-level stateless_transformationtransforms[] with type: "stateless"Stateless transforms are now a transform step.
join.sources[] with orientationjoin.left_source / join.right_sourceExplicit left and right sides instead of an array with orientation.
join.sources[].join_keyjoin.left_source.key / join.right_source.keyRenamed from join_key to key.
schema.fields[] with source_idsink.mapping[]Column mapping moves to the sink.
schema.fields[].column_name / column_typesink.mapping[].column_name / column_typeSame fields, new location.
Source schema inferredsources[].schema_fields[]Explicit per-source schema definition.
pipeline_resources.ingestor.left / right / baseresources.sources[] per source_idResources are keyed by source_id instead of orientation.
pipeline_resources.transformresources.transform[] per source_idNow an array, keyed by source_id.
pipeline_resources.sinkresources.sinkSame shape.
pipeline_resources.natsresources.natsSame shape.
Sink password base64-encodedSink password plain textPassword encoding changed.

See the Python SDK migration guide for step-by-step upgrade instructions.

Last updated on