Mobility Project

A practical example of configuring a new data pipeline for a car-sharing (Mobility) project.

The mobility project aims to process real-time ride events from an API to monitor fuel consumption and manage the supply of cars available for sharing. The primary objectives include identifying low-fuel vehicles and finding the nearest fuel station based on GPS coordinates and fuel type. It also discounts drivers based on fuel level and proximity to the closest fuel station.

You will use the GlassFlow WebApp to create a data processing pipeline.

Setting Up the Pipeline with GlassFlow

Prerequisites

To start with this setup, you need a free GlassFlow account.

Sign up for a free account

Step 1. Log in to GlassFlow WebApp

Navigate to the GlassFlow WebApp and log in with your credentials.

Step 2. Create a New Pipeline

Click on "Create New Pipeline" and provide a name. You can name it "Fuel Management".

Step 3. Configure a Data Source

Select "SDK" to configure the pipeline to use Python SDK for ingesting ride event data from the API.

Step 4. Define the Transformer

The transformer processes ride event data to identify low-fuel vehicles, find the nearest fuel stations, and calculate discounts. Copy and paste the following transformation function into the transformer's built-in editor.

https://github.com/glassflow/glassflow-examples/blob/main/use-cases/mobility/transform.py
"""
Transform function by the user
"""
import json
import requests


def handler(data, log):
    log.info("Event:" + json.dumps(data), data=data)
    try:
        transformed = handle(data)
    except Exception as e:
        log.error("Error in transformation", error=str(e))
        raise e
    return transformed


def get_nearest_fuel_station(gps_cordinates, fuel_type):
    print("get nearest fuel station")
    url = "https://mock-mobility-s3r3lbzina-ey.a.run.app/mobility/gas-stations/nearest"
    resp = requests.get(url,
                        params={
                            'cordinates_lat': gps_cordinates[0],
                            'cordinates_long': gps_cordinates[1],
                            'fuel_type': fuel_type
                        })
    if resp.status_code == 200:
        fuel_station = resp.json()
        print(fuel_station)
        return fuel_station
    else:
        print("error getting nearest fuel station")
        print(resp.status_code)
        return None


def handle(data: json):
    data['discount'] = {"discount": False}
    if not data['is_electric'] and data['current_fuel_percentage'] < 25:
        # find nearest gas station using a partner API
        fuel_station = get_nearest_fuel_station(data['gps_cordinates'],
                                                data['fuel_type'])
        if fuel_station:
            data['discount'] = {
                "discount": True,
                "fuel_station": fuel_station,
                "discount_type": "fuel"
            }

    return data

Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not be successful.

Thehandler function contains all transformation logic where the event data is modified based on specific conditions.

  • If the vehicle is not electric and its current fuel percentage is below 25%, it calls the get_nearest_fuel_station function to find the nearest fuel station via the mock API server.

  • If a fuel station is found, it updates the 'discount' key with details about the discount offered by the fuel station.

Step 5. Configure a Data Sink

Select "SDK" to configure the pipeline to use Python SDK for sending data to the fleet management file. In a real-world project, you send data to dashboards and notification systems.

Step 6. Confirm the Pipeline

Confirm the pipeline settings in the final step and click "Create Pipeline".

Step 7. Copy the Pipeline Credentials

Once the pipeline is created, copy its credentials such as Pipeline ID and Access Token.

Sending Data to the Pipeline

Generate data for the mobility project and publish it to the data pipeline in GlassFlow using the Python SDK.

Create an environment configuration file

Add a .env file in the project directory with the following configuration variables and their values:

PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Replace your_pipeline_idand your_pipeline_access_token with appropriate values obtained in the previous steps.

Prerequisites

To complete this part you'll need the following:

  1. Python is installed on your machine.

  2. Download and Install Pip to manage project packages.

Install required libraries

Install required libraries including GlassFlow SDK listed in the requirements.txt file using the pipcommand in a terminal.

pip install -r requirements.txt

Publish real-time API events to the pipeline

Create a new Python script file called producer_api.py in your project root directory and insert the code below. This Python script serves as a data producer, fetching mobility events data from a mock API server and publishing it to a GlassFlow pipeline.

https://github.com/glassflow/glassflow-examples/blob/main/use-cases/mobility/producer_api.py
"""
Get mobility events data via a mockserver and publish it to glassflow
"""

import glassflow
from dotenv import dotenv_values
import requests


def get_mock_events():
    """
    Get mock events from the mock server
    """
    res = requests.get(
        "https://mock-mobility-s3r3lbzina-ey.a.run.app/mobility/producer/events/ride-completed"
    )
    if res.status_code == 200:
        return res.json()
    else:
        print("Failed to get mock events")
        return None


def main():
    config = dotenv_values(".env")
    pipeline_id = config.get("PIPELINE_ID")
    token = config.get("PIPELINE_ACCESS_TOKEN")

    client = glassflow.GlassFlowClient()
    pipeline_client = client.pipeline_client(
        pipeline_id=pipeline_id, pipeline_access_token=token
    )
    counter = 0
    while True and counter < 1000:
        try:
            event = get_mock_events()
            counter += 1
            if event:
                req = pipeline_client.publish(request_body=event[0])

                if req.status_code == 200:
                    print("Event published successfully", event[0])
                else:
                    print("Failed to publish event")
                    print(req.text)
        except Exception as e:
            print(e)
            break
        except KeyboardInterrupt:
            break


if __name__ == "__main__":
    main()

Run the script

Run the Python script producer_api.py

python producer_api.py

This script continuously fetches mock mobility events data from a mock API server and publishes it to the specified GlassFlow pipeline.

Consume Data

Consume transformed data from the mobility project data pipeline in GlassFlow and store it locally on a file. You'll use the GlassFlow Python SDK to interact with the pipeline and retrieve the transformed data in real-time.

Consume transformed data

Create a Python script consumer_file.py inside the mobility folder and add the following code:

https://github.com/glassflow/glassflow-examples/blob/main/use-cases/mobility/consumer_file.py
"""Get transformed data and store it locally on disk"""

import glassflow
import sys
from dotenv import dotenv_values
import json


def main():
    config = dotenv_values(".env")
    print(config)
    pipeline_id = config.get("PIPELINE_ID")
    token = config.get("PIPELINE_ACCESS_TOKEN")

    client = glassflow.GlassFlowClient()
    pipeline_client = client.pipeline_client(
        pipeline_id=pipeline_id, pipeline_access_token=token
    )

    with open("mobility_data_transformed.txt", "a+") as f:
        while True:
            try:
                # consume transfornmed data from the pipeline
                res = pipeline_client.consume()
                if res.status_code == 200:
                    # get the transformed data as json
                    data = res.body.event
                    print("Data consumed successfully")
                    print(data)
                    f.write(json.dumps(data) + "\n")
                    f.flush()
            except KeyboardInterrupt:
                print("exiting")
                sys.exit(0)


if __name__ == "__main__":
    main()

Run the script

python consumer_file.py

The script will start consuming data continuously from the pipeline and storing it locally on disk. You can see an example of consumed data here. You can check the updates to the data written to the file by running this command in another terminal window

tail -f mobility_data_transformed.txt

You can extend this functionality to push the consumed data to cloud storage buckets or real-time databases per your project requirements.

See other use cases for complex scenarios.

Last updated

Logo

© 2023 GlassFlow