Real-time classified ads enrichment

A practical example of creating a pipeline to enrich classified ads in real-time using AI, GlassFlow, Langchain, and Redis.

Learn how to build a data processing pipeline to enrich classified ads in real-time. We'll use GlassFlow to process ads, enrich them with additional information, categorize them using Langchain and OpenAI, and store the enriched ads in Redis for quick and advanced search.

Pipeline components

Data source

The initial data source is a platform where users post classified ads. This could be an existing marketplace like Craigslist or eBay, or a custom-built website. GlassFlow can also continuously ingest new and updated ads from a custom website using a Webhook connector. For the sake of the demo, we generate and ingest sample ads from predefined JSON files using Python SDK. Sample input looks like this:

[
  {
      "id": "1",
      "user_id": "1",
      "title": "Math tutor for high school",
      "description": "I amd a Math graduate and graduated with honors in the university of Barcelona I can give you theoretical classes and practical labs where we go through problems and their solutions"
  },
  {
    "id": "2",
    "user_id": "2",
    "title": "Sage Barista Pro portafilter machine",
    "description": "My Sage Barista Pro is for sale because I'm moving.\n- regularly descaled\n- rinsed almost daily with coffee degreaser\n- very good condition, fine scratches, but nothing major and no dents\n- 4sieves and accessories included\n- heats up very quicly thanks to its thermoblock\n\nif you have any questions, just write a short message :-)",
    "images": [
      {
        "format": "image/jpg",
        "path": "data/ad2_image1.jpg"
      }
    ]
  },
  ...
]

Transformation

The transformation function in GlassFlow processes the ingested ads. It uses AI to analyze images for tags, descriptions, and generated content descriptions.

Data sink

The enriched ad data is stored in Redis, a high-performance in-memory database. The enriched data in Redis is made available to the frontend of the classified ads platform, ensuring that users see the most relevant and informative ads.

Setting Up the Pipeline with GlassFlow

You will use the GlassFlow WebApp to create a data processing pipeline.

Prerequisites

To start with this setup, you need a free GlassFlow account.

Sign up for a free account

Step 1. Log in to GlassFlow WebApp

Navigate to the GlassFlow WebApp and log in with your credentials.

Step 2. Create a New Pipeline

Click on "Create New Pipeline" and provide a name. You can name it "classified-ads-enrichment".

Step 3. Configure a Data Source

Select the "SDK" data source to configure the pipeline to use Python SDK. For demo purposes, we ingest data from a JSON file with examples of classified ads using a Python script called producer.py.

Step 4. Define the Transformer

generateThe transformation function uses Langchain and OpenAI to analyze, categorize, and add more information to the ad, generate tags based on ad images, and write a summary for each ad.

Copy and paste the following transformation function code into the transformer's built-in editor.

https://github.com/glassflow/glassflow-examples/blob/main/use-cases/classified-ads-data-enrichment/transform.py
import os
from typing import List, Optional

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import chain


OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")


class Image(BaseModel):
    format: str
    b64: str


class ClassifiedAd(BaseModel):
    id: str
    user_id: str
    title: str
    description: str
    images: Optional[List[Image]]


class ImageInfo(BaseModel):
    tags: List[str]
    description: str
    people_count: int
    has_explicit_content: bool


class ClassifiedAdEnriched(ClassifiedAd):
    images_info: List[ImageInfo]
    category: str
    summary: str


@chain
def image_model(inputs):
    model = ChatOpenAI(
        api_key=OPENAI_API_KEY,
        temperature=0.5,
        model="gpt-4o",
        max_tokens=1024
    )
    img_prompt = PromptTemplate(template="""
    Given the image, provide the following information:
    - A list of descriptive tags of the main objects in the image 
     (between 5 - 10 tags)
    - A short description of the image
    - The number of people in the image
    - Whether the image has explicit content or not
    
    {format_instructions}
    """)
    img_message = img_prompt.format(
        format_instructions=inputs["format_instructions"]
    )
    img_fmt = inputs['image'].format
    img_b64 = inputs['image'].b64
    msg = model.invoke(
        [
            HumanMessage(
                content=[
                    {"type": "text", "text": img_message},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:{img_fmt};base64,{img_b64}"}
                    }
                ]
            ),
        ]
    )
    return msg.content


