Skip to Content
ConfigurationPipeline Configuration Reference (Enterprise)

Pipeline Configuration Reference (Enterprise)

Enterprise

The pipeline configuration defines how data flows from sources through optional transforms and joins into ClickHouse. Both JSON and YAML formats are supported. The current format is V3, a structural redesign that introduces per-source configuration, an explicit transforms array, and sink-level column mapping.

This page is the complete configuration reference for the Enterprise Edition. Fields available only in Enterprise are marked with the Enterprise badge; everything else is shared with the Open Source Edition. If you run Open Source, use the Pipeline Configuration Reference instead.

While the web interface generates this configuration automatically, understanding its structure is essential for advanced use cases, CI/CD pipelines, and the Python SDK.

Full pipeline example (join pipeline with two Kafka sources)

version: v3 pipeline_id: joined-orders-pipeline name: Orders and Users Join sources: - type: kafka source_id: orders connection_params: brokers: - "kafka:9092" protocol: SASL_SSL mechanism: SCRAM-SHA-256 username: "<user>" password: "<password>" topic: orders consumer_group_initial_offset: earliest schema: fields: - name: order_id type: string - name: customer_id type: string - name: amount type: float - name: timestamp type: datetime - type: kafka source_id: users connection_params: brokers: - "kafka:9092" protocol: SASL_SSL mechanism: SCRAM-SHA-256 username: "<user>" password: "<password>" topic: users consumer_group_initial_offset: earliest schema: fields: - name: user_id type: string - name: email type: string transforms: - type: dedup source_id: orders config: key: order_id time_window: 1h - type: dedup source_id: users config: key: user_id time_window: 1h - type: filter source_id: orders config: expression: "amount > 0" join: enabled: true type: temporal left_source: source_id: orders key: customer_id time_window: 30s right_source: source_id: users key: user_id time_window: 30s output_fields: - source_id: orders name: order_id - source_id: orders name: amount - source_id: orders name: timestamp - source_id: users name: user_id - source_id: users name: email sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: true table: joined_orders max_batch_size: 1000 max_delay_time: 1s mapping: - name: order_id column_name: order_id column_type: String - name: amount column_name: amount column_type: Float64 - name: timestamp column_name: created_at column_type: DateTime - name: user_id column_name: user_id column_type: String - name: email column_name: email column_type: String metadata: tags: - demo resources: nats: stream: max_age: 24h max_bytes: 25GB sources: - source_id: orders replicas: 2 requests: cpu: 1000m memory: 1Gi limits: cpu: 1500m memory: 1.5Gi - source_id: users replicas: 1 requests: cpu: 1000m memory: 1Gi limits: cpu: 1500m memory: 1.5Gi transform: - source_id: orders replicas: 1 storage: size: 10Gi requests: cpu: 500m memory: 512Mi limits: cpu: 1000m memory: 1Gi - source_id: users replicas: 1 storage: size: 10Gi requests: cpu: 500m memory: 512Mi limits: cpu: 1000m memory: 1Gi sink: replicas: 1 requests: cpu: 250m memory: 256Mi limits: cpu: 500m memory: 512Mi

Root Configuration

version: v3 pipeline_id: my-pipeline-id name: My Pipeline Name sources: [] transforms: [] join: {} sink: {} metadata: {} resources: {}
FieldTypeRequiredDescription
versionstringYesMust be "v3".
pipeline_idstringYesUnique identifier for the pipeline.
namestringNoDisplay name shown in the UI.
sourcesarrayYesList of source configurations.
transformsarrayNoList of transform steps applied to sources.
joinobjectNoJoin configuration for combining data from two sources.
sinkobjectYesClickHouse sink configuration.
metadataobjectNoPipeline metadata such as tags.
resourcesobjectNoKubernetes resource allocation for pipeline components.

Sources Configuration

The sources array defines one or more data sources. Each entry has its own source_id, which you reference in transforms, join, and sink mapping.

Kafka Source

type: kafka source_id: orders connection_params: brokers: - "kafka:9092" protocol: PLAINTEXT mechanism: NO_AUTH topic: orders consumer_group_initial_offset: earliest schema: fields: - name: order_id type: string - name: amount type: float - name: timestamp type: datetime

