diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 07b2432..4f59fa4 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -437,7 +437,7 @@ func newRunnerWithNotMockedRunnerManager(t *testing.T, apiMock *nomad.ExecutorAP require.NoError(t, err) e.SetID(eID) e.SetPrewarmingPoolSize(0) - runnerManager.SetEnvironment(e) + runnerManager.StoreEnvironment(e) e.AddRunner(runnerJob) r, err = runnerManager.Claim(e.ID(), int(tests.DefaultTestTimeout.Seconds())) diff --git a/internal/environment/environment.go b/internal/environment/environment.go index 5381570..0318180 100644 --- a/internal/environment/environment.go +++ b/internal/environment/environment.go @@ -3,6 +3,7 @@ package environment import ( "context" "encoding/json" + "errors" "fmt" "github.com/google/uuid" nomadApi "github.com/hashicorp/nomad/api" @@ -18,6 +19,8 @@ const ( portNumberBase = 10 ) +var ErrScaleDown = errors.New("cannot scale down the environment") + type NomadEnvironment struct { jobHCL string job *nomadApi.Job @@ -33,6 +36,23 @@ func NewNomadEnvironment(jobHCL string) (*NomadEnvironment, error) { return &NomadEnvironment{jobHCL, job, runner.NewLocalRunnerStorage()}, nil } +func NewNomadEnvironmentFromRequest(jobHCL string, id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) ( + *NomadEnvironment, error) { + environment, err := NewNomadEnvironment(jobHCL) + if err != nil { + return nil, err + } + environment.SetID(id) + + // Set options according to request + environment.SetPrewarmingPoolSize(request.PrewarmingPoolSize) + environment.SetCPULimit(request.CPULimit) + environment.SetMemoryLimit(request.MemoryLimit) + environment.SetImage(request.Image) + environment.SetNetworkAccess(request.NetworkAccess, request.ExposedPorts) + return environment, nil +} + func (n *NomadEnvironment) ID() dto.EnvironmentID { id, err := nomad.EnvironmentIDFromTemplateJobID(*n.job.ID) if err != nil { @@ -169,11 +189,6 @@ func (n *NomadEnvironment) SetNetworkAccess(allow bool, exposedPorts []uint16) { // Register creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error { - // To avoid docker image issues. See https://github.com/openHPI/poseidon/issues/69 - if err := n.Delete(apiClient); err != nil { - return fmt.Errorf("failed to remove the environment: %w", err) - } - nomad.SetForcePullFlag(n.job, true) // This must be the default as otherwise new runners could have different images. evalID, err := apiClient.RegisterNomadJob(n.job) if err != nil { @@ -200,14 +215,13 @@ func (n *NomadEnvironment) Delete(apiClient nomad.ExecutorAPI) error { return nil } -func (n *NomadEnvironment) Scale(apiClient nomad.ExecutorAPI) error { +func (n *NomadEnvironment) ApplyPrewarmingPoolSize(apiClient nomad.ExecutorAPI) error { required := int(n.PrewarmingPoolSize()) - n.idleRunners.Length() - if required > 0 { - return n.createRunners(apiClient, uint(required), true) - } else { - return n.removeIdleRunners(apiClient, uint(-required)) + if required < 0 { + return fmt.Errorf("%w. Runners to remove: %d", ErrScaleDown, -required) } + return n.createRunners(apiClient, uint(required), true) } func (n *NomadEnvironment) Sample(apiClient nomad.ExecutorAPI) (runner.Runner, bool) { @@ -322,32 +336,18 @@ func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull b return nil } -func (n *NomadEnvironment) removeIdleRunners(apiClient nomad.ExecutorAPI, count uint) error { - log.WithField("runnersToDelete", count).WithField("id", n.ID()).Debug("Removing idle runners") - for i := 0; i < int(count); i++ { - r, ok := n.idleRunners.Sample() - if !ok { - return fmt.Errorf("could not delete expected idle runner: %w", runner.ErrRunnerNotFound) - } - err := apiClient.DeleteJob(r.ID()) - if err != nil { - return fmt.Errorf("could not delete expected Nomad idle runner: %w", err) - } - } - return nil -} - +// removeRunners removes all (idle and used) runners for the given environment n. func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI) error { - // Only to avoid timing issues as an idle runner is also removed when Nomad has deleted the allocation. - for _, r := range n.idleRunners.List() { - n.idleRunners.Delete(r.ID()) - } + // This prevents a race condition where the number of required runners is miscalculated in the up-scaling process + // based on the number of allocation that has been stopped at the moment of the scaling. + n.idleRunners.Purge() ids, err := apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), "")) if err != nil { return fmt.Errorf("failed to load runner ids: %w", err) } + // Block execution until Nomad confirmed all deletion requests. var wg sync.WaitGroup for _, id := range ids { wg.Add(1) diff --git a/internal/environment/manager.go b/internal/environment/manager.go index 70e30e8..1dd5321 100644 --- a/internal/environment/manager.go +++ b/internal/environment/manager.go @@ -88,7 +88,7 @@ func (m *NomadEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) ( } ok = false case !ok: - m.runnerManager.SetEnvironment(fetchedEnvironment) + m.runnerManager.StoreEnvironment(fetchedEnvironment) executionEnvironment = fetchedEnvironment ok = true default: @@ -114,32 +114,39 @@ func (m *NomadEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironmen func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) ( created bool, err error) { - environment, ok := m.runnerManager.GetEnvironment(id) - if !ok { - environment, err = NewNomadEnvironment(m.templateEnvironmentHCL) + // Check if execution environment is already existing (in the local memory). + environment, isExistingEnvironment := m.runnerManager.GetEnvironment(id) + if isExistingEnvironment { + // Remove existing environment to force downloading the newest Docker image. + // See https://github.com/openHPI/poseidon/issues/69 + err = environment.Delete(m.api) if err != nil { - return false, fmt.Errorf("error creating Nomad environment: %w", err) + return false, fmt.Errorf("failed to remove the environment: %w", err) } - environment.SetID(id) } - environment.SetPrewarmingPoolSize(request.PrewarmingPoolSize) - environment.SetCPULimit(request.CPULimit) - environment.SetMemoryLimit(request.MemoryLimit) - environment.SetImage(request.Image) - environment.SetNetworkAccess(request.NetworkAccess, request.ExposedPorts) - created = m.runnerManager.SetEnvironment(environment) + // Create a new environment with the given request options. + environment, err = NewNomadEnvironmentFromRequest(m.templateEnvironmentHCL, id, request) + if err != nil { + return false, fmt.Errorf("error creating Nomad environment: %w", err) + } + // Keep a copy of environment specification in memory. + m.runnerManager.StoreEnvironment(environment) + + // Register template Job with Nomad. err = environment.Register(m.api) if err != nil { return false, fmt.Errorf("error registering template job in API: %w", err) } - err = environment.Scale(m.api) + + // Launch idle runners based on the template job. + err = environment.ApplyPrewarmingPoolSize(m.api) if err != nil { return false, fmt.Errorf("error scaling template job in API: %w", err) } - return created, nil + return !isExistingEnvironment, nil } func (m *NomadEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) { @@ -181,7 +188,7 @@ func (m *NomadEnvironmentManager) Load() error { job: job, idleRunners: runner.NewLocalRunnerStorage(), } - m.runnerManager.SetEnvironment(environment) + m.runnerManager.StoreEnvironment(environment) jobLogger.Info("Successfully recovered environment") } return nil diff --git a/internal/environment/manager_test.go b/internal/environment/manager_test.go index b914596..9e5a30d 100644 --- a/internal/environment/manager_test.go +++ b/internal/environment/manager_test.go @@ -57,7 +57,7 @@ func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentRe s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false) - s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) + s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) _, err := s.manager.CreateOrUpdate(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request) s.ErrorIs(err, tests.ErrDefault) } @@ -67,7 +67,7 @@ func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() { s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false) - s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) + s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) s.apiMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil) s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) call := s.apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")) @@ -155,7 +155,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) { expectedEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL) expectedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger) require.NoError(t, err) - runnerManager.SetEnvironment(expectedEnvironment) + runnerManager.StoreEnvironment(expectedEnvironment) environment, err := m.Get(tests.DefaultEnvironmentIDAsInteger, false) assert.NoError(t, err) @@ -181,7 +181,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) { localEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL) require.NoError(t, err) localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger) - runnerManager.SetEnvironment(localEnvironment) + runnerManager.StoreEnvironment(localEnvironment) environment, err := m.Get(tests.DefaultEnvironmentIDAsInteger, false) assert.NoError(t, err) @@ -234,7 +234,7 @@ func TestNomadEnvironmentManager_List(t *testing.T) { localEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL) require.NoError(t, err) localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger) - runnerManager.SetEnvironment(localEnvironment) + runnerManager.StoreEnvironment(localEnvironment) environments, err := m.List(false) assert.NoError(t, err) diff --git a/internal/nomad/job.go b/internal/nomad/job.go index 8eda3f8..91e1acc 100644 --- a/internal/nomad/job.go +++ b/internal/nomad/job.go @@ -99,7 +99,11 @@ func FindOrCreateConfigTask(taskGroup *nomadApi.TaskGroup) *nomadApi.Task { if task.Config == nil { task.Config = make(map[string]interface{}) } - task.Config["command"] = ConfigTaskCommand + // This function should allow concurrency in the "Find" case. + // Therefore, this condition is necessary to remove concurrent writes in the "Find" case. + if v, ok := task.Config["command"]; !(ok && v == ConfigTaskCommand) { + task.Config["command"] = ConfigTaskCommand + } return task } @@ -125,8 +129,14 @@ func FindOrCreateDefaultTask(taskGroup *nomadApi.TaskGroup) *nomadApi.Task { if task.Config == nil { task.Config = make(map[string]interface{}) } - task.Config["command"] = TaskCommand - task.Config["args"] = TaskArgs + // This function should allow concurrency in the "Find" case. + if v, ok := task.Config["command"]; !(ok && v == TaskCommand) { + task.Config["command"] = TaskCommand + } + v, ok := task.Config["args"] + if args, isStringArray := v.([]string); !(ok && isStringArray && len(args) == 1 && args[0] == TaskArgs[0]) { + task.Config["args"] = TaskArgs + } return task } diff --git a/internal/runner/execution_environment_mock.go b/internal/runner/execution_environment_mock.go index 21c7f35..3734095 100644 --- a/internal/runner/execution_environment_mock.go +++ b/internal/runner/execution_environment_mock.go @@ -19,6 +19,20 @@ func (_m *ExecutionEnvironmentMock) AddRunner(r Runner) { _m.Called(r) } +// ApplyPrewarmingPoolSize provides a mock function with given fields: apiClient +func (_m *ExecutionEnvironmentMock) ApplyPrewarmingPoolSize(apiClient nomad.ExecutorAPI) error { + ret := _m.Called(apiClient) + + var r0 error + if rf, ok := ret.Get(0).(func(nomad.ExecutorAPI) error); ok { + r0 = rf(apiClient) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // CPULimit provides a mock function with given fields: func (_m *ExecutionEnvironmentMock) CPULimit() uint { ret := _m.Called() @@ -205,20 +219,6 @@ func (_m *ExecutionEnvironmentMock) Sample(apiClient nomad.ExecutorAPI) (Runner, return r0, r1 } -// Scale provides a mock function with given fields: apiClient -func (_m *ExecutionEnvironmentMock) Scale(apiClient nomad.ExecutorAPI) error { - ret := _m.Called(apiClient) - - var r0 error - if rf, ok := ret.Get(0).(func(nomad.ExecutorAPI) error); ok { - r0 = rf(apiClient) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // SetCPULimit provides a mock function with given fields: limit func (_m *ExecutionEnvironmentMock) SetCPULimit(limit uint) { _m.Called(limit) @@ -253,17 +253,3 @@ func (_m *ExecutionEnvironmentMock) SetNetworkAccess(allow bool, ports []uint16) func (_m *ExecutionEnvironmentMock) SetPrewarmingPoolSize(count uint) { _m.Called(count) } - -// UpdateRunnerSpecs provides a mock function with given fields: apiClient -func (_m *ExecutionEnvironmentMock) UpdateRunnerSpecs(apiClient nomad.ExecutorAPI) error { - ret := _m.Called(apiClient) - - var r0 error - if rf, ok := ret.Get(0).(func(nomad.ExecutorAPI) error); ok { - r0 = rf(apiClient) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/internal/runner/manager.go b/internal/runner/manager.go index 7ac0c4e..8832ab3 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -31,6 +31,8 @@ type ExecutionEnvironment interface { // PrewarmingPoolSize sets the number of idle runner of this environment that should be prewarmed. PrewarmingPoolSize() uint SetPrewarmingPoolSize(count uint) + // ApplyPrewarmingPoolSize creates idle runners according to the PrewarmingPoolSize. + ApplyPrewarmingPoolSize(apiClient nomad.ExecutorAPI) error // CPULimit sets the share of cpu that a runner should receive at minimum. CPULimit() uint SetCPULimit(limit uint) @@ -50,8 +52,6 @@ type ExecutionEnvironment interface { Register(apiClient nomad.ExecutorAPI) error // Delete removes this environment and all it's runner from the executor and Poseidon itself. Delete(apiClient nomad.ExecutorAPI) error - // Scale manages if the executor has enough idle runner according to the PrewarmingPoolSize. - Scale(apiClient nomad.ExecutorAPI) error // Sample returns and removes an arbitrary available runner. // ok is true iff a runner was returned. @@ -74,9 +74,8 @@ type Manager interface { // Iff the requested environment is not stored it returns false. GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) - // SetEnvironment stores the environment in Poseidons memory. - // It returns true iff a new environment is stored and false iff an existing environment was updated. - SetEnvironment(environment ExecutionEnvironment) bool + // StoreEnvironment stores the environment in Poseidons memory. + StoreEnvironment(environment ExecutionEnvironment) // DeleteEnvironment removes the specified execution environment in Poseidons memory. // It does nothing if the specified environment can not be found. @@ -129,10 +128,8 @@ func (m *NomadRunnerManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvi return m.environments.Get(id) } -func (m *NomadRunnerManager) SetEnvironment(environment ExecutionEnvironment) bool { - _, ok := m.environments.Get(environment.ID()) +func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) { m.environments.Add(environment) - return !ok } func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) { @@ -215,7 +212,7 @@ func (m *NomadRunnerManager) Load() { for _, job := range runnerJobs { m.loadSingleJob(job, environmentLogger, environment) } - err = environment.Scale(m.apiClient) + err = environment.ApplyPrewarmingPoolSize(m.apiClient) if err != nil { environmentLogger.WithError(err).Error("Couldn't scale environment") } diff --git a/internal/runner/manager_mock.go b/internal/runner/manager_mock.go index 8a93cd4..f43c52f 100644 --- a/internal/runner/manager_mock.go +++ b/internal/runner/manager_mock.go @@ -137,16 +137,7 @@ func (_m *ManagerMock) Return(r Runner) error { return r0 } -// SetEnvironment provides a mock function with given fields: environment -func (_m *ManagerMock) SetEnvironment(environment ExecutionEnvironment) bool { - ret := _m.Called(environment) - - var r0 bool - if rf, ok := ret.Get(0).(func(ExecutionEnvironment) bool); ok { - r0 = rf(environment) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 +// StoreEnvironment provides a mock function with given fields: environment +func (_m *ManagerMock) StoreEnvironment(environment ExecutionEnvironment) { + _m.Called(environment) } diff --git a/internal/runner/manager_test.go b/internal/runner/manager_test.go index 9682b09..1d2be6a 100644 --- a/internal/runner/manager_test.go +++ b/internal/runner/manager_test.go @@ -82,8 +82,7 @@ func mockIdleRunners(environmentMock *ExecutionEnvironmentMock) { func (s *ManagerTestSuite) setDefaultEnvironment() { s.exerciseEnvironment.On("ID").Return(defaultEnvironmentID) - created := s.nomadRunnerManager.SetEnvironment(s.exerciseEnvironment) - s.Require().True(created) + s.nomadRunnerManager.StoreEnvironment(s.exerciseEnvironment) } func (s *ManagerTestSuite) waitForRunnerRefresh() { @@ -93,8 +92,7 @@ func (s *ManagerTestSuite) waitForRunnerRefresh() { func (s *ManagerTestSuite) TestSetEnvironmentAddsNewEnvironment() { anotherEnvironment := &ExecutionEnvironmentMock{} anotherEnvironment.On("ID").Return(anotherEnvironmentID) - created := s.nomadRunnerManager.SetEnvironment(anotherEnvironment) - s.Require().True(created) + s.nomadRunnerManager.StoreEnvironment(anotherEnvironment) job, ok := s.nomadRunnerManager.environments.Get(anotherEnvironmentID) s.True(ok) diff --git a/internal/runner/storage.go b/internal/runner/storage.go index 795d9d9..2d2117a 100644 --- a/internal/runner/storage.go +++ b/internal/runner/storage.go @@ -21,6 +21,9 @@ type Storage interface { // It does nothing if no runner with the id is present in the store. Delete(id string) + // Purge removes all runners from the storage. + Purge() + // Length returns the number of currently stored runners in the storage. Length() int @@ -72,6 +75,12 @@ func (s *localRunnerStorage) Delete(id string) { delete(s.runners, id) } +func (s *localRunnerStorage) Purge() { + s.Lock() + defer s.Unlock() + s.runners = make(map[string]Runner) +} + func (s *localRunnerStorage) Sample() (Runner, bool) { s.Lock() defer s.Unlock()