Copy syntax = "proto3";
package seldon.mlops.scheduler;
option go_package = "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler";
import "google/protobuf/timestamp.proto";
import "mlops/chainer/chainer.proto";
// [START Messages]
message LoadModelRequest {
Model model = 1;
}
message Model {
MetaData meta = 1;
ModelSpec modelSpec = 2;
DeploymentSpec deploymentSpec = 3;
StreamSpec streamSpec = 4;
DataflowSpec dataflowSpec = 5;
}
message MetaData {
string name = 1;
optional string kind = 2;
optional string version = 3;
optional KubernetesMeta kubernetesMeta = 4; // Kubernetes specific config
}
message DataflowSpec {
bool cleanTopicsOnDelete = 1; // clean up the kafka topic on model delete
}
message DeploymentSpec {
uint32 replicas = 1;
uint32 minReplicas = 2;
uint32 maxReplicas = 3;
bool logPayloads = 4;
}
/* ModelDetails
*/
message ModelSpec {
string uri = 1; // storage uri from where to download the artifacts
optional uint32 artifactVersion = 2; // Optional v2 version folder to select
optional StorageConfig storageConfig = 3; // Storage auth configuration
repeated string requirements = 4; // list of capabilities the server must satisfy to run this model
optional uint64 memoryBytes = 5; // Requested memory
optional string server = 6; // the particular model server to load the model. If unspecified will be chosen.
repeated ParameterSpec parameters = 8; // parameters to load with model
optional ModelRuntimeInfo modelRuntimeInfo = 9; // model specific settings that are sent by the agent
// ensure only one of explainer or llm is specified at a time
oneof model_spec {
ExplainerSpec explainer = 7; // optional black box explainer details
LlmSpec llm = 10; // optional LLM specific settings
}
}
message ParameterSpec {
string name = 1;
string value = 2;
}
message ExplainerSpec {
string type = 1;
// 1 of semantic either model or pipeline reference
optional string modelRef = 2;
optional string pipelineRef = 3;
}
message LlmSpec {
optional string modelRef = 1;
optional string pipelineRef = 2;
}
message ModelRuntimeInfo {
oneof modelRuntimeInfo {
MLServerModelSettings mlserver = 1;
TritonModelConfig triton = 2;
}
}
message MLServerModelSettings {
uint32 parallelWorkers = 1;
}
message TritonModelConfig {
repeated TritonCPU cpu = 1;
}
message TritonCPU {
uint32 instanceCount = 1;
}
message KubernetesMeta {
string namespace = 1;
int64 generation = 2;
}
message StreamSpec {
string inputTopic = 2;
string outputTopic = 3;
}
message StorageConfig {
oneof config {
string storageSecretName = 1;
string storageRcloneConfig = 2;
}
}
message LoadModelResponse {
}
/* ModelReference represents a unique model
*/
message ModelReference {
string name = 1;
optional uint32 version = 2;
}
message UnloadModelRequest {
ModelReference model = 1;
optional KubernetesMeta kubernetesMeta = 2;
}
message UnloadModelResponse {
}
/* ModelStatusResponse provides the current assignment of the model onto a server
*/
message ModelStatusResponse {
enum ModelOperation {
ModelOperationUnknown = 0;
ModelCreate = 1;
ModelDelete = 2;
}
string modelName = 1;
repeated ModelVersionStatus versions = 2;
bool deleted = 3;
bool keepTopics = 4;
uint64 timestamp = 5;
ModelOperation operation = 6;
}
message ModelVersionStatus {
uint32 version = 2;
string serverName = 3;
optional KubernetesMeta kubernetesMeta = 4;
map<int32,ModelReplicaStatus> modelReplicaState = 5;
ModelStatus state = 6;
optional Model modelDefn = 7;
}
message ModelStatus {
enum ModelState {
ModelStateUnknown = 0;
ModelProgressing = 1;
ModelAvailable = 2;
ModelFailed = 3;
ModelTerminating = 4;
ModelTerminated = 5;
ModelTerminateFailed = 6;
ScheduleFailed = 7;
ModelScaledDown = 8;
ModelCreate = 9;
ModelTerminate = 10;
}
ModelState state = 1;
string reason = 2;
uint32 availableReplicas = 3;
uint32 unavailableReplicas = 4;
google.protobuf.Timestamp lastChangeTimestamp = 5;
ModelState modelGwState = 6;
string modelGwReason = 7;
}
message ModelReplicaStatus {
enum ModelReplicaState {
ModelReplicaStateUnknown = 0;
LoadRequested = 1;
Loading = 2;
Loaded = 3;
LoadFailed = 4;
UnloadRequested = 5;
Unloading = 6;
Unloaded = 7;
UnloadFailed = 8;
Available = 9;
LoadedUnavailable = 10;
UnloadEnvoyRequested = 11;
Draining = 12;
}
ModelReplicaState state = 1;
string reason = 2;
google.protobuf.Timestamp lastChangeTimestamp = 3;
}
message ServerStatusRequest {
string subscriberName = 1;
optional string name = 2; // Leave empty for all servers
}
/* ServerStatusResponse provides details of current server status
*/
message ServerStatusResponse {
/* Type of SterverStatus update. At the moment the scheduler doesn't combine multiple types of
* updates in the same response. However, the Type enum is forward-compatible with this
* possibility, by setting members to power-of-two values. This means enum values can be used
* as flags and combined with bitwise OR, with the exception of StatusResponseTypeUnknown.
*/
enum Type {
StatusResponseTypeUnknown = 0;
StatusUpdate = 1;
NonAuthoritativeReplicaInfo = 2;
ScalingRequest = 4;
}
Type type = 7;
string serverName = 1;
repeated ServerReplicaResources resources = 2;
int32 expectedReplicas = 3;
int32 availableReplicas = 4;
int32 numLoadedModelReplicas = 5;
optional KubernetesMeta kubernetesMeta = 6;
}
message ServerReplicaResources {
uint32 replicaIdx = 1;
uint64 totalMemoryBytes = 2;
uint64 availableMemoryBytes = 3;
int32 numLoadedModels = 4;
uint32 overCommitPercentage = 5;
}
message ModelSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
bool isModelGateway = 2;
}
message ModelStatusRequest {
string subscriberName = 1;
optional ModelReference model = 2;
bool allVersions = 3;
}
message ServerNotifyRequest {
repeated ServerNotify servers = 1;
bool isFirstSync = 2;
}
message ServerNotify {
string name = 1;
uint32 expectedReplicas = 2;
uint32 minReplicas = 5;
uint32 maxReplicas = 6;
bool shared = 3;
optional KubernetesMeta kubernetesMeta = 4;
}
message ServerNotifyResponse {
}
message ServerSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
// Experiments
message StartExperimentRequest {
Experiment experiment = 1;
}
enum ResourceType {
MODEL = 0;
PIPELINE = 1;
}
message Experiment {
string name = 1;
optional string default = 2;
repeated ExperimentCandidate candidates = 3;
optional ExperimentMirror mirror = 4;
optional ExperimentConfig config = 5;
optional KubernetesMeta kubernetesMeta = 6;
ResourceType resourceType = 7;
}
message ExperimentConfig {
bool stickySessions = 1;
}
message ExperimentCandidate {
string name = 1;
uint32 weight = 2;
}
message ExperimentMirror {
string name = 1;
uint32 percent = 2;
}
message StartExperimentResponse {
}
message StopExperimentRequest {
string name = 1;
}
message StopExperimentResponse {
}
message ExperimentSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
message ExperimentStatusResponse {
string experimentName = 1;
bool active = 2;
bool candidatesReady = 3;
bool mirrorReady = 4;
string statusDescription = 5;
optional KubernetesMeta kubernetesMeta = 6;
}
message LoadPipelineRequest {
Pipeline pipeline = 1;
}
message ExperimentStatusRequest {
string subscriberName = 1;
optional string name = 2; // Leave empty for all experiments
}
message Pipeline {
string name = 1;
string uid = 2;
uint32 version = 3;
repeated PipelineStep steps = 4;
optional PipelineOutput output = 5;
optional KubernetesMeta kubernetesMeta = 6;
optional PipelineInput input = 7;
optional DataflowSpec dataflowSpec = 8; // Dataflow specific config
bool allowCycles = 9; // Allow cycles in the pipeline
uint32 maxStepRevisits = 10; // Max number of times a step can be revisited
}
message PipelineStep {
enum JoinOp {
INNER = 0;
OUTER = 1;
ANY = 2;
}
string name = 1;
repeated string inputs = 2;
optional uint32 joinWindowMs = 3; // Join window millisecs, some nonzero default (TBD)
map<string,string> tensorMap = 4; // optional map of tensor name mappings
JoinOp inputsJoin = 5;
repeated string triggers = 6;
JoinOp triggersJoin = 7;
Batch batch = 8;
}
message Batch {
optional uint32 size = 1;
optional uint32 windowMs = 2;
}
message PipelineInput {
enum JoinOp {
INNER = 0;
OUTER = 1;
ANY = 2;
}
repeated string externalInputs = 1;
repeated string externalTriggers = 2;
optional uint32 joinWindowMs = 3; // Join window millisecs for output, default 0
JoinOp joinType = 4;
JoinOp triggersJoin = 5;
map<string,string> tensorMap = 6; // optional map of tensor name mappings
}
message PipelineOutput {
enum JoinOp {
INNER = 0;
OUTER = 1;
ANY = 2;
}
repeated string steps = 1;
uint32 joinWindowMs = 2; // Join window millisecs for output, default 0
JoinOp stepsJoin = 3;
map<string,string> tensorMap = 4; // optional map of tensor name mappings
}
message LoadPipelineResponse {
}
message UnloadPipelineRequest {
string name = 1;
}
message UnloadPipelineResponse {
}
message PipelineStatusRequest {
string subscriberName = 1;
optional string name = 2; // Leave empty for all pipelines
bool allVersions = 3;
}
message PipelineSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
string subscriberIp = 2; // IP of the subscription caller
bool isPipelineGateway = 3; // If true, the subscription is for a pipeline gateway
}
message PipelineStatusResponse {
enum PipelineOperation {
PipelineOperationUnknown = 0;
PipelineCreate = 1;
PipelineDelete = 2;
}
string pipelineName = 1;
repeated PipelineWithState versions = 2;
uint64 timestamp = 3;
PipelineOperation operation = 4;
}
message PipelineWithState {
Pipeline pipeline = 1;
PipelineVersionState state = 2;
}
message PipelineVersionState {
enum PipelineStatus {
PipelineStatusUnknown = 0;
PipelineCreate = 1;
PipelineCreating = 2;
PipelineReady = 3;
PipelineFailed = 4;
PipelineTerminate = 5;
PipelineTerminating = 6;
PipelineTerminated = 7;
PipelineRebalancing = 8;
}
uint32 pipelineVersion = 1;
PipelineStatus status = 2;
string reason = 3;
google.protobuf.Timestamp lastChangeTimestamp = 4;
bool modelsReady = 5;
PipelineStatus pipelineGwStatus = 6;
string pipelineGwReason = 7;
}
message SchedulerStatusRequest {
string subscriberName = 1;
}
message SchedulerStatusResponse {
string applicationVersion = 1;
}
message ControlPlaneSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
message ControlPlaneResponse {
enum Event {
UNKNOWN_EVENT = 0;
SEND_SERVERS = 1; // initial sync for the servers
SEND_MODELS = 2;
SEND_PIPELINES = 3;
SEND_EXPERIMENTS = 4;
}
Event event = 1;
}
message ModelUpdateMessage {
enum ModelOperation {
Unknown = 0;
Create = 1;
Delete = 2;
}
ModelOperation op = 1;
string model = 2;
uint32 version = 3;
string uid = 4;
uint64 timestamp = 5;
string stream = 6;
}
message ModelUpdateStatusMessage {
ModelUpdateMessage update = 1;
bool success = 2;
string reason = 3;
}
message ModelUpdateStatusResponse {
}
// [END Messages]
// [START Services]
service Scheduler {
rpc ServerNotify(ServerNotifyRequest) returns (ServerNotifyResponse) {};
rpc LoadModel(LoadModelRequest) returns (LoadModelResponse) {};
rpc UnloadModel(UnloadModelRequest) returns (UnloadModelResponse) {};
rpc LoadPipeline(LoadPipelineRequest) returns (LoadPipelineResponse) {};
rpc UnloadPipeline(UnloadPipelineRequest) returns (UnloadPipelineResponse) {};
rpc StartExperiment(StartExperimentRequest) returns (StartExperimentResponse) {};
rpc StopExperiment(StopExperimentRequest) returns (StopExperimentResponse) {};
rpc ServerStatus(ServerStatusRequest) returns (stream ServerStatusResponse) {}
rpc ModelStatus(ModelStatusRequest) returns (stream ModelStatusResponse) {}
rpc PipelineStatus(PipelineStatusRequest) returns (stream PipelineStatusResponse) {};
rpc ExperimentStatus(ExperimentStatusRequest) returns (stream ExperimentStatusResponse) {};
rpc SchedulerStatus(SchedulerStatusRequest) returns (SchedulerStatusResponse) {};
rpc SubscribeServerStatus(ServerSubscriptionRequest) returns (stream ServerStatusResponse) {};
rpc SubscribeModelStatus(ModelSubscriptionRequest) returns (stream ModelStatusResponse) {};
rpc SubscribeExperimentStatus(ExperimentSubscriptionRequest) returns (stream ExperimentStatusResponse) {};
rpc SubscribePipelineStatus(PipelineSubscriptionRequest) returns (stream PipelineStatusResponse) {};
rpc PipelineStatusEvent(chainer.PipelineUpdateStatusMessage) returns (chainer.PipelineUpdateStatusResponse) {};
rpc ModelStatusEvent(ModelUpdateStatusMessage) returns (ModelUpdateStatusResponse) {};
// control plane stream with controller
rpc SubscribeControlPlane(ControlPlaneSubscriptionRequest) returns (stream ControlPlaneResponse) {};
}
// [END Services]