Pipeline JSON Reference
The pipeline configuration is defined in a JSON file that specifies the source, sink, and any transformations. While the web interface automatically generates this configuration, understanding its structure can be helpful for advanced users.
The pipeline configuration is a JSON object that defines how data flows from Kafka topics to ClickHouse tables.
Root Configuration
| Field | Type | Required | Description |
|---|---|---|---|
version | string | No | Version of the pipeline configuration. Current supported version is “v2”. |
pipeline_id | string | Yes | Unique identifier for the pipeline. Must be non-empty. |
name | string | No | Name of the pipeline that will be displayed in the UI. |
source | object | Yes | Configuration for the Kafka source. See Source Configuration. |
sink | object | Yes | Configuration for the ClickHouse sink. See Sink Configuration. |
schema | object | Yes | Configuration for the schema of the pipeline. See Schema Configuration. |
join | object | No | Configuration for joining multiple Kafka topics. See Join Configuration. |
filter | object | No | Configuration for filtering the data. See Filter Configuration. |
metadata | object | No | Metadata for the pipeline. See Metadata Configuration. |
Source Configuration
The source configuration defines how to connect to and consume from Kafka topics.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | ”kafka” is the only supported source |
provider | string | No | Kafka provider, e.g. “aiven” |
topics | array | Yes | List of Kafka topics to consume from. See Topic Configuration. |
connection_params | object | Yes | Kafka connection parameters. See Kafka Connection Parameters. |
Kafka Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
brokers | array | Yes | List of Kafka broker addresses (e.g., ["localhost:9092"]). |
protocol | string | Yes | Security protocol for Kafka connection (e.g., SASL_SSL). |
mechanism | string | No | Authentication mechanism (e.g., SCRAM-SHA-256). Mandatory when skip_auth is false. |
username | string | Yes | Username for Kafka authentication. |
password | string | Yes | Password for Kafka authentication. |
kerberos_service_name | string | No | Kerberos service name. |
kerberos_keytab | string | No | Kerberos keytab file. |
kerberos_realm | string | No | Kerberos realm. |
kerberos_config | string | No | Kerberos configuration file. |
root_ca | string | No | Cert. file for Kafka authentication. |
skip_tls_verification | boolean | No | Skip TLS verification. Default is false. |
Supported Protocol:
PLAINTEXT- Plaintext protocol.SSL- SSL protocol.SASL_SSL- SASL over SSL protocol.SASL_PLAINTEXT- SASL over plaintext protocol.
Supported Mechanism:
NO_AUTH- No authentication mechanism.PLAIN- Plaintext authentication.SCRAM-SHA-256- SCRAM-SHA-256 authentication.SCRAM-SHA-512- SCRAM-SHA-512 authentication.GSSAPI- GSSAPI authentication.
Topic Configuration
Each topic in the topics array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Name of the Kafka topic. |
consumer_group_initial_offset | string | No | Initial offset for the consumer group (earliest or latest). Defaults to latest. |
replicas | integer | No | Number of replicas for the Kafka topic. Defaults to 1. |
deduplication | object | Yes | Deduplication settings. See Deduplication Configuration. |
Deduplication Configuration
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether deduplication is enabled. |
id_field | string | Yes | Field name used for message deduplication. |
id_field_type | string | Yes | Type of the ID field (e.g., “string”). |
time_window | string | Yes | Time window for deduplication (e.g., “1h” for one hour). See Time windows. |
Sink Configuration
The sink configuration defines how to connect to and write to ClickHouse.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be “clickhouse”. |
host | string | Yes | ClickHouse server hostname. |
port | string | Yes | ClickHouse server port. |
http_port | string | No | ClickHouse server http port (only used for UI connection) |
database | string | Yes | ClickHouse database name. |
username | string | Yes | ClickHouse username. |
password | string | Yes | ClickHouse password. It must be base64 encoded. |
table | string | Yes | Target table name. |
secure | boolean | No | Whether to use secure connection (TLS). Defaults to false. |
skip_certificate_verification | boolean | No | Whether to skip certificate verification. Defaults to false. |
max_batch_size | integer | No | Maximum number of records to batch before writing. Defaults to 1000. |
max_delay_time | string | No | Maximum delay time before the messages are flushed into the sink. Defaults to “10m”. |
Schema Configuration
The schema configuration defines the schema of the pipeline.
| Field | Type | Required | Description |
|---|---|---|---|
fields | array | Yes | List of field definitions. See Field Configuration. |
Field Configuration
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source ID. This is the ID of the source topic. |
name | string | Yes | Source field name. Nested structures are supported by using a dot notation (e.g. data.name). |
type | string | Yes | Source field type. |
column_name | string | No | ClickHouse column name. |
column_type | string | No | ClickHouse column type. |
The type parameter accepts the following Kafka data types:
'string'- For text data, including datetime strings'int','int8','int16','int32','int64'- For integer values'uint','uint8','uint16','uint32','uint64'- For unsigned integer values'float','float32','float64'- For floating-point numbers'bool'- For boolean values'bytes'- For binary data'array'- For array data'map'- For key-value pair data
The column_type parameter accepts the following ClickHouse data types:
Basic Types:
'Int8','Int16','Int32','Int64'- For integer values'UInt8','UInt16','UInt32','UInt64'- For unsigned integer values'Float32','Float64'- For float values'String'- For string values'FixedString'- For fixedstring values'DateTime','DateTime64'- For datetime values'Bool'- For bool values'UUID'- For uuid values'Enum8','Enum16'- For enum values
LowCardinality Types:
'LowCardinality(Int8)','LowCardinality(Int16)','LowCardinality(Int32)','LowCardinality(Int64)'- For low cardinality int values'LowCardinality(UInt8)','LowCardinality(UInt16)','LowCardinality(UInt32)','LowCardinality(UInt64)'- For low cardinality uint values'LowCardinality(Float32)','LowCardinality(Float64)'- For low cardinality float values'LowCardinality(String)'- For low cardinality string values'LowCardinality(FixedString)'- For low cardinality fixedstring values'LowCardinality(DateTime)'- For low cardinality datetime values
Array Types:
'Array(String)'- For array of string values'Array(Int8)','Array(Int16)','Array(Int32)','Array(Int64)'- For array of int values'Array(UInt8)','Array(UInt16)','Array(UInt32)','Array(UInt64)'- For array of uint values'Array(Float32)','Array(Float64)'- For array of float values'Array(Bool)'- For array of bool values'Array(Map(String, String))'- For array of maps
Map Types:
'Map(String, String)'- For key-value pairs
💡 Note: For comprehensive type mapping information, see the Supported Data Formats documentation.
Join Configuration
The join configuration defines how to join data from multiple Kafka topics.
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether joining is enabled. |
type | string | Yes | Join type (e.g., “temporal”). |
sources | array | Yes | List of sources to join. See Join Source Configuration. |
Join Source Configuration
Each source in the sources array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Name of the Kafka topic to join. |
join_key | string | Yes | Field name used for joining records. |
time_window | string | Yes | Time window for joining records (e.g., “1h” for one hour). See Time windows. |
orientation | string | Yes | Join orientation (“left” or “right”). |
Filter Configuration
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether filtering is enabled. |
expression | string | Yes | Filter expression. |
Metadata Configuration
| Field | Type | Required | Description |
|---|---|---|---|
tags | array | No | List of tags for the pipeline. |
Other configuration notes
Time windows
Time windows use string format, for example:
"30s"- 30 seconds"1m"- 1 minute"1h"- 1 hour"12h"- 12 hours"24h"- 24 hours
Example Configuration
{
"version": "v2",
"pipeline_id": "kafka-to-clickhouse-pipeline",
"source": {
"type": "kafka",
"connection_params": {
"brokers": [
"kafka-broker-0:9092",
"kafka-broker-1:9092"
],
"protocol": "SASL_SSL",
"mechanism": "SCRAM-SHA-256",
"username": "<user>",
"password": "<password>",
"root_ca": "<base64 encoded ca>"
},
"topics": [
{
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
"deduplication": {
"enabled": true,
"id_field": "session_id",
"id_field_type": "string",
"time_window": "12h"
}
},
{
"consumer_group_initial_offset": "earliest",
"name": "orders",
"deduplication": {
"enabled": true,
"id_field": "order_id",
"id_field_type": "string",
"time_window": "12h"
}
}
]
},
"join": {
"enabled": false,
"type": "temporal",
"sources": [
{
"source_id": "user_logins",
"join_key": "user_id",
"time_window": "1h",
"orientation": "left"
},
{
"source_id": "orders",
"join_key": "user_id",
"time_window": "1h",
"orientation": "right"
}
]
},
"sink": {
"type": "clickhouse",
"provider": "aiven",
"host": "<host>",
"port": "12753",
"database": "default",
"username": "<user>",
"password": "<password>",
"secure": true,
"max_batch_size": 1,
"max_delay_time": "10m",
"table": "user_orders"
},
"schema": {
"fields": [
{
"source_id": "user_logins",
"name": "session_id",
"type": "string",
"column_name": "session_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"name": "user_id",
"type": "string",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "orders",
"name": "user_id",
"type": "string"
},
{
"source_id": "orders",
"name": "order_id",
"type": "string",
"column_name": "order_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"name": "timestamp",
"type": "datetime",
"column_name": "login_at",
"column_type": "DateTime"
},
{
"source_id": "orders",
"name": "timestamp",
"type": "datetime",
"column_name": "order_placed_at",
"column_type": "DateTime"
}
]
}
}💡 Note: The web interface automatically generates this configuration based on user input, so manual editing is not required.