Tool Use

In the OpenAI Function Calling tutorial, we showed how we can combine the OpenAI Runtime with the Memory Runtime to perform function calling without worrying about managing the conversation history. Although this lifts the burden from the user to manage the history, it still has the caveat that the actual call to the tool has to be done locally by the user, and then the response has to be provided to the LLM to return its final answer.

In this tutorial, we will show how to automate the call to the actual function by wrapping that in a custom MLServer model deployment. We will also demonstrate how to add support for multiple functions and route the data flow path through your pipeline, depending on the LLM's choice of which function to call.

In this example, we will define two functions, get_temperature and get_wind_speed, which return the temperature and the wind speed for a geographical location defined by the latitude and the longitude, respectively.

The pipeline looks like this:

We see three routes that the data flow can take:

  • temperature route when the model is asked about the current temperature in a specific location - triggered when the get_temperature tensor is present.

  • wind speed route when the model is asked about the current wind speed in a specific location - triggered when the get_wind_speed tensor is present.

  • default route, which returns immediately the response of the model if none of the above routes are taken.

The triggering mechanism is a feature in Seldon Core 2 (SC2). For further details on the triggering mechanism in SC2, we invite the reader to check our docs here.

One may wonder how those routing tensors are extracted. To extract the routing tensors, the OpenAI Runtime looks at the response content, and the tool calls tensors returned by the LLM and tries to extract the values for some particular keys the user provides in the model-settings.json file. For example, the tool_call tensor may look something like:

"tool_calls": [
    {
        "id": "call_62136354",
        "type": "function",
        "function": {
            "arguments": "{'order_id': 'order_12345'}",
            "name": "get_temperature",
        },
    }
]

In this case, it will be enough to specify to extract the value under the "name" key, which will be "get_temperature". After extracting the value, the OpenAI runtime appends to the inference response outputs list an empty tensor with the name get_temperature. The presence of the get_temperature tensor will then trigger only the route responsible for fetching the temperature for the queried location.

Note that the LLM can also be asked some questions completely unrelated to the temperature of the wind speed at a location. In this case, we should not call any function, and we should return the answer immediately by routing the data through the default route. In this case, the OpenAI runtime will return an empty tensor called default, which is used to trigger the default route.

Custom MLServer models

Now that we have an idea of what the pipeline should look like, let's begin by implementing the custom MLServer model which will return the temperature and the wind speed.

We begin by defining the actual calls to the meto API:

import requests
from typing import Any, Dict