OTLP Source

type: otlp.traces source_id: traces

Source Fields

FieldTypeRequiredDescription
typestringYesSource type. One of "kafka", "otlp.logs", "otlp.traces", or "otlp.metrics". See Sources.
source_idstringYesUnique identifier for this source. Referenced by transforms, join, and sink mapping.
connection_paramsobjectYes (Kafka)Kafka connection parameters. See Connections.
topicstringYes (Kafka)Kafka topic name.
consumer_group_initial_offsetstringNoInitial offset for the consumer group: "earliest" or "latest". Default: "latest". Kafka only.
formatstringNoWire format of Kafka messages: "json" (default), "avro", or "protobuf". Avro and Protobuf are Enterprise only Enterprise. See Data Formats.
schemaobjectYes (Kafka)Source schema. For JSON, fields declares the fields to ingest. For Avro and Protobuf Enterprise, file (and message_type for Protobuf). Not needed for OTLP sources (schema is predefined).
schema_registryobjectNoConfluent Schema Registry connection Enterprise.
schema_versionstringConditionalIdentifier of the registry schema version used as the base version. Required when schema_registry is provided Enterprise.

Kafka Connection Parameters

FieldTypeRequiredDescription
brokersarrayYesList of Kafka broker addresses (e.g., ["kafka:9092"]).
protocolstringYesSecurity protocol: "PLAINTEXT", "SASL_PLAINTEXT", "SSL", or "SASL_SSL".
mechanismstringConditionalAuthentication mechanism (e.g., "SCRAM-SHA-256"). Required when authentication is enabled.
usernamestringConditionalKafka username. Required when authentication is enabled.
passwordstringConditionalKafka password. Required when authentication is enabled.
root_castringNoPEM-encoded CA certificate for TLS.
skip_tls_verificationbooleanNoSkip TLS certificate verification. Default: false.
kerberos_service_namestringNoKerberos service name.
kerberos_keytabstringNoKerberos keytab file.
kerberos_realmstringNoKerberos realm.
kerberos_configstringNoKerberos configuration file.

For detailed connection examples and supported protocol/mechanism combinations, see Supported Kafka Connections.

Source Schema

Every Kafka source declares its schema in the schema object. For JSON, list the fields to ingest under schema.fields. For Avro and Protobuf Enterprise, provide the schema text in schema.file (and the root message in schema.message_type for Protobuf) and GlassFlow derives the field list automatically. For the full format guide (field type mappings, nested records, schema evolution, and troubleshooting), see Data Formats.

JSON source:

{ "type": "kafka", "source_id": "events", "connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" }, "topic": "json_events", "schema": { "fields": [ { "name": "event_id", "type": "string" }, { "name": "amount", "type": "float64" } ] }, "consumer_group_initial_offset": "earliest" }

Avro source: Enterprise

{ "type": "kafka", "source_id": "events", "connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" }, "topic": "avro_events", "format": "avro", "schema": { "file": "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}" }, "consumer_group_initial_offset": "earliest" }

Protobuf source: Enterprise

{ "type": "kafka", "source_id": "events", "connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" }, "topic": "proto_events", "format": "protobuf", "schema": { "file": "syntax = \"proto3\";\nmessage Event { string id = 1; }", "message_type": "Event" }, "consumer_group_initial_offset": "earliest" }

schema object:

FieldTypeRequiredDescription
filestringConditionalInline schema text: the .avsc document (Avro) or .proto source (Protobuf), serialized as a string. Required for avro and protobuf.
message_typestringConditionalProtobuf root message to decode. Required for protobuf.
fieldsarrayConditionalField declarations for the source, each { "name": ..., "type": ... } (see below). Required for JSON; not used for Avro or Protobuf.
parsed_fieldsarrayNoRead-only. The field list GlassFlow parsed from file, returned on GET /pipeline/{id}. Ignored on create and edit.

Each entry in fields defines a source field:

