Consuming Kafka Events

The Model Performance Metrics module makes use of Kafka events in order to capture inference responses produced by ML models. In a Seldon Core 2 setup, each Model has a corresponding Kafka topic where inference responses are published. The published messages contain model and inference metadata, alongside with the contents of the message itself being the inference response.

Message Consumption Workflow

The Kafka Consumer is configured using the corresponding values in the Helm chart's values.yaml file. For more information on the configuration options, see the Kafka Helm chart values section in the Installation page.

How the consumer registers Kafka topics

Upon startup, the Model Performance Metrics module creates a Kafka consumer that registers to and starts consuming Kafka topics, matching a certain pattern.

  • The Kafka topic pattern used for matching is <KAFKA_CONSUMER_TOPIC_PREFIX>.<namespace>.<model_name>.outputs., where the namespace is the Kubernetes namespace where the model is deployed, and the model_name is the name of the model that the consumer is interested in.

  • The consumer here makes use of the KAFKA_CONSUMER_TOPIC_PREFIX environment variable, or if not set, the default value of seldon.

  • The topic prefix value allows for customisation of the topic names, in case the default value is not suitable for your setup. This topic prefix value must match the value used by the Seldon Core 2 components that publish inference responses.

Note

The consumer will consume messages from all partitions for topics matching the pattern, regardless of whether the topic's model has been subscribed with the Model Performance Metrics module via a Subscription.

Polling Mechanism

The Kafka consumer polls the Kafka broker for new messages at regular intervals:

  • The polling interval is configured using the KAFKA_CONSUMER_POLL_INTERVAL environment variable, with a default value of 10s.

  • At each poll, the consumer fetches the set of topics saved in the consumer's internal state.

  • Then, for each topic, the consumer tries to match the topic name with the aforementioned pattern.

  • If a match is found, the consumer starts consuming messages from its partition(s), otherwise, the topic is ignored.

  • The consumer refreshes the list of topics to consume in the background, at a frequency defined by the KAFKA_METADATA_REFRESH_FREQUENCY environment variable, with a default value of 1m.

Processing logic

Once a topic is matched and messages start arriving, the consumer processes each message by extracting several pieces of information from the message:

  • The pipeline name from the message's headers - the headers should contain a key-value pair with the key being pipeline and the value being the name of the pipeline that the model is part of.

  • The namespace the model is deployed in - this is extracted from the topic name.

  • The model name - this is also extracted from the topic name.

  • The inference response ID - this is the message key. The ID will be the same value as the inference request ID, if one was initially provided in the x-request-id header when making the inference request to the inference server. Otherwise, the inference server will generate a unique ID for the inference request and use it in the response.

  • The inference response timestamp - this is the message timestamp. The meaning of the timestamp is dependent on your Kafka configuration, it could be specified on the broker level or per topic.

  • The inference response itself - this is the message payload.

Once all of this information is extracted, the consumer passes the data to an internal service that stores it in a database.

Notes

  • A failure to extract the pipeline name due to a missing header or the pipeline name being empty will result in the message being ignored.

  • A failure to extract the namespace, model name, inference response ID, or timestamp will result in the message being ignored.

  • The message key is used as the inference response ID and sending messages with the same key will result in multiple messages being stored in the database.

Last updated

Was this helpful?