Skip to Content
SourcesOTLPExamples

OTLP Source Examples

Each example below includes a ClickHouse table and a complete pipeline configuration that maps every flattened OTLP field to the corresponding column.

The table schemas follow the ClickStack  format. This gives you out-of-the-box compatibility with ClickHouse observability tooling (HyperDX, Grafana ClickHouse plugin, etc.) without having to design your own schema.

After defining the configuration, create the pipeline with the Python SDK and point your OpenTelemetry Collector or SDK at the GlassFlow receiver (see Sending Data).

from glassflow.etl import Client client = Client(host="http://localhost:30180") pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")

Logs

ClickHouse Table

CREATE TABLE IF NOT EXISTS otel_logs ( `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), `TimestampTime` DateTime DEFAULT toDateTime(Timestamp), `TraceId` String CODEC(ZSTD(1)), `SpanId` String CODEC(ZSTD(1)), `TraceFlags` UInt8, `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), `SeverityNumber` UInt8, `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), `Body` String CODEC(ZSTD(1)), `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `ScopeName` String CODEC(ZSTD(1)), `ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)), `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 ) ENGINE = MergeTree() PARTITION BY toDate(TimestampTime) PRIMARY KEY (ServiceName, TimestampTime) ORDER BY (ServiceName, TimestampTime, Timestamp);

Pipeline Configuration

version: v3 pipeline_id: otel-logs-pipeline sources: - type: otlp.logs source_id: logs sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: true table: otel_logs max_batch_size: 1000 max_delay_time: 1s mapping: - name: timestamp column_name: Timestamp column_type: DateTime64(9) - name: trace_id column_name: TraceId column_type: String - name: span_id column_name: SpanId column_type: String - name: flags column_name: TraceFlags column_type: UInt8 - name: severity_text column_name: SeverityText column_type: LowCardinality(String) - name: severity_number column_name: SeverityNumber column_type: UInt8 - name: resource_attributes.service.name column_name: ServiceName column_type: LowCardinality(String) - name: body column_name: Body column_type: String - name: resource_attributes column_name: ResourceAttributes column_type: Map(LowCardinality(String), String) - name: scope_name column_name: ScopeName column_type: String - name: scope_version column_name: ScopeVersion column_type: LowCardinality(String) - name: scope_attributes column_name: ScopeAttributes column_type: Map(LowCardinality(String), String) - name: attributes column_name: LogAttributes column_type: Map(LowCardinality(String), String)

TimestampTime is a DEFAULT column computed automatically by ClickHouse from Timestamp — no mapping is needed.

Traces

ClickHouse Table

CREATE TABLE IF NOT EXISTS otel_traces ( `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), `TraceId` String CODEC(ZSTD(1)), `SpanId` String CODEC(ZSTD(1)), `ParentSpanId` String CODEC(ZSTD(1)), `TraceState` String CODEC(ZSTD(1)), `SpanName` LowCardinality(String) CODEC(ZSTD(1)), `SpanKind` LowCardinality(String) CODEC(ZSTD(1)), `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `ScopeName` String CODEC(ZSTD(1)), `ScopeVersion` String CODEC(ZSTD(1)), `SpanAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `Duration` UInt64 CODEC(ZSTD(1)), `StatusCode` LowCardinality(String) CODEC(ZSTD(1)), `StatusMessage` String CODEC(ZSTD(1)), `Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)), `Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)), `Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), `Links.TraceId` Array(String) CODEC(ZSTD(1)), `Links.SpanId` Array(String) CODEC(ZSTD(1)), `Links.TraceState` Array(String) CODEC(ZSTD(1)), `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_span_id SpanId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_parent_span_id ParentSpanId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_service_name ServiceName TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_name SpanName TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_kind SpanKind TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_status_code StatusCode TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1 ) ENGINE = MergeTree() PARTITION BY toDate(Timestamp) ORDER BY (ServiceName, Timestamp, TraceId, SpanId);

Pipeline Configuration

version: v3 pipeline_id: otel-traces-pipeline sources: - type: otlp.traces source_id: traces sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: true table: otel_traces max_batch_size: 1000 max_delay_time: 1s mapping: - name: start_timestamp column_name: Timestamp column_type: DateTime64(9) - name: trace_id column_name: TraceId column_type: String - name: span_id column_name: SpanId column_type: String - name: parent_span_id column_name: ParentSpanId column_type: String - name: trace_state column_name: TraceState column_type: String - name: name column_name: SpanName column_type: LowCardinality(String) - name: kind column_name: SpanKind column_type: LowCardinality(String) - name: resource_attributes.service.name column_name: ServiceName column_type: LowCardinality(String) - name: resource_attributes column_name: ResourceAttributes column_type: Map(LowCardinality(String), String) - name: scope_name column_name: ScopeName column_type: String - name: scope_version column_name: ScopeVersion column_type: String - name: attributes column_name: SpanAttributes column_type: Map(LowCardinality(String), String) - name: duration_ns column_name: Duration column_type: UInt64 - name: status_code column_name: StatusCode column_type: LowCardinality(String) - name: status_message column_name: StatusMessage column_type: String - name: events column_name: Events column_type: "Nested(Timestamp DateTime64(9), Name LowCardinality(String), Attributes Map(LowCardinality(String), String))" - name: links column_name: Links column_type: "Nested(TraceId String, SpanId String, TraceState String, Attributes Map(LowCardinality(String), String))"

Metrics (Gauge)

This example uses a gauge metric table. Sum, histogram, and summary tables follow the same pattern with additional type-specific columns.

ClickHouse Table

CREATE TABLE IF NOT EXISTS otel_metrics_gauge ( `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `ScopeName` String CODEC(ZSTD(1)), `ScopeVersion` String CODEC(ZSTD(1)), `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), `MetricName` String CODEC(ZSTD(1)), `MetricDescription` String CODEC(ZSTD(1)), `MetricUnit` String CODEC(ZSTD(1)), `Attributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `StartTimeUnix` DateTime64(9) CODEC(Delta, ZSTD(1)), `TimeUnix` DateTime64(9) CODEC(Delta, ZSTD(1)), `Value` Float64 CODEC(ZSTD(1)), `Flags` UInt32 CODEC(ZSTD(1)), INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1 ) ENGINE = MergeTree() PARTITION BY toDate(TimeUnix) ORDER BY (ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix));

Pipeline Configuration

version: v3 pipeline_id: otel-metrics-gauge-pipeline sources: - type: otlp.metrics source_id: metrics sink: type: clickhouse connection_params: host: clickhouse.example.com port: "9000" database: default username: default password: mysecret secure: true table: otel_metrics_gauge max_batch_size: 1000 max_delay_time: 1s mapping: - name: resource_attributes.service.name column_name: ServiceName column_type: LowCardinality(String) - name: resource_attributes column_name: ResourceAttributes column_type: Map(LowCardinality(String), String) - name: scope_name column_name: ScopeName column_type: String - name: scope_version column_name: ScopeVersion column_type: String - name: scope_attributes column_name: ScopeAttributes column_type: Map(LowCardinality(String), String) - name: metric_name column_name: MetricName column_type: String - name: metric_description column_name: MetricDescription column_type: String - name: metric_unit column_name: MetricUnit column_type: String - name: attributes column_name: Attributes column_type: Map(LowCardinality(String), String) - name: start_timestamp column_name: StartTimeUnix column_type: DateTime64(9) - name: timestamp column_name: TimeUnix column_type: DateTime64(9) - name: value_double column_name: Value column_type: Float64 - name: flags column_name: Flags column_type: UInt32
Last updated on