Skip to Content
SourcesKafkaTopic Configuration

Topic Configuration

Each Kafka source in the sources array defines a single topic to consume from. A pipeline can have multiple Kafka sources (e.g., two sources are required for temporal joins).

Source Parameters

FieldTypeRequiredDescription
typestringYesMust be "kafka"
source_idstringYesUnique identifier, referenced by transforms, join, and sink mapping
connection_paramsobjectYesKafka connection parameters. See Connections.
topicstringYesKafka topic name
consumer_group_initial_offsetstringNoWhere to start reading: earliest or latest (default: latest)
schema_fieldsarrayYesField definitions for this source

Consumer Group Offset

The consumer_group_initial_offset controls where GlassFlow starts reading when a consumer group is created for the first time:

  • earliest — start from the beginning of the topic. Use this when you need to process historical data.
  • latest — start from the most recent messages. Use this when you only care about new data arriving after the pipeline starts.

After the initial offset is set, GlassFlow tracks consumer group offsets automatically. Stopping and resuming a pipeline continues from where it left off.

Schema Fields

Each entry in schema_fields declares a field from the source data:

FieldTypeRequiredDescription
namestringYesField name. Dot notation supported for nested fields (e.g., data.user_id).
typestringYesField type. See Supported Data Formats.

Deduplication

Deduplication is configured as a transform step in the transforms array, not inside the source. See Deduplication for details.

"transforms": [ { "type": "dedup", "source_id": "orders", "config": { "key": "order_id", "time_window": "1h" } } ]

Example: Two Sources for a Join

"sources": [ { "type": "kafka", "source_id": "user_logins", "connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT", "mechanism": "NO_AUTH" }, "topic": "user_logins", "consumer_group_initial_offset": "earliest", "schema_fields": [ {"name": "session_id", "type": "string"}, {"name": "user_id", "type": "string"}, {"name": "timestamp", "type": "datetime"} ] }, { "type": "kafka", "source_id": "orders", "connection_params": { "brokers": ["kafka:9092"], "protocol": "PLAINTEXT", "mechanism": "NO_AUTH" }, "topic": "orders", "consumer_group_initial_offset": "earliest", "schema_fields": [ {"name": "order_id", "type": "string"}, {"name": "user_id", "type": "string"}, {"name": "amount", "type": "float"} ] } ]
Last updated on