Amazon Kinesis Data Streams

Integrating AWS Kinesis Data Streams as a Source with GlassFlow

This tutorial will guide you through integrating AWS Kinesis Data Streams with GlassFlow. AWS Kinesis is a real-time data streaming service that allows you to collect and process large streams of data records in real-time. By integrating Kinesis Data Streams with GlassFlow, you can build powerful data pipelines for real-time analytics and processing.

Prerequisites

Before you start, make sure you have the following:

Scenario overview

This project simulates a scenario where Netflix viewing metrics data is monitored and processed in real-time to improve user experience and content delivery. Generated sample data streamed continuously into Amazon Kinesis Data Streams. The goal is to ingest metrics data into GlassFlow, analyze the user reactions based on the watch frequency, categorize the reactions into like, favorite, or dislike.

Step 1: AWS Configuration

We use Boto3 (AWS SDK for Python) to create a Kinesis stream and send sample data.

Create IAM user

Before using Boto3, you need to set up authentication credentials for your AWS account using either the IAM Console or the AWS CLI. You can either choose an existing root user or create a new one.

For instructions about how to create a user using the IAM Console, see Creating IAM users. Once the user has been created, see Managing access keys to learn how to create and retrieve the keys used to authenticate the user.

Copy both generated aws_access_key_id and aws_secret_access_key, you will use them when you configure the local credentials file and to set environment variables.

Configure credentials file

Use AWS CLI installed to configure your credentials file by running the below command:

aws configure

Alternatively, you can create the credentials file yourself. By default, its location is ~/.aws/credentials. in MacOS/Linux operating systems At a minimum, the credentials file should specify the access key and secret access key. In this example, the key and secret key for the account are specified in the default profile:

[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY

You may also want to add a default region to the AWS configuration file, which is located by default at ~/.aws/config:

[default]
region=us-west-2

Alternatively, you can pass an AWS_REGION name as an environment variable when creating clients and resources.

You have now configured credentials for the default profile as well as a default region to use when creating connections. See Configuration for in-depth configuration sources and options.

Step 2: Create a GlassFlow Pipeline

  1. Log in to GlassFlow WebApp:

  2. Create a New Space:

    • Go to the “Spaces” page and create a new space named kinesis-integration to organize your pipelines.

  3. Create a New Pipeline:

    • Within the new space, go to the “Pipelines” page and click “Create Pipeline.”

    • Provide a name for the pipeline, e.g., kinesis-to-glassflow.

    • Choose the space you created (e.g., kinesis-integration).

  4. Configure a Data Source:

    • Select "SDK". The GlassFlow SDK option requires you to implement the logic for sending data from AWS Kinesis Data Streams to the GlassFlow pipeline in Python.

  5. Add Transformation Stage (Optional):

    • To transform the incoming data, add a transformation function in GlassFlow.

    • Create a Python script transform.py that defines the logic for analyzing watch frequency from Netflix view metrics.

    import json
    
    
    def handler(data, log):
        log.info("Event:" + json.dumps(data), data=data)
    
        if data["watchfrequency"] == 1:
            data["reaction"] = "dislike"
        elif data["watchfrequency"] > 5:
            data["reaction"] = "favourite"
        else:
            data["reaction"] = "like"
    
        return data
  6. Configure Data Sink SDK:

    • Choose a data sink for your processed data. You can use SDK connectors for various data stores.

    • Configure the sink according to your needs (e.g., database connection details).

  7. Confirm Pipeline Creation:

    • Review your pipeline configuration and click “Create Pipeline.”

    • Copy the new Pipeline ID and Access Token for future reference.

Step 3: Setup a Sample Project

Create a folder

Start by creating a dedicated project folder. Create a directory for this project named glassflow-aws-kinesis, where you will place all the necessary files.

mkdir glassflow-aws-kinesis
cd glassflow-aws-kinesis

Create a new virtual environment

Create a new virtual environment in the same folder and activate that environment:

python -m venv .venv && source .venv/bin/activate

Install libraries

Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenvusing pip.

pip install glassflow python-dotenv faker schedule boto3

Create an environment configuration file

Create a .env file in your project directory with the following content:

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=your_aws_region
AWS_KINESIS_STREAM_NAME=netflix-view-metrics
PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Replace placeholders with the actual values from your GlassFlow account and AWS credentials.

Step 4: Generate and Publish Data to AWS Kinesis Stream

Generate sample data stream with Netflix view events and push it to Kinesis using a Python script. It also creates a new Kinesis data stream service in your AWS environment if you do not have an existing one.

https://github.com/glassflow/glassflow-examples/blob/main/integrations/aws-kinesis-dynamodb/generator_kinesis/generator.py
import json
import random
import boto3
from faker import Faker
from botocore.exceptions import ClientError
from dotenv import load_dotenv
import os
import schedule


class DataGenerator:
    def __init__(self):
        self.fake = Faker()
        self.userid = 0
        self.channelid = 0
        self.genre = ""
        self.lastactive = None
        self.title = ""
        self.watchfrequency = 0
        self.etags = None

    def generate_record(self):
        data = {
            "userid": self.fake.uuid4(),
            "channelid": self.fake.pyint(min_value=1, max_value=50),
            "genre": random.choice(["thriller", "comedy", "romcom", "fiction"]),
            "lastactive": self.fake.date_time_between(
                start_date="-10m", end_date="now"
            ).isoformat(),
            "title": self.fake.name(),
            "watchfrequency": self.fake.pyint(min_value=1, max_value=10),
            "etags": self.fake.uuid4(),
        }
        return data


def create_kinesis_stream(kinesis_client, stream_name):
    try:
        kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)
        print("Created stream {}.".format(stream_name))
        stream_exists_waiter = kinesis_client.get_waiter("stream_exists")
        stream_exists_waiter.wait(StreamName=stream_name)
        response = kinesis_client.describe_stream(StreamName=stream_name)
        stream_details = response["StreamDescription"]
        print("Stream details: {}".format(stream_details))
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceInUseException":
            print("Stream '{}' already exists.".format(stream_name))
        else:
            print("Couldn't create stream {}.".format(stream_name))
            raise


