From 2e4a975588c768d18931598fc5679389f5b6c298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Mon, 14 Jun 2021 15:11:10 +0200 Subject: [PATCH] Implement even more merge request comments --- ci/demo-job.tpl.nomad | 28 ++- environment/manager.go | 91 ++++----- environment/manager_mock.go | 14 ++ environment/manager_test.go | 15 +- ...t-job.hcl => template-environment-job.hcl} | 2 +- go.sum | 2 - main.go | 5 +- nomad/executor_api_mock.go | 122 +++++++----- nomad/job.go | 81 +++----- nomad/job_test.go | 12 +- nomad/nomad.go | 49 +++-- nomad/nomad_test.go | 12 +- runner/manager.go | 185 ++++++++++-------- runner/manager_mock.go | 25 +-- runner/manager_test.go | 18 +- runner/nomad_environment_storage.go | 16 +- tests/e2e/environments_test.go | 10 +- tests/e2e/websocket_test.go | 5 +- 18 files changed, 368 insertions(+), 324 deletions(-) rename environment/{default-job.hcl => template-environment-job.hcl} (98%) diff --git a/ci/demo-job.tpl.nomad b/ci/demo-job.tpl.nomad index dcfef67..e9983cb 100644 --- a/ci/demo-job.tpl.nomad +++ b/ci/demo-job.tpl.nomad @@ -1,6 +1,6 @@ // This job is used by the e2e tests as a demo job. -job "0-default" { +job "template-0" { datacenters = ["dc1"] type = "batch" namespace = "${NOMAD_NAMESPACE}" @@ -49,4 +49,30 @@ job "0-default" { } } } + group "config" { + // We want to store whether a task is in use in order to recover from a downtime. + // Without a separate config task, marking a task as used would result in a restart of that task, + // as the meta information is passed to the container as environment variables. + count = 0 + task "config" { + driver = "exec" + config { + command = "whoami" + } + logs { + max_files = 1 + max_file_size = 1 + } + resources { + // minimum values + cpu = 1 + memory = 10 + } + } + meta { + environment = "0" + used = "false" + prewarmingPoolSize = "1" + } + } } diff --git a/environment/manager.go b/environment/manager.go index 24424be..398bba8 100644 --- a/environment/manager.go +++ b/environment/manager.go @@ -13,18 +13,23 @@ import ( "strconv" ) -// defaultJobHCL holds our default job in HCL format. +// templateEnvironmentJobHCL holds our default job in HCL format. // The default job is used when creating new job and provides // common settings that all the jobs share. -//go:embed default-job.hcl -var defaultJobHCL string +//go:embed template-environment-job.hcl +var templateEnvironmentJobHCL string var log = logging.GetLogger("environment") // Manager encapsulates API calls to the executor API for creation and deletion of execution environments. type Manager interface { + // Load fetches all already created execution environments from the executor and registers them at the runner manager. + // It should be called during the startup process (e.g. on creation of the Manager). + Load() error + // CreateOrUpdate creates/updates an execution environment on the executor. - // Iff the job was created, the returned boolean is true and the returned error is nil. + // If the job was created, the returned boolean is true, if it was updated, it is false. + // If err is not nil, that means the environment was neither created nor updated. CreateOrUpdate( id runner.EnvironmentID, request dto.ExecutionEnvironmentRequest, @@ -34,24 +39,27 @@ type Manager interface { Delete(id string) } -func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorAPI) ( - *NomadEnvironmentManager, error) { - environmentManager := &NomadEnvironmentManager{runnerManager, apiClient, *parseJob(defaultJobHCL)} - err := environmentManager.loadExistingEnvironments() - return environmentManager, err +func NewNomadEnvironmentManager(runnerManager runner.Manager, + apiClient nomad.ExecutorAPI) (m *NomadEnvironmentManager) { + m = &NomadEnvironmentManager{runnerManager, apiClient, *parseJob(templateEnvironmentJobHCL)} + if err := m.Load(); err != nil { + log.WithError(err).Error("Error recovering the execution environments") + } + runnerManager.Load() + return } type NomadEnvironmentManager struct { - runnerManager runner.Manager - api nomad.ExecutorAPI - defaultJob nomadApi.Job + runnerManager runner.Manager + api nomad.ExecutorAPI + templateEnvironmentJob nomadApi.Job } func (m *NomadEnvironmentManager) CreateOrUpdate( id runner.EnvironmentID, request dto.ExecutionEnvironmentRequest, ) (bool, error) { - templateJob, err := m.api.RegisterTemplateJob(&m.defaultJob, int(id), + templateJob, err := m.api.RegisterTemplateJob(&m.templateEnvironmentJob, runner.TemplateJobID(id), request.PrewarmingPoolSize, request.CPULimit, request.MemoryLimit, request.Image, request.NetworkAccess, request.ExposedPorts) @@ -59,7 +67,7 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( return false, err } - created, err := m.runnerManager.CreateOrUpdateEnvironment(id, request.PrewarmingPoolSize, templateJob) + created, err := m.runnerManager.CreateOrUpdateEnvironment(id, request.PrewarmingPoolSize, templateJob, true) if err != nil { return created, err } @@ -70,52 +78,13 @@ func (m *NomadEnvironmentManager) Delete(id string) { } -func (m *NomadEnvironmentManager) loadExistingEnvironments() error { - jobs, err := m.api.LoadAllJobs() +func (m *NomadEnvironmentManager) Load() error { + templateJobs, err := m.api.LoadEnvironmentJobs() if err != nil { - return fmt.Errorf("can't load template jobs: %w", err) + return fmt.Errorf("couldn't load template jobs: %w", err) } - var environmentTemplates, runnerJobs []*nomadApi.Job - for _, job := range jobs { - if nomad.IsEnvironmentTemplateID(*job.ID) { - environmentTemplates = append(environmentTemplates, job) - } else { - runnerJobs = append(runnerJobs, job) - } - } - m.recoverJobs(environmentTemplates, m.recoverEnvironmentTemplates) - m.recoverJobs(runnerJobs, m.recoverRunner) - - err = m.runnerManager.ScaleAllEnvironments() - if err != nil { - return fmt.Errorf("can not restore environment scaling: %w", err) - } - return nil -} - -type jobAdder func(id runner.EnvironmentID, job *nomadApi.Job, configTaskGroup *nomadApi.TaskGroup) error - -func (m *NomadEnvironmentManager) recoverEnvironmentTemplates(id runner.EnvironmentID, job *nomadApi.Job, - configTaskGroup *nomadApi.TaskGroup) error { - desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey]) - if err != nil { - return fmt.Errorf("Couldn't convert pool size to int: %w", err) - } - - m.runnerManager.RecoverEnvironment(id, job, uint(desiredIdleRunnersCount)) - return nil -} - -func (m *NomadEnvironmentManager) recoverRunner(id runner.EnvironmentID, job *nomadApi.Job, - configTaskGroup *nomadApi.TaskGroup) error { - isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue - m.runnerManager.RecoverRunner(id, job, isUsed) - return nil -} - -func (m *NomadEnvironmentManager) recoverJobs(jobs []*nomadApi.Job, onJob jobAdder) { - for _, job := range jobs { + for _, job := range templateJobs { jobLogger := log.WithField("jobID", *job.ID) if *job.Status != structs.JobStatusRunning { jobLogger.Info("Job not running, skipping ...") @@ -126,6 +95,11 @@ func (m *NomadEnvironmentManager) recoverJobs(jobs []*nomadApi.Job, onJob jobAdd jobLogger.Info("Couldn't find config task group in job, skipping ...") continue } + desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey]) + if err != nil { + jobLogger.Infof("Couldn't convert pool size to int: %v, skipping ...", err) + continue + } environmentID, err := runner.NewEnvironmentID(configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey]) if err != nil { jobLogger.WithField("environmentID", configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey]). @@ -133,12 +107,13 @@ func (m *NomadEnvironmentManager) recoverJobs(jobs []*nomadApi.Job, onJob jobAdd Error("Couldn't convert environment id of template job to int") continue } - err = onJob(environmentID, job, configTaskGroup) + _, err = m.runnerManager.CreateOrUpdateEnvironment(environmentID, uint(desiredIdleRunnersCount), job, false) if err != nil { jobLogger.WithError(err).Info("Could not recover job.") continue } } + return nil } func parseJob(jobHCL string) *nomadApi.Job { diff --git a/environment/manager_mock.go b/environment/manager_mock.go index 24367e1..674af83 100644 --- a/environment/manager_mock.go +++ b/environment/manager_mock.go @@ -39,3 +39,17 @@ func (_m *ManagerMock) CreateOrUpdate(id runner.EnvironmentID, request dto.Execu func (_m *ManagerMock) Delete(id string) { _m.Called(id) } + +// Load provides a mock function with given fields: +func (_m *ManagerMock) Load() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/environment/manager_test.go b/environment/manager_test.go index 355bd49..bd0c3c9 100644 --- a/environment/manager_test.go +++ b/environment/manager_test.go @@ -50,20 +50,21 @@ func (s *CreateOrUpdateTestSuite) SetupTest() { func (s *CreateOrUpdateTestSuite) mockRegisterTemplateJob(job *nomadApi.Job, err error) { s.apiMock.On("RegisterTemplateJob", - mock.AnythingOfType("*api.Job"), mock.AnythingOfType("int"), + mock.AnythingOfType("*api.Job"), mock.AnythingOfType("string"), mock.AnythingOfType("uint"), mock.AnythingOfType("uint"), mock.AnythingOfType("uint"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("[]uint16")). Return(job, err) } func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(created bool, err error) { - s.runnerManagerMock.On("CreateOrUpdateEnvironment", - mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("uint"), mock.AnythingOfType("*api.Job")). + s.runnerManagerMock.On("CreateOrUpdateEnvironment", mock.AnythingOfType("EnvironmentID"), + mock.AnythingOfType("uint"), mock.AnythingOfType("*api.Job"), mock.AnythingOfType("bool")). Return(created, err) } func (s *CreateOrUpdateTestSuite) createJobForRequest() *nomadApi.Job { - return nomad.CreateTemplateJob(&s.manager.defaultJob, tests.DefaultEnvironmentIDAsInteger, + return nomad.CreateTemplateJob(&s.manager.templateEnvironmentJob, + runner.TemplateJobID(tests.DefaultEnvironmentIDAsInteger), s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } @@ -76,7 +77,7 @@ func (s *CreateOrUpdateTestSuite) TestRegistersCorrectTemplateJob() { s.NoError(err) s.apiMock.AssertCalled(s.T(), "RegisterTemplateJob", - &s.manager.defaultJob, int(s.environmentID), + &s.manager.templateEnvironmentJob, runner.TemplateJobID(s.environmentID), s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } @@ -98,7 +99,7 @@ func (s *CreateOrUpdateTestSuite) TestCreatesOrUpdatesCorrectEnvironment() { _, err := s.manager.CreateOrUpdate(s.environmentID, s.request) s.NoError(err) s.runnerManagerMock.AssertCalled(s.T(), "CreateOrUpdateEnvironment", - s.environmentID, s.request.PrewarmingPoolSize, templateJob) + s.environmentID, s.request.PrewarmingPoolSize, templateJob, true) } func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentReturnsError() { @@ -132,7 +133,7 @@ func TestParseJob(t *testing.T) { log = logger.WithField("pkg", "nomad") t.Run("parses the given default job", func(t *testing.T) { - job := parseJob(defaultJobHCL) + job := parseJob(templateEnvironmentJobHCL) assert.False(t, exited) assert.NotNil(t, job) }) diff --git a/environment/default-job.hcl b/environment/template-environment-job.hcl similarity index 98% rename from environment/default-job.hcl rename to environment/template-environment-job.hcl index de439fc..9c294b7 100644 --- a/environment/default-job.hcl +++ b/environment/template-environment-job.hcl @@ -1,6 +1,6 @@ // This is the default job configuration that is used when no path to another default configuration is given -job "0-default" { +job "template-0" { datacenters = ["dc1"] type = "batch" diff --git a/go.sum b/go.sum index c16234b..42f4071 100644 --- a/go.sum +++ b/go.sum @@ -609,8 +609,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= -github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= diff --git a/main.go b/main.go index 13a7db1..2f7fdfc 100644 --- a/main.go +++ b/main.go @@ -47,10 +47,7 @@ func initServer() *http.Server { } runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, context.Background()) - environmentManager, err := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) - if err != nil { - log.WithError(err).Fatal("Error creating new Nomad environment manager") - } + environmentManager := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) return &http.Server{ Addr: config.Config.PoseidonAPIURL().Host, diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 95813e2..40cc0d0 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -142,52 +142,6 @@ func (_m *ExecutorAPIMock) JobScale(jobId string) (uint, error) { return r0, r1 } -// LoadAllJobs provides a mock function with given fields: -func (_m *ExecutorAPIMock) LoadAllJobs() ([]*api.Job, error) { - ret := _m.Called() - - var r0 []*api.Job - if rf, ok := ret.Get(0).(func() []*api.Job); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*api.Job) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// LoadEnvironmentTemplate provides a mock function with given fields: environmentID -func (_m *ExecutorAPIMock) LoadEnvironmentTemplate(environmentID string) (*api.Job, error) { - ret := _m.Called(environmentID) - - var r0 *api.Job - if rf, ok := ret.Get(0).(func(string) *api.Job); ok { - r0 = rf(environmentID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*api.Job) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(environmentID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // LoadJobList provides a mock function with given fields: func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { ret := _m.Called() @@ -211,8 +165,31 @@ func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { return r0, r1 } +// LoadRunnerJobs provides a mock function with given fields: environmentID +func (_m *ExecutorAPIMock) LoadRunnerJobs(environmentID string) ([]*api.Job, error) { + ret := _m.Called(environmentID) + + var r0 []*api.Job + if rf, ok := ret.Get(0).(func(string) []*api.Job); ok { + r0 = rf(environmentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(environmentID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadRunners provides a mock function with given fields: jobID -func (_m *ExecutorAPIMock) LoadRunners(jobID string) ([]string, error) { +func (_m *ExecutorAPIMock) LoadRunnerIDs(jobID string) ([]string, error) { ret := _m.Called(jobID) var r0 []string @@ -257,6 +234,29 @@ func (_m *ExecutorAPIMock) LoadEnvironmentTemplate(environmentID string) (*api.J return r0, r1 } +// LoadTemplateJobs provides a mock function with given fields: +func (_m *ExecutorAPIMock) LoadEnvironmentJobs() ([]*api.Job, error) { + ret := _m.Called() + + var r0 []*api.Job + if rf, ok := ret.Get(0).(func() []*api.Job); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // MarkRunnerAsUsed provides a mock function with given fields: runnerID func (_m *ExecutorAPIMock) MarkRunnerAsUsed(runnerID string) error { ret := _m.Called(runnerID) @@ -306,13 +306,27 @@ func (_m *ExecutorAPIMock) RegisterNomadJob(job *api.Job) (string, error) { return r0, r1 } -// RegisterTemplateJob provides a mock function with given fields: defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts -func (_m *ExecutorAPIMock) RegisterTemplateJob(defaultJob *api.Job, environmentID int, prewarmingPoolSize uint, cpuLimit uint, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) (*api.Job, error) { - ret := _m.Called(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) +// RegisterRunnerJob provides a mock function with given fields: template +func (_m *ExecutorAPIMock) RegisterRunnerJob(template *api.Job) error { + ret := _m.Called(template) + + var r0 error + if rf, ok := ret.Get(0).(func(*api.Job) error); ok { + r0 = rf(template) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RegisterTemplateJob provides a mock function with given fields: defaultJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts +func (_m *ExecutorAPIMock) RegisterTemplateJob(defaultJob *api.Job, id string, prewarmingPoolSize uint, cpuLimit uint, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) (*api.Job, error) { + ret := _m.Called(defaultJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) var r0 *api.Job - if rf, ok := ret.Get(0).(func(*api.Job, int, uint, uint, uint, string, bool, []uint16) *api.Job); ok { - r0 = rf(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + if rf, ok := ret.Get(0).(func(*api.Job, string, uint, uint, uint, string, bool, []uint16) *api.Job); ok { + r0 = rf(defaultJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*api.Job) @@ -320,8 +334,8 @@ func (_m *ExecutorAPIMock) RegisterTemplateJob(defaultJob *api.Job, environmentI } var r1 error - if rf, ok := ret.Get(1).(func(*api.Job, int, uint, uint, uint, string, bool, []uint16) error); ok { - r1 = rf(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + if rf, ok := ret.Get(1).(func(*api.Job, string, uint, uint, uint, string, bool, []uint16) error); ok { + r1 = rf(defaultJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) } else { r1 = ret.Error(1) } diff --git a/nomad/job.go b/nomad/job.go index 2369dc7..cbb7660 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -6,18 +6,17 @@ import ( "fmt" nomadApi "github.com/hashicorp/nomad/api" "strconv" - "strings" ) const ( TaskGroupName = "default-group" TaskName = "default-task" + TemplateJobPrefix = "template" ConfigTaskGroupName = "config" DummyTaskName = "dummy" - defaultRunnerJobID = "default" DefaultTaskDriver = "docker" DefaultDummyTaskDriver = "exec" - DefaultTaskCommand = "true" + DefaultDummyTaskCommand = "true" ConfigMetaEnvironmentKey = "environment" ConfigMetaUsedKey = "used" ConfigMetaUsedValue = "true" @@ -26,39 +25,9 @@ const ( ) var ( - ErrorInvalidJobID = errors.New("invalid job id") ErrorConfigTaskGroupNotFound = errors.New("config task group not found in job") ) -// RunnerJobID creates the job id. This requires an environment id and a runner id. -func RunnerJobID(environmentID, runnerID string) string { - return fmt.Sprintf("%s-%s", environmentID, runnerID) -} - -// EnvironmentIDFromJobID returns the environment id that is part of the passed job id. -func EnvironmentIDFromJobID(jobID string) (int, error) { - parts := strings.Split(jobID, "-") - if len(parts) == 0 { - return 0, fmt.Errorf("empty job id: %w", ErrorInvalidJobID) - } - environmentID, err := strconv.Atoi(parts[0]) - if err != nil { - return 0, fmt.Errorf("invalid environment id par %v: %w", err, ErrorInvalidJobID) - } - return environmentID, nil -} - -// TemplateJobID creates the environment specific id of the template job. -func TemplateJobID(id string) string { - return RunnerJobID(id, defaultRunnerJobID) -} - -// IsEnvironmentTemplateID checks if the passed job id belongs to a template job. -func IsEnvironmentTemplateID(jobID string) bool { - parts := strings.Split(jobID, "-") - return len(parts) == 2 && parts[1] == defaultRunnerJobID -} - // FindConfigTaskGroup returns the config task group of a job. // The config task group should be included in all jobs. func FindConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { @@ -82,13 +51,13 @@ func SetMetaConfigValue(job *nomadApi.Job, key, value string) error { // RegisterTemplateJob creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. func (a *APIClient) RegisterTemplateJob( - defaultJob *nomadApi.Job, - environmentID int, + basisJob *nomadApi.Job, + id string, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) (*nomadApi.Job, error) { - job := CreateTemplateJob(defaultJob, environmentID, prewarmingPoolSize, + job := CreateTemplateJob(basisJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) evalID, err := a.apiQuerier.RegisterNomadJob(job) if err != nil { @@ -100,24 +69,33 @@ func (a *APIClient) RegisterTemplateJob( // CreateTemplateJob creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. func CreateTemplateJob( - defaultJob *nomadApi.Job, - environmentID int, + basisJob *nomadApi.Job, + id string, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) *nomadApi.Job { - job := *defaultJob - templateJobID := TemplateJobID(strconv.Itoa(environmentID)) - job.ID = &templateJobID - job.Name = &templateJobID + job := *basisJob + job.ID = &id + job.Name = &id var taskGroup = createTaskGroup(&job, TaskGroupName, prewarmingPoolSize) configureTask(taskGroup, TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) - storeConfiguration(&job, environmentID, prewarmingPoolSize) + storeTemplateConfiguration(&job, prewarmingPoolSize) return &job } +func (a *APIClient) RegisterRunnerJob(template *nomadApi.Job) error { + storeRunnerConfiguration(template) + + evalID, err := a.apiQuerier.RegisterNomadJob(template) + if err != nil { + return fmt.Errorf("couldn't register runner job: %w", err) + } + return a.MonitorEvaluation(evalID, context.Background()) +} + func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup { var taskGroup *nomadApi.TaskGroup if len(job.TaskGroups) == 0 { @@ -207,17 +185,20 @@ func configureTask( configureNetwork(taskGroup, networkAccess, exposedPorts) } -func storeConfiguration(job *nomadApi.Job, id int, prewarmingPoolSize uint) { +func storeTemplateConfiguration(job *nomadApi.Job, prewarmingPoolSize uint) { taskGroup := findOrCreateConfigTaskGroup(job) - if taskGroup.Meta == nil { - taskGroup.Meta = make(map[string]string) - } - taskGroup.Meta[ConfigMetaEnvironmentKey] = strconv.Itoa(id) - taskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUnusedValue + taskGroup.Meta = make(map[string]string) taskGroup.Meta[ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize)) } +func storeRunnerConfiguration(job *nomadApi.Job) { + taskGroup := findOrCreateConfigTaskGroup(job) + + taskGroup.Meta = make(map[string]string) + taskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUnusedValue +} + func findOrCreateConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { taskGroup := FindConfigTaskGroup(job) if taskGroup == nil { @@ -245,5 +226,5 @@ func createDummyTaskIfNotPresent(taskGroup *nomadApi.TaskGroup) { if task.Config == nil { task.Config = make(map[string]interface{}) } - task.Config["command"] = DefaultTaskCommand + task.Config["command"] = DefaultDummyTaskCommand } diff --git a/nomad/job_test.go b/nomad/job_test.go index 3d9db24..2ca945a 100644 --- a/nomad/job_test.go +++ b/nomad/job_test.go @@ -27,7 +27,7 @@ func createTestResources() *nomadApi.Resources { } func createTestJob() (job, base *nomadApi.Job) { - jobID := TemplateJobID(tests.DefaultEnvironmentIDAsString) + jobID := tests.DefaultJobID base = nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) job = nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) task := createTestTask() @@ -221,11 +221,9 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { func TestCreateTemplateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() - testJobEnvironmentID, err := EnvironmentIDFromJobID(*testJob.ID) - assert.NoError(t, err) job := CreateTemplateJob( base, - testJobEnvironmentID, + tests.DefaultJobID, uint(*testJob.TaskGroups[0].Count), uint(*testJob.TaskGroups[0].Tasks[0].Resources.CPU), uint(*testJob.TaskGroups[0].Tasks[0].Resources.MemoryMB), @@ -244,7 +242,7 @@ func TestRegisterTemplateJobFailsWhenNomadJobRegistrationFails(t *testing.T) { apiClient := &APIClient{&apiMock} - _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger, + _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultJobID, 1, 2, 3, "image", false, []uint16{}) assert.ErrorIs(t, err, expectedErr) apiMock.AssertNotCalled(t, "EvaluationStream") @@ -267,7 +265,7 @@ func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing. apiClient := &APIClient{&apiMock} - _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger, + _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultJobID, 1, 2, 3, "image", false, []uint16{}) assert.NoError(t, err) } @@ -281,7 +279,7 @@ func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing apiClient := &APIClient{&apiMock} - _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger, + _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultJobID, 1, 2, 3, "image", false, []uint16{}) assert.ErrorIs(t, err, tests.ErrDefault) } diff --git a/nomad/nomad.go b/nomad/nomad.go index c7351bc..30057d1 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -18,6 +18,7 @@ var ( ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") ErrorEvaluation = errors.New("evaluation could not complete") ErrorPlacingAllocations = errors.New("failed to place all allocations") + ErrorLoadingJob = errors.New("failed to load job") ) type AllocationProcessor func(*nomadApi.Allocation) @@ -26,21 +27,25 @@ type AllocationProcessor func(*nomadApi.Allocation) type ExecutorAPI interface { apiQuerier - // LoadAllJobs loads all existing jobs independent of the environment or if it is a template job. - LoadAllJobs() ([]*nomadApi.Job, error) + // LoadEnvironmentJobs loads all environment jobs. + LoadEnvironmentJobs() ([]*nomadApi.Job, error) - // LoadRunners loads all runners of the specified environment which are running and not about to get stopped. - LoadRunners(environmentID string) (runnerIds []string, err error) + // LoadRunnerJobs loads all runner jobs specific for the environment. + LoadRunnerJobs(environmentID string) ([]*nomadApi.Job, error) + + // LoadRunnerIDs returns the IDs of all runners of the specified environment which are running and not about to + // get stopped. + LoadRunnerIDs(environmentID string) (runnerIds []string, err error) // RegisterTemplateJob creates a template job based on the default job configuration and the given parameters. // It registers the job and waits until the registration completes. - RegisterTemplateJob(defaultJob *nomadApi.Job, environmentID int, + RegisterTemplateJob(defaultJob *nomadApi.Job, id string, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) (*nomadApi.Job, error) - // LoadEnvironmentTemplate loads the template job of the specified environment. - // Based on the template job new runners can be created. - LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job, error) + // RegisterRunnerJob creates a runner job based on the template job. + // It registers the job and waits until the registration completes. + RegisterRunnerJob(template *nomadApi.Job) error // MonitorEvaluation monitors the given evaluation ID. // It waits until the evaluation reaches one of the states complete, canceled or failed. @@ -81,7 +86,7 @@ func (a *APIClient) init(nomadURL *url.URL, nomadNamespace string) error { return a.apiQuerier.init(nomadURL, nomadNamespace) } -func (a *APIClient) LoadRunners(environmentID string) (runnerIDs []string, err error) { +func (a *APIClient) LoadRunnerIDs(environmentID string) (runnerIDs []string, err error) { list, err := a.listJobs(environmentID) if err != nil { return nil, err @@ -95,12 +100,26 @@ func (a *APIClient) LoadRunners(environmentID string) (runnerIDs []string, err e return runnerIDs, nil } -func (a *APIClient) LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job, error) { - job, err := a.job(TemplateJobID(environmentID)) +func (a *APIClient) LoadRunnerJobs(environmentID string) ([]*nomadApi.Job, error) { + runnerIDs, err := a.LoadRunnerIDs(environmentID) if err != nil { - return nil, fmt.Errorf("failed loading template job: %w", err) + return []*nomadApi.Job{}, fmt.Errorf("couldn't load jobs: %w", err) } - return job, nil + + var occurredError error + jobs := make([]*nomadApi.Job, 0, len(runnerIDs)) + for _, id := range runnerIDs { + job, err := a.apiQuerier.job(id) + if err != nil { + if occurredError == nil { + occurredError = ErrorLoadingJob + } + occurredError = fmt.Errorf("%w: couldn't load job info for runner %s - %v", occurredError, id, err) + continue + } + jobs = append(jobs, job) + } + return jobs, occurredError } func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error { @@ -251,8 +270,8 @@ func (a *APIClient) MarkRunnerAsUsed(runnerID string) error { return nil } -func (a *APIClient) LoadAllJobs() ([]*nomadApi.Job, error) { - jobStubs, err := a.LoadJobList() +func (a *APIClient) LoadEnvironmentJobs() ([]*nomadApi.Job, error) { + jobStubs, err := a.listJobs(TemplateJobPrefix) if err != nil { return []*nomadApi.Job{}, fmt.Errorf("couldn't load jobs: %w", err) } diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index 1dfd4f3..672fed5 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -64,7 +64,7 @@ func (s *LoadRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { s.mock.On("listJobs", mock.AnythingOfType("string")). Return(nil, tests.ErrDefault) - returnedIds, err := s.nomadApiClient.LoadRunners(s.jobId) + returnedIds, err := s.nomadApiClient.LoadRunnerIDs(s.jobId) s.Nil(returnedIds) s.Equal(tests.ErrDefault, err) } @@ -73,7 +73,7 @@ func (s *LoadRunnersTestSuite) TestReturnsNoErrorWhenUnderlyingApiCallDoesNot() s.mock.On("listJobs", mock.AnythingOfType("string")). Return([]*nomadApi.JobListStub{}, nil) - _, err := s.nomadApiClient.LoadRunners(s.jobId) + _, err := s.nomadApiClient.LoadRunnerIDs(s.jobId) s.NoError(err) } @@ -81,7 +81,7 @@ func (s *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { s.mock.On("listJobs", mock.AnythingOfType("string")). Return([]*nomadApi.JobListStub{s.availableRunner}, nil) - returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + returnedIds, _ := s.nomadApiClient.LoadRunnerIDs(s.jobId) s.Len(returnedIds, 1) s.Equal(s.availableRunner.ID, returnedIds[0]) } @@ -90,7 +90,7 @@ func (s *LoadRunnersTestSuite) TestPendingRunnerIsNotReturned() { s.mock.On("listJobs", mock.AnythingOfType("string")). Return([]*nomadApi.JobListStub{s.pendingRunner}, nil) - returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + returnedIds, _ := s.nomadApiClient.LoadRunnerIDs(s.jobId) s.Empty(returnedIds) } @@ -98,7 +98,7 @@ func (s *LoadRunnersTestSuite) TestDeadRunnerIsNotReturned() { s.mock.On("listJobs", mock.AnythingOfType("string")). Return([]*nomadApi.JobListStub{s.deadRunner}, nil) - returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + returnedIds, _ := s.nomadApiClient.LoadRunnerIDs(s.jobId) s.Empty(returnedIds) } @@ -112,7 +112,7 @@ func (s *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() { s.mock.On("listJobs", mock.AnythingOfType("string")). Return(runnersList, nil) - returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + returnedIds, _ := s.nomadApiClient.LoadRunnerIDs(s.jobId) s.Len(returnedIds, 2) s.Contains(returnedIds, s.availableRunner.ID) s.Contains(returnedIds, s.anotherAvailableRunner.ID) diff --git a/runner/manager.go b/runner/manager.go index ad883b8..0e4dafc 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -14,10 +14,12 @@ import ( ) var ( - log = logging.GetLogger("runner") - ErrUnknownExecutionEnvironment = errors.New("execution environment not found") - ErrNoRunnersAvailable = errors.New("no runners available for this execution environment") - ErrRunnerNotFound = errors.New("no runner found with this id") + log = logging.GetLogger("runner") + ErrUnknownExecutionEnvironment = errors.New("execution environment not found") + ErrNoRunnersAvailable = errors.New("no runners available for this execution environment") + ErrRunnerNotFound = errors.New("no runner found with this id") + ErrorUpdatingExecutionEnvironment = errors.New("errors occurred when updating environment") + ErrorInvalidJobID = errors.New("invalid job id") ) type EnvironmentID int @@ -38,7 +40,9 @@ type NomadJobID string type Manager interface { // CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates // the existing environment and all runners. Iff a new Environment has been created, it returns true. - CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *nomadApi.Job) (bool, error) + // Iff scale is true, runners are created until the desiredIdleRunnersCount is reached. + CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job, + scale bool) (bool, error) // Claim returns a new runner. // It makes sure that the runner is not in use yet and returns an error if no runner could be provided. @@ -55,13 +59,9 @@ type Manager interface { // ScaleAllEnvironments checks for all environments if enough runners are created. ScaleAllEnvironments() error - // RecoverEnvironment adds a recovered Environment to the internal structure. - // This is intended to recover environments after a restart. - RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job, desiredIdleRunnersCount uint) - - // RecoverRunner adds a recovered runner to the internal structure. - // This is intended to recover runners after a restart. - RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool) + // Load fetches all already created runners from the executor and registers them. + // It should be called during the startup process (e.g. on creation of the Manager). + Load() } type NomadRunnerManager struct { @@ -79,7 +79,7 @@ func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *No NewLocalNomadEnvironmentStorage(), NewLocalRunnerStorage(), } - go m.updateRunners(ctx) + go m.keepRunnersSynced(ctx) return m } @@ -94,31 +94,36 @@ func (j *NomadEnvironment) ID() EnvironmentID { return j.environmentID } -func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) (bool, error) { +func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, + templateJob *nomadApi.Job, scale bool) (bool, error) { _, ok := m.environments.Get(id) if !ok { - return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob) + return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob, scale) } - return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob) + return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob, scale) } -func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) error { +func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint, + templateJob *nomadApi.Job, scale bool) error { m.environments.Add(&NomadEnvironment{ environmentID, NewLocalRunnerStorage(), desiredIdleRunnersCount, templateJob, }) - err := m.scaleEnvironment(environmentID) - if err != nil { - return fmt.Errorf("couldn't upscale environment %w", err) + if scale { + err := m.scaleEnvironment(environmentID) + if err != nil { + return fmt.Errorf("couldn't upscale environment %w", err) + } } return nil } // updateEnvironment updates all runners of the specified environment. This is required as attributes like the // CPULimit or MemoryMB could be changed in the new template job. -func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, newTemplateJob *nomadApi.Job) error { +func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, + newTemplateJob *nomadApi.Job, scale bool) error { environment, ok := m.environments.Get(id) if !ok { return ErrUnknownExecutionEnvironment @@ -135,31 +140,34 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn return err } - return m.scaleEnvironment(id) + if scale { + err = m.scaleEnvironment(id) + } + return err } func (m *NomadRunnerManager) updateRunnerSpecs(environmentID EnvironmentID, templateJob *nomadApi.Job) error { - runners, err := m.apiClient.LoadRunners(environmentID.toString()) + runners, err := m.apiClient.LoadRunnerIDs(environmentID.toString()) if err != nil { return fmt.Errorf("update environment couldn't load runners: %w", err) } - var occurredErrors []string + + var occurredError error for _, id := range runners { // avoid taking the address of the loop variable runnerID := id updatedRunnerJob := *templateJob updatedRunnerJob.ID = &runnerID updatedRunnerJob.Name = &runnerID - _, err := m.apiClient.RegisterNomadJob(&updatedRunnerJob) + err := m.apiClient.RegisterRunnerJob(&updatedRunnerJob) if err != nil { - occurredErrors = append(occurredErrors, err.Error()) + if occurredError == nil { + occurredError = ErrorUpdatingExecutionEnvironment + } + occurredError = fmt.Errorf("%w; new api error for runner %s - %v", occurredError, id, err) } } - if len(occurredErrors) > 0 { - errorResult := strings.Join(occurredErrors, "\n") - return fmt.Errorf("%d errors occurred when updating environment: %s", len(occurredErrors), errorResult) - } - return nil + return occurredError } func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) { @@ -203,8 +211,8 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) { } func (m *NomadRunnerManager) ScaleAllEnvironments() error { - for _, environmentID := range m.environments.List() { - err := m.scaleEnvironment(environmentID) + for _, environment := range m.environments.List() { + err := m.scaleEnvironment(environment.ID()) if err != nil { return fmt.Errorf("can not scale up: %w", err) } @@ -212,43 +220,35 @@ func (m *NomadRunnerManager) ScaleAllEnvironments() error { return nil } -func (m *NomadRunnerManager) RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job, - desiredIdleRunnersCount uint) { - _, ok := m.environments.Get(id) - if ok { - log.Error("Recovering existing environment.") - return - } - environment := &NomadEnvironment{ - environmentID: id, - idleRunners: NewLocalRunnerStorage(), - } - m.environments.Add(environment) - log.WithField("environmentID", environment.environmentID).Info("Added recovered environment") - environment.desiredIdleRunnersCount = desiredIdleRunnersCount - environment.templateJob = templateJob -} - -func (m *NomadRunnerManager) RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool) { - environment, ok := m.environments.Get(id) - if !ok { - log.Error("Environment missing. Can not recover runner") - return - } - - log.WithField("jobID", *job.ID). - WithField("environmentID", environment.environmentID). - Info("Added idle runner") - - newJob := NewNomadJob(*job.ID, m.apiClient) - if isUsed { - m.usedRunners.Add(newJob) - } else { - environment.idleRunners.Add(newJob) +func (m *NomadRunnerManager) Load() { + for _, environment := range m.environments.List() { + environmentLogger := log.WithField("environmentID", environment.ID()) + runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID().toString()) + if err != nil { + environmentLogger.WithError(err).Warn("Error fetching the runner jobs") + } + for _, job := range runnerJobs { + configTaskGroup := nomad.FindConfigTaskGroup(job) + if configTaskGroup == nil { + environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID) + continue + } + isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue + newJob := NewNomadJob(*job.ID, m.apiClient) + if isUsed { + m.usedRunners.Add(newJob) + } else { + environment.idleRunners.Add(newJob) + } + } + err = m.scaleEnvironment(environment.ID()) + if err != nil { + environmentLogger.Error("Couldn't scale environment") + } } } -func (m *NomadRunnerManager) updateRunners(ctx context.Context) { +func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { retries := 0 for ctx.Err() == nil { err := m.apiClient.WatchAllocations(ctx, m.onAllocationAdded, m.onAllocationStopped) @@ -261,17 +261,17 @@ func (m *NomadRunnerManager) updateRunners(ctx context.Context) { func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { log.WithField("id", alloc.JobID).Debug("Runner started") - if nomad.IsEnvironmentTemplateID(alloc.JobID) { + if IsEnvironmentTemplateID(alloc.JobID) { return } - environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID) + environmentID, err := EnvironmentIDFromJobID(alloc.JobID) if err != nil { log.WithError(err).Warn("Allocation could not be added") return } - job, ok := m.environments.Get(EnvironmentID(environmentID)) + job, ok := m.environments.Get(environmentID) if ok { job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient)) } @@ -280,14 +280,14 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) { log.WithField("id", alloc.JobID).Debug("Runner stopped") - environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID) + environmentID, err := EnvironmentIDFromJobID(alloc.JobID) if err != nil { log.WithError(err).Warn("Stopped allocation can not be handled") return } m.usedRunners.Delete(alloc.JobID) - job, ok := m.environments.Get(EnvironmentID(environmentID)) + job, ok := m.environments.Get(environmentID) if ok { job.idleRunners.Delete(alloc.JobID) } @@ -318,19 +318,40 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error { if err != nil { return fmt.Errorf("failed generating runner id") } - newRunnerID := nomad.RunnerJobID(environment.ID().toString(), newUUID.String()) + newRunnerID := RunnerJobID(environment.ID(), newUUID.String()) template := *environment.templateJob template.ID = &newRunnerID template.Name = &newRunnerID - evalID, err := m.apiClient.RegisterNomadJob(&template) - if err != nil { - return fmt.Errorf("couldn't register Nomad environment: %w", err) - } - err = m.apiClient.MonitorEvaluation(evalID, context.Background()) - if err != nil { - return fmt.Errorf("couldn't monitor evaluation: %w", err) - } - return nil + return m.apiClient.RegisterRunnerJob(&template) +} + +// RunnerJobID returns the nomad job id of the runner with the given environment id and uuid. +func RunnerJobID(environmentID EnvironmentID, uuid string) string { + return fmt.Sprintf("%d-%s", environmentID, uuid) +} + +// EnvironmentIDFromJobID returns the environment id that is part of the passed job id. +func EnvironmentIDFromJobID(jobID string) (EnvironmentID, error) { + parts := strings.Split(jobID, "-") + if len(parts) == 0 { + return 0, fmt.Errorf("empty job id: %w", ErrorInvalidJobID) + } + environmentID, err := strconv.Atoi(parts[0]) + if err != nil { + return 0, fmt.Errorf("invalid environment id par %v: %w", err, ErrorInvalidJobID) + } + return EnvironmentID(environmentID), nil +} + +// TemplateJobID returns the id of the template job for the environment with the given id. +func TemplateJobID(id EnvironmentID) string { + return fmt.Sprintf("%s-%d", nomad.TemplateJobPrefix, id) +} + +// IsEnvironmentTemplateID checks if the passed job id belongs to a template job. +func IsEnvironmentTemplateID(jobID string) bool { + parts := strings.Split(jobID, "-") + return len(parts) == 2 && parts[0] == nomad.TemplateJobPrefix } diff --git a/runner/manager_mock.go b/runner/manager_mock.go index 90e62f2..bdf5a24 100644 --- a/runner/manager_mock.go +++ b/runner/manager_mock.go @@ -35,20 +35,20 @@ func (_m *ManagerMock) Claim(id EnvironmentID) (Runner, error) { return r0, r1 } -// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount, teplateJob -func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *api.Job) (bool, error) { - ret := _m.Called(id, desiredIdleRunnersCount, teplateJob) +// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount, teplateJob, scale +func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *api.Job, scale bool) (bool, error) { + ret := _m.Called(id, desiredIdleRunnersCount, teplateJob, scale) var r0 bool - if rf, ok := ret.Get(0).(func(EnvironmentID, uint, *api.Job) bool); ok { - r0 = rf(id, desiredIdleRunnersCount, teplateJob) + if rf, ok := ret.Get(0).(func(EnvironmentID, uint, *api.Job, bool) bool); ok { + r0 = rf(id, desiredIdleRunnersCount, teplateJob, scale) } else { r0 = ret.Get(0).(bool) } var r1 error - if rf, ok := ret.Get(1).(func(EnvironmentID, uint, *api.Job) error); ok { - r1 = rf(id, desiredIdleRunnersCount, teplateJob) + if rf, ok := ret.Get(1).(func(EnvironmentID, uint, *api.Job, bool) error); ok { + r1 = rf(id, desiredIdleRunnersCount, teplateJob, scale) } else { r1 = ret.Error(1) } @@ -79,14 +79,9 @@ func (_m *ManagerMock) Get(runnerID string) (Runner, error) { return r0, r1 } -// RecoverEnvironment provides a mock function with given fields: id, templateJob, desiredIdleRunnersCount -func (_m *ManagerMock) RecoverEnvironment(id EnvironmentID, templateJob *api.Job, desiredIdleRunnersCount uint) { - _m.Called(id, templateJob, desiredIdleRunnersCount) -} - -// RecoverRunner provides a mock function with given fields: id, environment, isUsed -func (_m *ManagerMock) RecoverRunner(id EnvironmentID, job *api.Job, isUsed bool) { - _m.Called(id, job, isUsed) +// Load provides a mock function with given fields: +func (_m *ManagerMock) Load() { + _m.Called() } // Return provides a mock function with given fields: r diff --git a/runner/manager_test.go b/runner/manager_test.go index 2c28986..10a4b30 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -49,18 +49,17 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin <-time.After(10 * time.Minute) // 10 minutes is the default test timeout call.ReturnArguments = mock.Arguments{nil} }) - apiMock.On("LoadAllJobs").Return([]*nomadApi.Job{}, nil) + apiMock.On("LoadEnvironmentJobs").Return([]*nomadApi.Job{}, nil) apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string")).Return(nil) - apiMock.On("LoadRunners", tests.DefaultJobID).Return(returnedRunnerIds, nil) + apiMock.On("LoadRunnerIDs", tests.DefaultJobID).Return(returnedRunnerIds, nil) apiMock.On("JobScale", tests.DefaultJobID).Return(uint(len(returnedRunnerIds)), nil) apiMock.On("SetJobScale", tests.DefaultJobID, mock.AnythingOfType("uint"), "Runner Requested").Return(nil) - apiMock.On("LoadEnvironmentTemplate", mock.AnythingOfType("string")).Return(&nomadApi.Job{}, nil) - apiMock.On("RegisterNomadJob", mock.Anything).Return("", nil) + apiMock.On("RegisterRunnerJob", mock.Anything).Return(nil) apiMock.On("MonitorEvaluation", mock.Anything, mock.Anything).Return(nil) } func (s *ManagerTestSuite) registerDefaultEnvironment() { - err := s.nomadRunnerManager.registerEnvironment(defaultEnvironmentID, 0, &nomadApi.Job{}) + err := s.nomadRunnerManager.registerEnvironment(defaultEnvironmentID, 0, &nomadApi.Job{}, true) s.Require().NoError(err) } @@ -74,7 +73,8 @@ func (s *ManagerTestSuite) waitForRunnerRefresh() { } func (s *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() { - err := s.nomadRunnerManager.registerEnvironment(anotherEnvironmentID, defaultDesiredRunnersCount, &nomadApi.Job{}) + err := s.nomadRunnerManager. + registerEnvironment(anotherEnvironmentID, defaultDesiredRunnersCount, &nomadApi.Job{}, true) s.Require().NoError(err) job, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.True(ok) @@ -180,7 +180,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.nomadRunnerManager.updateRunners(ctx) + go s.nomadRunnerManager.keepRunnersSynced(ctx) <-time.After(10 * time.Millisecond) s.Require().Equal(1, len(hook.Entries)) @@ -208,7 +208,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.nomadRunnerManager.updateRunners(ctx) + go s.nomadRunnerManager.keepRunnersSynced(ctx) <-time.After(10 * time.Millisecond) _, ok = environment.idleRunners.Get(allocation.JobID) @@ -235,7 +235,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.nomadRunnerManager.updateRunners(ctx) + go s.nomadRunnerManager.keepRunnersSynced(ctx) <-time.After(10 * time.Millisecond) _, ok = environment.idleRunners.Get(allocation.JobID) diff --git a/runner/nomad_environment_storage.go b/runner/nomad_environment_storage.go index ee8ddac..94c6e92 100644 --- a/runner/nomad_environment_storage.go +++ b/runner/nomad_environment_storage.go @@ -6,8 +6,8 @@ import ( // NomadEnvironmentStorage is an interface for storing Nomad environments. type NomadEnvironmentStorage interface { - // List returns all keys of environments stored in this storage. - List() []EnvironmentID + // List returns all environments stored in this storage. + List() []*NomadEnvironment // Add adds an environment to the storage. // It overwrites the old environment if one with the same id was already stored. @@ -39,12 +39,14 @@ func NewLocalNomadEnvironmentStorage() *localNomadEnvironmentStorage { } } -func (s *localNomadEnvironmentStorage) List() []EnvironmentID { - keys := make([]EnvironmentID, 0, len(s.environments)) - for k := range s.environments { - keys = append(keys, k) +func (s *localNomadEnvironmentStorage) List() []*NomadEnvironment { + s.RLock() + defer s.RUnlock() + values := make([]*NomadEnvironment, 0, len(s.environments)) + for _, v := range s.environments { + values = append(values, v) } - return keys + return values } func (s *localNomadEnvironmentStorage) Add(environment *NomadEnvironment) { diff --git a/tests/e2e/environments_test.go b/tests/e2e/environments_test.go index ebe6e46..24b8274 100644 --- a/tests/e2e/environments_test.go +++ b/tests/e2e/environments_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.hpi.de/codeocean/codemoon/poseidon/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests/helpers" "io" @@ -46,7 +46,7 @@ func TestCreateOrUpdateEnvironment(t *testing.T) { t.Run("updates limits in Nomad correctly", func(t *testing.T) { updateRequest := request - updateRequest.CPULimit = 1337 + updateRequest.CPULimit = 150 updateRequest.MemoryLimit = 142 assertPutReturnsStatusAndZeroContent(t, path, updateRequest, http.StatusNoContent) @@ -103,7 +103,7 @@ func assertPutReturnsStatusAndZeroContent(t *testing.T, path string, func validateJob(t *testing.T, expected dto.ExecutionEnvironmentRequest) { t.Helper() - job := findNomadJob(t, tests.AnotherEnvironmentIDAsString) + job := findTemplateJob(t, tests.AnotherEnvironmentIDAsInteger) assertEqualValueStringPointer(t, nomadNamespace, job.Namespace) assertEqualValueStringPointer(t, "batch", job.Type) @@ -137,9 +137,9 @@ func validateJob(t *testing.T, expected dto.ExecutionEnvironmentRequest) { } } -func findNomadJob(t *testing.T, jobID string) *nomadApi.Job { +func findTemplateJob(t *testing.T, id runner.EnvironmentID) *nomadApi.Job { t.Helper() - job, _, err := nomadClient.Jobs().Info(nomad.TemplateJobID(jobID), nil) + job, _, err := nomadClient.Jobs().Info(runner.TemplateJobID(id), nil) if err != nil { t.Fatalf("Error retrieving Nomad job: %v", err) } diff --git a/tests/e2e/websocket_test.go b/tests/e2e/websocket_test.go index 0e218bf..5204dc8 100644 --- a/tests/e2e/websocket_test.go +++ b/tests/e2e/websocket_test.go @@ -170,7 +170,10 @@ func (s *E2ETestSuite) TestStderrFifoIsRemoved() { } func (s *E2ETestSuite) ListTempDirectory(runnerID string) string { - alloc, _, err := nomadClient.Allocations().Info(runnerID, nil) + allocListStub, _, err := nomadClient.Jobs().Allocations(runnerID, true, nil) + s.Require().NoError(err) + s.Require().Equal(1, len(allocListStub)) + alloc, _, err := nomadClient.Allocations().Info(allocListStub[0].ID, nil) s.Require().NoError(err) var stdout, stderr bytes.Buffer