def get_data(latitude: float, longitude: float) -> Dict[str, Any]:
    response = requests.get(f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&current=temperature_2m,wind_speed_10m&hourly=temperature_2m,relative_humidity_2m,wind_speed_10m")
    return response.json()


def get_temperature(latitude, longitude):
    data = get_data(latitude, longitude)
    return data['current']['temperature_2m']


def get_wind_speed(latitude, longitude):
    data = get_data(latitude, longitude)
    return data['current']['wind_speed_10m']

In the code above, we defined the get_data function which makes the general call to the API, and then get_temperature and get_wind_speed which extract the approriate fileds from the response.

The custom MLServer model that will wrap the actual function of interest will only receive the tool_calls tensor from the OpenAI Model. This suffices, because all the arguments that should be passed to the function are stored in the tool_calls tensor. Thus, we will need define some functions which will parse the content of the tool_calls tensor.

import json
from typing import Any, Dict, Union, Tuple, List, Optional

from mlserver.codecs import StringCodec
from mlserver.types import InferenceRequest, RequestInput, ResponseOutput


def extract_tensor_by_name(
    tensors: Union[List[RequestInput], List[ResponseOutput]], name: str
) -> Optional[Union[RequestInput, ResponseOutput]]:
    for tensor in tensors:
        if tensor.name == name:
            return tensor

    return None


def get_history(payload: InferenceRequest) -> List[Dict[str, Any]]:
    tensor = extract_tensor_by_name(payload.inputs, "history")
    tensor = StringCodec.decode_input(tensor)
    return [json.loads(tc) for tc in tensor]

def get_args(history: List[Dict[str, Any]], label: str) -> Tuple[str, Dict[str, Any]]:
    tool_calls = history[-1]['tool_calls']
    tool_calls = [json.loads(tc) for tc in tool_calls]
    args = None

    for tool_call in tool_calls:
        if tool_call['function']['name'] == label:
            tool_call_id = tool_call['id']
            args = json.loads(tool_call['function']['arguments'])
            break

    return tool_call_id, args


def get_memory_id(payload: InferenceRequest) -> str:
    memory_id = extract_tensor_by_name(payload.inputs, "memory_id")
    return StringCodec.decode_input(memory_id)[0]


def get_tools(payload: InferenceRequest) -> List[str]:
    tools = StringCodec.decode_input(extract_tensor_by_name(payload.inputs, "tools"))
    return [json.loads(tc) for tc in tools]


def get_args_temperature(history: List[Dict[str, Any]]) -> Tuple[str, Dict[str, Any]]:
    return get_args(history, "get_temperature")


def get_args_wind_speed(history: List[Dict[str, Any]]) -> Tuple[str, Dict[str, Any]]:
    return get_args(history, "get_wind_speed")

We will also define a helper function, which constructs the output response in the format expected by the Memory Runtime component:

from mlserver.types import InferenceResponse


def construct_response(
    name: str, 
    version: str, 
    response: Union[str, Dict[str, Any]], 
    tool_call_id: str, 
    memory_id: str,
    tools: List[Dict[str, Any]]
) -> InferenceResponse:
    return InferenceResponse(
            model_name=name,
            model_version=version,
            outputs=[
                StringCodec.encode_output(
                    name="role",
                    payload=["tool"]
                ),
                StringCodec.encode_output(
                    name="content",
                    payload=[json.dumps(response) if not isinstance(response, str) else response]
                ),
                StringCodec.encode_output(
                    name="type",
                    payload=["text"]
                ),
                StringCodec.encode_output(
                    name="tool_call_id",
                    payload=[tool_call_id]
                ),
                StringCodec.encode_output(
                    name="memory_id",
                    payload=[memory_id]
                ),
                StringCodec.encode_output(
                    name="tools",
                    payload=[json.dumps(tool) for tool in tools]
                ),
            ]
        )

In essence, the function above only returns the response of the function we called. Some additionals tensors (e.g., tools) are forwarded which are not essential for this tutorial, but will be used in the following ones.

With all the helper functions in place, we can define the two custom MLServer models as follows:

from mlserver.model import MLModel
from mlserver.types import InferenceRequest, InferenceResponse


class GetTemperature(MLModel):
    async def load(self) -> bool:
        self.ready = True
        return self.ready

    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        memory_id = get_memory_id(payload)
        tools = get_tools(payload)

        history = get_history(payload)
        tool_call_id, args = get_args_temperature(history)
        temperature = get_temperature(args['latitude'], args['longitude'])

        return construct_response(
            name=self.name,
            version=self.version,
            response={"temperature": temperature},
            tool_call_id=tool_call_id,
            memory_id=memory_id,
            tools=tools
        )
        

class GetWindSpeed(MLModel):
    async def load(self) -> bool:
        self.ready = True
        return self.ready

    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        memory_id = get_memory_id(payload)
        tools = get_tools(payload)

        history = get_history(payload)
        tool_call_id, args = get_args_wind_speed(history)
        wind_speed = get_wind_speed(args['latitude'], args['longitude'])

        return construct_response(
            name=self.name,
            version=self.version,
            response={"wind_speed": wind_speed},
            tool_call_id=tool_call_id,
            memory_id=memory_id,
            tools=tools
        )

We can now load the two models in SC2. The model-settings.json for the temperature model looks as follows:

!cat models/get-temperature/model-settings.json
{
    "name": "get-weather",
    "implementation": "model.GetTemperature",
    "parameters": {
        "version": "v0.1.0"
    }
}

Similarly, for the wind speed model, we have the following model-settings.json file:

!cat models/get-wind-speed/model-settings.json
{
    "name": "get-weather",
    "implementation": "model.GetWindSpeed",
    "parameters": {
        "version": "v0.1.0"
    }
}

For the two models, we will define a single manifest file which looks like this:

!cat manifest/function-models.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: get-temperature
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/get-temperature"
  requirements:
  - mlserver
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: get-wind-speed
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/get-wind-speed"
  requirements:
  - mlserver
---

We can load the models by running the following command:

!kubectl apply -f manifest/function-models.yaml -n seldon
!kubectl wait --for condition=ready --timeout=300s models --all -n seldon
model.mlops.seldon.io/get-temperature created
model.mlops.seldon.io/get-wind-speed created
model.mlops.seldon.io/get-temperature condition met
model.mlops.seldon.io/get-wind-speed condition met

At this point, we have our function models deployed. There is one more custom MLServer model that we need to deploy before starting to create the pipelines - that is the tail model. All that the tail model is supposed to be doing is to get an inference request containing the history of the conversation, parse it, and return the last entrence. The tail model looks like this:

import json

from mlserver import MLModel
from mlserver.codecs import StringCodec
from mlserver.types import InferenceRequest, InferenceResponse


class TailModel(MLModel):
    async def load(self) -> bool:
        self.ready = True
        return self.ready

    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        response = json.loads(
            StringCodec.decode_input(
                extract_tensor_by_name(payload.inputs, "history")
            )[-1]
        )
        return InferenceResponse(
            model_name=self.name,
            model_version=self.version,
            outputs=[
                StringCodec.encode_output(
                    name="role",
                    payload=[response["role"]]
                ),
                StringCodec.encode_output(
                    name="content",
                    payload=[response["content"][0]["value"]]
                ),
                StringCodec.encode_output(
                    name="type",
                    payload=[response["content"][0]["type"]]
                ),
            ]
        )

The model-settings.json file for the tail model is the following:

!cat models/tail/model-settings.json
{
    "name": "tail",
    "implementation": "model.TailModel",
    "parameters": {
        "version": "v0.1.0"
    }
}

The associated manifest file is:

!cat manifest/tail.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: default-tail
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/tail"
  requirements:
  - mlserver
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: response-tail
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/tail"
  requirements:
  - mlserver
---

We can now deploy the identity models by running the following commands:

!kubectl apply -f manifest/tail.yaml -n seldon
!kubectl wait --for condition=ready --timeout=300s models --all -n seldon
model.mlops.seldon.io/default-tail created
model.mlops.seldon.io/response-tail created
model.mlops.seldon.io/default-tail condition met
model.mlops.seldon.io/get-temperature condition met
model.mlops.seldon.io/get-wind-speed condition met
model.mlops.seldon.io/response-tail condition met

LLM models

At this point, we are done with the custom MLServer models. We can now move on to deploying the ChatGPT/LLM and the memory/history models.

We begin by deploying the LLM models. The model settings for the LLM model look like this:

!cat models/llm/model-settings.json
{
    "name": "openai_chat_completions",
    "implementation": "mlserver_llm_api.LLMRuntime",
    "parameters": {
        "extra": {
            "provider_id": "openai",
            "config": {
                "model_id": "gpt-4o",
                "model_type": "chat.completions"
            },
            "prompt_utils": {
                "extract_tensors": {
                    "keys": ["name"]
                }
            }
        }
    }
}

What is important to note in the model settings above is the "extract_tensors" field which has the "key" field inside pointing to a list containing "name". This field informs the OpenAI runtime to look for the key "name" in the output of the LLM, and use the corresponding value as a name for an empty tensor, which will be added to the inference response output list. This tensor will serve as a conditional tensor, which will trigger different routes in the pipeline.

As can be seen in the pipeline, we will have to deploy two instances of the same LLM. The manifest file looks as follows:

!cat manifest/llm.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: default-llm
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/llm"
  requirements:
  - openai
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: response-llm
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/llm"
  requirements:
  - openai
---

We can deploy the models above by running the following command:

!kubectl apply -f manifest/llm.yaml -n seldon
!kubectl wait --for condition=ready --timeout=300s models --all -n seldon
model.mlops.seldon.io/default-llm created
model.mlops.seldon.io/response-llm created
model.mlops.seldon.io/default-llm condition met
model.mlops.seldon.io/default-tail condition met
model.mlops.seldon.io/get-temperature condition met
model.mlops.seldon.io/get-wind-speed condition met
model.mlops.seldon.io/response-llm condition met
model.mlops.seldon.io/response-tail condition met

Memory components

Finally, we can deploy the memory components. The model-settings.json file for the memory component looks like this:

!cat models/memory/model-settings.json
{
    "name": "memory",
    "implementation": "mlserver_memory.ConversationalMemory",
    "parameters": {
        "extra": {
            "database": "filesys",
            "config": {
                "window_size": 100,
                "tensor_names": ["content", "role", "type", "tool_call_id", "tool_calls"]
            }
        }
    }
}

In the above configuration, we are specifying that we are interested in keeping a window size of 100 latest messages and that we are interested in storing some fields specific to the OpenAI function calling API.

As before, we will have to deploy multiple instances of the memory component, 6 in this case. The manifest file looks like this:

!cat manifest/memory.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: history-1
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/memory"
  requirements:
  - memory
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: history-2
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/memory"
  requirements:
  - memory
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: history-3
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/memory"
  requirements:
  - memory
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: history-4
spec:
  storageUri: "gs://seldon-models/llm-runtimes-settings/agentic-workflows/tool-use/memory"
  requirements:
  - memory
---

To deploy the memory components, run the following commands:

!kubectl apply -f manifest/memory.yaml -n seldon
!kubectl wait --for condition=ready --timeout=300s models --all -n seldon
model.mlops.seldon.io/history-1 created
model.mlops.seldon.io/history-2 created
model.mlops.seldon.io/history-3 created
model.mlops.seldon.io/history-4 created
model.mlops.seldon.io/default-llm condition met
model.mlops.seldon.io/default-tail condition met
model.mlops.seldon.io/get-temperature condition met
model.mlops.seldon.io/get-wind-speed condition met
model.mlops.seldon.io/history-1 condition met
model.mlops.seldon.io/history-2 condition met
model.mlops.seldon.io/history-3 condition met
model.mlops.seldon.io/history-4 condition met
model.mlops.seldon.io/response-llm condition met
model.mlops.seldon.io/response-tail condition met

Pipelines

We are now ready to define the pipeline in the schema above. Note that we separated components with comments to make it easier to visualize them.

!cat manifest/pipeline.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: routing-pipeline
spec:
  steps:
  # This is the interaction with the default llm and where the routing is decided
  - name: history-1
    inputs:
    - routing-pipeline.inputs.memory_id
    - routing-pipeline.inputs.role
    - routing-pipeline.inputs.content
    - routing-pipeline.inputs.type
    - routing-pipeline.inputs.tool_calls
    - routing-pipeline.inputs.tools
  - name: default-llm
    inputs:
    - history-1.outputs
  - name: history-2
    inputs:
    - default-llm.outputs.memory_id
    - default-llm.outputs.role
    - default-llm.outputs.content
    - default-llm.outputs.type
    - default-llm.outputs.tool_calls
    - default-llm.outputs.tools
  # This step is added to the pipeline to demonstrate the use of the `identity` tool
  - name: default-tail
    inputs:
    - history-2.outputs
    triggers:
    - default-llm.outputs.default
  # This step is added to the pipeline to demonstrate the use of the `get_temperature` tool
  - name: get-temperature
    inputs:
    - history-2.outputs
    triggers:
    - default-llm.outputs.get_temperature
  # This step is added to the pipeline to demonstrate the use of the `get_wind_speed` tool
  - name: get-wind-speed
    inputs:
    - history-2.outputs
    triggers:
    - default-llm.outputs.get_wind_speed
  # This step is to formulate the response
  - name: history-3
    inputsJoinType: any
    inputs:
    - get-temperature.outputs.memory_id
    - get-temperature.outputs.role
    - get-temperature.outputs.content
    - get-temperature.outputs.type
    - get-temperature.outputs.tool_call_id
    
    - get-wind-speed.outputs.memory_id
    - get-wind-speed.outputs.role
    - get-wind-speed.outputs.content
    - get-wind-speed.outputs.type
    - get-wind-speed.outputs.tool_call_id
  - name: response-llm
    inputs:
    - history-3.outputs
  - name: history-4
    inputs:
    - response-llm.outputs.memory_id
    - response-llm.outputs.role
    - response-llm.outputs.content
    - response-llm.outputs.type
  - name: response-tail
    inputs:
    - history-4.outputs
  output:
    steps:
    - response-tail.outputs
    - default-tail.outputs
    stepsJoin: any
  

To deploy the pipeline, run the following commands:

!kubectl apply -f manifest/pipeline.yaml -n seldon
!kubectl wait --for condition=ready --timeout=300s pipelines --all -n seldon
pipeline.mlops.seldon.io/routing-pipeline created
pipeline.mlops.seldon.io/routing-pipeline condition met

Before sending the requests to the pipeline, we define a helper function to extract the IP address of the seldon-mesh service. This will help us define the endpoint to which we want to send the requests.

import requests
import subprocess

def get_mesh_ip():
    cmd = f"kubectl get svc seldon-mesh -n seldon -o jsonpath='{{.status.loadBalancer.ingress[0].ip}}'"
    return subprocess.check_output(cmd, shell=True).decode('utf-8')

To inform the LLM about the functions we want to use (i.e., get_temperature and get_wind_speed), we define the tools object, which contains all the metadata needed by the LLM (e.g., name, description, arguments, etc.) to be able to provide the appropriate arguments to those functions. See OpenAI tutorial on function calling for more details.

tools = [
    json.dumps({
        "type": "function",
        "function": {
            "name": "get_temperature",
            "description": "Get current temperature for provided coordinates.",
            "parameters": {
                "type": "object",
                "properties": {
                    "latitude": {"type": "number"},
                    "longitude": {"type": "number"}
                },
                "required": ["latitude", "longitude"],
                "additionalProperties": False
            },
            "strict": True
        }
    }),
    json.dumps({
        "type": "function",
        "function": {
            "name": "get_wind_speed",
            "description": "Get current wind speed for provided coordinates.",
            "parameters": {
                "type": "object",
                "properties": {
                    "latitude": {"type": "number"},
                    "longitude": {"type": "number"}
                },
                "required": ["latitude", "longitude"],
                "additionalProperties": False
            },
            "strict": True
        }
    }),
]

We are now ready to start interacting with the pipeline. We begin by defining the memory UUID which will uniquely identify a chat history:

from uuid import uuid4
memory_id = str(uuid4())

We also define a helper function to send requests to the pipeline to avoid repetitive code:

def send_request(content: str):
    inference_request = {
        "inputs": [
            {
                "name": "memory_id",
                "shape": [1],
                "datatype": "BYTES",
                "data": [memory_id],
                "parameters": {"content_type": "str"},
            },
            {
                "name": "role",
                "shape": [1],
                "datatype": "BYTES",
                "data": ["user"]
            },
            {
                "name": "content",
                "shape": [1],
                "datatype": "BYTES",
                "data": [content]
            },
            {
                "name": "type",
                "shape": [1],
                "datatype": "BYTES",
                "data": ["text"]
            },
            {
                "name": "tools",
                "shape": [2],
                "datatype": "BYTES",
                "data": tools
            },
        ]
    }

    endpoint = f"http://{get_mesh_ip()}/v2/pipelines/routing-pipeline/infer"
    return requests.post(endpoint, json=inference_request)

We begin by asking a general question to the LLM. In this case, no function should be called, and the answer should be provided through the default branch (i.e., general knowledge of the LLM):

response = send_request("What is the capital of Romania?")
print(response.json()['outputs'][1]['data'][0])
The capital of Romania is Bucharest.

We see that the LLM provided the right answer. Let us now ask about the current temperature in Bucharest. In this case, the model should call the temperature model, and the data should flow through the temperature branch.

response = send_request("Can you tell me what's the temperature in Bucharest?")
print(response.json()['outputs'][1]['data'][0])
The current temperature in Bucharest is approximately 16.5°C.

We can also ask about the wind speed which will go through the wind speed branch.

response = send_request("What about the wind speed?")
print(response.json()['outputs'][1]['data'][0])
The current wind speed in Bucharest is approximately 8.2 km/h.

Note that we didn't have to specify that we are talking about Bucharest. The model is able to understand that from the context provided by the memory component.

We can query the model about a different location. In this case London.

response = send_request("Can you tell me what it the temperature in London?")
print(response.json()['outputs'][1]['data'][0])

response = send_request("What about the wind speed?")
print(response.json()['outputs'][1]['data'][0])
The current temperature in London is approximately 22.5°C.
The current wind speed in London is approximately 9.0 km/h.

Once again, the data flows through the temperature and wind speed branches, respectively, to retrieve the results.

In this example, the tools fetch real-time data from the Open-Meteo API, so the output will vary based on the current weather at the specified location.

We can continue the conversation with other questions completely unrelated to the previous ones:

response = send_request("What is 2 + 2 * 2?")
print(response.json()['outputs'][1]['data'][0])
2 + 2 * 2 equals 6. According to the order of operations, you first perform the multiplication and then the addition.

Or we can ask about what has been discussed until now:

response = send_request("Which were the cities I asked you about?")
print(response.json()['outputs'][1]['data'][0])
You asked about Bucharest and London.

To unload the pipeline and the models, run the following commands:

!kubectl delete pipelines --all -n seldon
!kubectl delete models --all -n seldon
pipeline.mlops.seldon.io "routing-pipeline" deleted
model.mlops.seldon.io "default-llm" deleted
model.mlops.seldon.io "default-tail" deleted
model.mlops.seldon.io "get-temperature" deleted
model.mlops.seldon.io "get-wind-speed" deleted
model.mlops.seldon.io "history-1" deleted
model.mlops.seldon.io "history-2" deleted
model.mlops.seldon.io "history-3" deleted
model.mlops.seldon.io "history-4" deleted
model.mlops.seldon.io "response-llm" deleted
model.mlops.seldon.io "response-tail" deleted

Congrats! You've just deployed an agentic pipeline with the LLM module!

Last updated

Was this helpful?