Pipeline Configuration Reference (Enterprise)
EnterpriseThe pipeline configuration defines how data flows from sources through optional transforms and joins into ClickHouse. Both JSON and YAML formats are supported. The current format is V3, a structural redesign that introduces per-source configuration, an explicit transforms array, and sink-level column mapping.
This page is the complete configuration reference for the Enterprise Edition. Fields available only in Enterprise are marked with the Enterprise badge; everything else is shared with the Open Source Edition. If you run Open Source, use the Pipeline Configuration Reference instead.
While the web interface generates this configuration automatically, understanding its structure is essential for advanced use cases, CI/CD pipelines, and the Python SDK.
Full pipeline example (join pipeline with two Kafka sources)
YAML
version: v3
pipeline_id: joined-orders-pipeline
name: Orders and Users Join
sources:
- type: kafka
source_id: orders
connection_params:
brokers:
- "kafka:9092"
protocol: SASL_SSL
mechanism: SCRAM-SHA-256
username: "<user>"
password: "<password>"
topic: orders
consumer_group_initial_offset: earliest
schema:
fields:
- name: order_id
type: string
- name: customer_id
type: string
- name: amount
type: float
- name: timestamp
type: datetime
- type: kafka
source_id: users
connection_params:
brokers:
- "kafka:9092"
protocol: SASL_SSL
mechanism: SCRAM-SHA-256
username: "<user>"
password: "<password>"
topic: users
consumer_group_initial_offset: earliest
schema:
fields:
- name: user_id
type: string
- name: email
type: string
transforms:
- type: dedup
source_id: orders
config:
key: order_id
time_window: 1h
- type: dedup
source_id: users
config:
key: user_id
time_window: 1h
- type: filter
source_id: orders
config:
expression: "amount > 0"
join:
enabled: true
type: temporal
left_source:
source_id: orders
key: customer_id
time_window: 30s
right_source:
source_id: users
key: user_id
time_window: 30s
output_fields:
- source_id: orders
name: order_id
- source_id: orders
name: amount
- source_id: orders
name: timestamp
- source_id: users
name: user_id
- source_id: users
name: email
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: true
table: joined_orders
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: order_id
column_name: order_id
column_type: String
- name: amount
column_name: amount
column_type: Float64
- name: timestamp
column_name: created_at
column_type: DateTime
- name: user_id
column_name: user_id
column_type: String
- name: email
column_name: email
column_type: String
metadata:
tags:
- demo
resources:
nats:
stream:
max_age: 24h
max_bytes: 25GB
sources:
- source_id: orders
replicas: 2
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 1500m
memory: 1.5Gi
- source_id: users
replicas: 1
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 1500m
memory: 1.5Gi
transform:
- source_id: orders
replicas: 1
storage:
size: 10Gi
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
- source_id: users
replicas: 1
storage:
size: 10Gi
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
sink:
replicas: 1
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512MiRoot Configuration
YAML
version: v3
pipeline_id: my-pipeline-id
name: My Pipeline Name
sources: []
transforms: []
join: {}
sink: {}
metadata: {}
resources: {}| Field | Type | Required | Description |
|---|---|---|---|
version | string | Yes | Must be "v3". |
pipeline_id | string | Yes | Unique identifier for the pipeline. |
name | string | No | Display name shown in the UI. |
sources | array | Yes | List of source configurations. |
transforms | array | No | List of transform steps applied to sources. |
join | object | No | Join configuration for combining data from two sources. |
sink | object | Yes | ClickHouse sink configuration. |
metadata | object | No | Pipeline metadata such as tags. |
resources | object | No | Kubernetes resource allocation for pipeline components. |
Sources Configuration
The sources array defines one or more data sources. Each entry has its own source_id, which you reference in transforms, join, and sink mapping.
Kafka Source
YAML
type: kafka
source_id: orders
connection_params:
brokers:
- "kafka:9092"
protocol: PLAINTEXT
mechanism: NO_AUTH
topic: orders
consumer_group_initial_offset: earliest
schema:
fields:
- name: order_id
type: string
- name: amount
type: float
- name: timestamp
type: datetimeOTLP Source
YAML
type: otlp.traces
source_id: tracesSource Fields
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Source type. One of "kafka", "otlp.logs", "otlp.traces", or "otlp.metrics". See Sources. |
source_id | string | Yes | Unique identifier for this source. Referenced by transforms, join, and sink mapping. |
connection_params | object | Yes (Kafka) | Kafka connection parameters. See Connections. |
topic | string | Yes (Kafka) | Kafka topic name. |
consumer_group_initial_offset | string | No | Initial offset for the consumer group: "earliest" or "latest". Default: "latest". Kafka only. |
format | string | No | Wire format of Kafka messages: "json" (default), "avro", or "protobuf". Avro and Protobuf are Enterprise only Enterprise. See Data Formats. |
schema | object | Yes (Kafka) | Source schema. For JSON, fields declares the fields to ingest. For Avro and Protobuf Enterprise, file (and message_type for Protobuf). Not needed for OTLP sources (schema is predefined). |
schema_registry | object | No | Confluent Schema Registry connection Enterprise. |
schema_version | string | Conditional | Identifier of the registry schema version used as the base version. Required when schema_registry is provided Enterprise. |
Kafka Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
brokers | array | Yes | List of Kafka broker addresses (e.g., ["kafka:9092"]). |
protocol | string | Yes | Security protocol: "PLAINTEXT", "SASL_PLAINTEXT", "SSL", or "SASL_SSL". |
mechanism | string | Conditional | Authentication mechanism (e.g., "SCRAM-SHA-256"). Required when authentication is enabled. |
username | string | Conditional | Kafka username. Required when authentication is enabled. |
password | string | Conditional | Kafka password. Required when authentication is enabled. |
root_ca | string | No | PEM-encoded CA certificate for TLS. |
skip_tls_verification | boolean | No | Skip TLS certificate verification. Default: false. |
kerberos_service_name | string | No | Kerberos service name. |
kerberos_keytab | string | No | Kerberos keytab file. |
kerberos_realm | string | No | Kerberos realm. |
kerberos_config | string | No | Kerberos configuration file. |
For detailed connection examples and supported protocol/mechanism combinations, see Supported Kafka Connections.
Source Schema
Every Kafka source declares its schema in the schema object. For JSON, list the fields to ingest under schema.fields. For Avro and Protobuf Enterprise, provide the schema text in schema.file (and the root message in schema.message_type for Protobuf) and GlassFlow derives the field list automatically. For the full format guide (field type mappings, nested records, schema evolution, and troubleshooting), see Data Formats.
JSON source:
{
"type": "kafka",
"source_id": "events",
"connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" },
"topic": "json_events",
"schema": {
"fields": [
{ "name": "event_id", "type": "string" },
{ "name": "amount", "type": "float64" }
]
},
"consumer_group_initial_offset": "earliest"
}Avro source: Enterprise
{
"type": "kafka",
"source_id": "events",
"connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" },
"topic": "avro_events",
"format": "avro",
"schema": {
"file": "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}"
},
"consumer_group_initial_offset": "earliest"
}Protobuf source: Enterprise
{
"type": "kafka",
"source_id": "events",
"connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" },
"topic": "proto_events",
"format": "protobuf",
"schema": {
"file": "syntax = \"proto3\";\nmessage Event { string id = 1; }",
"message_type": "Event"
},
"consumer_group_initial_offset": "earliest"
}schema object:
| Field | Type | Required | Description |
|---|---|---|---|
file | string | Conditional | Inline schema text: the .avsc document (Avro) or .proto source (Protobuf), serialized as a string. Required for avro and protobuf. |
message_type | string | Conditional | Protobuf root message to decode. Required for protobuf. |
fields | array | Conditional | Field declarations for the source, each { "name": ..., "type": ... } (see below). Required for JSON; not used for Avro or Protobuf. |
parsed_fields | array | No | Read-only. The field list GlassFlow parsed from file, returned on GET /pipeline/{id}. Ignored on create and edit. |
Each entry in fields defines a source field:
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Source field name. Dot notation is supported for nested fields (e.g., data.name). |
type | string | Yes | Field data type. See Data Formats. |
schema_registry object:
To fetch schemas from a Confluent-compatible Schema Registry, add a schema_registry object. You must still provide schema as the seed (base) version and set the top-level schema_version to the registry version that seed corresponds to. The same registry connection works for JSON, Avro, and Protobuf sources.
| Field | Type | Required | Description |
|---|---|---|---|
url | string | Yes | Base URL of the Confluent-compatible Schema Registry. |
api_key | string | No | API key for authentication. |
api_secret | string | No | API secret for authentication. |
A complete registry-backed Avro source, showing where schema_registry, the seed schema, and the top-level schema_version sit relative to each other:
{
"type": "kafka",
"source_id": "events",
"connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" },
"topic": "avro_events",
"format": "avro",
"schema_registry": {
"url": "https://<sr-host>",
"api_key": "<sr-api-key>",
"api_secret": "<sr-api-secret>"
},
"schema_version": "<registry-version-id>",
"schema": {
"file": "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}"
},
"consumer_group_initial_offset": "earliest"
}Transforms Configuration
The transforms array defines processing steps applied to source data before it reaches the sink. Each transform targets a specific source via source_id. You can apply multiple transforms to the same source — they run in the order they appear.
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Transform type: "dedup", "filter", or "stateless". |
source_id | string | Yes | Which source this transform applies to. Must match a source_id from the sources array. |
config | object | Yes | Type-specific configuration. See below. |
Deduplication
Removes duplicate records based on a key field within a time window.
YAML
type: dedup
source_id: orders
config:
key: order_id
time_window: 1hDedup config:
| Field | Type | Required | Description |
|---|---|---|---|
key | string | Yes | Field name to deduplicate on. |
time_window | string | Yes | Deduplication window. See Time windows. |
Filter
Keeps records that match the expression and discards the rest.
YAML
type: filter
source_id: orders
config:
expression: "status == 'active'"Filter config:
| Field | Type | Required | Description |
|---|---|---|---|
expression | string | Yes | Boolean expression. Records where the expression evaluates to true are kept. See Filter. |
Stateless Transformation
Computes new fields from existing data using expressions.
YAML
type: stateless
source_id: orders
config:
transforms:
- expression: "int(amount) % 2 == 0"
output_name: is_amount_even
output_type: boolStateless config:
| Field | Type | Required | Description |
|---|---|---|---|
transforms | array | Yes | List of transform definitions. |
Each transform definition:
| Field | Type | Required | Description |
|---|---|---|---|
expression | string | Yes | Expression to evaluate against each record. See Stateless Transformation. |
output_name | string | Yes | Name of the new output field. |
output_type | string | Yes | Data type of the output field. |
Join Configuration
The join configuration combines records from two sources based on matching keys within a time window.
YAML
enabled: true
type: temporal
left_source:
source_id: orders
key: customer_id
time_window: 30s
right_source:
source_id: users
key: user_id
time_window: 30s
output_fields:
- source_id: orders
name: order_id
output_name: ORDER_ID
- source_id: orders
name: amount
- source_id: users
name: user_id
- source_id: users
name: emailJoin Fields
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Whether the join is enabled. |
type | string | Yes | Join type (e.g., "temporal"). |
left_source | object | Yes (when enabled) | Left side of the join. |
right_source | object | Yes (when enabled) | Right side of the join. |
output_fields | array | Yes (when enabled) | Fields to include in the joined output. |
Join Source
Each join source (left_source and right_source) has the same structure.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source identifier. Must match a source_id from the sources array. |
key | string | Yes | Field to join on. |
time_window | string | Yes | Join time window. See Time windows. |
Join Output Fields
Each entry in output_fields selects a field from one of the joined sources.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Which source the field comes from. |
name | string | Yes | Field name from the source. |
output_name | string | No | Rename the field in the output. If omitted, the original name is used. |
Sink Configuration
The sink configuration defines the ClickHouse destination, including connection details, batching behavior, and column mapping.
YAML
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
http_port: "8123"
database: default
username: default
password: mysecret
secure: true
skip_certificate_verification: false
table: orders
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: order_id
column_name: order_id
column_type: String
- name: amount
column_name: amount
column_type: Float64
- name: timestamp
column_name: created_at
column_type: DateTimeSink Fields
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be "clickhouse". |
connection_params | object | Yes | ClickHouse connection parameters. |
table | string | Yes | Target table name in ClickHouse. |
max_batch_size | integer | No | Maximum number of records per batch. Default: 1000. |
max_delay_time | string | No | Maximum delay before flushing a batch. Default: "60s". |
mapping | array | Yes | Column mappings from source fields to ClickHouse columns. |
Sink Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
host | string | Yes | ClickHouse hostname. |
port | string | Yes | ClickHouse native port. |
http_port | string | No | ClickHouse HTTP port (used by the UI for connectivity checks). |
database | string | Yes | Database name. |
username | string | Yes | Username. |
password | string | Yes | Password (plain text). |
secure | boolean | No | Use TLS. Default: false. |
skip_certificate_verification | boolean | No | Skip certificate verification. Default: false. |
Sink Column Mapping
Each entry in the mapping array maps a source field to a ClickHouse column.
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Source field name. Dot notation is supported for nested fields. |
column_name | string | Yes | ClickHouse column name. |
column_type | string | Yes | ClickHouse column type (e.g., String, Float64, DateTime). |
For the full list of supported type mappings, see Data Formats.
Metadata Configuration
| Field | Type | Required | Description |
|---|---|---|---|
tags | array | No | List of string tags for the pipeline. |
Resources Configuration
The resources object controls Kubernetes resource allocation for each pipeline component. If omitted, defaults from the Helm chart values are used.
YAML
nats:
stream:
max_age: 24h
max_bytes: 10Gi
sources:
- source_id: orders
replicas: 2
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
transform:
- source_id: orders
replicas: 1
storage:
size: 10Gi
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
sink:
replicas: 1
requests:
cpu: 250m
memory: 256Mi
limits:
cpu: 500m
memory: 512MiTop-Level Resources Fields
| Field | Type | Required | Description |
|---|---|---|---|
nats | object | No | NATS stream configuration. |
sources | array | No | Per-source resource allocation. |
transform | array | No | Per-source transform resource allocation. |
sink | object | No | Sink resource allocation. |
Source Resources
Each entry in the sources array configures resources for one source’s ingestion component.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source identifier. Must match a source_id from the sources array. |
replicas | integer | No | Number of replicas. Default: 1. |
requests | object | No | CPU and memory requests: {"cpu": "250m", "memory": "256Mi"}. |
limits | object | No | CPU and memory limits: {"cpu": "500m", "memory": "512Mi"}. |
Transform Resources
Each entry in the transform array configures resources for one source’s transform component.
| Field | Type | Required | Description |
|---|---|---|---|
source_id | string | Yes | Source identifier. Must match a source_id from the sources array. |
replicas | integer | No | Number of replicas. Default: 1. |
storage | object | No | Storage configuration. Only applicable when deduplication is enabled: {"size": "10Gi"}. |
requests | object | No | CPU and memory requests: {"cpu": "500m", "memory": "512Mi"}. |
limits | object | No | CPU and memory limits: {"cpu": "1000m", "memory": "1Gi"}. |
Sink Resources
| Field | Type | Required | Description |
|---|---|---|---|
replicas | integer | No | Number of replicas. Default: 1. |
requests | object | No | CPU and memory requests. |
limits | object | No | CPU and memory limits. |
NATS Stream Configuration
| Field | Type | Required | Description |
|---|---|---|---|
max_age | string | No | Maximum message retention age. Default: "24h". Immutable after pipeline creation. |
max_bytes | string | No | Maximum stream size. Default: "0" (unlimited, no reserved memory). Immutable after pipeline creation. |
Other Configuration Notes
Time Windows
Time windows use a string format combining a number and a unit suffix:
"30s"— 30 seconds"1m"— 1 minute"1h"— 1 hour"12h"— 12 hours"24h"— 24 hours