Cyclic Pipeline

Learn how to deploy a cyclic pipeline using Core 2. In this example, you'll build a simple counter that begins at a user-defined starting value and increments by one until it reaches 10. If the starting value is already greater than 10, the pipeline terminates immediately without running.

Models

Start by implementing the first model: a simple counter.

from mlserver.model import MLModel
from mlserver.codecs import NumpyCodec, StringCodec
from mlserver.types import InferenceRequest, InferenceResponse
from mlserver.logging import logger


class Counter(MLModel):
    async def load(self) -> bool:
        self.ready = True
        return self.ready

    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        x = NumpyCodec.decode_input(payload.inputs[0]) + 1
        message = "continue" if x.item() < 10 else "stop"
        return InferenceResponse(
            model_name=self.name,
            model_version=self.version,
            outputs=[
                NumpyCodec.encode_output(
                    name="output",
                    payload=x
                ),
                StringCodec.encode_output(
                    name=message,
                    payload=[""]
                ),
            ]
        )

This model produces two output tensors. The first contains the incremented number, while the second is an empty tensor labeled either continue or stop. This second tensor acts as a trigger, directing the data flow through either the feedback loop or the output path. For more information on triggering tensors, see the intro to pipelines page.

Next, define the second model — an identity model:

import asyncio
from mlserver.logging import logger
from mlserver import MLModel, ModelSettings
from mlserver.types import (
    InferenceRequest, InferenceResponse, ResponseOutput
)


class IdentityModel(MLModel):
    def __init__(self, settings: ModelSettings):
        super().__init__(settings)
        self.params = settings.parameters
        self.extra = self.params.extra if self.params is not None else None
        self.delay = self.extra.get("delay", 0)
        

    async def load(self) -> bool:
        self.ready = True
        return self.ready

    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        if self.delay:
            await asyncio.sleep(self.delay)
        
        return InferenceResponse(
            model_name=self.name,
            model_version=self.version,
            outputs=[
                ResponseOutput(
                    name=request_input.name,
                    shape=request_input.shape,
                    datatype=request_input.datatype,
                    parameters=request_input.parameters,
                    data=request_input.data
                ) for request_input in payload.inputs
            ]
        )

The identity model simply passes the input tensors through to the output while introducing a delay. This delay is crucial for preventing infinite loops in the pipeline, which can occur due to the join interval behavior in Kafka Streams. For further details, see Kafka documentation.

Pipeline

This counter application pipeline consists of three models: the counter model, an identity model for the feedback loop, and another identity model for the output. The structure of the pipeline is illustrated as follows:

Models deployment

To deploy the pipeline, you need to load each model into the cluster. The model-settings.json configuration for the counter model is as follows:

{
    "name": "counter",
    "implementation": "model.Counter",
    "parameters": {
        "version": "v0.1.0"
    }
}

For the identity feedback loop model, reuse the model-settings.json file and configure it to include a 1-millisecond delay:

{
    "name": "identity-loop",
    "implementation": "model.IdentityModel",
    "parameters": {
        "version": "v0.1.0",
        "extra": {
            "delay": 0.001
        }
    }
}

The one-millisecond delay is crucial to prevent infinite loops in the pipeline. It aligns with the join window applied to all input types for the counter model, as well as the join window configured for the identity model, which is specified in the pipeline definition.

Similarly, for the identity output model, reuse the same model-settings.json file without introducing any delay.

{
    "name": "identity-output",
    "implementation": "model.IdentityModel",
    "parameters": {
        "version": "v0.1.0"
    }
}

The manifest files for the three models are the following:

cat ./models/counter.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: counter
spec:
  storageUri: "gs://seldon-models/scv2/examples/cyclic-pipeline/counter"
  requirements:
  - mlserver
cat ./models/identity-loop.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: identity-loop
spec:
  storageUri: "gs://seldon-models/scv2/examples/cyclic-pipeline/identity-loop"
  requirements:
  - mlserver
cat ./models/identity-output.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: identity-output
spec:  
  storageUri: "gs://seldon-models/scv2/examples/cyclic-pipeline/identity-output"
  requirements:
  - mlserver

To deploy the counter model, use the following command:

seldon model load -f ./models/counter.yaml
{}
seldon model status counter -w ModelAvailable | jq -M .

To deploy the identity loop model, use the following command:

seldon model load -f ./models/identity-loop.yaml
{}
seldon model status identity-loop -w ModelAvailable | jq -M .

To deploy the identity output model, use the following command:

seldon model load -f ./models/identity-output.yaml
{}
seldon model status identity-output -w ModelAvailable | jq -M .

Pipeline deployment

After the models are deployed, proceed to deploy the pipeline. The pipeline manifest file is defined as follows:

apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: counter-pipeline
spec:
  allowCycles: true
  maxStepRevisits: 100
  steps:
  - name: counter
    inputsJoinType: any
    inputs:
    - counter-pipeline.inputs
    - identity-loop.outputs
  - name: identity-output
    joinWindowMs: 1
    inputs:
    - counter.outputs
    triggers:
    - counter.outputs.stop
  - name: identity-loop
    joinWindowMs: 1
    inputs:
    - counter.outputs.output
    triggers:
    - counter.outputs.continue
  output:
    steps:
    - identity-output.outputs

Note: that the joinWindowMs parameter is set to 1 millisecond for both the identity loop and identity output models. This setting is essential to prevent messages from different iterations from being joined (e.g., a message from iteration t being joined with messages from iterations t-1, t-2, ..., 1). Additionally, we limit the number of step revisits to 100 — the maximum number of times the pipeline can revisit a step during execution. While our pipeline behaves deterministically and is guaranteed to terminate, this parameter is especially useful in cyclic pipelines where a terminal state might not be reached (e.g., agentic workflows where control flow is determined by an LLM). It helps safeguard against infinite loops.

To deploy the pipeline, use the following command:

seldon pipeline load -f counter-pipeline.yaml
seldon pipeline status counter-pipeline -w PipelineReady | jq -M .

Testing the pipeline

To send a request to the pipeline, use the following command:

seldon pipeline infer counter-pipeline \
  '{"inputs":[{"name":"counter-pipeline.inputs","shape":[1],"datatype":"INT32","data":[0]}]}' | jq -M .
{
  "model_name": "",
  "outputs": [
    {
      "data": [
        10
      ],
      "name": "output",
      "shape": [
        1,
        1
      ],
      "datatype": "INT32",
      "parameters": {
        "content_type": "np"
      }
    },
    {
      "data": [
        ""
      ],
      "name": "stop",
      "shape": [
        1,
        1
      ],
      "datatype": "BYTES",
      "parameters": {
        "content_type": "str"
      }
    }
  ]
}

This request initiates the pipeline with an input value of 0. The pipeline increments this value step by step until it reaches 10, at which point it stops. The response includes the final counter value, 10, along with a message indicating that the pipeline has terminated.

Cleanup

To clean up the models and the pipeline, use the following commands:

seldon pipeline unload -f counter-pipeline.yaml
seldon model unload -f ./models/counter.yaml
{}
seldon model unload -f ./models/identity-loop.yaml
{}
seldon model unload -f ./models/identity-output.yaml
{}

Last updated

Was this helpful?