FieldTypeRequiredDescription
namestringYesSource field name. Dot notation is supported for nested fields (e.g., data.name).
typestringYesField data type. See Data Formats.

schema_registry object:

To fetch schemas from a Confluent-compatible Schema Registry, add a schema_registry object. You must still provide schema as the seed (base) version and set the top-level schema_version to the registry version that seed corresponds to. The same registry connection works for JSON, Avro, and Protobuf sources.

FieldTypeRequiredDescription
urlstringYesBase URL of the Confluent-compatible Schema Registry.
api_keystringNoAPI key for authentication.
api_secretstringNoAPI secret for authentication.

A complete registry-backed Avro source, showing where schema_registry, the seed schema, and the top-level schema_version sit relative to each other:

{ "type": "kafka", "source_id": "events", "connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT" }, "topic": "avro_events", "format": "avro", "schema_registry": { "url": "https://<sr-host>", "api_key": "<sr-api-key>", "api_secret": "<sr-api-secret>" }, "schema_version": "<registry-version-id>", "schema": { "file": "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}" }, "consumer_group_initial_offset": "earliest" }

Transforms Configuration

The transforms array defines processing steps applied to source data before it reaches the sink. Each transform targets a specific source via source_id. You can apply multiple transforms to the same source — they run in the order they appear.

FieldTypeRequiredDescription
typestringYesTransform type: "dedup", "filter", or "stateless".
source_idstringYesWhich source this transform applies to. Must match a source_id from the sources array.
configobjectYesType-specific configuration. See below.

Deduplication

Removes duplicate records based on a key field within a time window.

type: dedup source_id: orders config: key: order_id time_window: 1h

Dedup config:

FieldTypeRequiredDescription
keystringYesField name to deduplicate on.
time_windowstringYesDeduplication window. See Time windows.

Filter

Keeps records that match the expression and discards the rest.

type: filter source_id: orders config: expression: "status == 'active'"

Filter config:

FieldTypeRequiredDescription
expressionstringYesBoolean expression. Records where the expression evaluates to true are kept. See Filter.

Stateless Transformation

Computes new fields from existing data using expressions.

type: stateless source_id: orders config: transforms: - expression: "int(amount) % 2 == 0" output_name: is_amount_even output_type: bool

Stateless config:

FieldTypeRequiredDescription
transformsarrayYesList of transform definitions.

Each transform definition:

FieldTypeRequiredDescription
expressionstringYesExpression to evaluate against each record. See Stateless Transformation.
output_namestringYesName of the new output field.
output_typestringYesData type of the output field.

Join Configuration

The join configuration combines records from two sources based on matching keys within a time window.

enabled: true type: temporal left_source: source_id: orders key: customer_id time_window: 30s right_source: source_id: users key: user_id time_window: 30s output_fields: - source_id: orders name: order_id output_name: ORDER_ID - source_id: orders name: amount - source_id: users name: user_id - source_id: users name: email

Join Fields

FieldTypeRequiredDescription
enabledbooleanYesWhether the join is enabled.
typestringYesJoin type (e.g., "temporal").
left_sourceobjectYes (when enabled)Left side of the join.
right_sourceobjectYes (when enabled)Right side of the join.
output_fieldsarrayYes (when enabled)Fields to include in the joined output.

Join Source

Each join source (left_source and right_source) has the same structure.

FieldTypeRequiredDescription
source_idstringYesSource identifier. Must match a source_id from the sources array.
keystringYesField to join on.
time_windowstringYesJoin time window. See Time windows.

Join Output Fields

Each entry in output_fields selects a field from one of the joined sources.

FieldTypeRequiredDescription
source_idstringYesWhich source the field comes from.
namestringYesField name from the source.
output_namestringNoRename the field in the output. If omitted, the original name is used.

Sink Configuration

The sink configuration defines the ClickHouse destination, including connection details, batching behavior, and column mapping.

type: clickhouse connection_params: host: clickhouse.example.com port: "9000" http_port: "8123" database: default username: default password: mysecret secure: true skip_certificate_verification: false table: orders max_batch_size: 1000 max_delay_time: 1s mapping: - name: order_id column_name: order_id column_type: String - name: amount column_name: amount column_type: Float64 - name: timestamp column_name: created_at column_type: DateTime

