Google Pub/Sub

Integrating Google Pub/Sub as a Source with GlassFlow using managed connector

Google Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. In this guide, you will learn how to process and transform real-time e-commerce order data with GlassFlow and Google Cloud Pub/Sub service. The following is the sequence of events:

  1. A data producer publishes an order details message to a Pub/Sub topic on Google PubSub service whenever a new order is placed. These order details include order ID, product ID, quantity, and order timestamp.

  2. A subscriber client creates a subscription to that topic and consumes messages from the subscription. While consuming data in real-time, we use GlassFlow to transform it to add additional insights (e.g., estimated delivery times based on order timestamp), and then make it available for further processing or analytics.

GlassFlow has a built-in managed connector for the Google Pub/Sub service. This integration can ingest data directly from Pub/Sub and push events to the GlassFlow pipeline.

Prerequisites

To complete the tutorial you'll need the following:

  1. Python is installed on your machine.

  2. Download and Install Pip to manage project packages.

  3. Follow steps from 1 to 7 under Before you begin section of guidance on Google Pub/Sub get started documentation to install the Google Cloud CLI and create a new Google Cloud project called glassflow-data-pipeline.

Step 1: Set Up Google Pub/Sub

Create a topic and a subscription on Google Pub/Sub

Use the following gcloud pubsub topics create command to create a topic named ecommerce-orders. Don't change the name of the topic, because it's referenced throughout the rest of the tutorial.

gcloud pubsub topics create ecommerce-orders

Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud pubsub subscriptions create ecommerce-orders-sub --topic my-topic

Step 2: Create a GlassFlow Pipeline

  1. Log in to GlassFlow WebApp:

  2. Create a New Space:

    • Go to the “Spaces” page and create a new space called google-pubsub-integration to organize your pipelines.

  3. Create a New Pipeline:

    • Within the new space, go to the “Pipelines” page and click “Create Pipeline.”

    • Provide a name for the pipeline, e.g., pubsub-to-glassflow.

    • Choose the space you created (e.g., google-pubsub-integration).

  4. Configure Google Pub/Sub Source Connector:

    • Select “Google Pub/Sub” from the list of built-in integrations.

    • Enter your Google Cloud Project ID (glassflow-data-pipeline) and the Subscription ID (e.g., ecommerce-orders-sub) you created.

    • Copy and paste the JSON key file content into Credentials JSON for authentication.

  5. Add Transformation Stage (Optional):

    • To transform the incoming data, add a transformation function in GlassFlow.

    • Create a Python script transform.py that defines how order data will be transformed. For example, adding an estimated delivery date based on the order timestamp.

      # transform.py
      
      import json
      from datetime import datetime, timedelta
      
      
      def handler(data, log):
          log.info("Event:" + json.dumps(data), data=data)
      
          order_timestamp = datetime.strptime(data["order_timestamp"], "%Y-%m-%dT%H:%M:%SZ")
          # Assume delivery takes 3 days
          estimated_delivery = order_timestamp + timedelta(days=3)
          data["estimated_delivery"] = estimated_delivery.strftime("%Y-%m-%d")
          return data

      Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not run successfully.

  6. Configure Data Sink:

    • Choose a data sink as an SDK for your processed data. We use GlassFlow Python SDK to consume data from the GlassFlow pipeline in Python. You can also use built-in connectors like Webhook.

    • Configure the sink according to your needs (e.g., connection details).

  7. Confirm Pipeline Creation:

    • Review your pipeline configuration and click “Create Pipeline.”

    • Copy the new Pipeline ID and Access Token for future reference.

Step 3: Setup a Sample Project

Create a folder

Start by creating a dedicated project folder. Create a directory for this project named glassflow-google-pubsub, where you will place all the necessary files.

mkdir glassflow-google-pubsub
cd glassflow-google-pubsub

Create a new virtual environment

Create a new virtual environment in the same folder and activate that environment:

python -m venv .venv && source .venv/bin/activate

Install libraries

Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenvusing pip.

pip install glassflow python-dotenv google-cloud-pubsub

Create an environment configuration file

Create a .env file in your project directory with the following content, replacing the placeholders with your actual Google Pub/Sub project_id,topic_id and subscription_id:

PROJECT_ID=glassflow-data-pipeline
TOPIC_ID=ecommerce-orders
SUBSCRIPTION_ID=ecommerce-orders-sub
PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Step 4: Publish Messages to Google Pub/Sub

Simulate publishing order data to the topic ecommerce-orders. You use the Google Cloud SDK to do so. Create a pubsub_publisher.py Python script file and copy and paste the following code:

# pubsub_publisher.py

from google.cloud import pubsub_v1
import json
from dotenv import dotenv_values

config = dotenv_values(".env")
project_id = config.get("PROJECT_ID")
topic_id = config.get("TOPIC_ID")

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

order_data = {
    "order_id": "12345",
    "product_id": "67890",
    "quantity": 2,
    "order_timestamp": "2024-01-01T12:00:00Z",
}

future = publisher.publish(topic_path, data=json.dumps(order_data).encode("utf-8"))

print(f"Published message ID: {future.result()}")

Run the Python script and you will get the published message ID printed on your console:

Published message ID: 10478246062331482

Step 5: Consume data from the GlassFlow pipeline

Create a new Python file called consumer.pyand consume transformed data from the GlassFlow pipeline. The pipeline continuously checks for new data from the Google Pub/Sub service, transforms it as needed, and consumes it.

import glassflow

pipeline_client = glassflow.GlassFlowClient().pipeline_client(
	pipeline_id="your_pipeline_id",
	pipeline_access_token="PIPELINE_ACCESS_TOKEN"
)
response = pipeline_client.consume()

if response.status_code == 200:
	data = response.json()
	print("Consumed Data from GlassFlow pipeline...", data)

After running the Python script, you will see the transformed data in the console output something like this:

Consuming data from GlassFlow pipeline...
{
   "order_id":"12345",
   "order_timestamp":"2024-01-01T12:00:00Z",
   "product_id":"67890",
   "quantity":2,
   "estimated_delivery":"2024-01-04"
}

Now the transform function is triggered whenever a new message is published to the ecommerce-orders topic on Google Pub/Sub service.

Conclusion

By integrating Google Pub/Sub with GlassFlow, you can efficiently stream real-time data into your pipelines, apply transformations, and store the results in various data sinks. This integration allows you to leverage the power of real-time messaging for applications such as real-time analytics, monitoring, and more.

Last updated

Logo

© 2023 GlassFlow