Amazon DynamoDB

Integrating Amazon DynamoDB as a Sink with GlassFlow

This tutorial will guide you through integrating AWS DynamoDB as a data sink with GlassFlow. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability. By using DynamoDB as a data sink, you can store and manage your processed data efficiently.

Prerequisites

Before you start, make sure you have the following:

Scenario overview

This project simulates a scenario where Netflix viewing metrics data is monitored and processed in real-time to improve user experience and content delivery. You generate sample Netflix viewing metrics data and ingest metrics data into GlassFlow, analyze the user reactions based on the watch frequency, categorize movie reactions into like, favorite, or dislike.

Step 1: AWS Configuration

Create IAM user

Set up authentication credentials for your AWS account using either the IAM Console or the AWS CLI. You can either choose an existing root user or create a new one.

For instructions about how to create a user using the IAM Console, see Creating IAM users. Once the user has been created, see Managing access keys to learn how to create and retrieve the keys used to authenticate the user.

Copy both generated aws_access_key_id and aws_secret_access_key, you will use them when you configure the local credentials file and to set environment variables.

Configure credentials file

Use AWS CLI installed to configure your credentials file by running the below command:

aws configure

Alternatively, you can create the credentials file yourself. By default, its location is ~/.aws/credentials. in MacOS/Linux operating systems At a minimum, the credentials file should specify the access key and secret access key. In this example, the key and secret key for the account are specified in the default profile:

[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY

You may also want to add a default region to the AWS configuration file, which is located by default at ~/.aws/config:

[default]
region=us-west-2

Alternatively, you can pass an AWS_REGION name as an environment variable when creating clients and resources.

You have now configured credentials for the default profile as well as a default region to use when creating connections. See Configuration for in-depth configuration sources and options.

Step 2: Create a GlassFlow Pipeline

  1. Log in to GlassFlow WebApp:

  2. Create a New Space:

    • Go to the “Spaces” page and create a new space named dynamodb-integration to organize your pipelines.

  3. Create a New Pipeline:

    • Within the new space, go to the “Pipelines” page and click “Create Pipeline.”

    • Provide a name for the pipeline, e.g., netflix-metrics-to-dynamodb.

    • Choose the space you created (e.g., dynamodb-integration).

  4. Configure a Data Source:

    • Select "SDK". The GlassFlow SDK option requires you to implement the logic for sending the sample data to the GlassFlow pipeline in Python.

  5. Add Transformation Stage (Optional):

    • To transform the incoming data, add a transformation function in GlassFlow.

    • Create a Python script transform.py that defines the logic for analyzing watch frequency from Netflix view metrics.

    import json
    
    
    def handler(data, log):
        log.info("Event:" + json.dumps(data), data=data)
    
        if data["watchfrequency"] == 1:
            data["reaction"] = "dislike"
        elif data["watchfrequency"] > 5:
            data["reaction"] = "favourite"
        else:
            data["reaction"] = "like"
    
        return data
  6. Configure a Data Sink:

    • Select "SDK". The GlassFlow SDK option requires you to implement the logic for storing transformed data in DynamoDB.

  7. Confirm Pipeline Creation:

    • Review your pipeline configuration and click “Create Pipeline.”

    • Copy the new Pipeline ID and Access Token for future reference.

Step 3: Setup a Sample Project

Create a folder

Start by creating a dedicated project folder. Create a directory for this project named glassflow-aws-dynamodb, where you will place all the necessary files.

mkdir glassflow-aws-dynamodb
cd glassflow-aws-dynamodb

Create a new virtual environment

Create a new virtual environment in the same folder and activate that environment:

python -m venv .venv && source .venv/bin/activate

Install libraries

Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenvusing pip.

pip install glassflow python-dotenv faker boto3

Create an environment configuration file

Create a .env file in your project directory with the following content:

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=your_aws_region
AWS_DYNAMODB_TABLE_NAME=NetflixViewMetrics
PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Replace placeholders with the actual values from your GlassFlow account and AWS credentials.

Step 4: Generate and Publish Data to GlassFlow Pipeline

Here's a sample Python script producer.py to generate Netflix view metrics and push them to your GlassFlow pipeline:

import json
import random
from faker import Faker
import glassflow

# Initialize Faker for generating sample data
fake = Faker()

# Replace these values with your GlassFlow pipeline details
PIPELINE_ID = 'your_pipeline_id'
PIPELINE_ACCESS_TOKEN = 'your_pipeline_access_token'

# Initialize GlassFlow client
pipeline_client = glassflow.GlassFlowClient().pipeline_client(
	pipeline_id=PIPELINE_ID,
	pipeline_access_token=PIPELINE_ACCESS_TOKEN
)

# Generate sample Netflix view metrics data
def generate_sample_data():
    data = {
        "userid": fake.uuid4(),
        "channelid": fake.pyint(min_value=1, max_value=50),
        "genre": random.choice(["thriller", "comedy", "romcom", "fiction"]),
        "lastactive": fake.date_time_between(start_date="-10m", end_date="now").isoformat(),
        "title": fake.name(),
        "watchfrequency": fake.pyint(min_value=1, max_value=10),
        "etags": fake.uuid4()
    }
    return data

# Push data to GlassFlow pipeline
for _ in range(5):  # Generate 5 sample records
    sample_data = generate_sample_data()
    response = pipeline_client.publish(data)
    if response.status_code == 200:
        print(f"Successfully pushed data: {sample_data}")
    else:
        print(f"Failed to push data: {response.text}")

print("Data pushing completed.")

Execute the Python script producer.py to send sample Netflix view metrics to GlassFlow:

python producer.py

Step 5: Consume Data from the GlassFlow pipeline and Store it in DynamoDB

We use Boto3 (AWS SDK for Python) to create a table NetflixViewMetrics and store processed Netflix view metrics data in DynamoDB. Write a Python script to do so. You can find a sample DynamoDB connector code using GlassFlow Python SDK on GitHub:

It's time to validate the data stored in the DynamoDB NoSQL database. Go to the DynamoDB service and the table NetflixViewMetricsis created during the configuration, we can see the data is loading continuously:

That's it! You have successfully used GlassFlow to process real-time data and deliver your streaming data to the DynamoDB.

Conclusion

By following these steps, you have successfully integrated AWS DynamoDB as a data sink with GlassFlow and pushed sample Netflix view metrics data using the GlassFlow Python SDK. This setup enables efficient data management with DynamoDB and provides a robust solution for handling and storing processed data.

Last updated

Logo

© 2023 GlassFlow