Pipeline Configuration Reference
The pipeline configuration defines how data flows from sources through optional transforms and joins into ClickHouse. Both JSON and YAML formats are supported. The current format is V3, a structural redesign that introduces per-source configuration, an explicit transforms array, and sink-level column mapping.
While the web interface generates this configuration automatically, understanding its structure is essential for advanced use cases, CI/CD pipelines, and the Python SDK.
Full pipeline example (join pipeline with two Kafka sources)
YAML
version: v3
pipeline_id: joined-orders-pipeline
name: Orders and Users Join
sources:
- type: kafka
source_id: orders
connection_params:
brokers:
- "kafka:9092"
protocol: SASL_SSL
mechanism: SCRAM-SHA-256
username: "<user>"
password: "<password>"
topic: orders
consumer_group_initial_offset: earliest
schema_fields:
- name: order_id
type: string
- name: customer_id
type: string
- name: amount
type: float
- name: timestamp
type: datetime
- type: kafka
source_id: users
connection_params:
brokers:
- "kafka:9092"
protocol: SASL_SSL
mechanism: SCRAM-SHA-256
username: "<user>"
password: "<password>"
topic: users
consumer_group_initial_offset: earliest
schema_fields:
- name: user_id
type: string
- name: email
type: string
transforms:
- type: dedup
source_id: orders
config:
key: order_id
time_window: 1h
- type: dedup
source_id: users
config:
key: user_id
time_window: 1h
- type: filter
source_id: orders
config:
expression: "amount > 0"
join:
enabled: true
type: temporal
left_source:
source_id: orders
key: customer_id
time_window: 30s
right_source:
source_id: users
key: user_id
time_window: 30s
output_fields:
- source_id: orders
name: order_id
- source_id: orders
name: amount
- source_id: orders
name: timestamp
- source_id: users
name: user_id
- source_id: users
name: email
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: true
table: joined_orders
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: order_id
column_name: order_id
column_type: String
- name: amount
column_name: amount
column_type: Float64
- name: timestamp
column_name: created_at
column_type: DateTime
- name: user_id
column_name: user_id
column_type: String
- name: email
column_name: email
column_type: String
metadata:
tags:
- demo
resources:
nats:
stream:
max_age: 24h
max_bytes: 25GB
sources:
- source_id: orders
replicas: 2
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 1500m
memory: 1.5Gi
- source_id: users
replicas: 1
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 1500m
memory: 1.5Gi
transform:
- source_id: orders
replicas: 1
storage:
size: 10Gi
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
- source_id: users
replicas: 1
storage:
size: 10Gi
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
sink:
replicas: 1
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512MiRoot Configuration
YAML
version: v3
pipeline_id: my-pipeline-id
name: My Pipeline Name
sources: []
transforms: []
join: {}
sink: {}
metadata: {}
resources: {}| Field | Type | Required | Description |
|---|---|---|---|
version | string | Yes | Must be "v3". |
pipeline_id | string | Yes | Unique identifier for the pipeline. |
name | string | No | Display name shown in the UI. |
sources | array | Yes | List of source configurations. |
transforms | array | No | List of transform steps applied to sources. |
join | object | No | Join configuration for combining data from two sources. |
sink | object | Yes | ClickHouse sink configuration. |
metadata | object | No | Pipeline metadata such as tags. |
resources | object | No | Kubernetes resource allocation for pipeline components. |
Sources Configuration
The sources array defines one or more data sources. Each entry has its own source_id, which you reference in transforms, join, and sink mapping.
Kafka Source
YAML
type: kafka
source_id: orders
connection_params:
brokers:
- "kafka:9092"
protocol: PLAINTEXT
mechanism: NO_AUTH
topic: orders
consumer_group_initial_offset: earliest
schema_fields:
- name: order_id
type: string
- name: amount
type: float
- name: timestamp
type: datetimeOTLP Source
YAML
type: otlp.traces
source_id: tracesSource Fields
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Source type. One of "kafka", "otlp.logs", "otlp.traces", or "otlp.metrics". See Sources. |
source_id | string | Yes | Unique identifier for this source. Referenced by transforms, join, and sink mapping. |
connection_params | object | Yes (Kafka) | Kafka connection parameters. See Connections. |
topic | string | Yes (Kafka) | Kafka topic name. |
consumer_group_initial_offset | string | No | Initial offset for the consumer group: "earliest" or "latest". Default: "latest". Kafka only. |
schema_fields | array | Conditional | Field definitions for this source. Required for Kafka sources. Not needed for OTLP sources (schema is predefined). |
Kafka Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
brokers | array | Yes | List of Kafka broker addresses (e.g., ["kafka:9092"]). |
protocol | string | Yes | Security protocol: "PLAINTEXT", "SASL_PLAINTEXT", "SSL", or "SASL_SSL". |
mechanism | string | Conditional | Authentication mechanism (e.g., "SCRAM-SHA-256"). Required when authentication is enabled. |
username | string | Conditional | Kafka username. Required when authentication is enabled. |
password | string | Conditional | Kafka password. Required when authentication is enabled. |
root_ca | string | No | PEM-encoded CA certificate for TLS. |
skip_tls_verification | boolean | No | Skip TLS certificate verification. Default: false. |
kerberos_service_name | string | No | Kerberos service name. |
kerberos_keytab | string | No | Kerberos keytab file. |
kerberos_realm | string | No | Kerberos realm. |
kerberos_config | string | No | Kerberos configuration file. |
For detailed connection examples and supported protocol/mechanism combinations, see Supported Kafka Connections.
Schema Fields
Each entry in the schema_fields array defines a field from the source data.
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Source field name. Dot notation is supported for nested fields (e.g., data.name). |
type | string | Yes | Field data type. See Supported Data Formats. |
Transforms Configuration
The transforms array defines processing steps applied to source data before it reaches the sink. Each transform targets a specific source via source_id. You can apply multiple transforms to the same source — they run in the order they appear.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Transform type: "dedup", "filter", or "stateless". |
source_id | string | Yes | Which source this transform applies to. Must match a source_id from the sources array. |
config | object | Yes | Type-specific configuration. See below. |
Deduplication
Removes duplicate records based on a key field within a time window.
YAML
type: dedup
source_id: orders
config:
key: order_id
time_window: 1hDedup config:
| Field | Type | Required | Description |
|---|---|---|---|
key | string | Yes | Field name to deduplicate on. |
time_window | string | Yes | Deduplication window. See Time windows. |
Filter
Keeps records that match the expression and discards the rest.
YAML
type: filter
source_id: orders
config:
expression: "status == 'active'"Filter config:
| Field | Type | Required | Description |
|---|---|---|---|
expression | string | Yes | Boolean expression. Records where the expression evaluates to true are kept. See Filter. |
Stateless Transformation
Computes new fields from existing data using expressions.
YAML
type: stateless
source_id: orders
config:
transforms:
- expression: "int(amount) % 2 == 0"
output_name: is_amount_even
output_type: boolStateless config:
| Field | Type | Required | Description |
|---|---|---|---|
transforms | array | Yes | List of transform definitions. |
Each transform definition:
| Field | Type | Required | Description |
|---|---|---|---|
expression | string | Yes | Expression to evaluate against each record. See Stateless Transformation. |
output_name | string | Yes | Name of the new output field. |
output_type | string | Yes | Data type of the output field. |
Join Configuration
The join configuration combines records from two sources based on matching keys within a time window.
YAML
enabled: true
type: temporal
left_source:
source_id: orders
key: customer_id
time_window: 30s
right_source:
source_id: users
key: user_id
time_window: 30s
output_fields:
- source_id: orders
name: order_id
output_name: ORDER_ID
- source_id: orders
name: amount
- source_id: users
name: user_id
- source_id: users
name: emailJoin Fields
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether the join is enabled. |
type | string | Yes | Join type (e.g., "temporal"). |
left_source | object | Yes (when enabled) | Left side of the join. |
right_source | object | Yes (when enabled) | Right side of the join. |
output_fields | array | Yes (when enabled) | Fields to include in the joined output. |
Join Source
Each join source (left_source and right_source) has the same structure.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source identifier. Must match a source_id from the sources array. |
key | string | Yes | Field to join on. |
time_window | string | Yes | Join time window. See Time windows. |
Join Output Fields
Each entry in output_fields selects a field from one of the joined sources.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Which source the field comes from. |
name | string | Yes | Field name from the source. |
output_name | string | No | Rename the field in the output. If omitted, the original name is used. |
Sink Configuration
The sink configuration defines the ClickHouse destination, including connection details, batching behavior, and column mapping.
YAML
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
http_port: "8123"
database: default
username: default
password: mysecret
secure: true
skip_certificate_verification: false
table: orders
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: order_id
column_name: order_id
column_type: String
- name: amount
column_name: amount
column_type: Float64
- name: timestamp
column_name: created_at
column_type: DateTimeSink Fields
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be "clickhouse". |
connection_params | object | Yes | ClickHouse connection parameters. |
table | string | Yes | Target table name in ClickHouse. |
max_batch_size | integer | No | Maximum number of records per batch. Default: 1000. |
max_delay_time | string | No | Maximum delay before flushing a batch. Default: "60s". |
mapping | array | Yes | Column mappings from source fields to ClickHouse columns. |
Sink Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
host | string | Yes | ClickHouse hostname. |
port | string | Yes | ClickHouse native port. |
http_port | string | No | ClickHouse HTTP port (used by the UI for connectivity checks). |
database | string | Yes | Database name. |
username | string | Yes | Username. |
password | string | Yes | Password (plain text). |
secure | boolean | No | Use TLS. Default: false. |
skip_certificate_verification | boolean | No | Skip certificate verification. Default: false. |
Sink Column Mapping
Each entry in the mapping array maps a source field to a ClickHouse column.
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Source field name. Dot notation is supported for nested fields. |
column_name | string | Yes | ClickHouse column name. |
column_type | string | Yes | ClickHouse column type (e.g., String, Float64, DateTime). |
For the full list of supported type mappings, see Supported Data Formats.
Metadata Configuration
| Field | Type | Required | Description |
|---|---|---|---|
tags | array | No | List of string tags for the pipeline. |
Resources Configuration
The resources object controls Kubernetes resource allocation for each pipeline component. If omitted, defaults from the Helm chart values are used.
YAML
nats:
stream:
max_age: 24h
max_bytes: 10Gi
sources:
- source_id: orders
replicas: 2
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
transform:
- source_id: orders
replicas: 1
storage:
size: 10Gi
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
sink:
replicas: 1
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512MiTop-Level Resources Fields
| Field | Type | Required | Description |
|---|---|---|---|
nats | object | No | NATS stream configuration. |
sources | array | No | Per-source resource allocation. |
transform | array | No | Per-source transform resource allocation. |
sink | object | No | Sink resource allocation. |
Source Resources
Each entry in the sources array configures resources for one source’s ingestion component.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source identifier. Must match a source_id from the sources array. |
replicas | integer | No | Number of replicas. Default: 1. |
requests | object | No | CPU and memory requests: {"cpu": "250m", "memory": "256Mi"}. |
limits | object | No | CPU and memory limits: {"cpu": "500m", "memory": "512Mi"}. |
Transform Resources
Each entry in the transform array configures resources for one source’s transform component.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source identifier. Must match a source_id from the sources array. |
replicas | integer | No | Number of replicas. Default: 1. |
storage | object | No | Storage configuration. Only applicable when deduplication is enabled: {"size": "10Gi"}. |
requests | object | No | CPU and memory requests: {"cpu": "500m", "memory": "512Mi"}. |
limits | object | No | CPU and memory limits: {"cpu": "1000m", "memory": "1Gi"}. |
Sink Resources
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas. Default: 1. |
requests | object | No | CPU and memory requests. |
limits | object | No | CPU and memory limits. |
NATS Stream Configuration
| Field | Type | Required | Description |
|---|---|---|---|
max_age | string | No | Maximum message retention age. Default: "24h". Immutable after pipeline creation. |
max_bytes | string | No | Maximum stream size. Default: "0" (unlimited, no reserved memory). Immutable after pipeline creation. |
Other Configuration Notes
Time Windows
Time windows use a string format combining a number and a unit suffix:
"30s"— 30 seconds"1m"— 1 minute"1h"— 1 hour"12h"— 12 hours"24h"— 24 hours
V2 Reference (Deprecated)
V2 is deprecated from version 3.0.0. Click to expand the V2 reference.
Pipeline version v2 is deprecated and will be removed in a future release. See the Python SDK migration guide for instructions on updating your configurations.
V2 Example
{
"version": "v2",
"pipeline_id": "kafka-to-clickhouse-pipeline",
"source": {
"type": "kafka",
"connection_params": {
"brokers": [
"kafka-broker-0:9092",
"kafka-broker-1:9092"
],
"protocol": "SASL_SSL",
"mechanism": "SCRAM-SHA-256",
"username": "<user>",
"password": "<password>",
"root_ca": "<base64 encoded ca>"
},
"topics": [
{
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
"deduplication": {
"enabled": true,
"id_field": "session_id",
"id_field_type": "string",
"time_window": "12h"
}
},
{
"consumer_group_initial_offset": "earliest",
"name": "orders",
"deduplication": {
"enabled": true,
"id_field": "order_id",
"id_field_type": "string",
"time_window": "12h"
}
}
]
},
"join": {
"enabled": false,
"type": "temporal",
"sources": [
{
"source_id": "user_logins",
"join_key": "user_id",
"time_window": "1h",
"orientation": "left"
},
{
"source_id": "orders",
"join_key": "user_id",
"time_window": "1h",
"orientation": "right"
}
]
},
"sink": {
"type": "clickhouse",
"host": "<host>",
"port": "12753",
"database": "default",
"username": "<user>",
"password": "<password>",
"secure": true,
"max_batch_size": 1000,
"max_delay_time": "10m",
"table": "user_orders"
},
"schema": {
"fields": [
{
"source_id": "user_logins",
"name": "session_id",
"type": "string",
"column_name": "session_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"name": "user_id",
"type": "string",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "orders",
"name": "user_id",
"type": "string"
},
{
"source_id": "orders",
"name": "order_id",
"type": "string",
"column_name": "order_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"name": "timestamp",
"type": "datetime",
"column_name": "login_at",
"column_type": "DateTime"
},
{
"source_id": "orders",
"name": "timestamp",
"type": "datetime",
"column_name": "order_placed_at",
"column_type": "DateTime"
}
]
},
"pipeline_resources": {
"ingestor": {
"left": {
"replicas": 5,
"requests": {
"cpu": "1000m",
"memory": "1Gi"
},
"limits": {
"cpu": "1500m",
"memory": "1.5Gi"
}
},
"right": {
"replicas": 2,
"requests": {
"cpu": "1000m",
"memory": "1Gi"
},
"limits": {
"cpu": "1500m",
"memory": "1.5Gi"
}
}
}
}
}V2 to V3 Migration
| V2 | V3 | Notes |
|---|---|---|
source (singular object) | sources[] (array) | Each source has its own source_id. |
source.topics[] with name | sources[].topic (singular) | One topic per source entry. |
| Dedup nested in topic | transforms[] with type: "dedup" | Separate transform entry per source. |
Top-level filter | transforms[] with type: "filter" | Filter is now a transform step. |
Top-level stateless_transformation | transforms[] with type: "stateless" | Stateless transforms are now a transform step. |
join.sources[] with orientation | join.left_source / join.right_source | Explicit left and right sides instead of an array with orientation. |
join.sources[].join_key | join.left_source.key / join.right_source.key | Renamed from join_key to key. |
schema.fields[] with source_id | sink.mapping[] | Column mapping moves to the sink. |
schema.fields[].column_name / column_type | sink.mapping[].column_name / column_type | Same fields, new location. |
| Source schema inferred | sources[].schema_fields[] | Explicit per-source schema definition. |
pipeline_resources.ingestor.left / right / base | resources.sources[] per source_id | Resources are keyed by source_id instead of orientation. |
pipeline_resources.transform | resources.transform[] per source_id | Now an array, keyed by source_id. |
pipeline_resources.sink | resources.sink | Same shape. |
pipeline_resources.nats | resources.nats | Same shape. |
| Sink password base64-encoded | Sink password plain text | Password encoding changed. |
See the Python SDK migration guide for step-by-step upgrade instructions.