Skip to Content
Usage GuidePython SDK

Python SDK

The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.

Installation

Install the GlassFlow Python SDK using pip:

pip install glassflow>=4.0.0

Pipeline version notice: The default and only supported pipeline version is v3. Pipeline version v2 has been removed in glassflow 4.0.0. If you have existing v2 configurations, use the migrate_pipeline_v2_to_v3() helper — see Migrating from V2 to V3 below.

Basic Usage

  1. Import Required Modules
from glassflow.etl import Client
  1. Create a Client
client = Client(host="http://localhost:30180")

Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with GlassFlow CLI, it is http://localhost:30180).

  1. Create a Pipeline from JSON Config

Create a pipeline from a JSON pipeline configuration file:

pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")

Pipeline Management

Get a Pipeline

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")

Get Pipeline Health

Access the pipeline health status:

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>") print(pipeline.status)

Output:

<PipelineStatus.RUNNING: 'Running'>

or query the latest health status:

pipeline_health = pipeline.health() print(pipeline_health)

Output:

{ "pipeline_id": "<pipeline-id>", "pipeline_name": "<pipeline-name>", "overall_status": "Running", "created_at": "2025-09-05T10:21:57.945135078Z", "updated_at": "2025-09-05T10:21:58.192448618Z" }

List Pipelines

You can list all pipelines available in your GlassFlow deployment:

pipelines = client.list_pipelines() print(pipelines)

Output:

[ { "pipeline_id": "<my-pipeline-id>", "name": "<my-pipeline-name>", "transformation_type": "Deduplication", "created_at": "2025-09-04T15:20:24.327977208Z", "updated_at": "2025-09-04T15:20:24.327977208Z", "status": "Running", "metadata": { "tags": ["demo"] } } ]

Stop a Pipeline

Stopping a pipeline will stop ingesting new messages and once all messages in the queue are processed, the pipeline will stop.

pipeline.stop()

Terminate a Pipeline

Terminating a pipeline will stop ingesting new messages and immediately terminate the pipeline.

pipeline.stop(terminate=True)

Resume a Pipeline

Resume will bring the pipeline back to running state from stopped state.

pipeline.resume()

Delete a Pipeline

Only stopped (or terminated) pipelines can be deleted.

pipeline.delete()

or

client.delete_pipeline(pipeline_id="<my-pipeline-id>")

Update a Pipeline

Only stopped (or terminated) pipelines can be updated.

pipeline.update(config_patch=<pipeline config patch>)

Example:

pipeline.update(config_patch={"join": {"type": "join", "enabled": true}})

or update the full pipeline config:

pipeline.update(config_patch=<new pipeline config>)

Rename a Pipeline

Rename a pipeline without stopping it — the new name is reflected in the UI and API immediately.

pipeline.rename("My renamed pipeline")

Dead-Letter Queue (DLQ)

Get DLQ State

dlq_state = pipeline.dlq.state() print(dlq_state)

Output:

{ "last_consumed_at": "2026-05-20T10:32:38.113427621Z", "last_received_at": "2026-05-20T10:23:26.530466989Z", "total_messages": 2, "unconsumed_messages": 1 }

Purge DLQ

Removes all messages from the DLQ. Returns None.

pipeline.dlq.purge()

Read Messages from DLQ

messages = pipeline.dlq.consume(batch_size=1) print(messages)

Output:

[ { "component": "ingestor", "error": "failed to validate data", "original_message": "<original kafka message>" } ]

Import / Export Pipeline Configurations

Import

Import a pipeline configuration from a JSON file:

pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")

Import a pipeline configuration from a YAML file:

pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")

Export

Export a pipeline configuration to a JSON file:

pipeline.to_json(json_path="<my-pipeline-config.json>")

Export a pipeline configuration to a YAML file:

pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")

OTLP Source Pipelines

GlassFlow can ingest OpenTelemetry signals directly without a Kafka cluster. Add an OTLP source to the sources array — no schema_fields or connection_params needed. GlassFlow manages the OTLP receiver endpoint.

Join is not supported for OTLP source pipelines. Deduplication, filter, and stateless transforms are configured through the transforms array.

pipeline_config = { "version": "v3", "pipeline_id": "otel-logs-pipeline", "sources": [ { "type": "otlp.logs", "source_id": "logs" } ], "transforms": [ { "type": "dedup", "source_id": "logs", "config": { "key": "trace_id", "time_window": "5m" } } ], "sink": { "type": "clickhouse", "connection_params": { "host": "clickhouse.example.com", "port": "9000", "database": "otel", "username": "default", "password": "mysecret", "secure": True }, "table": "otel_logs", "mapping": [ {"name": "timestamp", "column_name": "Timestamp", "column_type": "DateTime64(9)"}, {"name": "severity_text", "column_name": "SeverityText", "column_type": "LowCardinality(String)"}, {"name": "body", "column_name": "Body", "column_type": "String"}, {"name": "trace_id", "column_name": "TraceId", "column_type": "String"} ] } } pipeline = client.create_pipeline(pipeline_config)

For complete examples with full column mappings for logs, traces, and metrics, see OTLP Source Examples.

Pipeline Resources

Use resources to set CPU, memory, replica counts, and storage limits per component. This is optional — omit it to use GlassFlow’s defaults.

pipeline_config = { "version": "v3", "pipeline_id": "my-pipeline-id", # ... sources, transforms, sink ... "resources": { "sources": [ { "source_id": "orders", "replicas": 2, "requests": {"cpu": "250m", "memory": "256Mi"}, "limits": {"cpu": "500m", "memory": "512Mi"} } ], "transform": [ { "source_id": "orders", "replicas": 1, "storage": {"size": "10Gi"}, "requests": {"cpu": "500m", "memory": "512Mi"}, "limits": {"cpu": "1000m", "memory": "1Gi"} } ], "sink": { "replicas": 1, "requests": {"cpu": "250m", "memory": "256Mi"}, "limits": {"cpu": "500m", "memory": "512Mi"} }, "nats": { "stream": { "max_age": "24h", "max_bytes": "10Gi" } } } }

The following resource fields are immutable — they must be set at creation time and cannot be changed by pipeline.update():

FieldReason
transform[].storage.sizeUnderlying volume cannot be resized
nats.stream.max_ageJetStream stream config is immutable
nats.stream.max_bytesJetStream stream config is immutable

Migrating from V2 to V3

Pipeline version v2 has been removed in glassflow 4.0.0. Existing v2 configurations must be migrated to v3 before they can be deployed.

The SDK ships a migrate_pipeline_v2_to_v3() helper that does the migration server-side and returns a validated v3 configuration. It calls the GlassFlow API’s POST /api/v1/pipeline/migrate-preview endpoint — no pipeline is created and no state is touched, so it’s safe to call repeatedly.

from glassflow.etl import Client client = Client(host="http://localhost:30180") v3_config = client.migrate_pipeline_v2_to_v3(v2_config) pipeline = client.create_pipeline(v3_config)

If you prefer to migrate manually, see the Pipeline Configuration Reference for the full migration table.

Key structural changes:

V2V3
source (singular) with topics[]sources[] array, one entry per topic
Dedup nested in topictransforms[] with type: "dedup"
Top-level filter, stateless_transformationtransforms[] with type: "filter" / "stateless"
join.sources[] with orientationjoin.left_source / join.right_source
schema.fields[]sink.mapping[]
pipeline_resourcesresources with per-source entries
Flat sink fields (host, port, …)sink.connection_params nested object
Sink password base64-encodedSink password plain text

Next Steps

Last updated on