OTLP Source Examples
Each example below includes a ClickHouse table and a complete pipeline configuration that maps every flattened OTLP field to the corresponding column.
The table schemas follow the ClickStack format. This gives you out-of-the-box compatibility with ClickHouse observability tooling (HyperDX, Grafana ClickHouse plugin, etc.) without having to design your own schema.
After defining the configuration, create the pipeline with the Python SDK and point your OpenTelemetry Collector or SDK at the GlassFlow receiver (see Sending Data).
from glassflow.etl import Client
client = Client(host="http://localhost:30180")
pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")Logs
ClickHouse Table
CREATE TABLE IF NOT EXISTS otel_logs
(
`Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)),
`TimestampTime` DateTime DEFAULT toDateTime(Timestamp),
`TraceId` String CODEC(ZSTD(1)),
`SpanId` String CODEC(ZSTD(1)),
`TraceFlags` UInt8,
`SeverityText` LowCardinality(String) CODEC(ZSTD(1)),
`SeverityNumber` UInt8,
`ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
`Body` String CODEC(ZSTD(1)),
`ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`ScopeName` String CODEC(ZSTD(1)),
`ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)),
`ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8
)
ENGINE = MergeTree()
PARTITION BY toDate(TimestampTime)
PRIMARY KEY (ServiceName, TimestampTime)
ORDER BY (ServiceName, TimestampTime, Timestamp);Pipeline Configuration
YAML
version: v3
pipeline_id: otel-logs-pipeline
sources:
- type: otlp.logs
source_id: logs
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: true
table: otel_logs
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: timestamp
column_name: Timestamp
column_type: DateTime64(9)
- name: trace_id
column_name: TraceId
column_type: String
- name: span_id
column_name: SpanId
column_type: String
- name: flags
column_name: TraceFlags
column_type: UInt8
- name: severity_text
column_name: SeverityText
column_type: LowCardinality(String)
- name: severity_number
column_name: SeverityNumber
column_type: UInt8
- name: resource_attributes.service.name
column_name: ServiceName
column_type: LowCardinality(String)
- name: body
column_name: Body
column_type: String
- name: resource_attributes
column_name: ResourceAttributes
column_type: Map(LowCardinality(String), String)
- name: scope_name
column_name: ScopeName
column_type: String
- name: scope_version
column_name: ScopeVersion
column_type: LowCardinality(String)
- name: scope_attributes
column_name: ScopeAttributes
column_type: Map(LowCardinality(String), String)
- name: attributes
column_name: LogAttributes
column_type: Map(LowCardinality(String), String)TimestampTime is a DEFAULT column computed automatically by ClickHouse from Timestamp — no mapping is needed.
Traces
ClickHouse Table
CREATE TABLE IF NOT EXISTS otel_traces
(
`Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)),
`TraceId` String CODEC(ZSTD(1)),
`SpanId` String CODEC(ZSTD(1)),
`ParentSpanId` String CODEC(ZSTD(1)),
`TraceState` String CODEC(ZSTD(1)),
`SpanName` LowCardinality(String) CODEC(ZSTD(1)),
`SpanKind` LowCardinality(String) CODEC(ZSTD(1)),
`ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
`ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`ScopeName` String CODEC(ZSTD(1)),
`ScopeVersion` String CODEC(ZSTD(1)),
`SpanAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`Duration` UInt64 CODEC(ZSTD(1)),
`StatusCode` LowCardinality(String) CODEC(ZSTD(1)),
`StatusMessage` String CODEC(ZSTD(1)),
`Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)),
`Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)),
`Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)),
`Links.TraceId` Array(String) CODEC(ZSTD(1)),
`Links.SpanId` Array(String) CODEC(ZSTD(1)),
`Links.TraceState` Array(String) CODEC(ZSTD(1)),
`Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_span_id SpanId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_parent_span_id ParentSpanId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_service_name ServiceName TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_name SpanName TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_kind SpanKind TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_status_code StatusCode TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, Timestamp, TraceId, SpanId);Pipeline Configuration
YAML
version: v3
pipeline_id: otel-traces-pipeline
sources:
- type: otlp.traces
source_id: traces
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: true
table: otel_traces
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: start_timestamp
column_name: Timestamp
column_type: DateTime64(9)
- name: trace_id
column_name: TraceId
column_type: String
- name: span_id
column_name: SpanId
column_type: String
- name: parent_span_id
column_name: ParentSpanId
column_type: String
- name: trace_state
column_name: TraceState
column_type: String
- name: name
column_name: SpanName
column_type: LowCardinality(String)
- name: kind
column_name: SpanKind
column_type: LowCardinality(String)
- name: resource_attributes.service.name
column_name: ServiceName
column_type: LowCardinality(String)
- name: resource_attributes
column_name: ResourceAttributes
column_type: Map(LowCardinality(String), String)
- name: scope_name
column_name: ScopeName
column_type: String
- name: scope_version
column_name: ScopeVersion
column_type: String
- name: attributes
column_name: SpanAttributes
column_type: Map(LowCardinality(String), String)
- name: duration_ns
column_name: Duration
column_type: UInt64
- name: status_code
column_name: StatusCode
column_type: LowCardinality(String)
- name: status_message
column_name: StatusMessage
column_type: String
- name: events
column_name: Events
column_type: "Nested(Timestamp DateTime64(9), Name LowCardinality(String), Attributes Map(LowCardinality(String), String))"
- name: links
column_name: Links
column_type: "Nested(TraceId String, SpanId String, TraceState String, Attributes Map(LowCardinality(String), String))"Metrics (Gauge)
This example uses a gauge metric table. Sum, histogram, and summary tables follow the same pattern with additional type-specific columns.
ClickHouse Table
CREATE TABLE IF NOT EXISTS otel_metrics_gauge
(
`ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`ScopeName` String CODEC(ZSTD(1)),
`ScopeVersion` String CODEC(ZSTD(1)),
`ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
`MetricName` String CODEC(ZSTD(1)),
`MetricDescription` String CODEC(ZSTD(1)),
`MetricUnit` String CODEC(ZSTD(1)),
`Attributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`StartTimeUnix` DateTime64(9) CODEC(Delta, ZSTD(1)),
`TimeUnix` DateTime64(9) CODEC(Delta, ZSTD(1)),
`Value` Float64 CODEC(ZSTD(1)),
`Flags` UInt32 CODEC(ZSTD(1)),
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY toDate(TimeUnix)
ORDER BY (ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix));Pipeline Configuration
YAML
version: v3
pipeline_id: otel-metrics-gauge-pipeline
sources:
- type: otlp.metrics
source_id: metrics
sink:
type: clickhouse
connection_params:
host: clickhouse.example.com
port: "9000"
database: default
username: default
password: mysecret
secure: true
table: otel_metrics_gauge
max_batch_size: 1000
max_delay_time: 1s
mapping:
- name: resource_attributes.service.name
column_name: ServiceName
column_type: LowCardinality(String)
- name: resource_attributes
column_name: ResourceAttributes
column_type: Map(LowCardinality(String), String)
- name: scope_name
column_name: ScopeName
column_type: String
- name: scope_version
column_name: ScopeVersion
column_type: String
- name: scope_attributes
column_name: ScopeAttributes
column_type: Map(LowCardinality(String), String)
- name: metric_name
column_name: MetricName
column_type: String
- name: metric_description
column_name: MetricDescription
column_type: String
- name: metric_unit
column_name: MetricUnit
column_type: String
- name: attributes
column_name: Attributes
column_type: Map(LowCardinality(String), String)
- name: start_timestamp
column_name: StartTimeUnix
column_type: DateTime64(9)
- name: timestamp
column_name: TimeUnix
column_type: DateTime64(9)
- name: value_double
column_name: Value
column_type: Float64
- name: flags
column_name: Flags
column_type: UInt32