package environment import ( "errors" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "time" ) var log = logging.GetLogger("execution_environment") // ExecutionEnvironment is a partial image of an execution environment in CodeOcean. type ExecutionEnvironment interface { // NextRunner gets the next available runner and marks it as running. // If no runner is available it throws a error after a small timeout NextRunner() (runner.Runner, error) // Refresh fetches the runners for this execution environment and sends them to the pool. // This function does not terminate. Instead it fetches new runners periodically. Refresh() } // NomadExecutionEnvironment is an implementation that returns a nomad specific execution environment. // Here it is mapped on a Job in Nomad. // The jobId has to match the only Nomad task group name! type NomadExecutionEnvironment struct { id int jobId string availableRunners chan runner.Runner allRunners RunnerPool nomadApiClient nomad.ExecutorApi } var executionEnvironment ExecutionEnvironment // DebugInit initializes one execution environment so that its runners can be provided. // ToDo: This should be replaced by a create Execution Environment route func DebugInit(runnersPool RunnerPool, nomadApi nomad.ExecutorApi) { executionEnvironment = &NomadExecutionEnvironment{ id: 0, jobId: "python", availableRunners: make(chan runner.Runner, 5), nomadApiClient: nomadApi, allRunners: runnersPool, } go executionEnvironment.Refresh() } // GetExecutionEnvironment returns a previously added ExecutionEnvironment. // This way you can access all Runners for that environment, func GetExecutionEnvironment(id int) (ExecutionEnvironment, error) { // TODO: Remove hardcoded execution environment return executionEnvironment, nil } func (environment *NomadExecutionEnvironment) NextRunner() (r runner.Runner, err error) { select { case r = <-environment.availableRunners: r.SetStatus(runner.StatusRunning) return r, nil case <-time.After(50 * time.Millisecond): return nil, errors.New("no runners available") } } // Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... func (environment *NomadExecutionEnvironment) Refresh() { for { runners, err := environment.nomadApiClient.LoadRunners(environment.jobId) if err != nil { log.WithError(err).Warn("Failed fetching runners") break } for _, r := range environment.unusedRunners(runners) { // ToDo: Listen on Nomad event stream log.WithField("allocation", r).Debug("Adding allocation") environment.allRunners.Add(r) environment.availableRunners <- r } jobScale, err := environment.nomadApiClient.JobScale(environment.jobId) if err != nil { log.WithError(err).Warn("Failed get allocation count") break } neededRunners := cap(environment.availableRunners) - len(environment.availableRunners) + 1 runnerCount := jobScale + neededRunners time.Sleep(50 * time.Millisecond) log.WithField("count", runnerCount).Debug("Set job scaling") err = environment.nomadApiClient.SetJobScale(environment.jobId, runnerCount, "Runner Requested") if err != nil { log.WithError(err).Warn("Failed to set allocation scaling") continue } } } func (environment *NomadExecutionEnvironment) unusedRunners(fetchedRunnerIds []string) (newRunners []runner.Runner) { newRunners = make([]runner.Runner, 0) for _, runnerId := range fetchedRunnerIds { _, ok := environment.allRunners.Get(runnerId) if !ok { newRunners = append(newRunners, runner.NewExerciseRunner(runnerId)) } } return }