Native Kafka Integration
Seldon provides a native kafka integration when you specify serverType: kafka in your SeldonDeployment.
When serverType: kafka is specified you need to also specify environment variables in svcOrchSpec for KAFKA_BROKER, KAFKA_INPUT_TOPIC, KAFKA_OUTPUT_TOPIC. An example is shown below for a Tensorflow CIFAR10 model:
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: tfserving-cifar10
spec:
protocol: tensorflow
transport: rest
serverType: kafka
predictors:
- componentSpecs:
- spec:
containers:
- args:
- --port=8500
- --rest_api_port=8501
- --model_name=resnet32
- --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
- --enable_batching
image: tensorflow/serving
name: resnet32
ports:
- containerPort: 8501
name: http
svcOrchSpec:
env:
- name: KAFKA_BROKER
value: 10.12.10.16:9094
- name: KAFKA_INPUT_TOPIC
value: cifar10-rest-input
- name: KAFKA_OUTPUT_TOPIC
value: cifar10-rest-output
graph:
name: resnet32
type: MODEL
endpoint:
service_port: 8501
name: model
replicas: 1The above creates a REST tensorflow deployment using the tensorflow protocol and connects to input and output topics.
Details
For the SeldonDeployment:
Start with any Seldon inference graph
Set
spec.serverTypetokafkaAdd a
spec.predictor[].svcOrchSpec.envwith settings for KAFKA_BROKER, KAFKA_INPUT_TOPIC, KAFKA_OUTPUT_TOPIC.
For the input kafka topic:
Create requests streams for the input prediction of your specified protocol and transport.
For REST: the JSON representation of a predict request in the given protocol.
For gRPC: the protobuffer binary serialization of the request for the given protocol. You should also add a metadata field called
proto-namewith the package name of the protobuffer so it can be decoded, for exampletensorflow.serving.PredictRequest. We can only support proto buffers for native grpc protocols supported by Seldon.
TLS Settings
To allow TLS connections to Kafka for the consumer and produce use the following environment variables to the service orchestator section:
Set KAFKA_SECURITY_PROTOCOL to "ssl"
If you have the values for keys and certificates use:
KAFKA_SSL_CA_CERT
KAFKA_SSL_CLIENT_CERT
KAFKA_SSL_CLIENT_KEY
If you have the file locations for the certificates use:
KAFKA_SSL_CA_CERT_FILE
KAFKA_SSL_CLIENT_CERT_FILE
KAFKA_SSL_CLIENT_KEY_FILE
If you key is password protected then add
KAFKA_SSL_CLIENT_KEY_PASS (optional)
An example spec that gets values from screts is shown below and comes from the Kafka KEDA demo.
KEDA Scaling
KEDA can be used to scale Kafka SeldonDeployments by looking at the consumer lag.
In the above we:
define bootstrap servers for KEDA to connect to via
bootstrapServerdefine consumer group to monitor via
consumerGroupset the lag to scale up on via
lagThresholdmonitor a particular topic via
topicdefine TLS authentication via a AuthenticanTrigger via
authenticationRef
The authentication trigger we used for this was extracting the TLS details from secrets, e.g.
A worked example can be found here.
Examples
Last updated
Was this helpful?