Scheduler
Last updated
Was this helpful?
Last updated
Was this helpful?
The Seldon scheduler API provides a gRPC service to allow Models, Servers, Experiments, and Pipelines to be managed. In Kubernetes the manager deployed by Seldon translates Kubernetes custom resource definitions into calls to the Seldon Scheduler.
In non-Kubernetes environments users of Seldon could create a client to directly control Seldon resources using this API.
syntax = "proto3";
package seldon.mlops.scheduler;
option go_package = "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler";
import "google/protobuf/timestamp.proto";
// [START Messages]
message LoadModelRequest {
Model model = 1;
}
message Model {
MetaData meta = 1;
ModelSpec modelSpec = 2;
DeploymentSpec deploymentSpec = 3;
StreamSpec streamSpec = 4;
}
message MetaData {
string name = 1;
optional string kind = 2;
optional string version = 3;
optional KubernetesMeta kubernetesMeta = 4; // Kubernetes specific config
}
message DeploymentSpec {
uint32 replicas = 1;
uint32 minReplicas = 2;
uint32 maxReplicas = 3;
bool logPayloads = 4;
}
/* ModelDetails
*/
message ModelSpec {
string uri = 1; // storage uri from where to download the artifacts
optional uint32 artifactVersion = 2; // Optional v2 version folder to select
optional StorageConfig storageConfig = 3; // Storage auth configuration
repeated string requirements = 4; // list of capabilities the server must satisfy to run this model
optional uint64 memoryBytes = 5; // Requested memory
optional string server = 6; // the particular model server to load the model. If unspecified will be chosen.
repeated ParameterSpec parameters = 8; // parameters to load with model
optional ModelRuntimeInfo modelRuntimeInfo = 9; // model specific settings that are sent by the agent
// ensure only one of explainer or llm is specified at a time
oneof model_spec {
ExplainerSpec explainer = 7; // optional black box explainer details
LlmSpec llm = 10; // optional LLM specific settings
}
}
message ParameterSpec {
string name = 1;
string value = 2;
}
message ExplainerSpec {
string type = 1;
// 1 of semantic either model or pipeline reference
optional string modelRef = 2;
optional string pipelineRef = 3;
}
message LlmSpec {
optional string modelRef = 1;
optional string pipelineRef = 2;
}
message ModelRuntimeInfo {
oneof modelRuntimeInfo {
MLServerModelSettings mlserver = 1;
TritonModelConfig triton = 2;
}
}
message MLServerModelSettings {
uint32 parallelWorkers = 1;
}
message TritonModelConfig {
repeated TritonCPU cpu = 1;
}
message TritonCPU {
uint32 instanceCount = 1;
}
message KubernetesMeta {
string namespace = 1;
int64 generation = 2;
}
message StreamSpec {
string inputTopic = 2;
string outputTopic = 3;
}
message StorageConfig {
oneof config {
string storageSecretName = 1;
string storageRcloneConfig = 2;
}
}
message LoadModelResponse {
}
/* ModelReference represents a unique model
*/
message ModelReference {
string name = 1;
optional uint32 version = 2;
}
message UnloadModelRequest {
ModelReference model = 1;
optional KubernetesMeta kubernetesMeta = 2;
}
message UnloadModelResponse {
}
/* ModelStatusResponse provides the current assignment of the model onto a server
*/
message ModelStatusResponse {
string modelName = 1;
repeated ModelVersionStatus versions = 2;
bool deleted = 3;
}
message ModelVersionStatus {
uint32 version = 2;
string serverName = 3;
optional KubernetesMeta kubernetesMeta = 4;
map<int32,ModelReplicaStatus> modelReplicaState = 5;
ModelStatus state = 6;
optional Model modelDefn = 7;
}
message ModelStatus {
enum ModelState {
ModelStateUnknown = 0;
ModelProgressing = 1;
ModelAvailable = 2;
ModelFailed = 3;
ModelTerminating = 4;
ModelTerminated = 5;
ModelTerminateFailed = 6;
ScheduleFailed = 7;
}
ModelState state = 1;
string reason = 2;
uint32 availableReplicas = 3;
uint32 unavailableReplicas = 4;
google.protobuf.Timestamp lastChangeTimestamp = 5;
}
message ModelReplicaStatus {
enum ModelReplicaState {
ModelReplicaStateUnknown = 0;
LoadRequested = 1;
Loading = 2;
Loaded = 3;
LoadFailed = 4;
UnloadRequested = 5;
Unloading = 6;
Unloaded = 7;
UnloadFailed = 8;
Available = 9;
LoadedUnavailable = 10;
UnloadEnvoyRequested = 11;
Draining = 12;
}
ModelReplicaState state = 1;
string reason = 2;
google.protobuf.Timestamp lastChangeTimestamp = 3;
}
message ServerStatusRequest {
string subscriberName = 1;
optional string name = 2; // Leave empty for all servers
}
/* ServerStatusResponse provides details of current server status
*/
message ServerStatusResponse {
/* Type of SterverStatus update. At the moment the scheduler doesn't combine multiple types of
* updates in the same response. However, the Type enum is forward-compatible with this
* possibility, by setting members to power-of-two values. This means enum values can be used
* as flags and combined with bitwise OR, with the exception of StatusResponseTypeUnknown.
*/
enum Type {
StatusResponseTypeUnknown = 0;
StatusUpdate = 1;
NonAuthoritativeReplicaInfo = 2;
ScalingRequest = 4;
}
Type type = 7;
string serverName = 1;
repeated ServerReplicaResources resources = 2;
int32 expectedReplicas = 3;
int32 availableReplicas = 4;
int32 numLoadedModelReplicas = 5;
optional KubernetesMeta kubernetesMeta = 6;
}
message ServerReplicaResources {
uint32 replicaIdx = 1;
uint64 totalMemoryBytes = 2;
uint64 availableMemoryBytes = 3;
int32 numLoadedModels = 4;
uint32 overCommitPercentage = 5;
}
message ModelSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
message ModelStatusRequest {
string subscriberName = 1;
optional ModelReference model = 2;
bool allVersions = 3;
}
message ServerNotifyRequest {
repeated ServerNotify servers = 1;
bool isFirstSync = 2;
}
message ServerNotify {
string name = 1;
uint32 expectedReplicas = 2;
uint32 minReplicas = 5;
uint32 maxReplicas = 6;
bool shared = 3;
optional KubernetesMeta kubernetesMeta = 4;
}
message ServerNotifyResponse {
}
message ServerSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
// Experiments
message StartExperimentRequest {
Experiment experiment = 1;
}
enum ResourceType {
MODEL = 0;
PIPELINE = 1;
}
message Experiment {
string name = 1;
optional string default = 2;
repeated ExperimentCandidate candidates = 3;
optional ExperimentMirror mirror = 4;
optional ExperimentConfig config = 5;
optional KubernetesMeta kubernetesMeta = 6;
ResourceType resourceType = 7;
}
message ExperimentConfig {
bool stickySessions = 1;
}
message ExperimentCandidate {
string name = 1;
uint32 weight = 2;
}
message ExperimentMirror {
string name = 1;
uint32 percent = 2;
}
message StartExperimentResponse {
}
message StopExperimentRequest {
string name = 1;
}
message StopExperimentResponse {
}
message ExperimentSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
message ExperimentStatusResponse {
string experimentName = 1;
bool active = 2;
bool candidatesReady = 3;
bool mirrorReady = 4;
string statusDescription = 5;
optional KubernetesMeta kubernetesMeta = 6;
}
message LoadPipelineRequest {
Pipeline pipeline = 1;
}
message ExperimentStatusRequest {
string subscriberName = 1;
optional string name = 2; // Leave empty for all experiments
}
message Pipeline {
string name = 1;
string uid = 2;
uint32 version = 3;
repeated PipelineStep steps = 4;
optional PipelineOutput output = 5;
optional KubernetesMeta kubernetesMeta = 6;
optional PipelineInput input = 7;
}
message PipelineStep {
enum JoinOp {
INNER = 0;
OUTER = 1;
ANY = 2;
}
string name = 1;
repeated string inputs = 2;
optional uint32 joinWindowMs = 3; // Join window millisecs, some nonzero default (TBD)
map<string,string> tensorMap = 4; // optional map of tensor name mappings
JoinOp inputsJoin = 5;
repeated string triggers = 6;
JoinOp triggersJoin = 7;
Batch batch = 8;
}
message Batch {
optional uint32 size = 1;
optional uint32 windowMs = 2;
}
message PipelineInput {
enum JoinOp {
INNER = 0;
OUTER = 1;
ANY = 2;
}
repeated string externalInputs = 1;
repeated string externalTriggers = 2;
optional uint32 joinWindowMs = 3; // Join window millisecs for output, default 0
JoinOp joinType = 4;
JoinOp triggersJoin = 5;
map<string,string> tensorMap = 6; // optional map of tensor name mappings
}
message PipelineOutput {
enum JoinOp {
INNER = 0;
OUTER = 1;
ANY = 2;
}
repeated string steps = 1;
uint32 joinWindowMs = 2; // Join window millisecs for output, default 0
JoinOp stepsJoin = 3;
map<string,string> tensorMap = 4; // optional map of tensor name mappings
}
message LoadPipelineResponse {
}
message UnloadPipelineRequest {
string name = 1;
}
message UnloadPipelineResponse {
}
message PipelineStatusRequest {
string subscriberName = 1;
optional string name = 2; // Leave empty for all pipelines
bool allVersions = 3;
}
message PipelineSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
message PipelineStatusResponse {
string pipelineName = 1;
repeated PipelineWithState versions = 2;
}
message PipelineWithState {
Pipeline pipeline = 1;
PipelineVersionState state = 2;
}
message PipelineVersionState {
enum PipelineStatus {
PipelineStatusUnknown = 0;
PipelineCreate = 1;
PipelineCreating = 2;
PipelineReady = 3;
PipelineFailed = 4;
PipelineTerminate = 5;
PipelineTerminating = 6;
PipelineTerminated = 7;
}
uint32 pipelineVersion = 1;
PipelineStatus status = 2;
string reason = 3;
google.protobuf.Timestamp lastChangeTimestamp = 4;
bool modelsReady = 5;
}
message SchedulerStatusRequest {
string subscriberName = 1;
}
message SchedulerStatusResponse {
string applicationVersion = 1;
}
message ControlPlaneSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}
message ControlPlaneResponse {
enum Event {
UNKNOWN_EVENT = 0;
SEND_SERVERS = 1; // initial sync for the servers
SEND_RESOURCES = 2; // send models / pipelines / experiments
}
Event event = 1;
}
// [END Messages]
// [START Services]
service Scheduler {
rpc ServerNotify(ServerNotifyRequest) returns (ServerNotifyResponse) {};
rpc LoadModel(LoadModelRequest) returns (LoadModelResponse) {};
rpc UnloadModel(UnloadModelRequest) returns (UnloadModelResponse) {};
rpc LoadPipeline(LoadPipelineRequest) returns (LoadPipelineResponse) {};
rpc UnloadPipeline(UnloadPipelineRequest) returns (UnloadPipelineResponse) {};
rpc StartExperiment(StartExperimentRequest) returns (StartExperimentResponse) {};
rpc StopExperiment(StopExperimentRequest) returns (StopExperimentResponse) {};
rpc ServerStatus(ServerStatusRequest) returns (stream ServerStatusResponse) {}
rpc ModelStatus(ModelStatusRequest) returns (stream ModelStatusResponse) {}
rpc PipelineStatus(PipelineStatusRequest) returns (stream PipelineStatusResponse) {};
rpc ExperimentStatus(ExperimentStatusRequest) returns (stream ExperimentStatusResponse) {};
rpc SchedulerStatus(SchedulerStatusRequest) returns (SchedulerStatusResponse) {};
rpc SubscribeServerStatus(ServerSubscriptionRequest) returns (stream ServerStatusResponse) {};
rpc SubscribeModelStatus(ModelSubscriptionRequest) returns (stream ModelStatusResponse) {};
rpc SubscribeExperimentStatus(ExperimentSubscriptionRequest) returns (stream ExperimentStatusResponse) {};
rpc SubscribePipelineStatus(PipelineSubscriptionRequest) returns (stream PipelineStatusResponse) {};
// control plane stream with controller
rpc SubscribeControlPlane(ControlPlaneSubscriptionRequest) returns (stream ControlPlaneResponse) {};
}
// [END Services]