Google Pub/Sub

Integrating Google Pub/Sub as a Source with GlassFlow using managed connector

Google Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. In this guide, you will learn how to process and transform real-time e-commerce order data with GlassFlow and Google Cloud Pub/Sub service. The following is the sequence of events:

  1. A data producer publishes an order details message to a Pub/Sub topic on Google PubSub service whenever a new order is placed. These order details include order ID, product ID, quantity, and order timestamp.

  2. A subscriber client creates a subscription to that topic and consumes messages from the subscription. While consuming data in real-time, we use GlassFlow to transform it to add additional insights (e.g., estimated delivery times based on order timestamp), and then make it available for further processing or analytics.

GlassFlow has a built-in managed connector for the Google Pub/Sub service. This integration can ingest data directly from Pub/Sub and push events to the GlassFlow pipeline.


To complete the tutorial you'll need the following:

  1. Python is installed on your machine.

  2. Download and Install Pip to manage project packages.

  3. Follow steps from 1 to 7 under Before you begin section of guidance on Google Pub/Sub get started documentation to install the Google Cloud CLI and create a new Google Cloud project called glassflow-data-pipeline.

Step 1: Set Up Google Pub/Sub

Create a topic and a subscription on Google Pub/Sub

Use the following gcloud pubsub topics create command to create a topic named ecommerce-orders. Don't change the name of the topic, because it's referenced throughout the rest of the tutorial.

gcloud pubsub topics create ecommerce-orders

Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud pubsub subscriptions create ecommerce-orders-sub --topic my-topic

Step 2: Create a GlassFlow Pipeline

  1. Log in to GlassFlow WebApp:

  2. Create a New Space:

    • Go to the “Spaces” page and create a new space called google-pubsub-integration to organize your pipelines.

  3. Create a New Pipeline:

    • Within the new space, go to the “Pipelines” page and click “Create Pipeline.”

    • Provide a name for the pipeline, e.g., pubsub-to-glassflow.

    • Choose the space you created (e.g., google-pubsub-integration).

  4. Configure Google Pub/Sub Source Connector:

    • Select “Google Pub/Sub” from the list of built-in integrations.

    • Enter your Google Cloud Project ID (glassflow-data-pipeline) and the Subscription ID (e.g., ecommerce-orders-sub) you created.

    • Copy and paste the JSON key file content into Credentials JSON for authentication.

  5. Add Transformation Stage (Optional):

    • To transform the incoming data, add a transformation function in GlassFlow.

    • Create a Python script that defines how order data will be transformed. For example, adding an estimated delivery date based on the order timestamp.

      import json
      from datetime import datetime, timedelta
      def handler(data, log):
"Event:" + json.dumps(data), data=data)
          order_timestamp = datetime.strptime(data["order_timestamp"], "%Y-%m-%dT%H:%M:%SZ")
          # Assume delivery takes 3 days
          estimated_delivery = order_timestamp + timedelta(days=3)
          data["estimated_delivery"] = estimated_delivery.strftime("%Y-%m-%d")
          return data

      Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not run successfully.

  6. Configure Data Sink:

    • Choose a data sink as an SDK for your processed data. We use GlassFlow Python SDK to consume data from the GlassFlow pipeline in Python. You can also use built-in connectors like Webhook.

    • Configure the sink according to your needs (e.g., connection details).

  7. Confirm Pipeline Creation:

    • Review your pipeline configuration and click “Create Pipeline.”

    • Copy the new Pipeline ID and Access Token for future reference.

Step 3: Setup a Sample Project

Create a folder

Start by creating a dedicated project folder. Create a directory for this project named glassflow-google-pubsub, where you will place all the necessary files.

mkdir glassflow-google-pubsub
cd glassflow-google-pubsub

Create a new virtual environment

Create a new virtual environment in the same folder and activate that environment:

python -m venv .venv && source .venv/bin/activate

Install libraries

Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenvusing pip.

pip install glassflow python-dotenv google-cloud-pubsub

Create an environment configuration file

Create a .env file in your project directory with the following content, replacing the placeholders with your actual Google Pub/Sub project_id,topic_id and subscription_id:


Step 4: Publish Messages to Google Pub/Sub

Simulate publishing order data to the topic ecommerce-orders. You use the Google Cloud SDK to do so. Create a Python script file and copy and paste the following code:


from import pubsub_v1
import json
from dotenv import dotenv_values

config = dotenv_values(".env")
project_id = config.get("PROJECT_ID")
topic_id = config.get("TOPIC_ID")

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

order_data = {
    "order_id": "12345",
    "product_id": "67890",
    "quantity": 2,
    "order_timestamp": "2024-01-01T12:00:00Z",

future = publisher.publish(topic_path, data=json.dumps(order_data).encode("utf-8"))

print(f"Published message ID: {future.result()}")

Run the Python script and you will get the published message ID printed on your console:

Published message ID: 10478246062331482

Step 5: Consume data from the GlassFlow pipeline

Create a new Python file called consumer.pyand consume transformed data from the GlassFlow pipeline. The pipeline continuously checks for new data from the Google Pub/Sub service, transforms it as needed, and consumes it.

import glassflow

pipeline_client = glassflow.GlassFlowClient().pipeline_client(
response = pipeline_client.consume()

if response.status_code == 200:
	data = response.json()
	print("Consumed Data from GlassFlow pipeline...", data)

After running the Python script, you will see the transformed data in the console output something like this:

Consuming data from GlassFlow pipeline...

Now the transform function is triggered whenever a new message is published to the ecommerce-orders topic on Google Pub/Sub service.


By integrating Google Pub/Sub with GlassFlow, you can efficiently stream real-time data into your pipelines, apply transformations, and store the results in various data sinks. This integration allows you to leverage the power of real-time messaging for applications such as real-time analytics, monitoring, and more.

Last updated


© 2023 GlassFlow