Python SDK
The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.
Installation
Install the GlassFlow Python SDK using pip:
pip install glassflow>=4.0.0Pipeline version notice: The default and only supported pipeline version is
v3. Pipeline versionv2has been removed inglassflow4.0.0. If you have existingv2configurations, use themigrate_pipeline_v2_to_v3()helper — see Migrating from V2 to V3 below.
Basic Usage
- Import Required Modules
from glassflow.etl import Client- Create a Client
client = Client(host="http://localhost:30180")Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with GlassFlow CLI, it is http://localhost:30180).
- Create a Pipeline from JSON Config
Create a pipeline from a JSON pipeline configuration file:
pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")Pipeline Management
Get a Pipeline
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")Get Pipeline Health
Access the pipeline health status:
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")
print(pipeline.status)Output:
<PipelineStatus.RUNNING: 'Running'>or query the latest health status:
pipeline_health = pipeline.health()
print(pipeline_health)Output:
{
"pipeline_id": "<pipeline-id>",
"pipeline_name": "<pipeline-name>",
"overall_status": "Running",
"created_at": "2025-09-05T10:21:57.945135078Z",
"updated_at": "2025-09-05T10:21:58.192448618Z"
}List Pipelines
You can list all pipelines available in your GlassFlow deployment:
pipelines = client.list_pipelines()
print(pipelines)Output:
[
{
"pipeline_id": "<my-pipeline-id>",
"name": "<my-pipeline-name>",
"transformation_type": "Deduplication",
"created_at": "2025-09-04T15:20:24.327977208Z",
"updated_at": "2025-09-04T15:20:24.327977208Z",
"status": "Running",
"metadata": {
"tags": ["demo"]
}
}
]Stop a Pipeline
Stopping a pipeline will stop ingesting new messages and once all messages in the queue are processed, the pipeline will stop.
pipeline.stop()Terminate a Pipeline
Terminating a pipeline will stop ingesting new messages and immediately terminate the pipeline.
pipeline.stop(terminate=True)Resume a Pipeline
Resume will bring the pipeline back to running state from stopped state.
pipeline.resume()Delete a Pipeline
Only stopped (or terminated) pipelines can be deleted.
pipeline.delete()or
client.delete_pipeline(pipeline_id="<my-pipeline-id>")Update a Pipeline
Only stopped (or terminated) pipelines can be updated.
pipeline.update(config_patch=<pipeline config patch>)Example:
pipeline.update(config_patch={"join": {"type": "join", "enabled": true}})or update the full pipeline config:
pipeline.update(config_patch=<new pipeline config>)Rename a Pipeline
Rename a pipeline without stopping it — the new name is reflected in the UI and API immediately.
pipeline.rename("My renamed pipeline")Dead-Letter Queue (DLQ)
Get DLQ State
dlq_state = pipeline.dlq.state()
print(dlq_state)Output:
{
"last_consumed_at": "2026-05-20T10:32:38.113427621Z",
"last_received_at": "2026-05-20T10:23:26.530466989Z",
"total_messages": 2,
"unconsumed_messages": 1
}Purge DLQ
Removes all messages from the DLQ. Returns None.
pipeline.dlq.purge()Read Messages from DLQ
messages = pipeline.dlq.consume(batch_size=1)
print(messages)Output:
[
{
"component": "ingestor",
"error": "failed to validate data",
"original_message": "<original kafka message>"
}
]Import / Export Pipeline Configurations
Import
Import a pipeline configuration from a JSON file:
pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")Import a pipeline configuration from a YAML file:
pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")Export
Export a pipeline configuration to a JSON file:
pipeline.to_json(json_path="<my-pipeline-config.json>")Export a pipeline configuration to a YAML file:
pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")OTLP Source Pipelines
GlassFlow can ingest OpenTelemetry signals directly without a Kafka cluster. Add an OTLP source to the sources array — no schema_fields or connection_params needed. GlassFlow manages the OTLP receiver endpoint.
Join is not supported for OTLP source pipelines. Deduplication, filter, and stateless transforms are configured through the transforms array.
pipeline_config = {
"version": "v3",
"pipeline_id": "otel-logs-pipeline",
"sources": [
{
"type": "otlp.logs",
"source_id": "logs"
}
],
"transforms": [
{
"type": "dedup",
"source_id": "logs",
"config": {
"key": "trace_id",
"time_window": "5m"
}
}
],
"sink": {
"type": "clickhouse",
"connection_params": {
"host": "clickhouse.example.com",
"port": "9000",
"database": "otel",
"username": "default",
"password": "mysecret",
"secure": True
},
"table": "otel_logs",
"mapping": [
{"name": "timestamp", "column_name": "Timestamp", "column_type": "DateTime64(9)"},
{"name": "severity_text", "column_name": "SeverityText", "column_type": "LowCardinality(String)"},
{"name": "body", "column_name": "Body", "column_type": "String"},
{"name": "trace_id", "column_name": "TraceId", "column_type": "String"}
]
}
}
pipeline = client.create_pipeline(pipeline_config)For complete examples with full column mappings for logs, traces, and metrics, see OTLP Source Examples.
Pipeline Resources
Use resources to set CPU, memory, replica counts, and storage limits per component. This is optional — omit it to use GlassFlow’s defaults.
pipeline_config = {
"version": "v3",
"pipeline_id": "my-pipeline-id",
# ... sources, transforms, sink ...
"resources": {
"sources": [
{
"source_id": "orders",
"replicas": 2,
"requests": {"cpu": "250m", "memory": "256Mi"},
"limits": {"cpu": "500m", "memory": "512Mi"}
}
],
"transform": [
{
"source_id": "orders",
"replicas": 1,
"storage": {"size": "10Gi"},
"requests": {"cpu": "500m", "memory": "512Mi"},
"limits": {"cpu": "1000m", "memory": "1Gi"}
}
],
"sink": {
"replicas": 1,
"requests": {"cpu": "250m", "memory": "256Mi"},
"limits": {"cpu": "500m", "memory": "512Mi"}
},
"nats": {
"stream": {
"max_age": "24h",
"max_bytes": "10Gi"
}
}
}
}The following resource fields are immutable — they must be set at creation time and cannot be changed by pipeline.update():
| Field | Reason |
|---|---|
transform[].storage.size | Underlying volume cannot be resized |
nats.stream.max_age | JetStream stream config is immutable |
nats.stream.max_bytes | JetStream stream config is immutable |
Migrating from V2 to V3
Pipeline version v2 has been removed in glassflow 4.0.0. Existing v2 configurations must be migrated to v3 before they can be deployed.
The SDK ships a migrate_pipeline_v2_to_v3() helper that does the migration server-side and returns a validated v3 configuration. It calls the GlassFlow API’s POST /api/v1/pipeline/migrate-preview endpoint — no pipeline is created and no state is touched, so it’s safe to call repeatedly.
from glassflow.etl import Client
client = Client(host="http://localhost:30180")
v3_config = client.migrate_pipeline_v2_to_v3(v2_config)
pipeline = client.create_pipeline(v3_config)If you prefer to migrate manually, see the Pipeline Configuration Reference for the full migration table.
Key structural changes:
| V2 | V3 |
|---|---|
source (singular) with topics[] | sources[] array, one entry per topic |
| Dedup nested in topic | transforms[] with type: "dedup" |
Top-level filter, stateless_transformation | transforms[] with type: "filter" / "stateless" |
join.sources[] with orientation | join.left_source / join.right_source |
schema.fields[] | sink.mapping[] |
pipeline_resources | resources with per-source entries |
| Flat sink fields (host, port, …) | sink.connection_params nested object |
| Sink password base64-encoded | Sink password plain text |
Next Steps
- Explore the Pipeline Configuration Reference documentation for detailed configuration options
- Learn about monitoring and observability for your pipelines