Cyclic Pipeline
Learn how to deploy a cyclic pipeline using Core 2. In this example, you'll build a simple counter that begins at a user-defined starting value and increments by one until it reaches 10. If the starting value is already greater than 10, the pipeline terminates immediately without running.
Before you begin
Ensure that you have installed Seldon Core 2 in the namespace
seldon-mesh.Ensure that you are performing these steps in the directory where you have downloaded the samples.
Get the IP address of the Seldon Core 2 instance running with Istio:
ISTIO_INGRESS=$(kubectl get svc seldon-mesh -n seldon-mesh -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "Seldon Core 2: http://$ISTIO_INGRESS"Models
Start by implementing the first model: a simple counter.
from mlserver.model import MLModel
from mlserver.codecs import NumpyCodec, StringCodec
from mlserver.types import InferenceRequest, InferenceResponse
from mlserver.logging import logger
class Counter(MLModel):
async def load(self) -> bool:
self.ready = True
return self.ready
async def predict(self, payload: InferenceRequest) -> InferenceResponse:
x = NumpyCodec.decode_input(payload.inputs[0]) + 1
message = "continue" if x.item() < 10 else "stop"
return InferenceResponse(
model_name=self.name,
model_version=self.version,
outputs=[
NumpyCodec.encode_output(
name="output",
payload=x
),
StringCodec.encode_output(
name=message,
payload=[""]
),
]
)This model produces two output tensors. The first contains the incremented number, while the second is an empty tensor labeled either continue or stop. This second tensor acts as a trigger, directing the data flow through either the feedback loop or the output path. For more information on triggering tensors, see the intro to pipelines page.
Next, define the second model — an identity model:
The identity model simply passes the input tensors through to the output while introducing a delay. This delay is crucial for preventing infinite loops in the pipeline, which can occur due to the join interval behavior in Kafka Streams. For further details, see Kafka documentation.
Pipeline
This counter application pipeline consists of three models: the counter model, an identity model for the feedback loop, and another identity model for the output. The structure of the pipeline is illustrated as follows:
Models deployment
To deploy the pipeline, you need to load each model into the cluster. The model-settings.json configuration for the counter model is as follows:
For the identity feedback loop model, reuse the model-settings.json file and configure it to include a 1-millisecond delay:
The one-millisecond delay is crucial to prevent infinite loops in the pipeline. It aligns with the join window applied to all input types for the counter model, as well as the join window configured for the identity model, which is specified in the pipeline definition.
Similarly, for the identity output model, reuse the same model-settings.json file without introducing any delay.
The manifest files for the three models are the following:
To deploy the models, use the following command:
Pipeline deployment
After the models are deployed, proceed to deploy the pipeline. The pipeline manifest file is defined as follows:
Note: that the joinWindowMs parameter is set to 1 millisecond for both the identity loop and identity output models. This setting is essential to prevent messages from different iterations from being joined (e.g., a message from iteration t being joined with messages from iterations t-1, t-2, ..., 1). Additionally, we limit the number of step revisits to 100 — the maximum number of times the pipeline can revisit a step during execution. While our pipeline behaves deterministically and is guaranteed to terminate, this parameter is especially useful in cyclic pipelines where a terminal state might not be reached (e.g., agentic workflows where control flow is determined by an LLM). It helps safeguard against infinite loops.
To deploy the pipeline, use the following command:
Testing the pipeline
To send a request to the pipeline, use the following command:
This request initiates the pipeline with an input value of 0. The pipeline increments this value step by step until it reaches 10, at which point it stops. The response includes the final counter value, 10, along with a message indicating that the pipeline has terminated.
Cleanup
To clean up the models and the pipeline, use the following commands:
Last updated
Was this helpful?

