diff --git a/internal/environment/environment.go b/internal/environment/environment.go index f3cd3c7..5381570 100644 --- a/internal/environment/environment.go +++ b/internal/environment/environment.go @@ -3,7 +3,6 @@ package environment import ( "context" "encoding/json" - "errors" "fmt" "github.com/google/uuid" nomadApi "github.com/hashicorp/nomad/api" @@ -12,16 +11,13 @@ import ( "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "strconv" + "sync" ) const ( portNumberBase = 10 ) -var ( - ErrorUpdatingExecutionEnvironment = errors.New("errors occurred when updating environment") -) - type NomadEnvironment struct { jobHCL string job *nomadApi.Job @@ -173,6 +169,11 @@ func (n *NomadEnvironment) SetNetworkAccess(allow bool, exposedPorts []uint16) { // Register creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error { + // To avoid docker image issues. See https://github.com/openHPI/poseidon/issues/69 + if err := n.Delete(apiClient); err != nil { + return fmt.Errorf("failed to remove the environment: %w", err) + } + nomad.SetForcePullFlag(n.job, true) // This must be the default as otherwise new runners could have different images. evalID, err := apiClient.RegisterNomadJob(n.job) if err != nil { @@ -188,7 +189,7 @@ func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error { } func (n *NomadEnvironment) Delete(apiClient nomad.ExecutorAPI) error { - err := n.removeRunners(apiClient, uint(n.idleRunners.Length())) + err := n.removeRunners(apiClient) if err != nil { return err } @@ -205,36 +206,10 @@ func (n *NomadEnvironment) Scale(apiClient nomad.ExecutorAPI) error { if required > 0 { return n.createRunners(apiClient, uint(required), true) } else { - return n.removeRunners(apiClient, uint(-required)) + return n.removeIdleRunners(apiClient, uint(-required)) } } -func (n *NomadEnvironment) UpdateRunnerSpecs(apiClient nomad.ExecutorAPI) error { - runners, err := apiClient.LoadRunnerIDs(n.ID().ToString()) - if err != nil { - return fmt.Errorf("update environment couldn't load runners: %w", err) - } - - var occurredError error - for _, id := range runners { - // avoid taking the address of the loop variable - runnerID := id - updatedRunnerJob := n.DeepCopyJob() - updatedRunnerJob.ID = &runnerID - updatedRunnerJob.Name = &runnerID - nomad.SetForcePullFlag(updatedRunnerJob, true) - - err := apiClient.RegisterRunnerJob(updatedRunnerJob) - if err != nil { - if occurredError == nil { - occurredError = ErrorUpdatingExecutionEnvironment - } - occurredError = fmt.Errorf("%w; new api error for runner %s - %v", occurredError, id, err) - } - } - return occurredError -} - func (n *NomadEnvironment) Sample(apiClient nomad.ExecutorAPI) (runner.Runner, bool) { r, ok := n.idleRunners.Sample() if ok { @@ -347,7 +322,7 @@ func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull b return nil } -func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI, count uint) error { +func (n *NomadEnvironment) removeIdleRunners(apiClient nomad.ExecutorAPI, count uint) error { log.WithField("runnersToDelete", count).WithField("id", n.ID()).Debug("Removing idle runners") for i := 0; i < int(count); i++ { r, ok := n.idleRunners.Sample() @@ -361,3 +336,29 @@ func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI, count uint } return nil } + +func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI) error { + // Only to avoid timing issues as an idle runner is also removed when Nomad has deleted the allocation. + for _, r := range n.idleRunners.List() { + n.idleRunners.Delete(r.ID()) + } + + ids, err := apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), "")) + if err != nil { + return fmt.Errorf("failed to load runner ids: %w", err) + } + + var wg sync.WaitGroup + for _, id := range ids { + wg.Add(1) + go func(jobID string) { + defer wg.Done() + deleteErr := apiClient.DeleteJob(jobID) + if deleteErr != nil { + err = deleteErr + } + }(id) + } + wg.Wait() + return err +} diff --git a/internal/environment/environment_test.go b/internal/environment/environment_test.go index 571867d..87a2e07 100644 --- a/internal/environment/environment_test.go +++ b/internal/environment/environment_test.go @@ -110,8 +110,10 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) { expectedErr := tests.ErrDefault apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", expectedErr) + apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) - environment := &NomadEnvironment{"", &nomadApi.Job{}, nil} + environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()} environment.SetID(tests.DefaultEnvironmentIDAsInteger) err := environment.Register(apiClientMock) @@ -125,8 +127,10 @@ func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing. apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil) + apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) - environment := &NomadEnvironment{"", &nomadApi.Job{}, nil} + environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()} environment.SetID(tests.DefaultEnvironmentIDAsInteger) err := environment.Register(apiClientMock) @@ -139,8 +143,10 @@ func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(tests.ErrDefault) + apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) - environment := &NomadEnvironment{"", &nomadApi.Job{}, nil} + environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()} environment.SetID(tests.DefaultEnvironmentIDAsInteger) err := environment.Register(apiClientMock) diff --git a/internal/environment/manager.go b/internal/environment/manager.go index 017bb9f..70e30e8 100644 --- a/internal/environment/manager.go +++ b/internal/environment/manager.go @@ -134,10 +134,6 @@ func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request d if err != nil { return false, fmt.Errorf("error registering template job in API: %w", err) } - err = environment.UpdateRunnerSpecs(m.api) - if err != nil { - return false, fmt.Errorf("error updating runner jobs in API: %w", err) - } err = environment.Scale(m.api) if err != nil { return false, fmt.Errorf("error scaling template job in API: %w", err) diff --git a/internal/environment/manager_test.go b/internal/environment/manager_test.go index 69c0a5b..b914596 100644 --- a/internal/environment/manager_test.go +++ b/internal/environment/manager_test.go @@ -54,6 +54,8 @@ func (s *CreateOrUpdateTestSuite) SetupTest() { func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentReturnsError() { s.apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", tests.ErrDefault) + s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false) s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) _, err := s.manager.CreateOrUpdate(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request) @@ -62,6 +64,8 @@ func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentRe func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() { s.apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", nil) + s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false) s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) s.apiMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil) @@ -131,6 +135,8 @@ func TestNewNomadEnvironmentManager(t *testing.T) { func TestNomadEnvironmentManager_Get(t *testing.T) { apiMock := &nomad.ExecutorAPIMock{} mockWatchAllocations(apiMock) + apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) + apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) call := apiMock.On("LoadEnvironmentJobs") call.Run(func(args mock.Arguments) { call.ReturnArguments = mock.Arguments{[]*nomadApi.Job{}, nil} diff --git a/internal/runner/manager.go b/internal/runner/manager.go index c558769..7ac0c4e 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -52,8 +52,6 @@ type ExecutionEnvironment interface { Delete(apiClient nomad.ExecutorAPI) error // Scale manages if the executor has enough idle runner according to the PrewarmingPoolSize. Scale(apiClient nomad.ExecutorAPI) error - // UpdateRunnerSpecs updates all Runner of the passed environment to have the same definition as the environment. - UpdateRunnerSpecs(apiClient nomad.ExecutorAPI) error // Sample returns and removes an arbitrary available runner. // ok is true iff a runner was returned.