Skip to Content
ConfigurationPipeline JSON Reference

Pipeline JSON Reference

The pipeline configuration is defined in a JSON file that specifies the source, sink, and any transformations. While the web interface automatically generates this configuration, understanding its structure can be helpful for advanced users.

The pipeline configuration is a JSON object that defines how data flows from Kafka topics to ClickHouse tables.

Pipeline JSON Example

{ "version": "v2", "pipeline_id": "kafka-to-clickhouse-pipeline", "source": { "type": "kafka", "connection_params": { "brokers": [ "kafka-broker-0:9092", "kafka-broker-1:9092" ], "protocol": "SASL_SSL", "mechanism": "SCRAM-SHA-256", "username": "<user>", "password": "<password>", "root_ca": "<base64 encoded ca>" }, "topics": [ { "consumer_group_initial_offset": "earliest", "name": "user_logins", "deduplication": { "enabled": true, "id_field": "session_id", "id_field_type": "string", "time_window": "12h" } }, { "consumer_group_initial_offset": "earliest", "name": "orders", "deduplication": { "enabled": true, "id_field": "order_id", "id_field_type": "string", "time_window": "12h" } } ] }, "join": { "enabled": false, "type": "temporal", "sources": [ { "source_id": "user_logins", "join_key": "user_id", "time_window": "1h", "orientation": "left" }, { "source_id": "orders", "join_key": "user_id", "time_window": "1h", "orientation": "right" } ] }, "sink": { "type": "clickhouse", "provider": "aiven", "host": "<host>", "port": "12753", "database": "default", "username": "<user>", "password": "<password>", "secure": true, "max_batch_size": 1, "max_delay_time": "10m", "table": "user_orders" }, "schema": { "fields": [ { "source_id": "user_logins", "name": "session_id", "type": "string", "column_name": "session_id", "column_type": "UUID" }, { "source_id": "user_logins", "name": "user_id", "type": "string", "column_name": "user_id", "column_type": "UUID" }, { "source_id": "orders", "name": "user_id", "type": "string" }, { "source_id": "orders", "name": "order_id", "type": "string", "column_name": "order_id", "column_type": "UUID" }, { "source_id": "user_logins", "name": "timestamp", "type": "datetime", "column_name": "login_at", "column_type": "DateTime" }, { "source_id": "orders", "name": "timestamp", "type": "datetime", "column_name": "order_placed_at", "column_type": "DateTime" } ] }, "pipeline_resources": { "ingestor": { "left": { "replicas": 5, "requests": { "cpu": "1000m", "memory": "1Gi" }, "limits": { "cpu": "1500m", "memory": "1.5Gi" } }, "right": { "replicas": 2, "requests": { "cpu": "1000m", "memory": "1Gi" }, "limits": { "cpu": "1500m", "memory": "1.5Gi" } } } } }

💡 Note: The web interface automatically generates this configuration based on user input, so manual editing is not required.

Root Configuration

FieldTypeRequiredDescription
versionstringNoVersion of the pipeline configuration. Current supported version is “v2”.
pipeline_idstringYesUnique identifier for the pipeline. Must be non-empty.
namestringNoName of the pipeline that will be displayed in the UI.
sourceobjectYesConfiguration for the Kafka source.
sinkobjectYesConfiguration for the ClickHouse sink.
schemaobjectYesConfiguration for the schema of the pipeline.
joinobjectNoConfiguration for joining multiple Kafka topics.
filterobjectNoConfiguration for filtering the data.
stateless_transformationobjectNoConfiguration for stateless JSON transformations.
pipeline_resourcesobjectNoConfiguration for the kubernetes resources for the pipeline.
metadataobjectNoMetadata for the pipeline.

Source Configuration

The source configuration defines how to connect to and consume from Kafka topics.

FieldTypeRequiredDescription
typestringYes”kafka” is the only supported source
providerstringNoKafka provider, e.g. “aiven”
topicsarrayYesList of Kafka topics to consume from.
connection_paramsobjectYesKafka connection parameters.

Kafka Connection Parameters

