Skip to Content
SourcesKafkaExamples

Kafka Source Examples

Quick Start

Define the pipeline configuration

Create a pipeline configuration file with your Kafka source, ClickHouse sink, and column mapping:

version: v3 pipeline_id: my-kafka-pipeline sources: - type: kafka source_id: events connection_params: brokers: - "kafka:9092" protocol: PLAINTEXT mechanism: NO_AUTH topic: events consumer_group_initial_offset: earliest schema_fields: - name: event_id type: string - name: timestamp type: datetime - name: payload type: string sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: false table: events max_batch_size: 1000 max_delay_time: 1s mapping: - name: event_id column_name: event_id column_type: String - name: timestamp column_name: timestamp column_type: DateTime - name: payload column_name: payload column_type: String

Create the pipeline

Using the Python SDK:

from glassflow.etl import Client client = Client(host="http://localhost:30180") pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")

Or use the Web UI wizard, which walks you through each step visually.

Verify data is flowing

Check that records are arriving in your ClickHouse table:

SELECT count() FROM events;

Single Topic with Deduplication

version: v3 pipeline_id: dedup-pipeline sources: - type: kafka source_id: orders connection_params: brokers: - "kafka:9092" protocol: PLAINTEXT mechanism: NO_AUTH topic: orders consumer_group_initial_offset: earliest schema_fields: - name: order_id type: string - name: user_id type: string - name: amount type: float - name: timestamp type: datetime transforms: - type: dedup source_id: orders config: key: order_id time_window: 1h sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: false table: orders max_batch_size: 1000 max_delay_time: 1s mapping: - name: order_id column_name: order_id column_type: String - name: user_id column_name: user_id column_type: String - name: amount column_name: amount column_type: Float64 - name: timestamp column_name: created_at column_type: DateTime
Last updated on