Skip to Content
PipelineUsagePython SDK

Python SDK

The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.

Installation

Install the GlassFlow Python SDK using pip:

pip install glassflow-clickhouse-etl

Basic Usage

  1. Import Required Modules
from glassflow_clickhouse_etl import Pipeline, models, errors import json
  1. Define Pipeline Configuration

Create a pipeline configuration using the models.PipelineConfig class:

# Define source configuration source_config = models.SourceConfig( type="kafka", provider="confluent", connection_params=models.KafkaConnectionParams( brokers=["localhost:9092"], protocol="PLAINTEXT", skip_auth=True ), topics=[ models.TopicConfig( name="user_events", consumer_group_initial_offset="earliest", schema=models.Schema( type="json", fields=[ models.SchemaField(name="user_id", type="string"), models.SchemaField(name="name", type="string"), models.SchemaField(name="email", type="string"), models.SchemaField(name="created_at", type="string") ] ), deduplication=models.DeduplicationConfig( enabled=True, id_field="user_id", id_field_type="string", time_window="1h" ) ) ] ) # Define sink configuration sink_config = models.SinkConfig( type="clickhouse", host="localhost", port="8123", username="default", password="secret", database="default", table="user_events", max_batch_size=1000, max_delay_time="1s", table_mapping=[ models.TableMapping( source_id="user_events", field_name="user_id", column_name="user_id", column_type="String" ), models.TableMapping( source_id="user_events", field_name="name", column_name="name", column_type="String" ), models.TableMapping( source_id="user_events", field_name="email", column_name="email", column_type="String" ), models.TableMapping( source_id="user_events", field_name="created_at", column_name="created_at", column_type="DateTime" ) ] ) # Create pipeline configuration pipeline_config = models.PipelineConfig( pipeline_id="user-events-pipeline", source=source_config, sink=sink_config )
  1. Create and Deploy the Pipeline
# Create pipeline instance pipeline = Pipeline(pipeline_config, url="http://localhost:8080") # Check if pipeline already exists try: existing_pipeline_id = Pipeline().get_running_pipeline() if existing_pipeline_id == pipeline_config.pipeline_id: print(f"Pipeline {pipeline_config.pipeline_id} already exists") else: # Delete existing pipeline if different pipeline.delete() pipeline.create() except errors.PipelineNotFoundError: # Create new pipeline pipeline.create() print(f"Pipeline {pipeline_config.pipeline_id} created successfully")

Note: The url parameter specifies the GlassFlow API endpoint. By default, it uses http://localhost:8080 assuming a local Docker deployment. For production deployments, specify the appropriate URL:

# Local development pipeline = Pipeline(pipeline_config, url="http://localhost:8080") # Production deployment pipeline = Pipeline(pipeline_config, url="https://glassflow.yourdomain.com") # Custom port pipeline = Pipeline(pipeline_config, url="http://localhost:9090")

Pipeline Configuration Options

Kafka Connection Options

Plain Text Connection (Local Development)

connection_params = models.KafkaConnectionParams( brokers=["localhost:9092"], protocol="PLAINTEXT", skip_auth=True )

SASL SSL Connection (Production)

connection_params = models.KafkaConnectionParams( brokers=["kafka-cluster.example.com:9093"], protocol="SASL_SSL", mechanism="SCRAM-SHA-256", username="your_username", password="base64_encoded_password", root_ca="base64_encoded_certificate", skip_auth=False )

Kafka Data Types

The SchemaField type parameter accepts the following Kafka data types:

  • 'string' - For text data, including datetime strings
  • 'int8', 'int16', 'int32', 'int64' - For integer values
  • 'float32', 'float64' - For floating-point numbers
  • 'bool' - For boolean values
  • 'bytes' - For binary data

ClickHouse Connection Options

Basic Connection

sink_config = models.SinkConfig( type="clickhouse", host="localhost", port="8123", username="default", password="secret", database="default", table="user_events", secure=False )

Secure Connection (TLS)

sink_config = models.SinkConfig( type="clickhouse", host="clickhouse-cluster.example.com", port="8443", username="default", password="base64_encoded_password", database="default", table="user_events", secure=True )

Deduplication Configuration