def send_kinesis_record(kinesis_client, stream_name, record_generator):
    kinesis_record = [
        {
            "Data": bytes(json.dumps(record_generator()), "utf-8"),
            "PartitionKey": "partition_key",
        }
    ]
    kinesis_client.put_records(StreamName=stream_name, Records=kinesis_record)
    print("Sent record to Kinesis.", kinesis_record)


def main():
    kinesis_stream_name = os.environ["AWS_KINESIS_STREAM_NAME"]
    kinesis_client = boto3.client("kinesis", region_name=os.environ["AWS_REGION"])

    data_generator = DataGenerator()

    create_kinesis_stream(kinesis_client, kinesis_stream_name)

    EVENTS_PER_SECOND = 5
    schedule.every(float(1 / EVENTS_PER_SECOND)).seconds.do(
        send_kinesis_record,
        kinesis_client,
        kinesis_stream_name,
        data_generator.generate_record,
    )

    while True:
        schedule.run_pending()


if __name__ == "__main__":
    load_dotenv()
    main()

Step 5: Consume Data from the GlassFlow pipeline

Create a new Python file called consumer.pyand consume transformed data from the GlassFlow pipeline. The pipeline continuously checks for new data from the Kinesis service, transforms it as needed, and consumes it.

import glassflow

pipeline_client = glassflow.GlassFlowClient().pipeline_client(
	pipeline_id="your_pipeline_id",
	pipeline_access_token="PIPELINE_ACCESS_TOKEN"
)
response = pipeline_client.consume()

if response.status_code == 200:
	data = response.json()
	print("Consumed Data from GlassFlow pipeline...", data)

After running the Python script, you will see the transformed data in the console output something like this:

Consumed Data from GlassFlow pipeline...
{
   "channelid":18,
   "etags":"b10730dd-2fa1-414a-bea6-135f95801aa2",
   "genre":"thriller",
   "lastactive":"2024-05-11T15:35:35.328987",
   "title":"James Jimenez",
   "userid":"8fb239be-ea55-4e92-be53-658b702e1a31",
   "watchfrequency":2,
   "reaction":"like"
}

Conclusion

By integrating AWS Kinesis Data Streams with GlassFlow, you can efficiently handle real-time data streams and apply transformations before storing the results in various data sinks. If you need a built-in GlassFlow connector for Amazon Kinesis Data Streams service, raise an issue on GitHub.

Last updated

Logo

© 2023 GlassFlow