Python SDK
The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.
Installation
Install the GlassFlow Python SDK using pip:
pip install glassflow>=3.0.0Basic Usage
- Import Required Modules
from glassflow.etl import Client- Create a Client
client = Client(host="http://localhost:8081")Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with docker compose, it is http://localhost:8081).
- Create a Pipeline from JSON Config
Create a pipeline from a JSON pipeline configuration file:
pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")Pipeline Management
Get a Pipeline
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")Get Pipeline Health
Access the pipeline health status:
pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")
print(pipeline.status)Output:
<PipelineStatus.RUNNING: 'Running'>or query the latest health status:
pipeline_health = pipeline.health()
print(pipeline_health)Output:
{
    "pipeline_id": "<pipeline-id>", 
    "pipeline_name": "<pipeline-name>", 
    "overall_status": "Running", 
    "created_at": "2025-09-05T10:21:57.945135078Z", 
    "updated_at": "2025-09-05T10:21:58.192448618Z"
}List Pipelines
You can list all pipelines available in your GlassFlow deployment:
pipelines = client.list_pipelines()
print(pipelines)Output:
[
    {
        "pipeline_id": "<my-pipeline-id>", 
        "name": "<my-pipeline-name>", 
        "transformation_type": "Deduplication", 
        "created_at": "2025-09-04T15:20:24.327977208Z", 
        "status": "Running"
    }
]Read messages from Pipeline’s Dead-Letter Queue (DLQ)
Get DQL state summary:
dlq_state = pipeline.dlq.state()
print(dlq_state)Output:
{
    "last_consumed_at": "2025-09-05T10:32:38.113427621Z",
    "last_received_at": "2025-09-05T10:23:26.530466989Z",
    "total_messages": 2,
    "unconsumed_messages": 1
}and read messages from DLQ:
messages = pipeline.dlq.consume(batch_size=1)
print(messages)Output:
[
    {
        "component": "ingestor",
        "error": "failed to validate data",
        "original_message": "<original kafka message>"
    }
]Delete a Pipeline
pipeline.delete()or
client.delete_pipeline(pipeline_id="<my-pipeline-id>")Pipeline Pausing and Resuming
Pause a Pipeline
pipeline.pause()Resume a Pipeline
pipeline.resume()Import / Export Pipeline Configurations
Import
Import a pipeline configuration from a JSON file:
pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")Import a pipeline configuration from a YAML file:
pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")Export
Export a pipeline configuration to a JSON file:
pipeline.to_json(json_path="<my-pipeline-config.json>")Export a pipeline configuration to a YAML file:
pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")Next Steps
- Explore the Pipeline JSON Reference documentation for detailed configuration options
- Check out the demo scripts for more examples
- Learn about monitoring and observability for your pipelines