Python SDK
The GlassFlow Python SDK provides a programmatic way to create and manage data pipelines. This approach is ideal for developers who prefer code-based configuration, automated deployment, and integration with existing Python workflows.
Installation
Install the GlassFlow Python SDK using pip:
pip install glassflow-clickhouse-etl
Basic Usage
- Import Required Modules
from glassflow_clickhouse_etl import Pipeline, models, errors
import json
- Define Pipeline Configuration
Create a pipeline configuration using the models.PipelineConfig
class:
# Define source configuration
source_config = models.SourceConfig(
type="kafka",
provider="confluent",
connection_params=models.KafkaConnectionParams(
brokers=["localhost:9092"],
protocol="PLAINTEXT",
skip_auth=True
),
topics=[
models.TopicConfig(
name="user_events",
consumer_group_initial_offset="earliest",
schema=models.Schema(
type="json",
fields=[
models.SchemaField(name="user_id", type="string"),
models.SchemaField(name="name", type="string"),
models.SchemaField(name="email", type="string"),
models.SchemaField(name="created_at", type="string")
]
),
deduplication=models.DeduplicationConfig(
enabled=True,
id_field="user_id",
id_field_type="string",
time_window="1h"
)
)
]
)
# Define sink configuration
sink_config = models.SinkConfig(
type="clickhouse",
host="localhost",
port="8123",
username="default",
password="secret",
database="default",
table="user_events",
max_batch_size=1000,
max_delay_time="1s",
table_mapping=[
models.TableMapping(
source_id="user_events",
field_name="user_id",
column_name="user_id",
column_type="String"
),
models.TableMapping(
source_id="user_events",
field_name="name",
column_name="name",
column_type="String"
),
models.TableMapping(
source_id="user_events",
field_name="email",
column_name="email",
column_type="String"
),
models.TableMapping(
source_id="user_events",
field_name="created_at",
column_name="created_at",
column_type="DateTime"
)
]
)
# Create pipeline configuration
pipeline_config = models.PipelineConfig(
pipeline_id="user-events-pipeline",
source=source_config,
sink=sink_config
)
- Create and Deploy the Pipeline
# Create pipeline instance
pipeline = Pipeline(pipeline_config, url="http://localhost:8080")
# Check if pipeline already exists
try:
existing_pipeline_id = Pipeline().get_running_pipeline()
if existing_pipeline_id == pipeline_config.pipeline_id:
print(f"Pipeline {pipeline_config.pipeline_id} already exists")
else:
# Delete existing pipeline if different
pipeline.delete()
pipeline.create()
except errors.PipelineNotFoundError:
# Create new pipeline
pipeline.create()
print(f"Pipeline {pipeline_config.pipeline_id} created successfully")
Note: The url
parameter specifies the GlassFlow API endpoint. By default, it uses http://localhost:8080
assuming a local Docker deployment. For production deployments, specify the appropriate URL:
# Local development
pipeline = Pipeline(pipeline_config, url="http://localhost:8080")
# Production deployment
pipeline = Pipeline(pipeline_config, url="https://glassflow.yourdomain.com")
# Custom port
pipeline = Pipeline(pipeline_config, url="http://localhost:9090")
Pipeline Configuration Options
Kafka Connection Options
Plain Text Connection (Local Development)
connection_params = models.KafkaConnectionParams(
brokers=["localhost:9092"],
protocol="PLAINTEXT",
skip_auth=True
)
SASL SSL Connection (Production)
connection_params = models.KafkaConnectionParams(
brokers=["kafka-cluster.example.com:9093"],
protocol="SASL_SSL",
mechanism="SCRAM-SHA-256",
username="your_username",
password="base64_encoded_password",
root_ca="base64_encoded_certificate",
skip_auth=False
)
Kafka Data Types
The SchemaField
type parameter accepts the following Kafka data types:
'string'
- For text data, including datetime strings'int8'
,'int16'
,'int32'
,'int64'
- For integer values'float32'
,'float64'
- For floating-point numbers'bool'
- For boolean values'bytes'
- For binary data
ClickHouse Connection Options
Basic Connection
sink_config = models.SinkConfig(
type="clickhouse",
host="localhost",
port="8123",
username="default",
password="secret",
database="default",
table="user_events",
secure=False
)
Secure Connection (TLS)
sink_config = models.SinkConfig(
type="clickhouse",
host="clickhouse-cluster.example.com",
port="8443",
username="default",
password="base64_encoded_password",
database="default",
table="user_events",
secure=True
)
Deduplication Configuration
deduplication = models.DeduplicationConfig(
enabled=True,
id_field="user_id", # Field to use for deduplication
id_field_type="string", # Data type of the ID field
time_window="1h" # Time window for deduplication
)
Time Window Options:
"30s"
- 30 seconds"1m"
- 1 minute"1h"
- 1 hour"12h"
- 12 hours"24h"
- 24 hours
Sink Configuration
sink_config = models.SinkConfig(
# ... other config
max_batch_size=1000, # Maximum records per batch
max_delay_time="1s" # Maximum time to wait before flushing batch
)
Configuration Format Requirements
- Port fields: Must be strings (e.g.,
port="8123"
notport=8123
) - Time windows: Use string format (e.g.,
"1h"
,"30s"
,"12h"
) - Passwords: Should be base64 encoded strings
- Batch sizes: Must be integers
Pipeline Management
Creating a Pipeline
def create_pipeline():
# Define your pipeline configuration
pipeline_config = models.PipelineConfig(
pipeline_id="my-pipeline",
source=source_config,
sink=sink_config
)
# Create pipeline instance
pipeline = Pipeline(pipeline_config, url="http://localhost:8080")
# Check if pipeline already exists
try:
existing_pipeline_id = Pipeline().get_running_pipeline()
if existing_pipeline_id == pipeline_config.pipeline_id:
print(f"Pipeline {pipeline_config.pipeline_id} already exists")
else:
# Delete existing pipeline if different
pipeline.delete()
pipeline.create()
except errors.PipelineNotFoundError:
# Create new pipeline
pipeline.create()
print(f"Pipeline {pipeline_config.pipeline_id} created successfully")
Deleting a Pipeline
Important: Only one pipeline can be active at a time in the current version. Deleting a pipeline will stop the currently running pipeline.
def delete_pipeline():
"""Delete the currently running pipeline"""
try:
pipeline = Pipeline(url="http://localhost:8080")
pipeline.delete()
print("Pipeline deleted successfully")
except errors.PipelineNotFoundError:
print("No running pipeline found")
except Exception as e:
print(f"Error deleting pipeline: {e}")
# Simple deletion at script end
if __name__ == "__main__":
Pipeline(url="http://localhost:8080").delete()
Checking Pipeline Status
def check_pipeline_status():
"""Check if a pipeline is currently running"""
try:
pipeline_id = Pipeline(url="http://localhost:8080").get_running_pipeline()
print(f"Pipeline {pipeline_id} is currently running")
return pipeline_id
except errors.PipelineNotFoundError:
print("No pipeline is currently running")
return None
Advanced Configuration
Joining Multiple Topics
To create a pipeline that joins multiple Kafka topics:
# Define join configuration
join_config = models.JoinConfig(
enabled=True,
type="temporal",
sources=[
models.JoinSourceConfig(
source_id="user_events",
join_key="user_id",
time_window="1h",
orientation="left"
),
models.JoinSourceConfig(
source_id="order_events",
join_key="user_id",
time_window="1h",
orientation="right"
)
]
)
# Add join config to pipeline
pipeline_config.join = join_config
Loading Configuration from JSON
You can also load pipeline configuration from a JSON file:
def load_pipeline_config(config_path: str) -> models.PipelineConfig:
"""Load pipeline configuration from a JSON file"""
with open(config_path, 'r') as f:
config_data = json.load(f)
return models.PipelineConfig(**config_data)
# Load configuration
pipeline_config = load_pipeline_config('pipeline_config.json')
Error Handling
The SDK provides comprehensive error handling:
try:
pipeline.create()
except errors.PipelineAlreadyExistsError:
print("Pipeline already exists")
except errors.ConnectionError:
print("Failed to connect to GlassFlow. Make sure it's running.")
except errors.PipelineConfigError as e:
print(f"Configuration error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Complete Example
Here’s a complete example that demonstrates creating a deduplication pipeline:
from glassflow_clickhouse_etl import Pipeline, models, errors
import time
def create_deduplication_pipeline():
"""Create a pipeline that deduplicates user events"""
# Pipeline configuration
config = models.PipelineConfig(
pipeline_id="user-deduplication-pipeline",
source=models.SourceConfig(
type="kafka",
connection_params=models.KafkaConnectionParams(
brokers=["localhost:9092"],
protocol="PLAINTEXT",
skip_auth=True
),
topics=[
models.TopicConfig(
name="user_events",
consumer_group_initial_offset="earliest",
schema=models.Schema(
type="json",
fields=[
models.SchemaField(name="event_id", type="string"),
models.SchemaField(name="user_id", type="string"),
models.SchemaField(name="action", type="string"),
models.SchemaField(name="timestamp", type="string")
]
),
deduplication=models.DeduplicationConfig(
enabled=True,
id_field="event_id",
id_field_type="string",
time_window="12h"
)
)
]
),
sink=models.SinkConfig(
type="clickhouse",
host="localhost",
port="8123",
username="default",
password="secret",
database="default",
table="user_events_dedup",
max_batch_size=1000,
max_delay_time="1s",
table_mapping=[
models.TableMapping(
source_id="user_events",
field_name="event_id",
column_name="event_id",
column_type="String"
),
models.TableMapping(
source_id="user_events",
field_name="user_id",
column_name="user_id",
column_type="String"
),
models.TableMapping(
source_id="user_events",
field_name="action",
column_name="action",
column_type="String"
),
models.TableMapping(
source_id="user_events",
field_name="timestamp",
column_name="timestamp",
column_type="DateTime"
)
]
)
)
# Create and deploy pipeline
pipeline = Pipeline(config, url="http://localhost:8080")
try:
pipeline.create()
print(f"Pipeline {config.pipeline_id} created successfully")
# Wait for pipeline to start
time.sleep(10)
print("Pipeline is ready to process events")
except errors.PipelineAlreadyExistsError:
print(f"Pipeline {config.pipeline_id} already exists")
except Exception as e:
print(f"Error creating pipeline: {e}")
if __name__ == "__main__":
create_deduplication_pipeline()
Next Steps
- Explore the Pipeline Configuration documentation for detailed configuration options
- Check out the demo scripts for more examples
- Learn about monitoring and observability for your pipelines