FieldTypeRequiredDescription
brokersarrayYesList of Kafka broker addresses (e.g., ["localhost:9092"]).
protocolstringYesSecurity protocol for Kafka connection (e.g., SASL_SSL).
mechanismstringNoAuthentication mechanism (e.g., SCRAM-SHA-256). Mandatory when skip_tls_verification is false.
usernamestringNoUsername for Kafka authentication. Mandatory when skip_tls_verification is false.
passwordstringNoPassword for Kafka authentication. Mandatory when skip_tls_verification is false.
kerberos_service_namestringNoKerberos service name.
kerberos_keytabstringNoKerberos keytab file.
kerberos_realmstringNoKerberos realm.
kerberos_configstringNoKerberos configuration file.
root_castringNoCert. file for Kafka authentication.
skip_tls_verificationbooleanNoSkip TLS verification. Default is false.

💡 Note: For a full list of supported protocols and mechanisms, see the Supported Kafka Connections documentation.

Topic Configuration

Each topic in the topics array has the following configuration:

FieldTypeRequiredDescription
namestringYesName of the Kafka topic.
consumer_group_initial_offsetstringNoInitial offset for the consumer group (earliest or latest). Defaults to latest.
replicasintegerNoNumber of replicas for the Kafka topic. Defaults to 1. Deprecated field, use pipeline_resources.ingestor.<base|left|right>.replicas instead.
deduplicationobjectYesDeduplication settings.

Deduplication Configuration

FieldTypeRequiredDescription
enabledbooleanYesWhether deduplication is enabled.
id_fieldstringYesField name used for message deduplication.
id_field_typestringYesType of the ID field (e.g., “string”).
time_windowstringYesTime window for deduplication (e.g., “1h” for one hour). See Time windows.

Sink Configuration

The sink configuration defines how to connect to and write to ClickHouse.

FieldTypeRequiredDescription
typestringYesMust be “clickhouse”.
hoststringYesClickHouse server hostname.
portstringYesClickHouse server port.
http_portstringNoClickHouse server http port (only used for UI connection)
databasestringYesClickHouse database name.
usernamestringYesClickHouse username.
passwordstringYesClickHouse password. It must be base64 encoded.
tablestringYesTarget table name.
securebooleanNoWhether to use secure connection (TLS). Defaults to false.
skip_certificate_verificationbooleanNoWhether to skip certificate verification. Defaults to false.
max_batch_sizeintegerNoMaximum number of records to batch before writing. Defaults to 1000.
max_delay_timestringNoMaximum delay time before the messages are flushed into the sink. Defaults to “10m”.

Schema Configuration

The schema configuration defines the schema of the pipeline.

FieldTypeRequiredDescription
fieldsarrayYesList of field definitions.

Field Configuration

FieldTypeRequiredDescription
source_idstringYesSource ID. This is the ID of the source topic or transformation.
namestringYesSource field name. Nested structures are supported by using a dot notation (e.g. data.name).
typestringYesSource field type.
column_namestringNoClickHouse column name.
column_typestringNoClickHouse column type.

💡 Note: For comprehensive list of supported data types, see the Supported Data Formats documentation.

Join Configuration

The join configuration defines how to join data from multiple Kafka topics.

FieldTypeRequiredDescription
enabledbooleanYesWhether joining is enabled.
typestringYesJoin type (e.g., “temporal”).
sourcesarrayYesList of sources to join.

Join Source Configuration

Each source in the sources array has the following configuration:

FieldTypeRequiredDescription
source_idstringYesName of the Kafka topic to join.
join_keystringYesField name used for joining records.
time_windowstringYesTime window for joining records (e.g., “1h” for one hour). See Time windows.
orientationstringYesJoin orientation (“left” or “right”).

Filter Configuration

FieldTypeRequiredDescription
enabledbooleanYesWhether filtering is enabled.
expressionstringYesFilter expression. See Filters Transformation documentation.

Stateless Transformation Configuration

The stateless_transformation section defines expression-based, per-event JSON transformations that are applied before data is mapped to ClickHouse. These transformations are stateless and operate on each event independently.

