Topic Configuration
Each Kafka source in the sources array defines a single topic to consume from. A pipeline can have multiple Kafka sources (e.g., two sources are required for temporal joins).
Source Parameters
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Must be "kafka" |
source_id | string | Yes | Unique identifier, referenced by transforms, join, and sink mapping |
connection_params | object | Yes | Kafka connection parameters. See Connections. |
topic | string | Yes | Kafka topic name |
consumer_group_initial_offset | string | No | Where to start reading: earliest or latest (default: latest) |
schema_fields | array | Yes | Field definitions for this source |
Consumer Group Offset
The consumer_group_initial_offset controls where GlassFlow starts reading when a consumer group is created for the first time:
earliest— start from the beginning of the topic. Use this when you need to process historical data.latest— start from the most recent messages. Use this when you only care about new data arriving after the pipeline starts.
After the initial offset is set, GlassFlow tracks consumer group offsets automatically. Stopping and resuming a pipeline continues from where it left off.
Schema Fields
Each entry in schema_fields declares a field from the source data:
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Field name. Dot notation supported for nested fields (e.g., data.user_id). |
type | string | Yes | Field type. See Supported Data Formats. |
Deduplication
Deduplication is configured as a transform step in the transforms array, not inside the source. See Deduplication for details.
"transforms": [
{
"type": "dedup",
"source_id": "orders",
"config": {
"key": "order_id",
"time_window": "1h"
}
}
]Example: Two Sources for a Join
"sources": [
{
"type": "kafka",
"source_id": "user_logins",
"connection_params": {
"brokers": ["kafka:9092"],
"protocol": "PLAINTEXT",
"mechanism": "NO_AUTH"
},
"topic": "user_logins",
"consumer_group_initial_offset": "earliest",
"schema_fields": [
{"name": "session_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": "datetime"}
]
},
{
"type": "kafka",
"source_id": "orders",
"connection_params": {
"brokers": ["kafka:9092"],
"protocol": "PLAINTEXT",
"mechanism": "NO_AUTH"
},
"topic": "orders",
"consumer_group_initial_offset": "earliest",
"schema_fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "float"}
]
}
]Last updated on