Getting Started with Pipeless 🚀
Make sure to have Pipeless and its dependencies installed before continuing. Check the installation guide.
Pipeless basic concepts
A Pipeless project is structured in stages. Each stage is a like a box that performs a very specific task. You can think about a stage like a micro pipeline. Within each stage you can implement pre-processing, processing and post-processing, as well as run models.
When you provide a new stream to Pipeless, you have to specify a sorted list of stages. We call that list the "frame execution path" or just the frame path, and it represents the whole pipeline that the frames of that stream will go through. Execution paths are run in parallel for all frames, no matter which stream the frame belongs to. A single stage can be used in as many streams as you need.
A Pipeless project consists of at least one stage. So your project will be a directory where each sub-directory defines a stage.
Within each stage folder you create the files that define your Pipeless hooks. Each hook represents a processing step in the stage micro pipeline (i.e. pre-processing, processing or post-processing). All of them are optional and you can implement them in several programming languages.
Finally, each stage is isolated from the others. Every hook of a stage receives a structure that contains several fields, we usually call it the "frame data". You can perform changes on the frame data fields (some of them are read-only). Next hooks of the stage or the hooks of a following stage will have access to the modified data. Ideally, you should design your stages so that they are self-contained and do not depend on other stages data.
Create a new project
Run the following command:
pipeless init my_project --template scaffold
cd my_project
The above command creates a directory for the project root and, inside it, a directory called my-stage
,
which contains a scaffold of a new stage. If you remove the --template
option and interactive shell will
ask you a few questions to generate the stages.
The stage created can be executed correctly, but the code it contains does nothing. You can add some code later to the generated files getting inspiration from the examples.
Let's start Pipeless and provide a stream. Run the following command from my_project
:
pipeless start --stages-dir .
Pipeless will load your stages and you will see something like:
⚙️ Loading stages from my_project
⏳ Loading stage 'my_stage' from my_project/my-stage
Loading hook from my_project/my-stage/post-process.py
Loading hook from my_project/my-stage/process.py
Loading hook from my_project/my-stage/pre-process.py
You should now be able to submit streams:
pipeless add stream --input-uri "https://pipeless-public.s3.eu-west-3.amazonaws.com/cats.mp4" --output-uri "screen" --frame-path "my-stage"
We indicated to read the video from the provided URL and show the results in the screen. The output is optional since in many computer vision use cases you don't actually want to see the output video but to execute some code on some events. The frame path indicates the stages that should be executed for each frame of the provided stream.
The CLI allows you to easily add, edit or remove streams. You can also configure backpressure mechanisms or export the events that happen on the streams to monitor them.
To see all the available options and commands run:
pipeless --help
Creating a new stage
Creating a new stage is as simple as creating a new folder. The folder name will be adopted by Pipeless as the stage name, you will use it when providing the frame path of a new stream.
You can run the following command from inside your project directory to generate a stage and its files using the interactive shell:
pipeless generate stage
Stage context
A stage can have its own global context. The stage context is provided to the hooks as read-only. This allows to perform slow tasks just once when Pipeless initializes the stage instead of performing the same operation on every frame, avoiding extra processing time. For example, you can open connections to external servers, load a model file from the filesystem, etc.
To define context initialization logic for a stage create a init.{py,rs,...}
file containing a
function called init
that should return any data you want to have available in the context. The
following is an example that initializes a stage context with a XML file that we can later use on the hooks:
import cv2
def init():
return {
'model': cv2.CascadeClassifier('/home/path/pipeless/examples/cats/cats.xml')
}
Pipeless loads stages when you run pipeless start
. In the short future we plan to support
loading and unloading stages dynamically, but at this moment you need to restart Pipeless
for changes to take effect.
Writting hooks
A stage is composed of zero or more hooks. There are 3 kind of hooks: pre-processing
, processing
and post-processing
hooks.
Only one hook of each kind can be added to a stage.
The process
hook is special and it can be either implemented via code or provided as a JSON file to perform inference using one of the
supported inference runtimes. See the inference runtime section below.
To interactively create a new hook move to your project directory and run the following command:
pipeless generate hook
Implementing a code hook
Defining a code hook is as simple as creating a file containing a hook
function that receives the frame data and the stage context (read-only).
Depending on the hook type you will create a file named pre-process.xx
, process.xx
or post-process.xx
where xx
is the extension of the programming language.
The following is an example of a Python hook for processing:
import cv2
def hook(frame_data, context):
model = context['model']
rgb_frame_arr = frame_data['modified']
bounding_boxes = model.detectMultiScale(reduced_frame, minSize = (30, 30))
frame_data['inference_output'] = bounding_boxes
In the hook above we have taken out our model from the stage context. The model was loaded from a XML file during the
stage initialization (init.py
, see section above).
You can also see we take the RGB frame array from the frame data, run our model with it and update the inference_output
field of the frame data.
Note in most cases you do not implement a process
hook via code but instead you use an inference runtime
and pass your model to it, implementing only the pre-process and post-process hooks.
Inference Runtime - JSON hooks
When you have a model and want to perform inference using one of the supported inference runtimes you
use a process.json
file.
Within this JSON you define the model to use, the inference runtime and the runtime parameters.
To provide the inputs you set the inference_input
field of the frame data in your pre-process
hook. And to parse the output you read the inference_output
field from the frame data in your post-process
hook.
The following is an example of using the ONNX Runtime with the CPU execution provider:
{
"runtime": "onnx",
"model_uri": "https://pipeless-public.s3.eu-west-3.amazonaws.com/yolov8n.onnx",
"inference_params": {
"execution_provider": "cpu"
}
}
Frame data fields
The frame data provided to the hook functions contains the following fields:
uuid
: The internal unique id of the frame.original
: An array containing the RGB pixels in formatheight,width,channels
. You can read theoriginal
field but it is not editable.modified
: An array containing the modifiable RGB pixels in formatheight,width,channels
. You can read and edit themodified
field. When an output video is enabled it shows the content of themodified
field. When you provide a frame path with several stages to a stream, you probably want to work withmodified
instead oforiginal
in order to maintain the changes done to the frame in all stages.width
: Frame with.height
: Frame height.pts
: Presentation timestamp of the frame.duration
: Amount of time that this frame is shown when playing the video.fps
: Stream FPS.input_ts
: Timestamp of when this frame reached Pipeless. Mainly used in internaly.inference_input
: When using an inference runtime (process.json
) you provide the inference inputs by setting theinference_input
field at thepre-process
hook.inference_output
: When using an inference runtime (process.json
) you can obtain the inference outputs by reading theinference_output
field from thepost-process
hook.user_data
: This field allows you to associate data to the frame that you can later retreive from subsequent hooks, even if those hooks belong to other stages. It can contain any type of data, the only restriction is that dictionaries must use strings as keys. For more information check this example.frame_number
: Number of the frame in the stream
Stateless vs Stateful hooks
Pipeless allows to create two kinds of hooks:
-
stateless
(default): A stateless hook can run multiple instances of the hook at the same time for different frames. They run with the highest level of parallelization. Stateless hooks should be used when an internal state is not required. For example, if you implement some pre-processing as a simple resize your hook does not require an internal state. -
stateful
: Stateful hooks are the kind of hook you want to use when some internal state is required. A good example of when to use a stateful hook is when you use an object tracking model, since these models maintain internal state. Stateful hooks introduce a bit of latency compared to stateless hooks, since the internal state is locked and released on each frame to avoid corrupting it, thus many frames cannot be processed at the same time. Stateful hooks also ensure sequential frame processing, which means it will run for frames sorted by the frame number.
By default all hooks are stateless
unless you specify they aren't. As a general rule, use stateless
hooks unless you need a hook to preserve some internal state, then you need to use stateful
hooks.
To indicate a hook must be stateful
add a comment to the top of the file as follows (the comment must be the first line).
# make stateful
def hook(frame_data, context):
...
When using json
hooks, include a boolean make_stateful
field at the JSON root:
{
"make_stateful": true,
"runtime": "onnx",
"model_uri": "https://pipeless-public.s3.eu-west-3.amazonaws.com/yolov8n.onnx",
"inference_params": {
"execution_provider": "cpu"
}
}