Python SDK
The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.
Installation
Install the GlassFlow Python SDK using pip:
pip install glassflow>=3.0.0
Basic Usage
- Import Required Modules
from glassflow.etl import Client
- Create a Client
client = Client(host="http://localhost:8081")
Note: The host
parameter specifies the GlassFlow API host (if GlassFlow is running locally with docker compose, it is http://localhost:8081
).
- Create a Pipeline from JSON Config
Create a pipeline from a JSON pipeline configuration file:
pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")
Pipeline Management
Get a Pipeline
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")
Get Pipeline Health
pipeline_health = pipeline.health()
print(pipeline_health)
Output:
{
"pipeline_id": "<pipeline-id>",
"pipeline_name": "<pipeline-name>",
"overall_status": "Running",
"created_at": "2025-09-05T10:21:57.945135078Z",
"updated_at": "2025-09-05T10:21:58.192448618Z"
}
List Pipelines
You can list all pipelines available in your GlassFlow deployment:
pipelines = client.list_pipelines()
print(pipelines)
Output:
[
{
"pipeline_id": "<my-pipeline-id>",
"name": "<my-pipeline-name>",
"transformation_type": "Deduplication",
"created_at": "2025-09-04T15:20:24.327977208Z",
"status": "Running"
}
]
Read messages from Pipeline’s Dead-Letter Queue (DLQ)
Get DQL state summary:
dlq_state = pipeline.dlq.state()
print(dlq_state)
Output:
{
"last_consumed_at": "2025-09-05T10:32:38.113427621Z",
"last_received_at": "2025-09-05T10:23:26.530466989Z",
"total_messages": 2,
"unconsumed_messages": 1
}
and read messages from DLQ:
messages = pipeline.dlq.consume(batch_size=1)
print(messages)
Output:
[
{
"component": "ingestor",
"error": "failed to validate data",
"original_message": "<original kafka message>"
}
]
Delete a Pipeline
pipeline.delete()
or
client.delete_pipeline(pipeline_id="<my-pipeline-id>")
Import / Export Pipeline Configurations
Import
Import a pipeline configuration from a JSON file:
pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")
Import a pipeline configuration from a YAML file:
pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")
Export
Export a pipeline configuration to a JSON file:
pipeline.to_json(json_path="<my-pipeline-config.json>")
Export a pipeline configuration to a YAML file:
pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")
Next Steps
- Explore the Pipeline Configuration documentation for detailed configuration options
- Check out the demo scripts for more examples
- Learn about monitoring and observability for your pipelines