Skip to Content
Usage GuidePython SDK

Python SDK

The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.

Installation

Install the GlassFlow Python SDK using pip:

pip install glassflow>=3.6.0

Pipeline version notice: The default pipeline version is now v3. If you have existing v2 configurations, see Migrating from V2 to V3 below. Pipeline version v2 is deprecated and will be removed in a future release.

Basic Usage

  1. Import Required Modules
from glassflow.etl import Client
  1. Create a Client
client = Client(host="http://localhost:30180")

Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with GlassFlow CLI, it is http://localhost:30180).

  1. Create a Pipeline from JSON Config

Create a pipeline from a JSON pipeline configuration file:

pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")

Pipeline Management

Get a Pipeline

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")

Get Pipeline Health

Access the pipeline health status:

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>") print(pipeline.status)

Output:

<PipelineStatus.RUNNING: 'Running'>

or query the latest health status:

pipeline_health = pipeline.health() print(pipeline_health)

Output:

{ "pipeline_id": "<pipeline-id>", "pipeline_name": "<pipeline-name>", "overall_status": "Running", "created_at": "2025-09-05T10:21:57.945135078Z", "updated_at": "2025-09-05T10:21:58.192448618Z" }

List Pipelines

You can list all pipelines available in your GlassFlow deployment:

pipelines = client.list_pipelines() print(pipelines)

Output:

[ { "pipeline_id": "<my-pipeline-id>", "name": "<my-pipeline-name>", "transformation_type": "Deduplication", "created_at": "2025-09-04T15:20:24.327977208Z", "updated_at": "2025-09-04T15:20:24.327977208Z", "status": "Running", "metadata": { "tags": ["demo"] } } ]

Stop a Pipeline

Stopping a pipeline will stop ingesting new messages and once all messages in the queue are processed, the pipeline will stop.

pipeline.stop()

Terminate a Pipeline

Terminating a pipeline will stop ingesting new messages and immediately terminate the pipeline.

pipeline.stop(terminate=True)

Resume a Pipeline

Resume will bring the pipeline back to running state from stopped state.

pipeline.resume()

Delete a Pipeline

Only stopped (or terminated) pipelines can be deleted.

pipeline.delete()

or

client.delete_pipeline(pipeline_id="<my-pipeline-id>")

Update a Pipeline

Only stopped (or terminated) pipelines can be updated.

pipeline.update(config_patch=<pipeline config patch>)

Example:

pipeline.update(config_patch={"join": {"type": "join", "enabled": true}})

or update the full pipeline config:

pipeline.update(config_patch=<new pipeline config>)

Dead-Letter Queue (DLQ)

Get DLQ State

dlq_state = pipeline.dlq.state() print(dlq_state)

Purge DLQ

pipeline.dlq.purge()

Output:

{ "last_consumed_at": "2025-09-05T10:32:38.113427621Z", "last_received_at": "2025-09-05T10:23:26.530466989Z", "total_messages": 2, "unconsumed_messages": 1 }

Read Messages from DLQ

messages = pipeline.dlq.consume(batch_size=1) print(messages)

Output:

[ { "component": "ingestor", "error": "failed to validate data", "original_message": "<original kafka message>" } ]

Import / Export Pipeline Configurations

Import

Import a pipeline configuration from a JSON file:

pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")

Import a pipeline configuration from a YAML file:

pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")

Export

Export a pipeline configuration to a JSON file:

pipeline.to_json(json_path="<my-pipeline-config.json>")

Export a pipeline configuration to a YAML file:

pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")

OTLP Source Pipelines

GlassFlow can ingest OpenTelemetry signals directly without a Kafka cluster. Add an OTLP source to the sources array — no schema_fields or connection_params needed. GlassFlow manages the OTLP receiver endpoint.

Join is not supported for OTLP source pipelines. Deduplication, filter, and stateless transforms are configured through the transforms array.

pipeline_config = { "version": "v3", "pipeline_id": "otel-logs-pipeline", "sources": [ { "type": "otlp.logs", "source_id": "logs" } ], "transforms": [ { "type": "dedup", "source_id": "logs", "config": { "key": "trace_id", "time_window": "5m" } } ], "sink": { "type": "clickhouse", "connection_params": { "host": "clickhouse.example.com", "port": "9000", "database": "otel", "username": "default", "password": "mysecret", "secure": True }, "table": "otel_logs", "mapping": [ {"name": "timestamp", "column_name": "Timestamp", "column_type": "DateTime64(9)"}, {"name": "severity_text", "column_name": "SeverityText", "column_type": "LowCardinality(String)"}, {"name": "body", "column_name": "Body", "column_type": "String"}, {"name": "trace_id", "column_name": "TraceId", "column_type": "String"} ] } } pipeline = client.create_pipeline(pipeline_config)

For complete examples with full column mappings for logs, traces, and metrics, see OTLP Source Examples.

Pipeline Resources

Use resources to set CPU, memory, replica counts, and storage limits per component. This is optional — omit it to use GlassFlow’s defaults.

pipeline_config = { "version": "v3", "pipeline_id": "my-pipeline-id", # ... sources, transforms, sink ... "resources": { "sources": [ { "source_id": "orders", "replicas": 2, "requests": {"cpu": "250m", "memory": "256Mi"}, "limits": {"cpu": "500m", "memory": "512Mi"} } ], "transform": [ { "source_id": "orders", "replicas": 1, "storage": {"size": "10Gi"}, "requests": {"cpu": "500m", "memory": "512Mi"}, "limits": {"cpu": "1000m", "memory": "1Gi"} } ], "sink": { "replicas": 1, "requests": {"cpu": "250m", "memory": "256Mi"}, "limits": {"cpu": "500m", "memory": "512Mi"} }, "nats": { "stream": { "max_age": "24h", "max_bytes": "10Gi" } } } }

The following resource fields are immutable — they must be set at creation time and cannot be changed by pipeline.update():

FieldReason
transform[].storage.sizeUnderlying volume cannot be resized
nats.stream.max_ageJetStream stream config is immutable
nats.stream.max_bytesJetStream stream config is immutable

Migrating from V2 to V3

Pipeline version v2 is deprecated. See the Pipeline Configuration Reference for the full V2 → V3 migration table.

Key structural changes:

V2V3
source (singular) with topics[]sources[] array, one entry per topic
Dedup nested in topictransforms[] with type: "dedup"
Top-level filter, stateless_transformationtransforms[] with type: "filter" / "stateless"
join.sources[] with orientationjoin.left_source / join.right_source
schema.fields[]sink.mapping[]
pipeline_resourcesresources with per-source entries
Flat sink fields (host, port, …)sink.connection_params nested object
Sink password base64-encodedSink password plain text

Next Steps

Last updated on