Mobility Project
A practical example of configuring a new data pipeline for a car-sharing (Mobility) project.
The mobility project aims to process real-time ride events from an API to monitor fuel consumption and manage the supply of cars available for sharing. The primary objectives include identifying low-fuel vehicles and finding the nearest fuel station based on GPS coordinates and fuel type. It also discounts drivers based on fuel level and proximity to the closest fuel station.
You will use the GlassFlow WebApp to create a data processing pipeline.
Setting Up the Pipeline with GlassFlow
Prerequisites
To start with this setup, you need a free GlassFlow account.
Step 1. Log in to GlassFlow WebApp
Navigate to the GlassFlow WebApp and log in with your credentials.
Step 2. Create a New Pipeline
Click on "Create New Pipeline" and provide a name. You can name it "Fuel Management".
Step 3. Configure a Data Source
Select "SDK" to configure the pipeline to use Python SDK for ingesting ride event data from the API.
Step 4. Define the Transformer
The transformer processes ride event data to identify low-fuel vehicles, find the nearest fuel stations, and calculate discounts. Copy and paste the following transformation function into the transformer's built-in editor.
Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not be successful.
Thehandler
function contains all transformation logic where the event data is modified based on specific conditions.
If the vehicle is not electric and its current fuel percentage is below 25%, it calls the
get_nearest_fuel_station
function to find the nearest fuel station via the mock API server.If a fuel station is found, it updates the 'discount' key with details about the discount offered by the fuel station.
Step 5. Configure a Data Sink
Select "SDK" to configure the pipeline to use Python SDK for sending data to the fleet management file. In a real-world project, you send data to dashboards and notification systems.
Step 6. Confirm the Pipeline
Confirm the pipeline settings in the final step and click "Create Pipeline".
Step 7. Copy the Pipeline Credentials
Once the pipeline is created, copy its credentials such as Pipeline ID and Access Token.
Sending Data to the Pipeline
Generate data for the mobility project and publish it to the data pipeline in GlassFlow using the Python SDK.
Create an environment configuration file
Add a .env
file in the project directory with the following configuration variables and their values:
Replace your_pipeline_id
and your_pipeline_access_token
with appropriate values obtained in the previous steps.
Prerequisites
To complete this part you'll need the following:
Python is installed on your machine.
Download and Install Pip to manage project packages.
Install required libraries
Install required libraries including GlassFlow SDK listed in the requirements.txt file using the pip
command in a terminal.
Publish real-time API events to the pipeline
Create a new Python script file called producer_api.py
in your project root directory and insert the code below. This Python script serves as a data producer, fetching mobility events data from a mock API server and publishing it to a GlassFlow pipeline.
Run the script
Run the Python script producer_api.py
This script continuously fetches mock mobility events data from a mock API server and publishes it to the specified GlassFlow pipeline.
Consume Data
Consume transformed data from the mobility project data pipeline in GlassFlow and store it locally on a file. You'll use the GlassFlow Python SDK to interact with the pipeline and retrieve the transformed data in real-time.
Consume transformed data
Create a Python script consumer_file.py
inside the mobility
folder and add the following code:
Run the script
The script will start consuming data continuously from the pipeline and storing it locally on disk. You can see an example of consumed data here. You can check the updates to the data written to the file by running this command in another terminal window
You can extend this functionality to push the consumed data to cloud storage buckets or real-time databases per your project requirements.
See other use cases for complex scenarios.
Last updated