FieldTypeRequiredDescription
enabledbooleanNoWhether stateless transformations are enabled. Defaults to false when omitted.
idstringYesOptional identifier for the transformation configuration.
typestringNoTransformation type identifier. Reserved for future use. Default is expr_lang_transform.
configobjectYes (when enabled is true)Stateless transformation configuration.

Stateless Transformations Config

FieldTypeRequiredDescription
transformarrayYesList of individual stateless transformations.

Transform Definition

Each entry in the transform array defines one derived field computed from the input JSON using an expression:

FieldTypeRequiredDescription
expressionstringYesExpression evaluated against the input JSON event. See Stateless Transformations documentation for more details on the supported transformations.
output_namestringYesName of the field in the transformed payload that will be produced by this expression.
output_typestringYesExpected output type.

Pipeline Resources Configuration

The pipeline resources configuration defines the kubernetes resources for the pipeline. If not specified, the default values from the chart values will be used.

FieldTypeRequiredDescription
ingestorobjectNoIngestor resources configuration.
joinobjectNoJoin resources configuration.
sinkobjectNoSink resources configuration.
transformobjectNoTransform resources configuration.
natsobjectNoNats resources configuration.

Ingestor Resources Configuration

The ingestor resources configuration defines the resources for the ingestor.

FieldTypeRequiredDescription
baseobjectNoIngestor resources configuration when join is disabled.
leftobjectNoLeft ingestor resources configuration.
rightobjectNoRight ingestor resources configuration.

Base Ingestor Resources Configuration

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas for the base ingestor. Defaults to 1.
requestsobjectNoRequests resources configuration.
limitsobjectNoLimits resources configuration.

Left Ingestor Resources Configuration

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas for the left ingestor. Defaults to 1.
requestsobjectNoRequests resources configuration.
limitsobjectNoLimits resources configuration.

Right Ingestor Resources Configuration

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas for the right ingestor. Defaults to 1.
requestsobjectNoRequests resources configuration.
limitsobjectNoLimits resources configuration.

Join Resources Configuration

The join resources configuration defines the resources for the join.

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas for the join. Defaults to 1. Currently, the number of replicas can only be equal to 1.
requestsobjectNoRequests resources configuration.
limitsobjectNoLimits resources configuration.

Sink Resources Configuration

The sink resources configuration defines the resources for the sink.

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas for the sink. Defaults to 1.
requestsobjectNoRequests resources configuration.
limitsobjectNoLimits resources configuration.

Transform Resources Configuration

The transform resources configuration defines the resources for the transform.

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas for the transform. Defaults to 1. This parameter is only mutable when deduplication is disabled.
requestsobjectNoRequests resources configuration.
limitsobjectNoLimits resources configuration.
storageobjectNoStorage resources configuration, only used when deduplication is enabled.

Resources Configuration

FieldTypeRequiredDescription
cpustringNoCPU request or limit.
memorystringNoMemory request or limit.

Storage Resources Configuration

FieldTypeRequiredDescription
sizestringNoSize of the storage. This parameter is immutable and cannot be changed after the pipeline is created.

Nats Resources Configuration

The nats resources configuration defines the resources for the nats.

FieldTypeRequiredDescription
streamobjectNoStream resources configuration.

Stream Resources Configuration

FieldTypeRequiredDescription
maxAgestringNoMaximum age of the stream. Defaults to “24h”. This parameter is immutable and cannot be changed after the pipeline is created.
maxBytesstringNoMaximum bytes of the stream. Defaults to “0” which will not limit the stream size and will not reserve memory for the stream. This parameter is immutable and cannot be changed after the pipeline is created.

💡 Note: You can find more information about the NATS stream configuration parameters in the NATS Stream Configuration  documentation.

Metadata Configuration

FieldTypeRequiredDescription
tagsarrayNoList of tags for the pipeline.

Other configuration notes

Time windows

Time windows use string format, for example:

  • "30s" - 30 seconds
  • "1m" - 1 minute
  • "1h" - 1 hour
  • "12h" - 12 hours
  • "24h" - 24 hours
Last updated on