Python SDK
The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.
Installation
Install the GlassFlow Python SDK using pip:
pip install glassflow>=3.6.0Pipeline version notice: The default pipeline version is now
v3. If you have existingv2configurations, see Migrating from V2 to V3 below. Pipeline versionv2is deprecated and will be removed in a future release.
Basic Usage
- Import Required Modules
from glassflow.etl import Client- Create a Client
client = Client(host="http://localhost:30180")Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with GlassFlow CLI, it is http://localhost:30180).
- Create a Pipeline from JSON Config
Create a pipeline from a JSON pipeline configuration file:
pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")Pipeline Management
Get a Pipeline
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")Get Pipeline Health
Access the pipeline health status:
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")
print(pipeline.status)Output:
<PipelineStatus.RUNNING: 'Running'>or query the latest health status:
pipeline_health = pipeline.health()
print(pipeline_health)Output:
{
"pipeline_id": "<pipeline-id>",
"pipeline_name": "<pipeline-name>",
"overall_status": "Running",
"created_at": "2025-09-05T10:21:57.945135078Z",
"updated_at": "2025-09-05T10:21:58.192448618Z"
}List Pipelines
You can list all pipelines available in your GlassFlow deployment:
pipelines = client.list_pipelines()
print(pipelines)Output:
[
{
"pipeline_id": "<my-pipeline-id>",
"name": "<my-pipeline-name>",
"transformation_type": "Deduplication",
"created_at": "2025-09-04T15:20:24.327977208Z",
"updated_at": "2025-09-04T15:20:24.327977208Z",
"status": "Running",
"metadata": {
"tags": ["demo"]
}
}
]Stop a Pipeline
Stopping a pipeline will stop ingesting new messages and once all messages in the queue are processed, the pipeline will stop.
pipeline.stop()Terminate a Pipeline
Terminating a pipeline will stop ingesting new messages and immediately terminate the pipeline.
pipeline.stop(terminate=True)Resume a Pipeline
Resume will bring the pipeline back to running state from stopped state.
pipeline.resume()Delete a Pipeline
Only stopped (or terminated) pipelines can be deleted.
pipeline.delete()or
client.delete_pipeline(pipeline_id="<my-pipeline-id>")Update a Pipeline
Only stopped (or terminated) pipelines can be updated.
pipeline.update(config_patch=<pipeline config patch>)Example:
pipeline.update(config_patch={"join": {"type": "join", "enabled": true}})or update the full pipeline config:
pipeline.update(config_patch=<new pipeline config>)Dead-Letter Queue (DLQ)
Get DLQ State
dlq_state = pipeline.dlq.state()
print(dlq_state)Purge DLQ
pipeline.dlq.purge()Output:
{
"last_consumed_at": "2025-09-05T10:32:38.113427621Z",
"last_received_at": "2025-09-05T10:23:26.530466989Z",
"total_messages": 2,
"unconsumed_messages": 1
}Read Messages from DLQ
messages = pipeline.dlq.consume(batch_size=1)
print(messages)Output:
[
{
"component": "ingestor",
"error": "failed to validate data",
"original_message": "<original kafka message>"
}
]Import / Export Pipeline Configurations
Import
Import a pipeline configuration from a JSON file:
pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")Import a pipeline configuration from a YAML file:
pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")Export
Export a pipeline configuration to a JSON file:
pipeline.to_json(json_path="<my-pipeline-config.json>")Export a pipeline configuration to a YAML file:
pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")OTLP Source Pipelines
GlassFlow can ingest OpenTelemetry signals directly without a Kafka cluster. Add an OTLP source to the sources array — no schema_fields or connection_params needed. GlassFlow manages the OTLP receiver endpoint.
Join is not supported for OTLP source pipelines. Deduplication, filter, and stateless transforms are configured through the transforms array.
pipeline_config = {
"version": "v3",
"pipeline_id": "otel-logs-pipeline",
"sources": [
{
"type": "otlp.logs",
"source_id": "logs"
}
],
"transforms": [
{
"type": "dedup",
"source_id": "logs",
"config": {
"key": "trace_id",
"time_window": "5m"
}
}
],
"sink": {
"type": "clickhouse",
"connection_params": {
"host": "clickhouse.example.com",
"port": "9000",
"database": "otel",
"username": "default",
"password": "mysecret",
"secure": True
},
"table": "otel_logs",
"mapping": [
{"name": "timestamp", "column_name": "Timestamp", "column_type": "DateTime64(9)"},
{"name": "severity_text", "column_name": "SeverityText", "column_type": "LowCardinality(String)"},
{"name": "body", "column_name": "Body", "column_type": "String"},
{"name": "trace_id", "column_name": "TraceId", "column_type": "String"}
]
}
}
pipeline = client.create_pipeline(pipeline_config)For complete examples with full column mappings for logs, traces, and metrics, see OTLP Source Examples.
Pipeline Resources
Use resources to set CPU, memory, replica counts, and storage limits per component. This is optional — omit it to use GlassFlow’s defaults.
pipeline_config = {
"version": "v3",
"pipeline_id": "my-pipeline-id",
# ... sources, transforms, sink ...
"resources": {
"sources": [
{
"source_id": "orders",
"replicas": 2,
"requests": {"cpu": "250m", "memory": "256Mi"},
"limits": {"cpu": "500m", "memory": "512Mi"}
}
],
"transform": [
{
"source_id": "orders",
"replicas": 1,
"storage": {"size": "10Gi"},
"requests": {"cpu": "500m", "memory": "512Mi"},
"limits": {"cpu": "1000m", "memory": "1Gi"}
}
],
"sink": {
"replicas": 1,
"requests": {"cpu": "250m", "memory": "256Mi"},
"limits": {"cpu": "500m", "memory": "512Mi"}
},
"nats": {
"stream": {
"max_age": "24h",
"max_bytes": "10Gi"
}
}
}
}The following resource fields are immutable — they must be set at creation time and cannot be changed by pipeline.update():
| Field | Reason |
|---|---|
transform[].storage.size | Underlying volume cannot be resized |
nats.stream.max_age | JetStream stream config is immutable |
nats.stream.max_bytes | JetStream stream config is immutable |
Migrating from V2 to V3
Pipeline version v2 is deprecated. See the Pipeline Configuration Reference for the full V2 → V3 migration table.
Key structural changes:
| V2 | V3 |
|---|---|
source (singular) with topics[] | sources[] array, one entry per topic |
| Dedup nested in topic | transforms[] with type: "dedup" |
Top-level filter, stateless_transformation | transforms[] with type: "filter" / "stateless" |
join.sources[] with orientation | join.left_source / join.right_source |
schema.fields[] | sink.mapping[] |
pipeline_resources | resources with per-source entries |
| Flat sink fields (host, port, …) | sink.connection_params nested object |
| Sink password base64-encoded | Sink password plain text |
Next Steps
- Explore the Pipeline Configuration Reference documentation for detailed configuration options
- Learn about monitoring and observability for your pipelines