Skip to Content
Usage GuidePython SDK

Python SDK

The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.

Installation

Install the GlassFlow Python SDK using pip:

pip install glassflow>=3.0.0

Basic Usage

  1. Import Required Modules
from glassflow.etl import Client
  1. Create a Client
client = Client(host="http://localhost:8081")

Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with docker compose, it is http://localhost:8081).

  1. Create a Pipeline from JSON Config

Create a pipeline from a JSON pipeline configuration file:

pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")

Pipeline Management

Get a Pipeline

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")

Get Pipeline Health

Access the pipeline health status:

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>") print(pipeline.status)

Output:

<PipelineStatus.RUNNING: 'Running'>

or query the latest health status:

pipeline_health = pipeline.health() print(pipeline_health)

Output:

{ "pipeline_id": "<pipeline-id>", "pipeline_name": "<pipeline-name>", "overall_status": "Running", "created_at": "2025-09-05T10:21:57.945135078Z", "updated_at": "2025-09-05T10:21:58.192448618Z" }

List Pipelines

You can list all pipelines available in your GlassFlow deployment:

pipelines = client.list_pipelines() print(pipelines)

Output:

[ { "pipeline_id": "<my-pipeline-id>", "name": "<my-pipeline-name>", "transformation_type": "Deduplication", "created_at": "2025-09-04T15:20:24.327977208Z", "status": "Running" } ]

Read messages from Pipeline’s Dead-Letter Queue (DLQ)

Get DQL state summary:

dlq_state = pipeline.dlq.state() print(dlq_state)

Output:

{ "last_consumed_at": "2025-09-05T10:32:38.113427621Z", "last_received_at": "2025-09-05T10:23:26.530466989Z", "total_messages": 2, "unconsumed_messages": 1 }

and read messages from DLQ:

messages = pipeline.dlq.consume(batch_size=1) print(messages)

Output:

[ { "component": "ingestor", "error": "failed to validate data", "original_message": "<original kafka message>" } ]

Delete a Pipeline

pipeline.delete()

or

client.delete_pipeline(pipeline_id="<my-pipeline-id>")

Pipeline Pausing and Resuming

Pause a Pipeline

pipeline.pause()

Resume a Pipeline

pipeline.resume()

Import / Export Pipeline Configurations

Import

Import a pipeline configuration from a JSON file:

pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")

Import a pipeline configuration from a YAML file:

pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")

Export

Export a pipeline configuration to a JSON file:

pipeline.to_json(json_path="<my-pipeline-config.json>")

Export a pipeline configuration to a YAML file:

pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")

Next Steps

Last updated on