Skip to Content
PipelinePipeline Configuration

Pipeline Configuration

The pipeline configuration is defined in a JSON file that specifies the source, sink, and any transformations. While the web interface automatically generates this configuration, understanding its structure can be helpful for advanced users.

The pipeline configuration is a JSON object that defines how data flows from Kafka topics to ClickHouse tables.

Root Configuration

FieldTypeRequiredDescription
pipeline_idstringYesUnique identifier for the pipeline. Must be non-empty.
sourceobjectYesConfiguration for the Kafka source. See Source Configuration.
sinkobjectYesConfiguration for the ClickHouse sink. See Sink Configuration.
joinobjectNoConfiguration for joining multiple Kafka topics. See Join Configuration.

Source Configuration

The source configuration defines how to connect to and consume from Kafka topics.

FieldTypeRequiredDescription
typestringYes”kafka” is the only supported source
providerstringNoKafka provider, e.g. “aiven”
topicsarrayYesList of Kafka topics to consume from. See Topic Configuration.
connection_paramsobjectYesKafka connection parameters. See Connection Parameters.

Connection Parameters

FieldTypeRequiredDescription
brokersarrayYesList of Kafka broker addresses (e.g., [“localhost:9092”]).
protocolstringYesSecurity protocol for Kafka connection (e.g., “SASL_SSL”).
mechanismstringYesAuthentication mechanism (e.g., “SCRAM-SHA-256”).
usernamestringYesUsername for Kafka authentication.
passwordstringYesPassword for Kafka authentication.
root_castringNoCert. file for Kafka authentication.
skip_authbooleanNoSkip Kafka authentication.

Topic Configuration

Each topic in the topics array has the following configuration:

FieldTypeRequiredDescription
namestringYesName of the Kafka topic.
consumer_group_initial_offsetstringYesInitial offset for the consumer group (“earliest” or “newest”).
schemaobjectYesEvent schema definition. See Schema Configuration.
deduplicationobjectYesDeduplication settings. See Deduplication Configuration.

Schema Configuration

FieldTypeRequiredDescription
typestringYesSchema type (Currently only “json” is supported).
fieldsarrayYesList of field definitions. See Field Configuration.

Field Configuration

FieldTypeRequiredDescription
namestringYesField name.
typestringYesField type (e.g., “String”, “Integer”).

Deduplication Configuration

FieldTypeRequiredDescription
enabledbooleanYesWhether deduplication is enabled.
id_fieldstringYesField name used for message deduplication.
id_field_typestringYesType of the ID field (e.g., “string”).
time_windowstringYesTime window for deduplication (e.g., “1h” for one hour).

Sink Configuration

The sink configuration defines how to connect to and write to ClickHouse.

FieldTypeRequiredDescription
typestringYesMust be “clickhouse”.
hoststringYesClickHouse server hostname.
portintegerYesClickHouse server port.
databasestringYesClickHouse database name.
usernamestringYesClickHouse username.
passwordstringYesClickHouse password.
tablestringYesTarget table name.
securebooleanNoWhether to use secure connection. Defaults to false.
max_batch_sizeintegerNoMaximum number of records to batch before writing. Defaults to 1000.
max_delay_timestringNoMaximum delay time before the messages are flushed into the sink. Defaults to “10m”.
table_mappingarrayYesList of field to column mappings. See Table Mapping Configuration.

Table Mapping Configuration

Each mapping in the table_mapping array has the following configuration:

FieldTypeRequiredDescription
source_idstringYesName of the source topic.
field_namestringYesSource field name.
column_namestringYesTarget column name.
column_typestringYesTarget column type.

Join Configuration

The join configuration defines how to join data from multiple Kafka topics.

FieldTypeRequiredDescription
enabledbooleanYesWhether joining is enabled.
typestringYesJoin type (e.g., “temporal”).
sourcesarrayYesList of sources to join. See Join Source Configuration.

Join Source Configuration

Each source in the sources array has the following configuration:

FieldTypeRequiredDescription
source_idstringYesName of the Kafka topic to join.
join_keystringYesField name used for joining records.
time_windowstringYesTime window for joining records (e.g., “1h” for one hour).
orientationstringYesJoin orientation (“left” or “right”).

Example Configuration

{ "pipeline_id": "kafka-to-clickhouse-pipeline", "source": { "type": "kafka", "provider": "aiven", "connection_params": { "brokers": [ "kafka-broker-0:9092", "kafka-broker-1:9092" ], "protocol": "SASL_SSL", "mechanism": "SCRAM-SHA-256", "username": "<user>", "password": "<password>", "root_ca": "<base64 encoded ca>" }, "topics": [ { "consumer_group_initial_offset": "earliest", "name": "user_logins", "schema": { "type": "json", "fields": [ { "name": "session_id", "type": "string" }, { "name": "user_id", "type": "string" }, { "name": "timestamp", "type": "datetime" } ] }, "deduplication": { "enabled": true, "id_field": "session_id", "id_field_type": "string", "time_window": "12h" } }, { "consumer_group_initial_offset": "earliest", "name": "orders", "schema": { "type": "json", "fields": [ { "name": "user_id", "type": "string" }, { "name": "order_id", "type": "string" }, { "name": "timestamp", "type": "datetime" } ] }, "deduplication": { "enabled": true, "id_field": "order_id", "id_field_type": "string", "time_window": "12h" } } ] }, "join": { "enabled": false, "type": "temporal", "sources": [ { "source_id": "user_logins", "join_key": "user_id", "time_window": "1h", "orientation": "left" }, { "source_id": "orders", "join_key": "user_id", "time_window": "1h", "orientation": "right" } ] }, "sink": { "type": "clickhouse", "provider": "aiven", "host": "<host>", "port": "12753", "database": "default", "username": "<user>", "password": "<password>", "secure": true, "max_batch_size": 1, "max_delay_time": "10m", "table": "user_orders", "table_mapping": [ { "source_id": "user_logins", "field_name": "session_id", "column_name": "session_id", "column_type": "UUID" }, { "source_id": "user_logins", "field_name": "user_id", "column_name": "user_id", "column_type": "UUID" }, { "source_id": "orders", "field_name": "order_id", "column_name": "order_id", "column_type": "UUID" }, { "source_id": "user_logins", "field_name": "timestamp", "column_name": "login_at", "column_type": "DataTime" }, { "source_id": "orders", "field_name": "timestamp", "column_name": "order_placed_at", "column_type": "DateTime" } ] } }

💡 Note: The web interface automatically generates this configuration based on user input, so manual editing is not required.

Last updated on