Sink Fields

FieldTypeRequiredDescription
typestringYesMust be "clickhouse".
connection_paramsobjectYesClickHouse connection parameters.
tablestringYesTarget table name in ClickHouse.
max_batch_sizeintegerNoMaximum number of records per batch. Default: 1000.
max_delay_timestringNoMaximum delay before flushing a batch. Default: "60s".
mappingarrayYesColumn mappings from source fields to ClickHouse columns.

Sink Connection Parameters

FieldTypeRequiredDescription
hoststringYesClickHouse hostname.
portstringYesClickHouse native port.
http_portstringNoClickHouse HTTP port (used by the UI for connectivity checks).
databasestringYesDatabase name.
usernamestringYesUsername.
passwordstringYesPassword (plain text).
securebooleanNoUse TLS. Default: false.
skip_certificate_verificationbooleanNoSkip certificate verification. Default: false.

Sink Column Mapping

Each entry in the mapping array maps a source field to a ClickHouse column.

FieldTypeRequiredDescription
namestringYesSource field name. Dot notation is supported for nested fields.
column_namestringYesClickHouse column name.
column_typestringYesClickHouse column type (e.g., String, Float64, DateTime).

For the full list of supported type mappings, see Data Formats.

Metadata Configuration

FieldTypeRequiredDescription
tagsarrayNoList of string tags for the pipeline.

Resources Configuration

The resources object controls Kubernetes resource allocation for each pipeline component. If omitted, defaults from the Helm chart values are used.

nats: stream: max_age: 24h max_bytes: 10Gi sources: - source_id: orders replicas: 2 requests: cpu: 250m memory: 256Mi limits: cpu: 500m memory: 512Mi transform: - source_id: orders replicas: 1 storage: size: 10Gi requests: cpu: 500m memory: 512Mi limits: cpu: 1000m memory: 1Gi sink: replicas: 1 requests: cpu: 250m memory: 256Mi limits: cpu: 500m memory: 512Mi

Top-Level Resources Fields

FieldTypeRequiredDescription
natsobjectNoNATS stream configuration.
sourcesarrayNoPer-source resource allocation.
transformarrayNoPer-source transform resource allocation.
sinkobjectNoSink resource allocation.

Source Resources

Each entry in the sources array configures resources for one source’s ingestion component.

FieldTypeRequiredDescription
source_idstringYesSource identifier. Must match a source_id from the sources array.
replicasintegerNoNumber of replicas. Default: 1.
requestsobjectNoCPU and memory requests: {"cpu": "250m", "memory": "256Mi"}.
limitsobjectNoCPU and memory limits: {"cpu": "500m", "memory": "512Mi"}.

Transform Resources

Each entry in the transform array configures resources for one source’s transform component.

FieldTypeRequiredDescription
source_idstringYesSource identifier. Must match a source_id from the sources array.
replicasintegerNoNumber of replicas. Default: 1.
storageobjectNoStorage configuration. Only applicable when deduplication is enabled: {"size": "10Gi"}.
requestsobjectNoCPU and memory requests: {"cpu": "500m", "memory": "512Mi"}.
limitsobjectNoCPU and memory limits: {"cpu": "1000m", "memory": "1Gi"}.

Sink Resources

FieldTypeRequiredDescription
replicasintegerNoNumber of replicas. Default: 1.
requestsobjectNoCPU and memory requests.
limitsobjectNoCPU and memory limits.

NATS Stream Configuration

FieldTypeRequiredDescription
max_agestringNoMaximum message retention age. Default: "24h". Immutable after pipeline creation.
max_bytesstringNoMaximum stream size. Default: "0" (unlimited, no reserved memory). Immutable after pipeline creation.

Other Configuration Notes

Time Windows

Time windows use a string format combining a number and a unit suffix:

  • "30s" — 30 seconds
  • "1m" — 1 minute
  • "1h" — 1 hour
  • "12h" — 12 hours
  • "24h" — 24 hours
Last updated on