Pipeline Config

Learn how to create and manage ML pipelines in Seldon Core using Kubernetes custom resources, including model chaining and tensor mapping.

Pipelines allow one to connect flows of inference data transformed by Model components. A directed acyclic graph (DAG) of steps can be defined to join Models together. Each Model will need to be capable of receiving a V2 inference request and respond with a V2 inference response. An example Pipeline is shown below:

Unexpected error with integration github-files: Integration is not installed on this space

The steps list shows three models: tfsimple1, tfsimple2 and tfsimple3. These three models each take two tensors called INPUT0 and INPUT1 of integers. The models produce two outputs OUTPUT0 (the sum of the inputs) and OUTPUT1 (subtraction of the second input from the first).

tfsimple1 and tfsimple2 take as inputs the input to the Pipeline: the default assumption when no explicit inputs are defined. tfsimple3 takes one V2 tensor input from each of the outputs of tfsimple1 and tfsimple2. As the outputs of tfsimple1 and tfsimple2 have tensors named OUTPUT0 and OUTPUT1 their names need to be changed to respect the expected input tensors and this is done with a tensorMap component providing this tensor renaming. This is only required if your models can not be directly chained together.

The output of the Pipeline is the output from the tfsimple3 model.

Support for Cyclic Pipelines

Seldon Core 2 supports cyclic pipelines, enabling the creation of feedback loops within the inference graph. However, the cyclic pipelines should be used carefully, as incorrect configurations can lead to unintended behavior.

The risk of joining messages from different iterations (i.e., a message from iteration t might be joined with messages from t-1, t-2, ..., 1). If a feedback message re-enters the pipeline within the join window and reaches a step already holding messages from a previous iteration, Kafka Streams may join messages across iterations. This can trigger unintended message propagation. For more details on how Kafka Streams handles joins and the implications for feedback loops, refer to this Confluent blog post.

Seldon Core 2 provides a maxStepRevisits parameter in the pipeline manifest. This parameter limits the number of times a step can be revisited within a single pipeline execution. If the limit is reached, the pipeline execution will terminate, returning an error. This feature is useful in cyclic pipelines, where infinite loops might occur (e.g., in agentic workflows where control flow is determined by an LLM). It helps safeguard against unintended infinite loops. By default, the maxStepRevisits is set to 0 (i.e., no cycles), but you can adjust it according to your use case.

To enable a cyclic pipeline, set the allowCycles flag in your pipeline manifest:

apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: pipeline
spec:
  allowCycles: true
  maxStepRevisits: 100
  ...

Detailed Specification

The full GoLang specification for a Pipeline is shown below:

type PipelineSpec struct {
	// External inputs to this pipeline, optional
	Input *PipelineInput `json:"input,omitempty"`

	// The steps of this inference graph pipeline
	Steps []PipelineStep `json:"steps"`

	// Synchronous output from this pipeline, optional
	Output *PipelineOutput `json:"output,omitempty"`

	// Dataflow specs
	Dataflow *DataflowSpec `json:"dataflow,omitempty"`

	// Allow cyclic pipeline
	AllowCycles bool `json:"allowCycles,omitempty"`

	// Maximum number of times a step can be revisited
	MaxStepRevisits uint32 `json:"maxStepRevisits,omitempty"` 
}

type DataflowSpec struct {
	// Flag to indicate whether the kafka input/output topics
	// should be cleaned up when the model is deleted
	// Default false
	CleanTopicsOnDelete bool `json:"cleanTopicsOnDelete,omitempty"`
}

// +kubebuilder:validation:Enum=inner;outer;any
type JoinType string

const (
	// data must be available from all inputs
	JoinTypeInner JoinType = "inner"
	// data will include any data from any inputs at end of window
	JoinTypeOuter JoinType = "outer"
	// first data input that arrives will be forwarded
	JoinTypeAny JoinType = "any"
)

type PipelineStep struct {
	// Name of the step
	Name string `json:"name"`

	// Previous step to receive data from
	Inputs []string `json:"inputs,omitempty"`

	// msecs to wait for messages from multiple inputs to arrive before joining the inputs
	JoinWindowMs *uint32 `json:"joinWindowMs,omitempty"`

	// Map of tensor name conversions to use e.g. output1 -> input1
	TensorMap map[string]string `json:"tensorMap,omitempty"`

	// Triggers required to activate step
	Triggers []string `json:"triggers,omitempty"`

	// +kubebuilder:default=inner
	InputsJoinType *JoinType `json:"inputsJoinType,omitempty"`

	TriggersJoinType *JoinType `json:"triggersJoinType,omitempty"`

	// Batch size of request required before data will be sent to this step
	Batch *PipelineBatch `json:"batch,omitempty"`
}

type PipelineBatch struct {
	Size     *uint32 `json:"size,omitempty"`
	WindowMs *uint32 `json:"windowMs,omitempty"`
	Rolling  bool    `json:"rolling,omitempty"`
}

type PipelineInput struct {
	// Previous external pipeline steps to receive data from
	ExternalInputs []string `json:"externalInputs,omitempty"`

	// Triggers required to activate inputs
	ExternalTriggers []string `json:"externalTriggers,omitempty"`

	// msecs to wait for messages from multiple inputs to arrive before joining the inputs
	JoinWindowMs *uint32 `json:"joinWindowMs,omitempty"`

	// +kubebuilder:default=inner
	JoinType *JoinType `json:"joinType,omitempty"`

	// +kubebuilder:default=inner
	TriggersJoinType *JoinType `json:"triggersJoinType,omitempty"`

	// Map of tensor name conversions to use e.g. output1 -> input1
	TensorMap map[string]string `json:"tensorMap,omitempty"`
}

type PipelineOutput struct {
	// Previous step to receive data from
	Steps []string `json:"steps,omitempty"`

	// msecs to wait for messages from multiple inputs to arrive before joining the inputs
	JoinWindowMs uint32 `json:"joinWindowMs,omitempty"`

	// +kubebuilder:default=inner
	StepsJoin *JoinType `json:"stepsJoin,omitempty"`

	// Map of tensor name conversions to use e.g. output1 -> input1
	TensorMap map[string]string `json:"tensorMap,omitempty"`
}

Last updated

Was this helpful?