Skip to Content
Usage GuidePython SDK

Python SDK

The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.

Installation

Install the GlassFlow Python SDK using pip:

pip install glassflow>=3.6.0

Basic Usage

  1. Import Required Modules
from glassflow.etl import Client
  1. Create a Client
client = Client(host="http://localhost:30180")

Note: The host parameter specifies the GlassFlow API host (if GlassFlow is running locally with GlassFlow CLI, it is http://localhost:30180).

  1. Create a Pipeline from JSON Config

Create a pipeline from a JSON pipeline configuration file:

pipeline = client.create_pipeline(pipeline_config_json_path="pipeline_config.json")

Pipeline Management

Get a Pipeline

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>")

Get Pipeline Health

Access the pipeline health status:

pipeline = client.get_pipeline(pipeline_id="<my-pipeline-id>") print(pipeline.status)

Output:

<PipelineStatus.RUNNING: 'Running'>

or query the latest health status:

pipeline_health = pipeline.health() print(pipeline_health)

Output:

{ "pipeline_id": "<pipeline-id>", "pipeline_name": "<pipeline-name>", "overall_status": "Running", "created_at": "2025-09-05T10:21:57.945135078Z", "updated_at": "2025-09-05T10:21:58.192448618Z" }

List Pipelines

You can list all pipelines available in your GlassFlow deployment:

pipelines = client.list_pipelines() print(pipelines)

Output:

[ { "pipeline_id": "<my-pipeline-id>", "name": "<my-pipeline-name>", "transformation_type": "Deduplication", "created_at": "2025-09-04T15:20:24.327977208Z", "updated_at": "2025-09-04T15:20:24.327977208Z", "status": "Running", "metadata": { "tags": ["demo"] } } ]

Stop a Pipeline

Stopping a pipeline will stop ingesting new messages and once all messages in the queue are processed, the pipeline will stop.

pipeline.stop()

Terminate a Pipeline

Terminating a pipeline will stop ingesting new messages and immediately terminate the pipeline.

pipeline.stop(terminate=True)

Resume a Pipeline

Resume will bring the pipeline back to running state from stopped state.

pipeline.resume()

Delete a Pipeline

Only stopped (or terminated) pipelines can be deleted.

pipeline.delete()

or

client.delete_pipeline(pipeline_id="<my-pipeline-id>")

Update a Pipeline

Only stopped (or terminated) pipelines can be updated.

pipeline.update(config_patch=<pipeline config patch>)

Example:

pipeline.update(config_patch={"join": {"type": "join", "enabled": true}})

or update the full pipeline config:

pipeline.update(config_patch=<new pipeline config>)

Dead-Letter Queue (DLQ)

Get DLQ State

dlq_state = pipeline.dlq.state() print(dlq_state)

Purge DLQ

pipeline.dlq.purge()

Output:

{ "last_consumed_at": "2025-09-05T10:32:38.113427621Z", "last_received_at": "2025-09-05T10:23:26.530466989Z", "total_messages": 2, "unconsumed_messages": 1 }

Read Messages from DLQ

messages = pipeline.dlq.consume(batch_size=1) print(messages)

Output:

[ { "component": "ingestor", "error": "failed to validate data", "original_message": "<original kafka message>" } ]

Import / Export Pipeline Configurations

Import

Import a pipeline configuration from a JSON file:

pipeline_config = client.create_pipeline(pipeline_config_json_path="<my-pipeline-config.json>")

Import a pipeline configuration from a YAML file:

pipeline_config = client.create_pipeline(pipeline_config_yaml_path="<my-pipeline-config.yaml>")

Export

Export a pipeline configuration to a JSON file:

pipeline.to_json(json_path="<my-pipeline-config.json>")

Export a pipeline configuration to a YAML file:

pipeline.to_yaml(yaml_path="<my-pipeline-config.yaml>")

Next Steps

Last updated on