Conditional pipeline with pandas query model
The model is defined as an MLServer custom runtime and allows the user to pass in a custom pandas query as a parameter defined at model creation to be used to filter the data passed to the model.
from mlserver import MLModel
from mlserver.types import InferenceRequest, InferenceResponse
from mlserver.codecs import PandasCodec
from mlserver.errors import MLServerError
import pandas as pd
from fastapi import status
from mlserver.logging import logger
QUERY_KEY = "query"
class ModelParametersMissing(MLServerError):
def __init__(self, model_name: str, reason: str):
super().__init__(
f"Parameters missing for model {model_name} {reason}", status.HTTP_400_BAD_REQUEST
)
class PandasQueryRuntime(MLModel):
async def load(self) -> bool:
logger.info("Loading with settings %s", self.settings)
if self.settings.parameters is None or \
self.settings.parameters.extra is None:
raise ModelParametersMissing(self.name, "no settings.parameters.extra found")
self.query = self.settings.parameters.extra[QUERY_KEY]
if self.query is None:
raise ModelParametersMissing(self.name, "no settings.parameters.extra.query found")
self.ready = True
return self.ready
async def predict(self, payload: InferenceRequest) -> InferenceResponse:
input_df: pd.DataFrame = PandasCodec.decode_request(payload)
# run query on input_df and save in output_df
output_df = input_df.query(self.query)
if output_df.empty:
output_df = pd.DataFrame({'status':["no rows satisfied " + self.query]})
else:
output_df["status"] = "row satisfied " + self.query
return PandasCodec.encode_response(self.name, output_df, self.version)
Conditional Pipeline using PandasQuery
cat ../../models/choice1.yaml
echo "---"
cat ../../models/choice2.yaml
echo "---"
cat ../../models/add10.yaml
echo "---"
cat ../../models/mul10.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: choice-is-one
spec:
storageUri: "gs://seldon-models/scv2/examples/pandasquery"
requirements:
- mlserver
- python
parameters:
- name: query
value: "choice == 1"
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: choice-is-two
spec:
storageUri: "gs://seldon-models/scv2/examples/pandasquery"
requirements:
- mlserver
- python
parameters:
- name: query
value: "choice == 2"
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: add10
spec:
storageUri: "gs://seldon-models/scv2/samples/triton_23-03/add10"
requirements:
- triton
- python
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: mul10
spec:
storageUri: "gs://seldon-models/scv2/samples/triton_23-03/mul10"
requirements:
- triton
- python
kubectl apply -f ../../models/choice1.yaml -n ${NAMESPACE}
kubectl apply -f ../../models/choice2.yaml -n ${NAMESPACE}
kubectl apply -f ../../models/add10.yaml -n ${NAMESPACE}
kubectl apply -f ../../models/mul10.yaml -n ${NAMESPACE}
model.mlops.seldon.io/choice1 created
model.mlops.seldon.io/choice2 created
model.mlops.seldon.io/add10 created
model.mlops.seldon.io/mul10 created
kubectl wait --for condition=ready --timeout=300s model choice-is-one -n ${NAMESPACE}
kubectl wait --for condition=ready --timeout=300s model choice-is-two -n ${NAMESPACE}
kubectl wait --for condition=ready --timeout=300s model add10 -n ${NAMESPACE}
kubectl wait --for condition=ready --timeout=300s model mul10 -n ${NAMESPACE}
model.mlops.seldon.io/choice-is-one condition met
model.mlops.seldon.io/choice-is-two condition met
model.mlops.seldon.io/add10 condition met
model.mlops.seldon.io/mul10 condition met
cat ../../pipelines/choice.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
name: choice
spec:
steps:
- name: choice-is-one
- name: mul10
inputs:
- choice.inputs.INPUT
triggers:
- choice-is-one.outputs.choice
- name: choice-is-two
- name: add10
inputs:
- choice.inputs.INPUT
triggers:
- choice-is-two.outputs.choice
output:
steps:
- mul10
- add10
stepsJoin: any
kubectl apply -f pipelines/choice.yaml
pipeline.mlops.seldon.io/choice created
kubectl wait --for condition=ready --timeout=300s pipelines choice -n ${NAMESPACE}
pipeline.mlops.seldon.io/choice condition met
seldon pipeline infer choice --inference-mode grpc \
'{"model_name":"choice","inputs":[{"name":"choice","contents":{"int_contents":[1]},"datatype":"INT32","shape":[1]},{"name":"INPUT","contents":{"fp32_contents":[5,6,7,8]},"datatype":"FP32","shape":[4]}]}' | jq -M .
{
"outputs": [
{
"name": "OUTPUT",
"datatype": "FP32",
"shape": [
"4"
],
"contents": {
"fp32Contents": [
50,
60,
70,
80
]
}
}
]
}
seldon pipeline infer choice --inference-mode grpc \
'{"model_name":"choice","inputs":[{"name":"choice","contents":{"int_contents":[2]},"datatype":"INT32","shape":[1]},{"name":"INPUT","contents":{"fp32_contents":[5,6,7,8]},"datatype":"FP32","shape":[4]}]}' | jq -M .
{
"outputs": [
{
"name": "OUTPUT",
"datatype": "FP32",
"shape": [
"4"
],
"contents": {
"fp32Contents": [
15,
16,
17,
18
]
}
}
]
}
kubectl delete -f ../../models/choice1.yaml -n ${NAMESPACE}
kubectl delete -f ../../models/choice2.yaml -n ${NAMESPACE}
kubectl delete -f ../../models/add10.yaml -n ${NAMESPACE}
kubectl delete -f ../../models/mul10.yaml -n ${NAMESPACE}
kubectl delete -f ../../pipelines/choice.yaml -n ${NAMESPACE}
PreviousProduction income classifier with drift, outlier and explanationsNextKubernetes Server with PVC
Last updated
Was this helpful?