From 6b69a2d73237867c1044796471e728da3182d1af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Mon, 23 Oct 2023 14:36:14 +0200 Subject: [PATCH] Refactor Nomad Recovery from an approach that loaded the runners only once at the startup to a method that will be repeated i.e. if the Nomad Event Stream connection interrupts. --- .golangci.yaml | 2 +- cmd/poseidon/main.go | 10 +-- internal/api/environments_test.go | 9 ++- internal/api/ws/codeocean_writer.go | 2 +- internal/environment/abstract_manager.go | 2 +- internal/environment/aws_environment.go | 2 +- internal/environment/aws_manager.go | 4 +- internal/environment/nomad_environment.go | 38 +++++++--- .../environment/nomad_environment_test.go | 4 +- internal/environment/nomad_manager.go | 50 +++++++++--- internal/environment/nomad_manager_test.go | 48 +++++------- internal/nomad/nomad.go | 5 +- internal/runner/abstract_manager.go | 2 - internal/runner/aws_manager_test.go | 1 + internal/runner/execution_environment.go | 2 +- internal/runner/execution_environment_mock.go | 12 +-- internal/runner/manager.go | 4 - internal/runner/nomad_manager.go | 76 +++++++++++++------ internal/runner/nomad_manager_test.go | 35 +++++++-- internal/runner/nomad_runner.go | 8 +- pkg/dto/dto.go | 10 ++- tests/constants.go | 5 +- 22 files changed, 211 insertions(+), 120 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 9699a5c..5c35dbb 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -37,7 +37,7 @@ linters-settings: importas: unaliased: false lll: - line-length: 120 + line-length: 150 makezero: always: true maligned: diff --git a/cmd/poseidon/main.go b/cmd/poseidon/main.go index 6456686..4ba130a 100644 --- a/cmd/poseidon/main.go +++ b/cmd/poseidon/main.go @@ -193,20 +193,20 @@ func createManagerHandler(handler managerCreator, enabled bool, return runnerManager, environmentManager } -func createNomadManager(ctx context.Context) ( - runnerManager runner.Manager, environmentManager environment.ManagerHandler) { +func createNomadManager(ctx context.Context) (runner.Manager, environment.ManagerHandler) { // API initialization nomadAPIClient, err := nomad.NewExecutorAPI(&config.Config.Nomad) if err != nil { log.WithError(err).WithField("nomad config", config.Config.Nomad).Fatal("Error creating Nomad API client") } - runnerManager = runner.NewNomadRunnerManager(nomadAPIClient, ctx) - environmentManager, err = environment. - NewNomadEnvironmentManager(runnerManager, nomadAPIClient, config.Config.Server.TemplateJobFile, ctx) + runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, ctx) + environmentManager, err := environment. + NewNomadEnvironmentManager(runnerManager, nomadAPIClient, config.Config.Server.TemplateJobFile) if err != nil { log.WithError(err).Fatal("Error initializing environment manager") } + go environmentManager.KeepEnvironmentsSynced(runnerManager.SynchronizeRunners, ctx) return runnerManager, environmentManager } diff --git a/internal/api/environments_test.go b/internal/api/environments_test.go index a28db77..3c7ccbd 100644 --- a/internal/api/environments_test.go +++ b/internal/api/environments_test.go @@ -3,6 +3,7 @@ package api import ( "bytes" "encoding/json" + "fmt" "github.com/gorilla/mux" "github.com/openHPI/poseidon/internal/environment" "github.com/openHPI/poseidon/internal/nomad" @@ -19,6 +20,8 @@ import ( "testing" ) +const jobHCLBasicFormat = "job \"%s\" {}" + type EnvironmentControllerTestSuite struct { tests.MemoryLeakTestSuite manager *environment.ManagerHandlerMock @@ -92,10 +95,10 @@ func (s *EnvironmentControllerTestSuite) TestList() { call.Run(func(args mock.Arguments) { firstEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, - "job \""+nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger)+"\" {}") + fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) s.Require().NoError(err) secondEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, - "job \""+nomad.TemplateJobID(tests.AnotherEnvironmentIDAsInteger)+"\" {}") + fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) s.Require().NoError(err) call.ReturnArguments = mock.Arguments{[]runner.ExecutionEnvironment{firstEnvironment, secondEnvironment}, nil} }) @@ -156,7 +159,7 @@ func (s *EnvironmentControllerTestSuite) TestGet() { call.Run(func(args mock.Arguments) { testEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, - "job \""+nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger)+"\" {}") + fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) s.Require().NoError(err) call.ReturnArguments = mock.Arguments{testEnvironment, nil} }) diff --git a/internal/api/ws/codeocean_writer.go b/internal/api/ws/codeocean_writer.go index cbe70a6..8fa7946 100644 --- a/internal/api/ws/codeocean_writer.go +++ b/internal/api/ws/codeocean_writer.go @@ -107,7 +107,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) case errors.Is(info.Err, runner.ErrOOMKilled): - cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: runner.ErrOOMKilled.Error()}) + cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: dto.ErrOOMKilled.Error()}) case errors.Is(info.Err, nomad.ErrorAllocationCompleted), errors.Is(info.Err, runner.ErrDestroyedByAPIRequest): message := "the allocation stopped as expected" log.WithContext(cw.ctx).WithError(info.Err).Trace(message) diff --git a/internal/environment/abstract_manager.go b/internal/environment/abstract_manager.go index 28b7705..d923c1b 100644 --- a/internal/environment/abstract_manager.go +++ b/internal/environment/abstract_manager.go @@ -58,7 +58,7 @@ func (n *AbstractManager) Delete(id dto.EnvironmentID) (bool, error) { } n.runnerManager.DeleteEnvironment(id) - if err := e.Delete(false); err != nil { + if err := e.Delete(runner.ErrDestroyedByAPIRequest); err != nil { return true, fmt.Errorf("could not delete environment: %w", err) } return true, nil diff --git a/internal/environment/aws_environment.go b/internal/environment/aws_environment.go index 7d864c9..eb10978 100644 --- a/internal/environment/aws_environment.go +++ b/internal/environment/aws_environment.go @@ -45,7 +45,7 @@ func (a *AWSEnvironment) SetImage(awsEndpoint string) { a.awsEndpoint = awsEndpoint } -func (a *AWSEnvironment) Delete(_ bool) error { +func (a *AWSEnvironment) Delete(_ runner.DestroyReason) error { return nil } diff --git a/internal/environment/aws_manager.go b/internal/environment/aws_manager.go index ba4e8cd..92e9a0a 100644 --- a/internal/environment/aws_manager.go +++ b/internal/environment/aws_manager.go @@ -15,9 +15,7 @@ type AWSEnvironmentManager struct { } func NewAWSEnvironmentManager(runnerManager runner.Manager) *AWSEnvironmentManager { - m := &AWSEnvironmentManager{&AbstractManager{nil, runnerManager}} - runnerManager.Load() - return m + return &AWSEnvironmentManager{&AbstractManager{nil, runnerManager}} } func (a *AWSEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) { diff --git a/internal/environment/nomad_environment.go b/internal/environment/nomad_environment.go index 777fbe6..4597ba9 100644 --- a/internal/environment/nomad_environment.go +++ b/internal/environment/nomad_environment.go @@ -219,13 +219,15 @@ func (n *NomadEnvironment) Register() error { return nil } -func (n *NomadEnvironment) Delete(local bool) error { +func (n *NomadEnvironment) Delete(reason runner.DestroyReason) error { n.cancel() - if !local { - err := n.removeRunners() - if err != nil { - return err - } + + err := n.removeRunners(reason) + if err != nil { + return err + } + + if !errors.Is(reason, runner.ErrLocalDestruction) { err = n.apiClient.DeleteJob(*n.job.ID) if err != nil { return fmt.Errorf("couldn't delete environment job: %w", err) @@ -238,7 +240,10 @@ func (n *NomadEnvironment) ApplyPrewarmingPoolSize() error { required := int(n.PrewarmingPoolSize()) - int(n.idleRunners.Length()) if required < 0 { - return fmt.Errorf("%w. Runners to remove: %d", ErrScaleDown, -required) + log.WithError(ErrScaleDown). + WithField(dto.KeyEnvironmentID, n.ID().ToString()). + WithField("offset", -required).Warn("Too many idle runner") + return nil } return n.createRunners(uint(required), true) } @@ -260,6 +265,12 @@ func (n *NomadEnvironment) Sample() (runner.Runner, bool) { } func (n *NomadEnvironment) AddRunner(r runner.Runner) { + if replacedRunner, ok := n.idleRunners.Get(r.ID()); ok { + err := replacedRunner.Destroy(runner.ErrDestroyedAndReplaced) + if err != nil { + log.WithError(err).Warn("failed removing runner before replacing it") + } + } n.idleRunners.Add(r.ID(), r) } @@ -364,10 +375,19 @@ func (n *NomadEnvironment) createRunner(forcePull bool) error { } // removeRunners removes all (idle and used) runners for the given environment n. -func (n *NomadEnvironment) removeRunners() error { +func (n *NomadEnvironment) removeRunners(reason runner.DestroyReason) error { // This prevents a race condition where the number of required runners is miscalculated in the up-scaling process // based on the number of allocation that has been stopped at the moment of the scaling. - n.idleRunners.Purge() + for _, r := range n.idleRunners.List() { + n.idleRunners.Delete(r.ID()) + if err := r.Destroy(runner.ErrLocalDestruction); err != nil { + log.WithError(err).Warn("failed to remove runner locally") + } + } + + if errors.Is(reason, runner.ErrLocalDestruction) { + return nil + } ids, err := n.apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), "")) if err != nil { diff --git a/internal/environment/nomad_environment_test.go b/internal/environment/nomad_environment_test.go index 883b248..d3b6177 100644 --- a/internal/environment/nomad_environment_test.go +++ b/internal/environment/nomad_environment_test.go @@ -165,7 +165,7 @@ func (s *MainTestSuite) TestParseJob() { environment, err := NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock, templateEnvironmentJobHCL) s.NoError(err) s.NotNil(environment.job) - s.NoError(environment.Delete(false)) + s.NoError(environment.Delete(tests.ErrCleanupDestroyReason)) }) s.Run("returns error when given wrong job", func() { @@ -231,7 +231,7 @@ func (s *MainTestSuite) TestNomadEnvironment_DeleteLocally() { environment, err := NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock, templateEnvironmentJobHCL) s.Require().NoError(err) - err = environment.Delete(true) + err = environment.Delete(runner.ErrLocalDestruction) s.NoError(err) apiMock.AssertExpectations(s.T()) } diff --git a/internal/environment/nomad_manager.go b/internal/environment/nomad_manager.go index 8e55ea7..ee01e0b 100644 --- a/internal/environment/nomad_manager.go +++ b/internal/environment/nomad_manager.go @@ -3,6 +3,7 @@ package environment import ( "context" _ "embed" + "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -13,6 +14,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/util" + "math" "os" "time" ) @@ -36,7 +38,6 @@ func NewNomadEnvironmentManager( runnerManager runner.Manager, apiClient nomad.ExecutorAPI, templateJobFile string, - ctx context.Context, ) (*NomadEnvironmentManager, error) { if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil { return nil, err @@ -44,10 +45,6 @@ func NewNomadEnvironmentManager( m := &NomadEnvironmentManager{&AbstractManager{nil, runnerManager}, apiClient, templateEnvironmentJobHCL} - if err := util.RetryExponentialWithContext(ctx, func() error { return m.Load() }); err != nil { - log.WithError(err).Error("Error recovering the execution environments") - } - runnerManager.Load() return m, nil } @@ -72,7 +69,8 @@ func (m *NomadEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) ( ok = true default: executionEnvironment.SetConfigFrom(fetchedEnvironment) - err = fetchedEnvironment.Delete(true) + // We destroy only this (second) local reference to the environment. + err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced) if err != nil { log.WithError(err).Warn("Failed to remove environment locally") } @@ -113,7 +111,7 @@ func (m *NomadEnvironmentManager) fetchEnvironments() error { fetchedEnvironment := newNomadEnvironmentFromJob(job, m.api) localEnvironment.SetConfigFrom(fetchedEnvironment) // We destroy only this (second) local reference to the environment. - if err = fetchedEnvironment.Delete(true); err != nil { + if err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced); err != nil { log.WithError(err).Warn("Failed to remove environment locally") } } else { @@ -124,7 +122,7 @@ func (m *NomadEnvironmentManager) fetchEnvironments() error { // Remove local environments that are not remote environments. for _, localEnvironment := range m.runnerManager.ListEnvironments() { if _, ok := remoteEnvironments[localEnvironment.ID().ToString()]; !ok { - err := localEnvironment.Delete(true) + err := localEnvironment.Delete(runner.ErrLocalDestruction) log.WithError(err).Warn("Failed to remove environment locally") } } @@ -138,7 +136,7 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( if isExistingEnvironment { // Remove existing environment to force downloading the newest Docker image. // See https://github.com/openHPI/poseidon/issues/69 - err = environment.Delete(false) + err = environment.Delete(runner.ErrEnvironmentUpdated) if err != nil { return false, fmt.Errorf("failed to remove the environment: %w", err) } @@ -172,7 +170,39 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( return !isExistingEnvironment, nil } -func (m *NomadEnvironmentManager) Load() error { +// KeepEnvironmentsSynced loads all environments, runner existing at Nomad, and watches Nomad events to handle further changes. +func (m *NomadEnvironmentManager) KeepEnvironmentsSynced(synchronizeRunners func(ctx context.Context) error, ctx context.Context) { + err := util.RetryConstantAttemptsWithContext(math.MaxInt, ctx, func() error { + // Load Environments + if err := m.load(); err != nil { + log.WithContext(ctx).WithError(err).Warn("Loading Environments failed! Retrying...") + return err + } + + // Load Runners and keep them synchronized. + if err := synchronizeRunners(ctx); err != nil { + log.WithContext(ctx).WithError(err).Warn("Loading and synchronizing Runners failed! Retrying...") + return err + } + + return nil + }) + if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + log.WithContext(ctx).WithError(err).Fatal("Stopped KeepEnvironmentsSynced") + } +} + +// Load recovers all environments from the Jobs in Nomad. +// As it replaces the environments the idle runners stored are not tracked anymore. +func (m *NomadEnvironmentManager) load() error { + // We have to destroy the environments first as otherwise they would just be replaced and old goroutines might stay running. + for _, environment := range m.runnerManager.ListEnvironments() { + err := environment.Delete(runner.ErrDestroyedAndReplaced) + if err != nil { + log.WithError(err).Warn("Failed deleting environment locally. Possible memory leak") + } + } + templateJobs, err := m.api.LoadEnvironmentJobs() if err != nil { return fmt.Errorf("couldn't load template jobs: %w", err) diff --git a/internal/environment/nomad_manager_test.go b/internal/environment/nomad_manager_test.go index daaf798..375ebdf 100644 --- a/internal/environment/nomad_manager_test.go +++ b/internal/environment/nomad_manager_test.go @@ -96,9 +96,6 @@ func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() { } func (s *MainTestSuite) TestNewNomadEnvironmentManager() { - disableRecovery, cancel := context.WithCancel(context.Background()) - cancel() - executorAPIMock := &nomad.ExecutorAPIMock{} executorAPIMock.On("LoadEnvironmentJobs").Return([]*nomadApi.Job{}, nil) executorAPIMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) @@ -110,7 +107,7 @@ func (s *MainTestSuite) TestNewNomadEnvironmentManager() { previousTemplateEnvironmentJobHCL := templateEnvironmentJobHCL s.Run("returns error if template file does not exist", func() { - _, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, "/non-existent/file", disableRecovery) + _, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, "/non-existent/file") s.Error(err) }) @@ -122,12 +119,12 @@ func (s *MainTestSuite) TestNewNomadEnvironmentManager() { f := createTempFile(s.T(), templateJobHCL) defer os.Remove(f.Name()) - m, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, f.Name(), disableRecovery) + m, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, f.Name()) s.NoError(err) s.NotNil(m) s.Equal(templateJobHCL, m.templateEnvironmentHCL) - s.NoError(environment.Delete(false)) + s.NoError(environment.Delete(tests.ErrCleanupDestroyReason)) }) s.Run("returns error if template file is invalid", func() { @@ -135,7 +132,7 @@ func (s *MainTestSuite) TestNewNomadEnvironmentManager() { f := createTempFile(s.T(), templateJobHCL) defer os.Remove(f.Name()) - m, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, f.Name(), disableRecovery) + m, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, f.Name()) s.Require().NoError(err) _, err = NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, m.templateEnvironmentHCL) s.Error(err) @@ -145,9 +142,6 @@ func (s *MainTestSuite) TestNewNomadEnvironmentManager() { } func (s *MainTestSuite) TestNomadEnvironmentManager_Get() { - disableRecovery, cancel := context.WithCancel(context.Background()) - cancel() - apiMock := &nomad.ExecutorAPIMock{} mockWatchAllocations(s.TestCtx, apiMock) apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) @@ -158,7 +152,7 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_Get() { }) runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx) - m, err := NewNomadEnvironmentManager(runnerManager, apiMock, "", disableRecovery) + m, err := NewNomadEnvironmentManager(runnerManager, apiMock, "") s.Require().NoError(err) s.Run("Returns error when not found", func() { @@ -177,7 +171,7 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_Get() { s.NoError(err) s.Equal(expectedEnvironment, environment) - err = environment.Delete(false) + err = environment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) }) @@ -211,11 +205,11 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_Get() { s.NoError(err) s.Equal(fetchedEnvironment.Image(), environment.Image()) - err = fetchedEnvironment.Delete(false) + err = fetchedEnvironment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) - err = environment.Delete(false) + err = environment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) - err = localEnvironment.Delete(false) + err = localEnvironment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) }) runnerManager.DeleteEnvironment(tests.DefaultEnvironmentIDAsInteger) @@ -237,18 +231,15 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_Get() { s.NoError(err) s.Equal(fetchedEnvironment.Image(), environment.Image()) - err = fetchedEnvironment.Delete(false) + err = fetchedEnvironment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) - err = environment.Delete(false) + err = environment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) }) }) } func (s *MainTestSuite) TestNomadEnvironmentManager_List() { - disableRecovery, cancel := context.WithCancel(context.Background()) - cancel() - apiMock := &nomad.ExecutorAPIMock{} apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) @@ -259,7 +250,7 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_List() { }) runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx) - m, err := NewNomadEnvironmentManager(runnerManager, apiMock, "", disableRecovery) + m, err := NewNomadEnvironmentManager(runnerManager, apiMock, "") s.Require().NoError(err) s.Run("with no environments", func() { @@ -279,7 +270,7 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_List() { s.Equal(1, len(environments)) s.Equal(localEnvironment, environments[0]) - err = localEnvironment.Delete(false) + err = localEnvironment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) }) runnerManager.DeleteEnvironment(tests.DefaultEnvironmentIDAsInteger) @@ -306,9 +297,9 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_List() { s.True(ok) s.Equal(fetchedEnvironment.job, nomadEnvironment.job) - err = fetchedEnvironment.Delete(false) + err = fetchedEnvironment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) - err = nomadEnvironment.Delete(false) + err = nomadEnvironment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) }) } @@ -331,14 +322,17 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_Load() { _, ok := runnerManager.GetEnvironment(tests.DefaultEnvironmentIDAsInteger) s.Require().False(ok) - _, err := NewNomadEnvironmentManager(runnerManager, apiMock, "", s.TestCtx) + m, err := NewNomadEnvironmentManager(runnerManager, apiMock, "") + s.Require().NoError(err) + + err = m.load() s.Require().NoError(err) environment, ok := runnerManager.GetEnvironment(tests.DefaultEnvironmentIDAsInteger) s.Require().True(ok) s.Equal("python:latest", environment.Image()) - err = environment.Delete(false) + err = environment.Delete(tests.ErrCleanupDestroyReason) s.Require().NoError(err) }) @@ -352,7 +346,7 @@ func (s *MainTestSuite) TestNomadEnvironmentManager_Load() { _, ok := runnerManager.GetEnvironment(tests.DefaultEnvironmentIDAsInteger) s.Require().False(ok) - _, err := NewNomadEnvironmentManager(runnerManager, apiMock, "", context.Background()) + _, err := NewNomadEnvironmentManager(runnerManager, apiMock, "") s.Require().NoError(err) _, ok = runnerManager.GetEnvironment(tests.DefaultEnvironmentIDAsInteger) diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 2f9b0ef..e7115ea 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -27,8 +27,9 @@ var ( ErrorPlacingAllocations = errors.New("failed to place all allocations") ErrorLoadingJob = errors.New("failed to load job") ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") - ErrorOOMKilled RunnerDeletedReason = errors.New("the allocation was OOM Killed") - ErrorAllocationRescheduled RunnerDeletedReason = errors.New("the allocation was rescheduled") + ErrorLocalDestruction RunnerDeletedReason = errors.New("the destruction should not cause external changes") + ErrorOOMKilled RunnerDeletedReason = fmt.Errorf("%s: %w", dto.ErrOOMKilled.Error(), ErrorLocalDestruction) + ErrorAllocationRescheduled RunnerDeletedReason = fmt.Errorf("the allocation was rescheduled: %w", ErrorLocalDestruction) ErrorAllocationStopped RunnerDeletedReason = errors.New("the allocation was stopped") ErrorAllocationStoppedUnexpectedly RunnerDeletedReason = fmt.Errorf("%w unexpectedly", ErrorAllocationStopped) ErrorAllocationRescheduledUnexpectedly RunnerDeletedReason = fmt.Errorf( diff --git a/internal/runner/abstract_manager.go b/internal/runner/abstract_manager.go index 236f69d..faf702f 100644 --- a/internal/runner/abstract_manager.go +++ b/internal/runner/abstract_manager.go @@ -123,5 +123,3 @@ func (n *AbstractManager) Get(runnerID string) (Runner, error) { func (n *AbstractManager) Return(_ Runner) error { return nil } - -func (n *AbstractManager) Load() {} diff --git a/internal/runner/aws_manager_test.go b/internal/runner/aws_manager_test.go index 6008322..529d8c1 100644 --- a/internal/runner/aws_manager_test.go +++ b/internal/runner/aws_manager_test.go @@ -111,5 +111,6 @@ func createBasicEnvironmentMock(id dto.EnvironmentID) *ExecutionEnvironmentMock environment.On("MemoryLimit").Return(uint(0)) environment.On("NetworkAccess").Return(false, nil) environment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) + environment.On("ApplyPrewarmingPoolSize").Return(nil) return environment } diff --git a/internal/runner/execution_environment.go b/internal/runner/execution_environment.go index 9d77a40..f3dd8e9 100644 --- a/internal/runner/execution_environment.go +++ b/internal/runner/execution_environment.go @@ -39,7 +39,7 @@ type ExecutionEnvironment interface { Register() error // Delete removes this environment and all it's runner from the executor and Poseidon itself. // Iff local the environment is just removed from Poseidon without external escalation. - Delete(local bool) error + Delete(reason DestroyReason) error // Sample returns and removes an arbitrary available runner. // ok is true iff a runner was returned. diff --git a/internal/runner/execution_environment_mock.go b/internal/runner/execution_environment_mock.go index 9562f91..2b328cb 100644 --- a/internal/runner/execution_environment_mock.go +++ b/internal/runner/execution_environment_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.33.2. DO NOT EDIT. +// Code generated by mockery v2.36.0. DO NOT EDIT. package runner @@ -45,13 +45,13 @@ func (_m *ExecutionEnvironmentMock) CPULimit() uint { return r0 } -// Delete provides a mock function with given fields: local -func (_m *ExecutionEnvironmentMock) Delete(local bool) error { - ret := _m.Called(local) +// Delete provides a mock function with given fields: reason +func (_m *ExecutionEnvironmentMock) Delete(reason DestroyReason) error { + ret := _m.Called(reason) var r0 error - if rf, ok := ret.Get(0).(func(bool) error); ok { - r0 = rf(local) + if rf, ok := ret.Get(0).(func(DestroyReason) error); ok { + r0 = rf(reason) } else { r0 = ret.Error(0) } diff --git a/internal/runner/manager.go b/internal/runner/manager.go index 1bfa880..76c5191 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -51,8 +51,4 @@ type Accessor interface { // Return signals that the runner is no longer used by the caller and can be claimed by someone else. // The runner is deleted or cleaned up for reuse depending on the used executor. Return(r Runner) error - - // Load fetches all already created runners from the executor and registers them. - // It should be called during the startup process (e.g. on creation of the Manager). - Load() } diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index fc3c7fc..d24474f 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -10,9 +10,9 @@ import ( "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/monitoring" + "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/util" "github.com/sirupsen/logrus" - "math" "strconv" "time" ) @@ -30,12 +30,9 @@ type NomadRunnerManager struct { } // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. -// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad. -// If you cancel the context the background synchronization will be stopped. +// KeepRunnersSynced has to be started separately. func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { - m := &NomadRunnerManager{NewAbstractManager(ctx), apiClient} - go m.keepRunnersSynced(ctx) - return m + return &NomadRunnerManager{NewAbstractManager(ctx), apiClient} } func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) { @@ -80,40 +77,64 @@ func (m *NomadRunnerManager) Return(r Runner) error { return err } -func (m *NomadRunnerManager) Load() { +// SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism). +func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { + // Load Runners + if err := m.load(); err != nil { + return fmt.Errorf("failed loading runners: %w", err) + } + + // Watch for changes regarding the existing or new runners. + err := m.apiClient.WatchEventStream(ctx, + &nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) + + if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + err = fmt.Errorf("nomad Event Stream failed!: %w", err) + } + return err +} + +// Load recovers all runners for all existing environments. +func (m *NomadRunnerManager) load() error { + newUsedRunners := storage.NewLocalStorage[Runner]() + for _, environment := range m.environments.List() { environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString()) + runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID()) if err != nil { - environmentLogger.WithError(err).Warn("Error fetching the runner jobs") + return fmt.Errorf("failed fetching the runner jobs: %w", err) } for _, job := range runnerJobs { - m.loadSingleJob(job, environmentLogger, environment) + m.loadSingleJob(job, environmentLogger, environment, newUsedRunners) } err = environment.ApplyPrewarmingPoolSize() if err != nil { - environmentLogger.WithError(err).Error("Couldn't scale environment") + return fmt.Errorf("couldn't scale environment: %w", err) } } + + m.updateUsedRunners(newUsedRunners) + return nil } func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry, - environment ExecutionEnvironment) { + environment ExecutionEnvironment, newUsedRunners storage.Storage[Runner]) { configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) if configTaskGroup == nil { - environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID) + environmentLogger.Warnf("Couldn't find config task group in job %s, skipping ...", *job.ID) return } isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID) if err != nil { - environmentLogger.WithError(err).Warn("Error loading runner portMappings") + environmentLogger.WithError(err).Warn("Error loading runner portMappings, skipping ...") return } newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.onRunnerDestroyed) log.WithField("isUsed", isUsed).WithField(dto.KeyRunnerID, newJob.ID()).Debug("Recovered Runner") if isUsed { - m.usedRunners.Add(newJob.ID(), newJob) + newUsedRunners.Add(newJob.ID(), newJob) timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey]) if err != nil { environmentLogger.WithError(err).Warn("Error loading timeout from meta values") @@ -125,18 +146,23 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger } } -func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { - err := util.RetryConstantAttemptsWithContext(math.MaxInt, ctx, func() error { - err := m.apiClient.WatchEventStream(ctx, - &nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) - if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - log.WithContext(ctx).WithError(err).Errorf("Nomad Event Stream failed! Retrying...") - err = fmt.Errorf("KeepRunnersSynced: %w", err) +func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner]) { + for _, r := range m.usedRunners.List() { + var reason DestroyReason + if _, ok := newUsedRunners.Get(r.ID()); ok { + reason = ErrDestroyedAndReplaced + } else { + reason = ErrLocalDestruction + log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered") } - return err - }) - if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - log.WithContext(ctx).WithError(err).Fatal("Stopped Restarting the Nomad Event Stream") + m.usedRunners.Delete(r.ID()) + if err := r.Destroy(reason); err != nil { + log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally") + } + } + + for _, r := range newUsedRunners.List() { + m.usedRunners.Add(r.ID(), r) } } diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 1e7bbf0..30de4a9 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -59,6 +59,7 @@ func mockRunnerQueries(ctx context.Context, apiMock *nomad.ExecutorAPIMock, retu call.ReturnArguments = mock.Arguments{nil} }) apiMock.On("LoadEnvironmentJobs").Return([]*nomadApi.Job{}, nil) + apiMock.On("LoadRunnerJobs", mock.AnythingOfType("dto.EnvironmentID")).Return([]*nomadApi.Job{}, nil) apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil) apiMock.On("LoadRunnerIDs", tests.DefaultRunnerID).Return(returnedRunnerIds, nil) apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) @@ -258,12 +259,19 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { }) }) - go s.nomadRunnerManager.keepRunnersSynced(s.TestCtx) + go func() { + err := s.nomadRunnerManager.SynchronizeRunners(s.TestCtx) + if err != nil { + log.WithError(err).Error("failed to synchronize runners") + } + }() <-time.After(10 * time.Millisecond) s.Require().Equal(1, len(hook.Entries)) s.Equal(logrus.ErrorLevel, hook.LastEntry().Level) - s.Equal(hook.LastEntry().Data[logrus.ErrorKey], tests.ErrDefault) + err, ok := hook.LastEntry().Data[logrus.ErrorKey].(error) + s.Require().True(ok) + s.ErrorIs(err, tests.ErrDefault) } func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { @@ -285,7 +293,12 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { }) }) - go s.nomadRunnerManager.keepRunnersSynced(s.TestCtx) + go func() { + err := s.nomadRunnerManager.SynchronizeRunners(s.TestCtx) + if err != nil { + log.WithError(err).Error("failed to synchronize runners") + } + }() <-time.After(10 * time.Millisecond) r, ok := environment.Sample() @@ -313,7 +326,12 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { }) }) - go s.nomadRunnerManager.keepRunnersSynced(s.TestCtx) + go func() { + err := s.nomadRunnerManager.SynchronizeRunners(s.TestCtx) + if err != nil { + log.WithError(err).Error("failed to synchronize runners") + } + }() <-time.After(tests.ShortTimeout) _, ok = environment.Sample() @@ -515,7 +533,8 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner. call.Return([]*nomadApi.Job{job}, nil) - runnerManager.Load() + err := runnerManager.load() + s.NoError(err) environmentMock.AssertExpectations(s.T()) }) @@ -533,7 +552,8 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { s.Require().Zero(runnerManager.usedRunners.Length()) - runnerManager.Load() + err := runnerManager.load() + s.NoError(err) _, ok := runnerManager.usedRunners.Get(tests.DefaultRunnerID) s.True(ok) @@ -557,7 +577,8 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { s.Require().Zero(runnerManager.usedRunners.Length()) - runnerManager.Load() + err := runnerManager.load() + s.NoError(err) s.Require().NotZero(runnerManager.usedRunners.Length()) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 1d13124..1cc5999 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -41,9 +41,12 @@ var ( ErrorUnknownExecution = errors.New("unknown execution") ErrorFileCopyFailed = errors.New("file copy failed") ErrFileNotFound = errors.New("file not found or insufficient permissions") + ErrLocalDestruction DestroyReason = nomad.ErrorLocalDestruction ErrOOMKilled DestroyReason = nomad.ErrorOOMKilled ErrDestroyedByAPIRequest DestroyReason = errors.New("the client wants to stop the runner") ErrCannotStopExecution DestroyReason = errors.New("the execution did not stop after SIGQUIT") + ErrDestroyedAndReplaced DestroyReason = fmt.Errorf("the runner will be destroyed and replaced: %w", ErrLocalDestruction) + ErrEnvironmentUpdated DestroyReason = errors.New("the environment will be destroyed and updated") ) // NomadJob is an abstraction to communicate with Nomad environments. @@ -258,10 +261,7 @@ func (r *NomadJob) Destroy(reason DestroyReason) (err error) { } } - // local determines if a reason is present that the runner should only be removed locally (without requesting Nomad). - local := errors.Is(reason, nomad.ErrorAllocationRescheduled) || - errors.Is(reason, ErrOOMKilled) - if local { + if errors.Is(reason, ErrLocalDestruction) { log.WithContext(r.ctx).Debug("Runner destroyed locally") return nil } diff --git a/pkg/dto/dto.go b/pkg/dto/dto.go index f57def9..56f1f01 100644 --- a/pkg/dto/dto.go +++ b/pkg/dto/dto.go @@ -216,10 +216,12 @@ const ( var ( ErrUnknownWebSocketMessageType = errors.New("unknown WebSocket message type") - ErrMissingType = errors.New("type is missing") - ErrMissingData = errors.New("data is missing") - ErrInvalidType = errors.New("invalid type") - ErrNotSupported = errors.New("not supported") + // ErrOOMKilled is the exact message that CodeOcean expects to further handle these specific cases. + ErrOOMKilled = errors.New("the allocation was OOM Killed") + ErrMissingType = errors.New("type is missing") + ErrMissingData = errors.New("data is missing") + ErrInvalidType = errors.New("invalid type") + ErrNotSupported = errors.New("not supported") ) // WebSocketMessage is the type for all messages send in the WebSocket to the client. diff --git a/tests/constants.go b/tests/constants.go index 457a151..19bb9f0 100644 --- a/tests/constants.go +++ b/tests/constants.go @@ -25,13 +25,14 @@ const ( AnotherRunnerID = AnotherEnvironmentIDAsString + "-" + AnotherUUID DefaultExecutionID = "s0m3-3x3cu710n-1d" DefaultMockID = "m0ck-1d" - TinyTimeout = 10 * time.Millisecond ShortTimeout = 100 * time.Millisecond DefaultTestTimeout = 10 * time.Minute ) var ( - ErrDefault = errors.New("an error occurred") + ErrDefault = errors.New("an error occurred") + ErrCleanupDestroyReason = errors.New("destruction required for cleanup") + DefaultPortMappings = []nomadApi.PortMapping{{To: 42, Value: 1337, Label: "lit", HostIP: "127.0.0.1"}} DefaultMappedPorts = []*dto.MappedPort{{ExposedPort: 42, HostAddress: "127.0.0.1:1337"}} )