Miguel A. Cabrera Minagorri

2023-09-06

Handling computer vision events in real time with Kafka and Pipeless

Combining computer vision and real-time data processing creates lots of interesting opportunities. Imagine a world where cameras, sensors, and intelligent algorithms work together to interpret and respond to the visual world as it unfolds, instantaneously. This is real-time computer vision, a field where the ability to capture and process visual data with lightning speed opens up a myriad of possibilities across industries.

This article explains how you can generate and process computer vision events in real-time using Pipeless and Kafka. Pipeless is an open-source computer vision framework to build and deploy apps in minutes. Kafka is a popular OSS distributed event streaming platform.

If you prefer playing with the code than reading, you can go directly to the ready-to-run example in this doc section. It contains everything you need to get started, including a docker-compose.yaml to create a Kafka cluster locally and step by step instructions.

Context

Previous articles explained how to create a Pipeless project, load a model to identify cats on a video and draw bounding boxes over the video. That's cool, but not really useful. Today, you will learn how to connect Pipeless with Kafka, so instead of drawing bounding boxes over the input video, you will be able to react to events in real-time, in this case, the event will be a cat appearing on the video.

In case you missed the previous articles, you can find them below:

For this demonstration you will process a local video file, which is not very practical for real world applications. Future tutorials will cover how to use Pipeless to process video streams from remote URLs and RTMP/RTSP flows.

Architecture

The following schema represents the architecture of what you will deploy:

Pipeless + Kafka schema

In short, you will take an input video stream, analyze it with Pipeless to identify what appears on the video and export events to a Kafka topic. Those events can be consumed to take any required actions depending on the application.

As an example from a real-world scenario, consider a potato processing plant where a camera continuously monitors the incoming potatoes. If a potato with an incorrect color appears, the system will trigger an air flow mechanism to eject the problematic potato from the production line. Or you can also envision a restaurant tracking the time patrons spend at their tables. Here the events would be people arriving at or leaving a table.

Pipeless Kafka plugin

From a high level point of view, you can see Kafka as a distributed system that is split into topics. You can post messages to topics (produce), and you can consume messages from topics.

Pipeless provides a plugin to easily send messages to a Kafka topic. It takes care of configuring the Kafka client so you just need to provide some environment variables. The Kafka plugin is included in the pipeless-ai-plugins package that you can install with pip. Connecting to Kafka is not a complex task, indeed, using the Kafka plugin is not mandatory, it is just a wrapper around the Kafka producer client to make things even easier, but nothing stops you from using the Kafka client directly instead.

You can find the whole documentation about the Kafka plugin in this doc section.

Detecting events with Pipeless

Let's re-create the cats application to, instead of drawing bounding boxes, detect events and send them to Kafka. You can find the whole application ready-to-run, including step by step instructions and all the required resources such as the cats detection model and a docker-compose.yaml with a Kafka cluster in this doc section.

Please clone that repo and move to the examples/kafka directory to easily follow the next sections.

Find the Pipeless installation instructions and requirements here.

Disabling video output

This particular scenario does not require a video output. The original example was modifying the video frames on the fly and producing a new video as our expected output. However, now we are interested in the events not on the video itself, so we will modify the project configuration to disable the output video.

The following is the whole content of the configuration file (config.yaml):

input:
  address:
    host: localhost
    port: 1234
  video:
    enable: true
    uri: file:///home/path/pipeless/examples/kafka/cats.mp4
log_level: INFO
output:
  video:
    enable: false
worker:
  n_workers: 1
plugins:
  order: 'kafka'

IMPORTANT: Remember to edit the uri of the input video to set the absolute path to your local directory

For reference, the following are the differences with the original cats example:

 output:
-  address:
-    host: localhost
-    port: 1237
-  video:
-    enable: true
-    uri: file:///home/example/path/pipeless/examples/cats/cats-output.mp4
+    enable: false
+plugins:
+  order: 'kafka'

As you can see, we also removed the output address section since we will not have an output component, instead, we will send the events to Kafka directly from our process hook. Finally, we specified the plugins order. In this case, since we will use only the Kafka plugin, the list has just one element.

Loading the Kafka plugin and producing events

To install the Kafka plugin just run the install command:

pipeless install plugin kafka

Once we have the plugin installed and added to the plugins order in the configuration, we can send messages to our Kafka topic:

self.plugins.kafka.produce('pipeless', 'There is a cat!')

The following is the whole code of the new app (app.py):

from pipeless_ai.lib.app.app import PipelessApp
import cv2

class App(PipelessApp):
    def before(self):
        self.xml_data = cv2.CascadeClassifier('cats.xml')

    def process(self, frame):
        model = this.xml_data

        # Create reduced frame for faster detection
        original_height, original_width, _ = frame.shape
        aspect_ratio = original_width / original_height
        reduced_width = 600
        reduced_height = int(reduced_width / aspect_ratio)
        reduced_frame = cv2.resize(frame, (reduced_width, reduced_height))
        bounding_boxes = model.detectMultiScale(reduced_frame, minSize = (30, 30))

        # Notify that there is a cat
        if len(bounding_boxes) > 0:
            self.plugins.kafka.produce('pipeless', 'There is a cat!')

Note that the only differences with the original cats example are the following:

def process(self, frame):
-# Draw the bounding boxes over the original frame
-for box in bounding_boxes:
-    a, b, width, height = box
-    # Recalculate bounding box for the original image
-    a = int(a * (original_width / reduced_width))
-    b = int(b * (original_height / reduced_height))
-    width = int(width * (original_width / reduced_width))
-    height = int(height * (original_height / reduced_height))
-    cv2.rectangle(frame, (a, b), (a + width, b + height), (255, 0, 255), 2)
+# Notify that there is a cat
+if len(bounding_boxes) > 0:
+    self.plugins.kafka.produce('pipeless', 'There is a cat!')

Finally, let's configure the Kafka plugin with our cluster address. It is as simple as exporting an environment variable:

export KAFKA_BOOTSTRAP_SERVERS="localhost:9094"

And that's all you need!

Running the application

Now let's start a local Kafka using the docker-compose.yaml file provided on the examples/kafka directory. We won't go into details here since it is out of the scope of this article, just run the following command from the example directory:

docker compose up

And let's run pipeless to start processing our video and sending events:

pipeless run

Verify the events on Kafka

The commands on this section must be executed within the Kafka container. Exec into the container by running:

docker compose exec kafka bash

The docker-compose.yaml included configures Kafka to automatically create topics. You can verify the pipeless topic was created when writing to it for the first time by running:

kafka-topics.sh --list --bootstrap-server localhost:9094

The code of the example only sends information to Kafka, it does not consume from the topic, thus the topic still contains all the information we have sent to it. It is your task to listen for messages on the Kafka topics and take actions based on those messages. This is out of the Pipeless scope since each application has its own requirements on what to do with the events. Let's run a consumer to verify the information is arriving to the topic:

kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic pipeless --from-beginning

Use Ctrl + C to stop the consumer.

Now, it is your time to complete your application by consuming messages from the Kafka topic, process that information and take any required actions. A simple example could be to send you a notification when there is a cat on the video and the time at which it appeared.

Get involved

If you like Pipeless you can help us by sharing and starring our GitHub repository.

We also appreciate feedback, you can share your thoughts via a GitHub issue or opening a thread on our GitHub discussions forum. You can also follow Pipeless on Twitter (now X)

Finally, if you feel confident, you can send us a PR adding new features, plugins or models!

Subscribe to our newsletter!

Receive new tutorials, community highlights, release notes, and more!

Copyright © 2023 Pipeless, Inc.