Google Pub/Sub
Integrating Google Pub/Sub as a Source with GlassFlow using managed connector
Google Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. In this guide, you will learn how to process and transform real-time e-commerce order data with GlassFlow and Google Cloud Pub/Sub service. The following is the sequence of events:
A data producer publishes an order details message to a Pub/Sub topic on Google PubSub service whenever a new order is placed. These order details include order ID, product ID, quantity, and order timestamp.
A subscriber client creates a subscription to that topic and consumes messages from the subscription. While consuming data in real-time, we use GlassFlow to transform it to add additional insights (e.g., estimated delivery times based on order timestamp), and then make it available for further processing or analytics.
GlassFlow has a built-in managed connector for the Google Pub/Sub service. This integration can ingest data directly from Pub/Sub and push events to the GlassFlow pipeline.
Prerequisites
To complete the tutorial you'll need the following:
A GlassFlow account. Sign up for a free GlassFlow account.
Python is installed on your machine.
Download and Install Pip to manage project packages.
Follow steps from 1 to 7 under Before you begin section of guidance on Google Pub/Sub get started documentation to install the Google Cloud CLI and create a new Google Cloud project called
glassflow-data-pipeline
.
Step 1: Set Up Google Pub/Sub
Create a topic and a subscription on Google Pub/Sub
Use the following gcloud pubsub topics create command to create a topic named ecommerce-orders
. Don't change the name of the topic, because it's referenced throughout the rest of the tutorial.
Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.
Step 2: Create a GlassFlow Pipeline
Log in to GlassFlow WebApp:
Navigate to the GlassFlow WebApp and log in to your account.
Create a New Space:
Go to the “Spaces” page and create a new space called
google-pubsub-integration
to organize your pipelines.
Create a New Pipeline:
Within the new space, go to the “Pipelines” page and click “Create Pipeline.”
Provide a name for the pipeline, e.g.,
pubsub-to-glassflow
.Choose the space you created (e.g.,
google-pubsub-integration
).
Configure Google Pub/Sub Source Connector:
Select “Google Pub/Sub” from the list of built-in integrations.
Enter your Google Cloud Project ID (
glassflow-data-pipeline
) and the Subscription ID (e.g.,ecommerce-orders-sub
) you created.Copy and paste the JSON key file content into Credentials JSON for authentication.
Add Transformation Stage (Optional):
To transform the incoming data, add a transformation function in GlassFlow.
Create a Python script
transform.py
that defines how order data will be transformed. For example, adding an estimated delivery date based on the order timestamp.Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not run successfully.
Configure Data Sink:
Choose a data sink as an SDK for your processed data. We use GlassFlow Python SDK to consume data from the GlassFlow pipeline in Python. You can also use built-in connectors like Webhook.
Configure the sink according to your needs (e.g., connection details).
Confirm Pipeline Creation:
Review your pipeline configuration and click “Create Pipeline.”
Copy the new Pipeline ID and Access Token for future reference.
Step 3: Setup a Sample Project
Create a folder
Start by creating a dedicated project folder. Create a directory for this project named glassflow-google-pubsub
, where you will place all the necessary files.
Create a new virtual environment
Create a new virtual environment in the same folder and activate that environment:
Install libraries
Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenv
using pip
.
Create an environment configuration file
Create a .env
file in your project directory with the following content, replacing the placeholders with your actual Google Pub/Sub project_id
,topic_id
and subscription_id
:
Step 4: Publish Messages to Google Pub/Sub
Simulate publishing order data to the topic ecommerce-orders
. You use the Google Cloud SDK to do so. Create a pubsub_publisher.py
Python script file and copy and paste the following code:
Run the Python script and you will get the published message ID printed on your console:
Step 5: Consume data from the GlassFlow pipeline
Create a new Python file called consumer.py
and consume transformed data from the GlassFlow pipeline. The pipeline continuously checks for new data from the Google Pub/Sub service, transforms it as needed, and consumes it.
After running the Python script, you will see the transformed data in the console output something like this:
Now the transform function is triggered whenever a new message is published to the ecommerce-orders
topic on Google Pub/Sub service.
Conclusion
By integrating Google Pub/Sub with GlassFlow, you can efficiently stream real-time data into your pipelines, apply transformations, and store the results in various data sinks. This integration allows you to leverage the power of real-time messaging for applications such as real-time analytics, monitoring, and more.
Last updated