@chain
def ad_model(inputs: dict):
    model = ChatOpenAI(
        api_key=OPENAI_API_KEY,
        temperature=0.5,
        model="gpt-4o",
        max_tokens=1024
    )

    ad = inputs["ad"]
    images_info = inputs["images_info"]

    images_info_str = ""
    if len(images_info) > 0:
        images_info_str = """
        - Images:
        """
        for img in images_info:
            images_info_str += f"\n- {img}"

    ad_prompt = PromptTemplate(template="""
        Give the following classified ad information:
        - Ad ID: {id}
        - User ID: {user_id}
        - Title: {title}
        - Description: {description}
        {image_descriptions} 
        
        Available ad categories:
            - car, bike and boats
            - services
            - event tickets
            - electronics
            - home and garden
            - jobs
            - property
            - fashion & beauty
            - music, films and books
            - courses and lessons
            - give away and exchange
            
        Write a short summary of the classified ad and classify the ad in one of 
        the available ad categories.
        
        {format_instructions}
        """)

    message = ad_prompt.format(
        format_instructions=inputs["format_instructions"],
        id=ad.id,
        user_id=ad.user_id,
        title=ad.title,
        description=ad.description,
        image_descriptions=images_info_str
    )

    msg = model.invoke(
        [
            HumanMessage(
                content=[
                    {"type": "text", "text": message},
                ]
            ),
        ]
    )
    return msg.content


def handler(data, logs):
    """
    data is a json with a classified ad data:

    {
        "title": "",
        "description": "",
        "images": [
            {
                "format": "image/png",
                "b64": ""
            },
            {
                "format": "image/png",
                "b64": ""
            }
        ]
    }
    """
    ad_data = ClassifiedAd(**data)

    logs.info("Processing Ad images...")
    images_info = []
    if ad_data.images and len(ad_data.images) > 0:
        image_parser = JsonOutputParser(pydantic_object=ImageInfo)
        image_chain = image_model | image_parser

        for img in ad_data.images:
            img_info = image_chain.invoke(input={
                "image": img,
                "format_instructions": image_parser.get_format_instructions()
            })
            images_info.append(img_info)
            logs.debug(f"Image info: {img_info}")

    logs.info("Processing Ad...")
    ad_parser = JsonOutputParser(pydantic_object=ClassifiedAdEnriched)
    ad_chain = ad_model | ad_parser

    return ad_chain.invoke(input={
        "ad": ad_data,
        "images_info": images_info,
        "format_instructions": ad_parser.get_format_instructions()
    })

Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not be successful.

By default, the transformer function uses a free OpenAI API key provided by GlassFlow.

You can replace it with your API key too. To do so:

  1. Have an OpenAI API account.

  2. Create an API key.

  3. Set the API key in the transformation code.

Step 5. Choose a transformer dependency

The transformation function uses openai external library in the code, so we need to choose it from the Dependencies dropdown menu. GlassFlow includes the external library in the function deployment and runtime. Read more about Python dependencies for transformation.

Step 6. Configure a Data Sink

Select "SDK" as a data sink to configure the pipeline to use Python SDK. Code provided in Python sink_connector.py for sending enriched classified ads to Redis. You use Docker to run a Redis instance and a sink connector Python script for Redis.

Step 7. Confirm the Pipeline

Confirm the pipeline settings in the final step and click "Create Pipeline".

Step 8. Copy the Pipeline Credentials

Once the pipeline is created, copy its credentials such as Pipeline ID and Access Token.

Send and consume data from the pipeline

Prerequisites

To complete this part you'll need the following:

Installation

  1. Clone the glassflow-examples repository to your local machine:

    git clone https://github.com/glassflow/glassflow-examples.git
  2. Navigate to the project directory:

    cd use-cases/classified-ads-data-enrichment
  3. Create a new virtual environment:

    python -m venv .venv && source .venv/bin/activate
  4. Install the required dependencies:

    pip install -r requirements.txt

Create an environment configuration file

Add a .env file in the project directory and add the following configuration variables:

PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Replace your_pipeline_id and your_pipeline_access_token with appropriate values obtained from your GlassFlow account.

Run the Docker

Run the below Docker command in a terminal:

docker compose up

Docker compose will spin up the sink connector service that listens to the GlassFlow pipeline and saves the documents to the Redis database continuously. Also, it runs Redis Stack with Redis server and Redis insight.

Run the producer

Run producer.py Python script in a terminal to publish sample ads data to the GlassFlow pipeline:

python producer.py

Once your Docker containers and producer Python script are running, you can access the Redis Insight UI to view the enriched ads.

See the enriched data on Redis Insight

Open your web browser and navigate to the following URL(https://localhost:8001) in your browser and have a look at the enriched data.

You should see the output with enriched ads.

Summary

By following these steps, you created the pipeline with GlassFlow, processed and enriched classified ads in real time. You can also experiment with updating input ads data and searching for ads based on different attributes to see how the enriched information enhances the search experience. Explore other use cases to see how GlassFlow can revolutionize your data processing needs.

Last updated

Logo

© 2023 GlassFlow