Pipeline JSON Reference
The pipeline configuration is defined in a JSON file that specifies the source, sink, and any transformations. While the web interface automatically generates this configuration, understanding its structure can be helpful for advanced users.
The pipeline configuration is a JSON object that defines how data flows from Kafka topics to ClickHouse tables.
Pipeline JSON Example
{
"version": "v2",
"pipeline_id": "kafka-to-clickhouse-pipeline",
"source": {
"type": "kafka",
"connection_params": {
"brokers": [
"kafka-broker-0:9092",
"kafka-broker-1:9092"
],
"protocol": "SASL_SSL",
"mechanism": "SCRAM-SHA-256",
"username": "<user>",
"password": "<password>",
"root_ca": "<base64 encoded ca>"
},
"topics": [
{
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
"deduplication": {
"enabled": true,
"id_field": "session_id",
"id_field_type": "string",
"time_window": "12h"
}
},
{
"consumer_group_initial_offset": "earliest",
"name": "orders",
"deduplication": {
"enabled": true,
"id_field": "order_id",
"id_field_type": "string",
"time_window": "12h"
}
}
]
},
"join": {
"enabled": false,
"type": "temporal",
"sources": [
{
"source_id": "user_logins",
"join_key": "user_id",
"time_window": "1h",
"orientation": "left"
},
{
"source_id": "orders",
"join_key": "user_id",
"time_window": "1h",
"orientation": "right"
}
]
},
"sink": {
"type": "clickhouse",
"provider": "aiven",
"host": "<host>",
"port": "12753",
"database": "default",
"username": "<user>",
"password": "<password>",
"secure": true,
"max_batch_size": 1,
"max_delay_time": "10m",
"table": "user_orders"
},
"schema": {
"fields": [
{
"source_id": "user_logins",
"name": "session_id",
"type": "string",
"column_name": "session_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"name": "user_id",
"type": "string",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "orders",
"name": "user_id",
"type": "string"
},
{
"source_id": "orders",
"name": "order_id",
"type": "string",
"column_name": "order_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"name": "timestamp",
"type": "datetime",
"column_name": "login_at",
"column_type": "DateTime"
},
{
"source_id": "orders",
"name": "timestamp",
"type": "datetime",
"column_name": "order_placed_at",
"column_type": "DateTime"
}
]
},
"pipeline_resources": {
"ingestor": {
"left": {
"replicas": 5,
"requests": {
"cpu": "1000m",
"memory": "1Gi"
},
"limits": {
"cpu": "1500m",
"memory": "1.5Gi"
}
},
"right": {
"replicas": 2,
"requests": {
"cpu": "1000m",
"memory": "1Gi"
},
"limits": {
"cpu": "1500m",
"memory": "1.5Gi"
}
}
}
}
}💡 Note: The web interface automatically generates this configuration based on user input, so manual editing is not required.
Root Configuration
| Field | Type | Required | Description |
|---|---|---|---|
version | string | No | Version of the pipeline configuration. Current supported version is “v2”. |
pipeline_id | string | Yes | Unique identifier for the pipeline. Must be non-empty. |
name | string | No | Name of the pipeline that will be displayed in the UI. |
source | object | Yes | Configuration for the Kafka source. |
sink | object | Yes | Configuration for the ClickHouse sink. |
schema | object | Yes | Configuration for the schema of the pipeline. |
join | object | No | Configuration for joining multiple Kafka topics. |
filter | object | No | Configuration for filtering the data. |
stateless_transformation | object | No | Configuration for stateless JSON transformations. |
pipeline_resources | object | No | Configuration for the kubernetes resources for the pipeline. |
metadata | object | No | Metadata for the pipeline. |
Source Configuration
The source configuration defines how to connect to and consume from Kafka topics.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | ”kafka” is the only supported source |
provider | string | No | Kafka provider, e.g. “aiven” |
topics | array | Yes | List of Kafka topics to consume from. |
connection_params | object | Yes | Kafka connection parameters. |
Kafka Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
brokers | array | Yes | List of Kafka broker addresses (e.g., ["localhost:9092"]). |
protocol | string | Yes | Security protocol for Kafka connection (e.g., SASL_SSL). |
mechanism | string | No | Authentication mechanism (e.g., SCRAM-SHA-256). Mandatory when skip_tls_verification is false. |
username | string | No | Username for Kafka authentication. Mandatory when skip_tls_verification is false. |
password | string | No | Password for Kafka authentication. Mandatory when skip_tls_verification is false. |
kerberos_service_name | string | No | Kerberos service name. |
kerberos_keytab | string | No | Kerberos keytab file. |
kerberos_realm | string | No | Kerberos realm. |
kerberos_config | string | No | Kerberos configuration file. |
root_ca | string | No | Cert. file for Kafka authentication. |
skip_tls_verification | boolean | No | Skip TLS verification. Default is false. |
💡 Note: For a full list of supported protocols and mechanisms, see the Supported Kafka Connections documentation.
Topic Configuration
Each topic in the topics array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Name of the Kafka topic. |
consumer_group_initial_offset | string | No | Initial offset for the consumer group (earliest or latest). Defaults to latest. |
replicas | integer | No | Number of replicas for the Kafka topic. Defaults to 1. Deprecated field, use pipeline_resources.ingestor.<base|left|right>.replicas instead. |
deduplication | object | Yes | Deduplication settings. |
Deduplication Configuration
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether deduplication is enabled. |
id_field | string | Yes | Field name used for message deduplication. |
id_field_type | string | Yes | Type of the ID field (e.g., “string”). |
time_window | string | Yes | Time window for deduplication (e.g., “1h” for one hour). See Time windows. |
Sink Configuration
The sink configuration defines how to connect to and write to ClickHouse.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be “clickhouse”. |
host | string | Yes | ClickHouse server hostname. |
port | string | Yes | ClickHouse server port. |
http_port | string | No | ClickHouse server http port (only used for UI connection) |
database | string | Yes | ClickHouse database name. |
username | string | Yes | ClickHouse username. |
password | string | Yes | ClickHouse password. It must be base64 encoded. |
table | string | Yes | Target table name. |
secure | boolean | No | Whether to use secure connection (TLS). Defaults to false. |
skip_certificate_verification | boolean | No | Whether to skip certificate verification. Defaults to false. |
max_batch_size | integer | No | Maximum number of records to batch before writing. Defaults to 1000. |
max_delay_time | string | No | Maximum delay time before the messages are flushed into the sink. Defaults to “10m”. |
Schema Configuration
The schema configuration defines the schema of the pipeline.
| Field | Type | Required | Description |
|---|---|---|---|
fields | array | Yes | List of field definitions. |
Field Configuration
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source ID. This is the ID of the source topic or transformation. |
name | string | Yes | Source field name. Nested structures are supported by using a dot notation (e.g. data.name). |
type | string | Yes | Source field type. |
column_name | string | No | ClickHouse column name. |
column_type | string | No | ClickHouse column type. |
💡 Note: For comprehensive list of supported data types, see the Supported Data Formats documentation.
Join Configuration
The join configuration defines how to join data from multiple Kafka topics.
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether joining is enabled. |
type | string | Yes | Join type (e.g., “temporal”). |
sources | array | Yes | List of sources to join. |
Join Source Configuration
Each source in the sources array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Name of the Kafka topic to join. |
join_key | string | Yes | Field name used for joining records. |
time_window | string | Yes | Time window for joining records (e.g., “1h” for one hour). See Time windows. |
orientation | string | Yes | Join orientation (“left” or “right”). |
Filter Configuration
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether filtering is enabled. |
expression | string | Yes | Filter expression. See Filters Transformation documentation. |
Stateless Transformation Configuration
The stateless_transformation section defines expression-based, per-event JSON transformations that are applied before data is mapped to ClickHouse.
These transformations are stateless and operate on each event independently.
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | No | Whether stateless transformations are enabled. Defaults to false when omitted. |
id | string | Yes | Optional identifier for the transformation configuration. |
type | string | No | Transformation type identifier. Reserved for future use. Default is expr_lang_transform. |
config | object | Yes (when enabled is true) | Stateless transformation configuration. |
Stateless Transformations Config
| Field | Type | Required | Description |
|---|---|---|---|
transform | array | Yes | List of individual stateless transformations. |
Transform Definition
Each entry in the transform array defines one derived field computed from the input JSON using an expression:
| Field | Type | Required | Description |
|---|---|---|---|
expression | string | Yes | Expression evaluated against the input JSON event. See Stateless Transformations documentation for more details on the supported transformations. |
output_name | string | Yes | Name of the field in the transformed payload that will be produced by this expression. |
output_type | string | Yes | Expected output type. |
Pipeline Resources Configuration
The pipeline resources configuration defines the kubernetes resources for the pipeline. If not specified, the default values from the chart values will be used.
| Field | Type | Required | Description |
|---|---|---|---|
ingestor | object | No | Ingestor resources configuration. |
join | object | No | Join resources configuration. |
sink | object | No | Sink resources configuration. |
transform | object | No | Transform resources configuration. |
nats | object | No | Nats resources configuration. |
Ingestor Resources Configuration
The ingestor resources configuration defines the resources for the ingestor.
| Field | Type | Required | Description |
|---|---|---|---|
base | object | No | Ingestor resources configuration when join is disabled. |
left | object | No | Left ingestor resources configuration. |
right | object | No | Right ingestor resources configuration. |
Base Ingestor Resources Configuration
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas for the base ingestor. Defaults to 1. |
requests | object | No | Requests resources configuration. |
limits | object | No | Limits resources configuration. |
Left Ingestor Resources Configuration
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas for the left ingestor. Defaults to 1. |
requests | object | No | Requests resources configuration. |
limits | object | No | Limits resources configuration. |
Right Ingestor Resources Configuration
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas for the right ingestor. Defaults to 1. |
requests | object | No | Requests resources configuration. |
limits | object | No | Limits resources configuration. |
Join Resources Configuration
The join resources configuration defines the resources for the join.
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas for the join. Defaults to 1. Currently, the number of replicas can only be equal to 1. |
requests | object | No | Requests resources configuration. |
limits | object | No | Limits resources configuration. |
Sink Resources Configuration
The sink resources configuration defines the resources for the sink.
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas for the sink. Defaults to 1. |
requests | object | No | Requests resources configuration. |
limits | object | No | Limits resources configuration. |
Transform Resources Configuration
The transform resources configuration defines the resources for the transform.
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas for the transform. Defaults to 1. This parameter is only mutable when deduplication is disabled. |
requests | object | No | Requests resources configuration. |
limits | object | No | Limits resources configuration. |
storage | object | No | Storage resources configuration, only used when deduplication is enabled. |
Resources Configuration
| Field | Type | Required | Description |
|---|---|---|---|
cpu | string | No | CPU request or limit. |
memory | string | No | Memory request or limit. |
Storage Resources Configuration
| Field | Type | Required | Description |
|---|---|---|---|
size | string | No | Size of the storage. This parameter is immutable and cannot be changed after the pipeline is created. |
Nats Resources Configuration
The nats resources configuration defines the resources for the nats.
| Field | Type | Required | Description |
|---|---|---|---|
stream | object | No | Stream resources configuration. |
Stream Resources Configuration
| Field | Type | Required | Description |
|---|---|---|---|
maxAge | string | No | Maximum age of the stream. Defaults to “24h”. This parameter is immutable and cannot be changed after the pipeline is created. |
maxBytes | string | No | Maximum bytes of the stream. Defaults to “0” which will not limit the stream size and will not reserve memory for the stream. This parameter is immutable and cannot be changed after the pipeline is created. |
💡 Note: You can find more information about the NATS stream configuration parameters in the NATS Stream Configuration documentation.
Metadata Configuration
| Field | Type | Required | Description |
|---|---|---|---|
tags | array | No | List of tags for the pipeline. |
Other configuration notes
Time windows
Time windows use string format, for example:
"30s"- 30 seconds"1m"- 1 minute"1h"- 1 hour"12h"- 12 hours"24h"- 24 hours