Kafka Source Examples
Quick Start
Define the pipeline configuration
Create a pipeline configuration file with your Kafka source, ClickHouse sink, and column mapping:
YAML
version: v3
pipeline_id: my-kafka-pipeline
sources:
- type: kafka
source_id: events
connection_params:
brokers:
- "kafka:9092"
protocol: PLAINTEXT
mechanism: NO_AUTH
topic: events
consumer_group_initial_offset: earliest
schema_fields:
- name: event_id
type: string
- name: timestamp
type: datetime
- name: payload
type: string
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: false
table: events
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: event_id
column_name: event_id
column_type: String
- name: timestamp
column_name: timestamp
column_type: DateTime
- name: payload
column_name: payload
column_type: StringCreate the pipeline
Using the Python SDK:
from glassflow.etl import Client
client = Client(host="http://localhost:30180")
pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")Or use the Web UI wizard, which walks you through each step visually.
Verify data is flowing
Check that records are arriving in your ClickHouse table:
SELECT count() FROM events;Single Topic with Deduplication
YAML
version: v3
pipeline_id: dedup-pipeline
sources:
- type: kafka
source_id: orders
connection_params:
brokers:
- "kafka:9092"
protocol: PLAINTEXT
mechanism: NO_AUTH
topic: orders
consumer_group_initial_offset: earliest
schema_fields:
- name: order_id
type: string
- name: user_id
type: string
- name: amount
type: float
- name: timestamp
type: datetime
transforms:
- type: dedup
source_id: orders
config:
key: order_id
time_window: 1h
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: false
table: orders
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: order_id
column_name: order_id
column_type: String
- name: user_id
column_name: user_id
column_type: String
- name: amount
column_name: amount
column_type: Float64
- name: timestamp
column_name: created_at
column_type: DateTimeLast updated on