deduplication = models.DeduplicationConfig( enabled=True, id_field="user_id", # Field to use for deduplication id_field_type="string", # Data type of the ID field time_window="1h" # Time window for deduplication )

Time Window Options:

  • "30s" - 30 seconds
  • "1m" - 1 minute
  • "1h" - 1 hour
  • "12h" - 12 hours
  • "24h" - 24 hours

Sink Configuration

sink_config = models.SinkConfig( # ... other config max_batch_size=1000, # Maximum records per batch max_delay_time="1s" # Maximum time to wait before flushing batch )

Configuration Format Requirements

  • Port fields: Must be strings (e.g., port="8123" not port=8123)
  • Time windows: Use string format (e.g., "1h", "30s", "12h")
  • Passwords: Should be base64 encoded strings
  • Batch sizes: Must be integers

Pipeline Management

Creating a Pipeline

def create_pipeline(): # Define your pipeline configuration pipeline_config = models.PipelineConfig( pipeline_id="my-pipeline", source=source_config, sink=sink_config ) # Create pipeline instance pipeline = Pipeline(pipeline_config, url="http://localhost:8080") # Check if pipeline already exists try: existing_pipeline_id = Pipeline().get_running_pipeline() if existing_pipeline_id == pipeline_config.pipeline_id: print(f"Pipeline {pipeline_config.pipeline_id} already exists") else: # Delete existing pipeline if different pipeline.delete() pipeline.create() except errors.PipelineNotFoundError: # Create new pipeline pipeline.create() print(f"Pipeline {pipeline_config.pipeline_id} created successfully")

Deleting a Pipeline

Important: Only one pipeline can be active at a time in the current version. Deleting a pipeline will stop the currently running pipeline.

def delete_pipeline(): """Delete the currently running pipeline""" try: pipeline = Pipeline(url="http://localhost:8080") pipeline.delete() print("Pipeline deleted successfully") except errors.PipelineNotFoundError: print("No running pipeline found") except Exception as e: print(f"Error deleting pipeline: {e}") # Simple deletion at script end if __name__ == "__main__": Pipeline(url="http://localhost:8080").delete()

Checking Pipeline Status

def check_pipeline_status(): """Check if a pipeline is currently running""" try: pipeline_id = Pipeline(url="http://localhost:8080").get_running_pipeline() print(f"Pipeline {pipeline_id} is currently running") return pipeline_id except errors.PipelineNotFoundError: print("No pipeline is currently running") return None

Advanced Configuration

Joining Multiple Topics

To create a pipeline that joins multiple Kafka topics:

# Define join configuration join_config = models.JoinConfig( enabled=True, type="temporal", sources=[ models.JoinSourceConfig( source_id="user_events", join_key="user_id", time_window="1h", orientation="left" ), models.JoinSourceConfig( source_id="order_events", join_key="user_id", time_window="1h", orientation="right" ) ] ) # Add join config to pipeline pipeline_config.join = join_config

Loading Configuration from JSON

You can also load pipeline configuration from a JSON file:

def load_pipeline_config(config_path: str) -> models.PipelineConfig: """Load pipeline configuration from a JSON file""" with open(config_path, 'r') as f: config_data = json.load(f) return models.PipelineConfig(**config_data) # Load configuration pipeline_config = load_pipeline_config('pipeline_config.json')

Error Handling

The SDK provides comprehensive error handling:

try: pipeline.create() except errors.PipelineAlreadyExistsError: print("Pipeline already exists") except errors.ConnectionError: print("Failed to connect to GlassFlow. Make sure it's running.") except errors.PipelineConfigError as e: print(f"Configuration error: {e}") except Exception as e: print(f"Unexpected error: {e}")

Complete Example

Here’s a complete example that demonstrates creating a deduplication pipeline:

from glassflow_clickhouse_etl import Pipeline, models, errors import time def create_deduplication_pipeline(): """Create a pipeline that deduplicates user events""" # Pipeline configuration config = models.PipelineConfig( pipeline_id="user-deduplication-pipeline", source=models.SourceConfig( type="kafka", connection_params=models.KafkaConnectionParams( brokers=["localhost:9092"], protocol="PLAINTEXT", skip_auth=True ), topics=[ models.TopicConfig( name="user_events", consumer_group_initial_offset="earliest", schema=models.Schema( type="json", fields=[ models.SchemaField(name="event_id", type="string"), models.SchemaField(name="user_id", type="string"), models.SchemaField(name="action", type="string"), models.SchemaField(name="timestamp", type="string") ] ), deduplication=models.DeduplicationConfig( enabled=True, id_field="event_id", id_field_type="string", time_window="12h" ) ) ] ), sink=models.SinkConfig( type="clickhouse", host="localhost", port="8123", username="default", password="secret", database="default", table="user_events_dedup", max_batch_size=1000, max_delay_time="1s", table_mapping=[ models.TableMapping( source_id="user_events", field_name="event_id", column_name="event_id", column_type="String" ), models.TableMapping( source_id="user_events", field_name="user_id", column_name="user_id", column_type="String" ), models.TableMapping( source_id="user_events", field_name="action", column_name="action", column_type="String" ), models.TableMapping( source_id="user_events", field_name="timestamp", column_name="timestamp", column_type="DateTime" ) ] ) ) # Create and deploy pipeline pipeline = Pipeline(config, url="http://localhost:8080") try: pipeline.create() print(f"Pipeline {config.pipeline_id} created successfully") # Wait for pipeline to start time.sleep(10) print("Pipeline is ready to process events") except errors.PipelineAlreadyExistsError: print(f"Pipeline {config.pipeline_id} already exists") except Exception as e: print(f"Error creating pipeline: {e}") if __name__ == "__main__": create_deduplication_pipeline()

Next Steps

Last updated on