package kubernetes import ( "context" "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "io" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" "strings" "time" ) var ( log = logging.GetLogger("kubernetes") ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") ErrorEvaluation = errors.New("evaluation could not complete") ErrorPlacingAllocations = errors.New("failed to place all allocations") ErrorLoadingJob = errors.New("failed to load job") ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") ErrorLocalDestruction RunnerDeletedReason = errors.New("the destruction should not cause external changes") ErrorOOMKilled RunnerDeletedReason = fmt.Errorf("%s: %w", dto.ErrOOMKilled.Error(), ErrorLocalDestruction) ErrorAllocationRescheduled RunnerDeletedReason = fmt.Errorf("the allocation was rescheduled: %w", ErrorLocalDestruction) ErrorAllocationStopped RunnerDeletedReason = errors.New("the allocation was stopped") ErrorAllocationStoppedUnexpectedly RunnerDeletedReason = fmt.Errorf("%w unexpectedly", ErrorAllocationStopped) ErrorAllocationRescheduledUnexpectedly RunnerDeletedReason = fmt.Errorf( "%w correctly but rescheduled", ErrorAllocationStopped) // ErrorAllocationCompleted is for reporting the reason for the stopped allocation. // We do not consider it as an error but add it anyway for a complete reporting. ErrorAllocationCompleted RunnerDeletedReason = errors.New("the allocation completed") ) type allocationData struct { // allocClientStatus defines the state defined by Nomad. allocClientStatus string // allocDesiredStatus defines if the allocation wants to be running or being stopped. allocDesiredStatus string jobID string start time.Time // stopExpected is used to suppress warnings that could be triggered by a race condition // between the Inactivity timer and an external event leadng to allocation rescheduling. stopExpected bool // Just debugging information allocNomadNode string } // resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected. const resultChannelWriteTimeout = 10 * time.Millisecond type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool) type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration) // AllocationProcessing includes the callbacks to interact with allocation events. type AllocationProcessing struct { OnNew NewAllocationProcessor OnDeleted DeletedAllocationProcessor } type RunnerDeletedReason error // ExecutorAPI provides access to a container orchestration solution. type ExecutorAPI interface { apiQuerier // LoadEnvironmentJobs loads all environment jobs. LoadEnvironmentJobs() ([]*appsv1.Deployment, error) // LoadRunnerJobs loads all runner jobs specific for the environment. LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*appsv1.Deployment, error) // LoadRunnerIDs returns the IDs of all runners with the specified id prefix which are not about to // get stopped. LoadRunnerIDs(prefix string) (runnerIds []string, err error) // LoadRunnerPortMappings returns the mapped ports of the runner. LoadRunnerPortMappings(runnerID string) ([]v1.ContainerPort, error) // RegisterRunnerJob creates a runner job based on the template job. // It registers the job and waits until the registration completes. RegisterRunnerJob(template *appsv1.Deployment) error // MonitorEvaluation monitors the given evaluation ID. // It waits until the evaluation reaches one of the states complete, canceled or failed. // If the evaluation was not successful, an error containing the failures is returned. // See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175 MonitorEvaluation(evaluationID string, ctx context.Context) error // WatchEventStream listens on the Nomad event stream for allocation and evaluation events. // Depending on the incoming event, any of the given function is executed. // Do not run multiple times simultaneously. WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error // ExecuteCommand executes the given command in the job/runner with the given id. // It writes the output of the command to stdout/stderr and reads input from stdin. // If tty is true, the command will run with a tty. // Iff privilegedExecution is true, the command will be executed privileged. // The command is passed in the shell form (not the exec array form) and will be executed in a shell. ExecuteCommand(jobID string, ctx context.Context, command string, tty bool, privilegedExecution bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) // MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata. MarkRunnerAsUsed(runnerID string, duration int) error } // APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real // Executor API and its return values. type APIClient struct { apiQuerier evaluations storage.Storage[chan error] // allocations contain management data for all pending and running allocations. allocations storage.Storage[*allocationData] isListening bool } func (A APIClient) LoadEnvironmentJobs() ([]*appsv1.Deployment, error) { //TODO implement me panic("implement me") } func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*appsv1.Deployment, error) { go a.initializeAllocations(environmentID) runnerIDs, err := a.LoadRunnerIDs(RunnerJobID(environmentID, "")) if err != nil { return []*appsv1.Deployment{}, fmt.Errorf("couldn't load jobs: %w", err) } var occurredError error jobs := make([]*appsv1.Deployment, 0, len(runnerIDs)) for _, id := range runnerIDs { job, err := a.apiQuerier.deployment(id) if err != nil { if occurredError == nil { occurredError = ErrorLoadingJob } occurredError = fmt.Errorf("%w: couldn't load job info for runner %s - %v", occurredError, id, err) continue } jobs = append(jobs, &job) } return jobs, occurredError } func (A APIClient) LoadRunnerIDs(prefix string) (runnerIds []string, err error) { //TODO implement me panic("implement me") } func (A APIClient) LoadRunnerPortMappings(runnerID string) ([]v1.ContainerPort, error) { //TODO implement me panic("implement me") } func (A APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error { //TODO implement me panic("implement me") } func (A APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error { //TODO implement me panic("implement me") } func (A APIClient) ExecuteCommand(jobID string, ctx context.Context, command string, tty bool, privilegedExecution bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) { //TODO implement me panic("implement me") } func (A APIClient) MarkRunnerAsUsed(runnerID string, duration int) error { //TODO implement me panic("implement me") } // NewExecutorAPI creates a new api client. // One client is usually sufficient for the complete runtime of the API. func NewExecutorAPI(kubernetesConfig *rest.Config) (ExecutorAPI, error) { client := &APIClient{ apiQuerier: &kubernetesAPIClient{}, evaluations: storage.NewLocalStorage[chan error](), allocations: storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations, func(p *write.Point, object *allocationData, _ storage.EventType) { p.AddTag(monitoring.InfluxKeyJobID, object.jobID) p.AddTag(monitoring.InfluxKeyClientStatus, object.allocClientStatus) p.AddTag(monitoring.InfluxKeyNomadNode, object.allocNomadNode) }, 0, nil), } err := client.init(kubernetesConfig) return client, err } func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) { allocationStubs, err := a.listAllocations() if err != nil { log.WithError(err).Warn("Could not initialize allocations") } else { for _, stub := range allocationStubs { switch { case IsEnvironmentTemplateID(stub.JobID): continue case !strings.HasPrefix(stub.JobID, RunnerJobID(environmentID, "")): continue case stub.ClientStatus == structs.AllocClientStatusPending || stub.ClientStatus == structs.AllocClientStatusRunning: log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation") a.allocations.Add(stub.ID, &allocationData{ allocClientStatus: stub.ClientStatus, allocDesiredStatus: stub.DesiredStatus, jobID: stub.JobID, start: time.Unix(0, stub.CreateTime), allocNomadNode: stub.NodeName, }) } } } }