Pipeline JSON Reference
The pipeline configuration is defined in a JSON file that specifies the source, sink, and any transformations. While the web interface automatically generates this configuration, understanding its structure can be helpful for advanced users.
The pipeline configuration is a JSON object that defines how data flows from Kafka topics to ClickHouse tables.
Root Configuration
| Field | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | Unique identifier for the pipeline. Must be non-empty. |
source | object | Yes | Configuration for the Kafka source. See Source Configuration. |
sink | object | Yes | Configuration for the ClickHouse sink. See Sink Configuration. |
join | object | No | Configuration for joining multiple Kafka topics. See Join Configuration. |
Source Configuration
The source configuration defines how to connect to and consume from Kafka topics.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | ”kafka” is the only supported source |
provider | string | No | Kafka provider, e.g. “aiven” |
topics | array | Yes | List of Kafka topics to consume from. See Topic Configuration. |
connection_params | object | Yes | Kafka connection parameters. See Kafka Connection Parameters. |
Kafka Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
brokers | array | Yes | List of Kafka broker addresses (e.g., ["localhost:9092"]). |
protocol | string | Yes | Security protocol for Kafka connection (e.g., SASL_SSL). |
mechanism | string | No | Authentication mechanism (e.g., SCRAM-SHA-256). Mandatory when skip_auth is false. |
username | string | Yes | Username for Kafka authentication. |
password | string | Yes | Password for Kafka authentication. |
root_ca | string | No | Cert. file for Kafka authentication. |
skip_auth | boolean | No | Skip Kafka authentication. Default is false. |
Supported Protocol:
PLAINTEXT- Plaintext protocol.SSL- SSL protocol.SASL_SSL- SASL over SSL protocol.SASL_PLAINTEXT- SASL over plaintext protocol.
Supported Mechanism:
SCRAM-SHA-256- SCRAM-SHA-256 authentication.SCRAM-SHA-512- SCRAM-SHA-512 authentication.PLAIN- Plaintext authentication.
Topic Configuration
Each topic in the topics array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Name of the Kafka topic. |
consumer_group_initial_offset | string | No | Initial offset for the consumer group (earliest or latest). Defaults to latest. |
replicas | integer | No | Number of replicas for the Kafka topic. Defaults to 1. |
schema | object | Yes | Event schema definition. See Schema Configuration. |
deduplication | object | Yes | Deduplication settings. See Deduplication Configuration. |
Schema Configuration
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Schema type (Currently only “json” is supported). |
fields | array | Yes | List of field definitions. See Field Configuration. |
Field Configuration
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Field name. Nested structures are supported by using a dot notation (e.g. data.name). |
type | string | Yes | Field type. |
The type parameter accepts the following Kafka data types:
'string'- For text data, including datetime strings'int','int8','int16','int32','int64'- For integer values'uint','uint8','uint16','uint32','uint64'- For unsigned integer values'float','float32','float64'- For floating-point numbers'bool'- For boolean values'bytes'- For binary data'array'- For array data'map'- For key-value pair data
Deduplication Configuration
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether deduplication is enabled. |
id_field | string | Yes | Field name used for message deduplication. |
id_field_type | string | Yes | Type of the ID field (e.g., “string”). |
time_window | string | Yes | Time window for deduplication (e.g., “1h” for one hour). See Time windows. |
Sink Configuration
The sink configuration defines how to connect to and write to ClickHouse.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be “clickhouse”. |
host | string | Yes | ClickHouse server hostname. |
port | string | Yes | ClickHouse server port. |
http_port | string | No | ClickHouse server http port (only used for UI connection) |
database | string | Yes | ClickHouse database name. |
username | string | Yes | ClickHouse username. |
password | string | Yes | ClickHouse password. It must be base64 encoded. |
table | string | Yes | Target table name. |
secure | boolean | No | Whether to use secure connection (TLS). Defaults to false. |
skip_certificate_verification | boolean | No | Whether to skip certificate verification. Defaults to false. |
max_batch_size | integer | No | Maximum number of records to batch before writing. Defaults to 1000. |
max_delay_time | string | No | Maximum delay time before the messages are flushed into the sink. Defaults to “10m”. |
table_mapping | array | Yes | List of field to column mappings. See Table Mapping Configuration. |
Table Mapping Configuration
Each mapping in the table_mapping array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Name of the source topic. |
field_name | string | Yes | Source field name. |
column_name | string | Yes | Target column name. |
column_type | string | Yes | Target column type. |
The column_type parameter accepts the following ClickHouse data types:
Basic Types:
'Int8','Int16','Int32','Int64'- For integer values'UInt8','UInt16','UInt32','UInt64'- For unsigned integer values'Float32','Float64'- For float values'String'- For string values'FixedString'- For fixedstring values'DateTime','DateTime64'- For datetime values'Bool'- For bool values'UUID'- For uuid values'Enum8','Enum16'- For enum values
LowCardinality Types:
'LowCardinality(Int8)','LowCardinality(Int16)','LowCardinality(Int32)','LowCardinality(Int64)'- For low cardinality int values'LowCardinality(UInt8)','LowCardinality(UInt16)','LowCardinality(UInt32)','LowCardinality(UInt64)'- For low cardinality uint values'LowCardinality(Float32)','LowCardinality(Float64)'- For low cardinality float values'LowCardinality(String)'- For low cardinality string values'LowCardinality(FixedString)'- For low cardinality fixedstring values'LowCardinality(DateTime)'- For low cardinality datetime values
Array Types:
'Array(String)'- For array of string values'Array(Int8)','Array(Int16)','Array(Int32)','Array(Int64)'- For array of int values'Array(UInt8)','Array(UInt16)','Array(UInt32)','Array(UInt64)'- For array of uint values'Array(Float32)','Array(Float64)'- For array of float values'Array(Bool)'- For array of bool values'Array(Map(String, String))'- For array of maps
Map Types:
'Map(String, String)'- For key-value pairs
💡 Note: For comprehensive type mapping information, see the Supported Data Formats documentation.
Join Configuration
The join configuration defines how to join data from multiple Kafka topics.
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether joining is enabled. |
type | string | Yes | Join type (e.g., “temporal”). |
sources | array | Yes | List of sources to join. See Join Source Configuration. |
Join Source Configuration
Each source in the sources array has the following configuration:
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Name of the Kafka topic to join. |
join_key | string | Yes | Field name used for joining records. |
time_window | string | Yes | Time window for joining records (e.g., “1h” for one hour). See Time windows. |
orientation | string | Yes | Join orientation (“left” or “right”). |
Other configuration notes
Time windows
Time windows use string format, for example:
"30s"- 30 seconds"1m"- 1 minute"1h"- 1 hour"12h"- 12 hours"24h"- 24 hours
Example Configuration
{
"pipeline_id": "kafka-to-clickhouse-pipeline",
"source": {
"type": "kafka",
"provider": "aiven",
"connection_params": {
"brokers": [
"kafka-broker-0:9092",
"kafka-broker-1:9092"
],
"protocol": "SASL_SSL",
"mechanism": "SCRAM-SHA-256",
"username": "<user>",
"password": "<password>",
"root_ca": "<base64 encoded ca>"
},
"topics": [
{
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
"schema": {
"type": "json",
"fields": [
{
"name": "session_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "timestamp",
"type": "datetime"
}
]
},
"deduplication": {
"enabled": true,
"id_field": "session_id",
"id_field_type": "string",
"time_window": "12h"
}
},
{
"consumer_group_initial_offset": "earliest",
"name": "orders",
"schema": {
"type": "json",
"fields": [
{
"name": "user_id",
"type": "string"
},
{
"name": "order_id",
"type": "string"
},
{
"name": "timestamp",
"type": "datetime"
}
]
},
"deduplication": {
"enabled": true,
"id_field": "order_id",
"id_field_type": "string",
"time_window": "12h"
}
}
]
},
"join": {
"enabled": false,
"type": "temporal",
"sources": [
{
"source_id": "user_logins",
"join_key": "user_id",
"time_window": "1h",
"orientation": "left"
},
{
"source_id": "orders",
"join_key": "user_id",
"time_window": "1h",
"orientation": "right"
}
]
},
"sink": {
"type": "clickhouse",
"provider": "aiven",
"host": "<host>",
"port": "12753",
"database": "default",
"username": "<user>",
"password": "<password>",
"secure": true,
"max_batch_size": 1,
"max_delay_time": "10m",
"table": "user_orders",
"table_mapping": [
{
"source_id": "user_logins",
"field_name": "session_id",
"column_name": "session_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"field_name": "user_id",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "orders",
"field_name": "order_id",
"column_name": "order_id",
"column_type": "UUID"
},
{
"source_id": "user_logins",
"field_name": "timestamp",
"column_name": "login_at",
"column_type": "DataTime"
},
{
"source_id": "orders",
"field_name": "timestamp",
"column_name": "order_placed_at",
"column_type": "DateTime"
}
]
}
}💡 Note: The web interface automatically generates this configuration based on user input, so manual editing is not required.