Kafka Source
Kafka source pipelines consume messages from one or more Kafka topics, apply optional transformations, and write the results to ClickHouse. This is the default source type and supports the full set of GlassFlow pipeline features including deduplication, temporal joins, filtering, and stateless transformations.
Configuration
Each Kafka source is an entry in the sources array with type set to "kafka":
{
"type": "kafka",
"source_id": "events",
"connection_params": {
"brokers": ["kafka:9092"],
"protocol": "PLAINTEXT",
"mechanism": "NO_AUTH"
},
"topic": "events",
"consumer_group_initial_offset": "earliest",
"schema_fields": [
{"name": "event_id", "type": "string"},
{"name": "timestamp", "type": "datetime"}
]
}Source Parameters
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be "kafka" |
source_id | string | Yes | Unique identifier, referenced by transforms, join, and sink mapping |
connection_params | object | Yes | Kafka connection parameters |
topic | string | Yes | Kafka topic name |
consumer_group_initial_offset | string | No | Where to start reading: earliest or latest (default: latest) |
schema_fields | array | Yes | Field definitions for this source. Each entry: {"name": "...", "type": "..."} |
Connection Parameters
| Field | Type | Required | Description |
|---|---|---|---|
brokers | array | Yes | Kafka broker addresses (e.g., ["kafka:9092"]) |
protocol | string | Yes | Security protocol: PLAINTEXT, SASL_PLAINTEXT, SSL, SASL_SSL |
mechanism | string | Conditional | Auth mechanism: NO_AUTH, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI |
username | string | Conditional | Kafka username (required when auth is enabled) |
password | string | Conditional | Kafka password (required when auth is enabled) |
root_ca | string | No | PEM-encoded CA certificate for TLS |
skip_tls_verification | boolean | No | Skip TLS certificate verification (default: false) |
For detailed examples of each protocol and authentication method (including Kerberos), see Connections.
Features
| Feature | Supported | Details |
|---|---|---|
| Deduplication | Yes | Deduplication |
| Temporal Joins | Yes | Join |
| Filter | Yes | Filter |
| Stateless Transformation | Yes | Stateless Transformation |
Next Steps
- Examples — quick start and full pipeline examples
- Connections — protocol and auth examples
- Pipeline Configuration Reference — full configuration reference
Last updated on