From 87f823756b6b5336f02f752601d7acbed1b224d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Thu, 10 Jun 2021 19:08:14 +0200 Subject: [PATCH] Implement merge request comments --- Makefile | 2 +- api/environments.go | 11 +- api/environments_test.go | 23 ++- api/runners_test.go | 2 +- environment/job.go | 47 ++--- environment/job_test.go | 22 ++- environment/manager.go | 91 ++++++++- environment/manager_mock.go | 13 +- environment/manager_test.go | 38 ++-- main.go | 6 +- nomad/api_querier.go | 12 +- nomad/api_querier_mock.go | 2 +- nomad/executor_api_mock.go | 4 +- nomad/job.go | 65 +++++-- nomad/nomad.go | 30 +-- nomad/nomad_test.go | 10 +- runner/manager.go | 231 ++++++++++------------- runner/manager_mock.go | 43 ++++- runner/manager_test.go | 32 ++-- runner/nomad_environment_storage.go | 54 +++--- runner/nomad_environment_storage_test.go | 82 ++++---- runner/runner.go | 7 +- runner/runner_test.go | 14 +- tests/constants.go | 6 +- tests/e2e/e2e_test.go | 16 +- tests/e2e/environments_test.go | 2 +- 26 files changed, 482 insertions(+), 383 deletions(-) diff --git a/Makefile b/Makefile index aebe793..6820551 100644 --- a/Makefile +++ b/Makefile @@ -81,7 +81,7 @@ coverhtml: coverage ## Generate HTML coverage report .PHONY: e2e-test e2e-test: deps ## Run e2e tests - @go test -count=1 ./tests/e2e -v + @go test -count=1 ./tests/e2e -v -args -dockerImage="drp.codemoon.xopic.de/openhpi/co_execenv_python:3.8" .PHONY: e2e-docker e2e-docker: docker ## Run e2e tests against the Docker container diff --git a/api/environments.go b/api/environments.go index 77746b3..4a9a5a9 100644 --- a/api/environments.go +++ b/api/environments.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/mux" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" ) @@ -37,8 +38,12 @@ func (e *EnvironmentController) createOrUpdate(writer http.ResponseWriter, reque writeBadRequest(writer, fmt.Errorf("could not find %s", executionEnvironmentIDKey)) return } - - created, err := e.manager.CreateOrUpdate(id, *req) + environmentID, err := runner.NewEnvironmentID(id) + if err != nil { + writeBadRequest(writer, fmt.Errorf("could not update environment: %w", err)) + return + } + created, err := e.manager.CreateOrUpdate(environmentID, *req) if err != nil { writeInternalServerError(writer, err, dto.ErrorUnknown) } @@ -51,6 +56,6 @@ func (e *EnvironmentController) createOrUpdate(writer http.ResponseWriter, reque } // delete removes an execution environment from the executor -func (e *EnvironmentController) delete(writer http.ResponseWriter, request *http.Request) { // nolint:unused +func (e *EnvironmentController) delete(writer http.ResponseWriter, request *http.Request) { // nolint:unused ToDo } diff --git a/api/environments_test.go b/api/environments_test.go index 55b6272..351fd9f 100644 --- a/api/environments_test.go +++ b/api/environments_test.go @@ -8,9 +8,13 @@ import ( "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" + "math" "net/http" "net/http/httptest" + "strconv" + "strings" "testing" ) @@ -32,7 +36,7 @@ func (s *EnvironmentControllerTestSuite) SetupTest() { type CreateOrUpdateEnvironmentTestSuite struct { EnvironmentControllerTestSuite path string - id string + id runner.EnvironmentID body []byte } @@ -42,8 +46,8 @@ func TestCreateOrUpdateEnvironmentTestSuite(t *testing.T) { func (s *CreateOrUpdateEnvironmentTestSuite) SetupTest() { s.EnvironmentControllerTestSuite.SetupTest() - s.id = tests.DefaultEnvironmentIDAsString - testURL, err := s.router.Get(createOrUpdateRouteName).URL(executionEnvironmentIDKey, s.id) + s.id = tests.DefaultEnvironmentIDAsInteger + testURL, err := s.router.Get(createOrUpdateRouteName).URL(executionEnvironmentIDKey, strconv.Itoa(int(s.id))) if err != nil { s.T().Fatal(err) } @@ -98,3 +102,16 @@ func (s *CreateOrUpdateEnvironmentTestSuite) TestReturnsNoContentIfNotNewEnviron recorder := s.recordRequest() s.Equal(http.StatusNoContent, recorder.Code) } + +func (s *CreateOrUpdateEnvironmentTestSuite) TestReturnsNotFoundOnNonIntegerID() { + s.path = strings.Join([]string{BasePath, EnvironmentsPath, "/", "invalid-id"}, "") + recorder := s.recordRequest() + s.Equal(http.StatusNotFound, recorder.Code) +} + +func (s *CreateOrUpdateEnvironmentTestSuite) TestFailsOnTooLargeID() { + tooLargeIntStr := strconv.Itoa(math.MaxInt64) + "0" + s.path = strings.Join([]string{BasePath, EnvironmentsPath, "/", tooLargeIntStr}, "") + recorder := s.recordRequest() + s.Equal(http.StatusBadRequest, recorder.Code) +} diff --git a/api/runners_test.go b/api/runners_test.go index 2610506..755b7bd 100644 --- a/api/runners_test.go +++ b/api/runners_test.go @@ -29,7 +29,7 @@ type MiddlewareTestSuite struct { func (s *MiddlewareTestSuite) SetupTest() { s.manager = &runner.ManagerMock{} - s.runner = runner.NewNomadJob("runner", nil) + s.runner = runner.NewNomadJob(tests.DefaultRunnerID, nil) s.capturedRunner = nil s.runnerRequest = func(runnerId string) *http.Request { path, err := s.router.Get("test-runner-id").URL(RunnerIdKey, runnerId) diff --git a/environment/job.go b/environment/job.go index a3c2fdf..4fde8a1 100644 --- a/environment/job.go +++ b/environment/job.go @@ -6,6 +6,7 @@ import ( nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/jobspec2" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "strconv" ) @@ -15,34 +16,34 @@ import ( //go:embed default-job.hcl var defaultJobHCL string -// registerDefaultJob creates a Nomad job based on the default job configuration and the given parameters. +// registerTemplateJob creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. -func (m *NomadEnvironmentManager) registerDefaultJob( - environmentID string, +func (m *NomadEnvironmentManager) registerTemplateJob( + environmentID runner.EnvironmentID, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, - exposedPorts []uint16) error { - job := createDefaultJob(m.defaultJob, environmentID, prewarmingPoolSize, + exposedPorts []uint16) (*nomadApi.Job, error) { + job := createTemplateJob(m.defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) evalID, err := m.api.RegisterNomadJob(job) if err != nil { - return err + return nil, err } - return m.api.MonitorEvaluation(evalID, context.Background()) + return job, m.api.MonitorEvaluation(evalID, context.Background()) } -func createDefaultJob( +func createTemplateJob( defaultJob nomadApi.Job, - environmentID string, + environmentID runner.EnvironmentID, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) *nomadApi.Job { job := defaultJob - defaultJobID := nomad.DefaultJobID(environmentID) - job.ID = &defaultJobID - job.Name = &defaultJobID + templateJobID := nomad.TemplateJobID(strconv.Itoa(int(environmentID))) + job.ID = &templateJobID + job.Name = &templateJobID var taskGroup = createTaskGroup(&job, nomad.TaskGroupName, prewarmingPoolSize) configureTask(taskGroup, nomad.TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) @@ -155,43 +156,43 @@ func configureTask( configureNetwork(taskGroup, networkAccess, exposedPorts) } -func storeConfiguration(job *nomadApi.Job, id string, prewarmingPoolSize uint) { - taskGroup := getConfigTaskGroup(job) - checkForDummyTask(taskGroup) +func storeConfiguration(job *nomadApi.Job, id runner.EnvironmentID, prewarmingPoolSize uint) { + taskGroup := findOrCreateConfigTaskGroup(job) if taskGroup.Meta == nil { taskGroup.Meta = make(map[string]string) } - taskGroup.Meta[nomad.ConfigMetaEnvironmentKey] = id + taskGroup.Meta[nomad.ConfigMetaEnvironmentKey] = strconv.Itoa(int(id)) taskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUnusedValue taskGroup.Meta[nomad.ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize)) } -func getConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { +func findOrCreateConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { taskGroup := nomad.FindConfigTaskGroup(job) if taskGroup == nil { - taskGroup = nomadApi.NewTaskGroup(nomad.ConfigTaskName, 0) + taskGroup = nomadApi.NewTaskGroup(nomad.ConfigTaskGroupName, 0) } + createDummyTaskIfNotPresent(taskGroup) return taskGroup } -// checkForDummyTask ensures that a dummy task is in the task group so that the group is accepted by Nomad. -func checkForDummyTask(taskGroup *nomadApi.TaskGroup) { +// createDummyTaskIfNotPresent ensures that a dummy task is in the task group so that the group is accepted by Nomad. +func createDummyTaskIfNotPresent(taskGroup *nomadApi.TaskGroup) { var task *nomadApi.Task for _, t := range taskGroup.Tasks { - if t.Name == nomad.ConfigTaskName { + if t.Name == nomad.DummyTaskName { task = t break } } if task == nil { - task = nomadApi.NewTask(nomad.ConfigTaskName, nomad.DefaultConfigTaskDriver) + task = nomadApi.NewTask(nomad.DummyTaskName, nomad.DefaultDummyTaskDriver) taskGroup.Tasks = append(taskGroup.Tasks, task) } if task.Config == nil { task.Config = make(map[string]interface{}) } - task.Config["command"] = nomad.DefaultConfigTaskCommand + task.Config["command"] = nomad.DefaultTaskCommand } diff --git a/environment/job_test.go b/environment/job_test.go index 9680a80..8f2943e 100644 --- a/environment/job_test.go +++ b/environment/job_test.go @@ -53,7 +53,7 @@ func createTestResources() *nomadApi.Resources { } func createTestJob() (*nomadApi.Job, *nomadApi.Job) { - jobID := nomad.DefaultJobID(tests.DefaultEnvironmentIDAsString) + jobID := nomad.TemplateJobID(tests.DefaultEnvironmentIDAsString) base := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) job := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) task := createTestTask() @@ -245,12 +245,14 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { assert.Equal(t, task, taskGroup.Tasks[0], "it should not create a new task") } -func TestCreateJobSetsAllGivenArguments(t *testing.T) { +func TestCreateTemplateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.APIClient{}, *base} - job := createDefaultJob( + testJobEnvironmentID, err := nomad.EnvironmentIDFromJobID(*testJob.ID) + assert.NoError(t, err) + job := createTemplateJob( manager.defaultJob, - tests.DefaultEnvironmentIDAsString, + runner.EnvironmentID(testJobEnvironmentID), uint(*testJob.TaskGroups[0].Count), uint(*testJob.TaskGroups[0].Tasks[0].Resources.CPU), uint(*testJob.TaskGroups[0].Tasks[0].Resources.MemoryMB), @@ -261,7 +263,7 @@ func TestCreateJobSetsAllGivenArguments(t *testing.T) { assert.Equal(t, *testJob, *job) } -func TestRegisterJobWhenNomadJobRegistrationFails(t *testing.T) { +func TestRegisterTemplateJobFailsWhenNomadJobRegistrationFails(t *testing.T) { apiMock := nomad.ExecutorAPIMock{} expectedErr := errors.New("test error") @@ -273,12 +275,12 @@ func TestRegisterJobWhenNomadJobRegistrationFails(t *testing.T) { defaultJob: nomadApi.Job{}, } - err := m.registerDefaultJob("id", 1, 2, 3, "image", false, []uint16{}) + _, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{}) assert.Equal(t, expectedErr, err) apiMock.AssertNotCalled(t, "EvaluationStream") } -func TestRegisterJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { +func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { apiMock := nomad.ExecutorAPIMock{} evaluationID := "id" @@ -291,11 +293,11 @@ func TestRegisterJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { defaultJob: nomadApi.Job{}, } - err := m.registerDefaultJob("id", 1, 2, 3, "image", false, []uint16{}) + _, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{}) assert.NoError(t, err) } -func TestRegisterJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) { +func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) { apiMock := nomad.ExecutorAPIMock{} evaluationID := "id" expectedErr := errors.New("test error") @@ -309,6 +311,6 @@ func TestRegisterJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) { defaultJob: nomadApi.Job{}, } - err := m.registerDefaultJob("id", 1, 2, 3, "image", false, []uint16{}) + _, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{}) assert.Equal(t, expectedErr, err) } diff --git a/environment/manager.go b/environment/manager.go index 756d740..e84f9db 100644 --- a/environment/manager.go +++ b/environment/manager.go @@ -1,7 +1,9 @@ package environment import ( + "fmt" nomadApi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" @@ -16,7 +18,7 @@ type Manager interface { // CreateOrUpdate creates/updates an execution environment on the executor. // Iff the job was created, the returned boolean is true and the returned error is nil. CreateOrUpdate( - id string, + id runner.EnvironmentID, request dto.ExecutionEnvironmentRequest, ) (bool, error) @@ -24,9 +26,11 @@ type Manager interface { Delete(id string) } -func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorAPI) *NomadEnvironmentManager { +func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorAPI) ( + *NomadEnvironmentManager, error) { environmentManager := &NomadEnvironmentManager{runnerManager, apiClient, *parseJob(defaultJobHCL)} - return environmentManager + err := environmentManager.loadExistingEnvironments() + return environmentManager, err } type NomadEnvironmentManager struct { @@ -36,14 +40,10 @@ type NomadEnvironmentManager struct { } func (m *NomadEnvironmentManager) CreateOrUpdate( - id string, + id runner.EnvironmentID, request dto.ExecutionEnvironmentRequest, ) (bool, error) { - idInt, err := strconv.Atoi(id) - if err != nil { - return false, err - } - err = m.registerDefaultJob(id, + templateJob, err := m.registerTemplateJob(id, request.PrewarmingPoolSize, request.CPULimit, request.MemoryLimit, request.Image, request.NetworkAccess, request.ExposedPorts) @@ -51,7 +51,7 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( return false, err } - created, err := m.runnerManager.CreateOrUpdateEnvironment(runner.EnvironmentID(idInt), request.PrewarmingPoolSize) + created, err := m.runnerManager.CreateOrUpdateEnvironment(id, request.PrewarmingPoolSize, templateJob) if err != nil { return created, err } @@ -61,3 +61,74 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( func (m *NomadEnvironmentManager) Delete(id string) { } + +func (m *NomadEnvironmentManager) loadExistingEnvironments() error { + jobs, err := m.api.LoadAllJobs() + if err != nil { + return fmt.Errorf("can't load template jobs: %w", err) + } + + var environmentTemplates, runnerJobs []*nomadApi.Job + for _, job := range jobs { + if nomad.IsEnvironmentTemplateID(*job.ID) { + environmentTemplates = append(environmentTemplates, job) + } else { + runnerJobs = append(runnerJobs, job) + } + } + m.recoverJobs(environmentTemplates, m.recoverEnvironmentTemplates) + m.recoverJobs(runnerJobs, m.recoverRunner) + + err = m.runnerManager.ScaleAllEnvironments() + if err != nil { + return fmt.Errorf("can not restore environment scaling: %w", err) + } + return nil +} + +type jobAdder func(id runner.EnvironmentID, job *nomadApi.Job, configTaskGroup *nomadApi.TaskGroup) error + +func (m *NomadEnvironmentManager) recoverEnvironmentTemplates(id runner.EnvironmentID, job *nomadApi.Job, + configTaskGroup *nomadApi.TaskGroup) error { + desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey]) + if err != nil { + return fmt.Errorf("Couldn't convert pool size to int: %w", err) + } + + m.runnerManager.RecoverEnvironment(id, job, uint(desiredIdleRunnersCount)) + return nil +} + +func (m *NomadEnvironmentManager) recoverRunner(id runner.EnvironmentID, job *nomadApi.Job, + configTaskGroup *nomadApi.TaskGroup) error { + isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue + m.runnerManager.RecoverRunner(id, job, isUsed) + return nil +} + +func (m *NomadEnvironmentManager) recoverJobs(jobs []*nomadApi.Job, onJob jobAdder) { + for _, job := range jobs { + jobLogger := log.WithField("jobID", *job.ID) + if *job.Status != structs.JobStatusRunning { + jobLogger.Info("Job not running, skipping ...") + continue + } + configTaskGroup := nomad.FindConfigTaskGroup(job) + if configTaskGroup == nil { + jobLogger.Info("Couldn't find config task group in job, skipping ...") + continue + } + environmentID, err := runner.NewEnvironmentID(configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey]) + if err != nil { + jobLogger.WithField("environmentID", configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey]). + WithError(err). + Error("Couldn't convert environment id of template job to int") + continue + } + err = onJob(environmentID, job, configTaskGroup) + if err != nil { + jobLogger.WithError(err).Info("Could not recover job.") + continue + } + } +} diff --git a/environment/manager_mock.go b/environment/manager_mock.go index 8065491..24367e1 100644 --- a/environment/manager_mock.go +++ b/environment/manager_mock.go @@ -5,6 +5,8 @@ package environment import ( mock "github.com/stretchr/testify/mock" dto "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" + + runner "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" ) // ManagerMock is an autogenerated mock type for the Manager type @@ -13,18 +15,18 @@ type ManagerMock struct { } // CreateOrUpdate provides a mock function with given fields: id, request -func (_m *ManagerMock) CreateOrUpdate(id string, request dto.ExecutionEnvironmentRequest) (bool, error) { +func (_m *ManagerMock) CreateOrUpdate(id runner.EnvironmentID, request dto.ExecutionEnvironmentRequest) (bool, error) { ret := _m.Called(id, request) var r0 bool - if rf, ok := ret.Get(0).(func(string, dto.ExecutionEnvironmentRequest) bool); ok { + if rf, ok := ret.Get(0).(func(runner.EnvironmentID, dto.ExecutionEnvironmentRequest) bool); ok { r0 = rf(id, request) } else { r0 = ret.Get(0).(bool) } var r1 error - if rf, ok := ret.Get(1).(func(string, dto.ExecutionEnvironmentRequest) error); ok { + if rf, ok := ret.Get(1).(func(runner.EnvironmentID, dto.ExecutionEnvironmentRequest) error); ok { r1 = rf(id, request) } else { r1 = ret.Error(1) @@ -37,8 +39,3 @@ func (_m *ManagerMock) CreateOrUpdate(id string, request dto.ExecutionEnvironmen func (_m *ManagerMock) Delete(id string) { _m.Called(id) } - -// Load provides a mock function with given fields: -func (_m *ManagerMock) Load() { - _m.Called() -} diff --git a/environment/manager_test.go b/environment/manager_test.go index 390c7f1..5145a3e 100644 --- a/environment/manager_test.go +++ b/environment/manager_test.go @@ -8,8 +8,6 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" - "math" - "strconv" "testing" ) @@ -28,7 +26,6 @@ func TestCreateOrUpdateTestSuite(t *testing.T) { func (s *CreateOrUpdateTestSuite) SetupTest() { s.runnerManagerMock = runner.ManagerMock{} - s.runnerManagerMock.On("registerEnvironment", mock.Anything, mock.Anything).Return(nil) s.apiMock = nomad.ExecutorAPIMock{} s.request = dto.ExecutionEnvironmentRequest{ @@ -49,38 +46,23 @@ func (s *CreateOrUpdateTestSuite) SetupTest() { } } -func (s *CreateOrUpdateTestSuite) mockEnvironmentExists(exists bool) { - s.runnerManagerMock.On("EnvironmentExists", mock.AnythingOfType("EnvironmentID")).Return(exists) -} - func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(exists bool) *mock.Call { return s.runnerManagerMock.On("CreateOrUpdateEnvironment", - mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("uint")). + mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("uint"), mock.AnythingOfType("*api.Job")). Return(!exists, nil) } func (s *CreateOrUpdateTestSuite) createJobForRequest() *nomadApi.Job { - return createDefaultJob(s.manager.defaultJob, tests.DefaultEnvironmentIDAsString, + return createTemplateJob(s.manager.defaultJob, tests.DefaultEnvironmentIDAsInteger, s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } -func (s *CreateOrUpdateTestSuite) TestFailsOnInvalidID() { - _, err := s.manager.CreateOrUpdate("invalid-id", s.request) - s.Error(err) -} - -func (s *CreateOrUpdateTestSuite) TestFailsOnTooLargeID() { - tooLargeIntStr := strconv.Itoa(math.MaxInt64) + "0" - _, err := s.manager.CreateOrUpdate(tooLargeIntStr, s.request) - s.Error(err) -} - func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsRegistersCorrectJob() { s.mockCreateOrUpdateEnvironment(true) expectedJob := s.createJobForRequest() - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) + created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) s.NoError(err) s.False(created) s.apiMock.AssertCalled(s.T(), "RegisterNomadJob", expectedJob) @@ -90,7 +72,7 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsOccurredErrorIsPassed s.mockCreateOrUpdateEnvironment(true) s.registerNomadJobMockCall.Return("", tests.ErrDefault) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) + created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) s.False(created) s.Equal(tests.ErrDefault, err) } @@ -98,7 +80,7 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsOccurredErrorIsPassed func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsReturnsFalse() { s.mockCreateOrUpdateEnvironment(true) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) + created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) s.NoError(err) s.False(created) } @@ -108,7 +90,7 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrec expectedJob := s.createJobForRequest() - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) + created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) s.NoError(err) s.True(created) s.apiMock.AssertCalled(s.T(), "RegisterNomadJob", expectedJob) @@ -117,18 +99,20 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrec func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrectEnvironment() { s.mockCreateOrUpdateEnvironment(false) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) + expectedJob := s.createJobForRequest() + created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) s.True(created) s.NoError(err) s.runnerManagerMock.AssertCalled(s.T(), "CreateOrUpdateEnvironment", - runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request.PrewarmingPoolSize) + runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request.PrewarmingPoolSize, expectedJob) } func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistOccurredErrorIsPassedAndNoEnvironmentRegistered() { s.mockCreateOrUpdateEnvironment(false) s.registerNomadJobMockCall.Return("", tests.ErrDefault) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) + created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) s.False(created) s.Equal(tests.ErrDefault, err) + s.runnerManagerMock.AssertNotCalled(s.T(), "CreateOrUpdateEnvironment") } diff --git a/main.go b/main.go index 2810ff1..13a7db1 100644 --- a/main.go +++ b/main.go @@ -46,11 +46,11 @@ func initServer() *http.Server { log.WithError(err).WithField("nomad url", config.Config.NomadAPIURL()).Fatal("Error parsing the nomad url") } - runnerManager, err := runner.NewNomadRunnerManager(nomadAPIClient, context.Background()) + runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, context.Background()) + environmentManager, err := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) if err != nil { - log.WithError(err).Fatal("Error creating new Nomad runner manager") + log.WithError(err).Fatal("Error creating new Nomad environment manager") } - environmentManager := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) return &http.Server{ Addr: config.Config.PoseidonAPIURL().Host, diff --git a/nomad/api_querier.go b/nomad/api_querier.go index 082c6fd..5f437bc 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -9,7 +9,7 @@ import ( ) var ( - ErrNoAllocationsFound = errors.New("no allocation found") + ErrorNoAllocationFound = errors.New("no allocation found") ) // apiQuerier provides access to the Nomad functionality. @@ -36,8 +36,8 @@ type apiQuerier interface { // listJobs loads all jobs with the specified prefix. listJobs(prefix string) (allocationListStub []*nomadApi.JobListStub, err error) - // jobInfo returns the job of the given jobID. - jobInfo(jobID string) (job *nomadApi.Job, err error) + // job returns the job of the given jobID. + job(jobID string) (job *nomadApi.Job, err error) // RegisterNomadJob registers a job with Nomad. // It returns the evaluation ID that can be used when listening to the Nomad event stream. @@ -72,12 +72,12 @@ func (nc *nomadAPIClient) DeleteRunner(runnerID string) (err error) { return } -func (nc *nomadAPIClient) Execute(jobID string, +func (nc *nomadAPIClient) Execute(runnerID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) { - allocations, _, err := nc.client.Jobs().Allocations(jobID, false, nil) + allocations, _, err := nc.client.Jobs().Allocations(runnerID, false, nil) if len(allocations) == 0 { - return 1, ErrNoAllocationsFound + return 1, ErrorNoAllocationFound } allocation, _, err := nc.client.Allocations().Info(allocations[0].ID, nil) if err != nil { diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index f7458fc..2d49f07 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -194,7 +194,7 @@ func (_m *apiQuerierMock) init(nomadURL *url.URL, nomadNamespace string) error { } // jobInfo provides a mock function with given fields: jobID -func (_m *apiQuerierMock) jobInfo(jobID string) (*api.Job, error) { +func (_m *apiQuerierMock) job(jobID string) (*api.Job, error) { ret := _m.Called(jobID) var r0 *api.Job diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 96c23c7..c549e5b 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -212,7 +212,7 @@ func (_m *ExecutorAPIMock) LoadRunners(jobID string) ([]string, error) { } // LoadTemplateJob provides a mock function with given fields: environmentID -func (_m *ExecutorAPIMock) LoadTemplateJob(environmentID string) (*api.Job, error) { +func (_m *ExecutorAPIMock) LoadEnvironmentTemplate(environmentID string) (*api.Job, error) { ret := _m.Called(environmentID) var r0 *api.Job @@ -326,7 +326,7 @@ func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error } // jobInfo provides a mock function with given fields: jobID -func (_m *ExecutorAPIMock) jobInfo(jobID string) (*api.Job, error) { +func (_m *ExecutorAPIMock) job(jobID string) (*api.Job, error) { ret := _m.Called(jobID) var r0 *api.Job diff --git a/nomad/job.go b/nomad/job.go index 2392eb8..f561eed 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -1,8 +1,10 @@ package nomad import ( + "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" + "strconv" "strings" ) @@ -10,12 +12,11 @@ const ( TaskGroupName = "default-group" TaskName = "default-task" ConfigTaskGroupName = "config" - ConfigTaskName = "config" + DummyTaskName = "dummy" defaultRunnerJobID = "default" - runnerJobIDFormat = "%s-%s" DefaultTaskDriver = "docker" - DefaultConfigTaskDriver = "exec" - DefaultConfigTaskCommand = "whoami" + DefaultDummyTaskDriver = "exec" + DefaultTaskCommand = "true" ConfigMetaEnvironmentKey = "environment" ConfigMetaUsedKey = "used" ConfigMetaUsedValue = "true" @@ -23,19 +24,42 @@ const ( ConfigMetaPoolSizeKey = "prewarmingPoolSize" ) -func DefaultJobID(id string) string { +var ( + ErrorInvalidJobID = errors.New("invalid job id") + ErrorConfigTaskGroupNotFound = errors.New("config task group not found in job") +) + +// RunnerJobID creates the job id. This requires an environment id and a runner id. +func RunnerJobID(environmentID, runnerID string) string { + return fmt.Sprintf("%s-%s", environmentID, runnerID) +} + +// EnvironmentIDFromJobID returns the environment id that is part of the passed job id. +func EnvironmentIDFromJobID(jobID string) (int, error) { + parts := strings.Split(jobID, "-") + if len(parts) == 0 { + return 0, fmt.Errorf("empty job id: %w", ErrorInvalidJobID) + } + environmentID, err := strconv.Atoi(parts[0]) + if err != nil { + return 0, fmt.Errorf("invalid environment id par %v: %w", err, ErrorInvalidJobID) + } + return environmentID, nil +} + +// TemplateJobID creates the environment specific id of the template job. +func TemplateJobID(id string) string { return RunnerJobID(id, defaultRunnerJobID) } -func RunnerJobID(environmentID, runnerID string) string { - return fmt.Sprintf(runnerJobIDFormat, environmentID, runnerID) -} - -func IsDefaultJobID(jobID string) bool { +// IsEnvironmentTemplateID checks if the passed job id belongs to a template job. +func IsEnvironmentTemplateID(jobID string) bool { parts := strings.Split(jobID, "-") return len(parts) == 2 && parts[1] == defaultRunnerJobID } +// FindConfigTaskGroup returns the config task group of a job. +// The config task group should be included in all jobs. func FindConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { for _, tg := range job.TaskGroups { if *tg.Name == ConfigTaskGroupName { @@ -45,17 +69,13 @@ func FindConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { return nil } -func EnvironmentIDFromJobID(jobID string) string { - parts := strings.Split(jobID, "-") - if len(parts) == 0 { - return "" +func SetMetaConfigValue(job *nomadApi.Job, key, value string) error { + configTaskGroup := FindConfigTaskGroup(job) + if configTaskGroup == nil { + return ErrorConfigTaskGroupNotFound } - return parts[0] -} - -func (nc *nomadAPIClient) jobInfo(jobID string) (job *nomadApi.Job, err error) { - job, _, err = nc.client.Jobs().Info(jobID, nil) - return + configTaskGroup.Meta[key] = value + return nil } // LoadJobList loads the list of jobs from the Nomad api. @@ -81,3 +101,8 @@ func (nc *nomadAPIClient) SetJobScale(jobID string, count uint, reason string) ( _, _, err = nc.client.Jobs().Scale(jobID, TaskGroupName, &intCount, reason, false, nil, nil) return } + +func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) { + job, _, err = nc.client.Jobs().Info(jobID, nil) + return +} diff --git a/nomad/nomad.go b/nomad/nomad.go index a7eaa3b..9702808 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -16,9 +16,8 @@ import ( var ( log = logging.GetLogger("nomad") ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") - errEvaluation = errors.New("evaluation could not complete") - errPlacingAllocations = errors.New("failed to place all allocations") - errFindingTaskGroup = errors.New("no task group found") + ErrorEvaluation = errors.New("evaluation could not complete") + ErrorPlacingAllocations = errors.New("failed to place all allocations") ) type AllocationProcessor func(*nomadApi.Allocation) @@ -30,10 +29,12 @@ type ExecutorAPI interface { // LoadAllJobs loads all existing jobs independent of the environment or if it is a template job. LoadAllJobs() ([]*nomadApi.Job, error) - // LoadRunners loads all jobs of the specified environment which are running and not about to get stopped. + // LoadRunners loads all runners of the specified environment which are running and not about to get stopped. LoadRunners(environmentID string) (runnerIds []string, err error) - LoadTemplateJob(environmentID string) (*nomadApi.Job, error) + // LoadEnvironmentTemplate loads the template job of the specified environment. + // Based on the template job new runners can be created. + LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job, error) // MonitorEvaluation monitors the given evaluation ID. // It waits until the evaluation reaches one of the states complete, canceled or failed. @@ -88,8 +89,8 @@ func (a *APIClient) LoadRunners(environmentID string) (runnerIDs []string, err e return runnerIDs, nil } -func (a *APIClient) LoadTemplateJob(environmentID string) (*nomadApi.Job, error) { - job, err := a.jobInfo(DefaultJobID(environmentID)) +func (a *APIClient) LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job, error) { + job, err := a.job(TemplateJobID(environmentID)) if err != nil { return nil, fmt.Errorf("failed loading template job: %w", err) } @@ -214,10 +215,10 @@ func handleAllocationEvent(startTime int64, pendingAllocations map[string]bool, func checkEvaluation(eval *nomadApi.Evaluation) (err error) { if len(eval.FailedTGAllocs) == 0 { if eval.Status != structs.EvalStatusComplete { - err = fmt.Errorf("%w: %q", errEvaluation, eval.Status) + err = fmt.Errorf("%w: %q", ErrorEvaluation, eval.Status) } } else { - err = fmt.Errorf("evaluation %q finished with status %q but %w", eval.ID, eval.Status, errPlacingAllocations) + err = fmt.Errorf("evaluation %q finished with status %q but %w", eval.ID, eval.Status, ErrorPlacingAllocations) for taskGroup, metrics := range eval.FailedTGAllocs { err = fmt.Errorf("%w\n%s: %#v", err, taskGroup, metrics) } @@ -229,15 +230,14 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) { } func (a *APIClient) MarkRunnerAsUsed(runnerID string) error { - job, err := a.jobInfo(runnerID) + job, err := a.job(runnerID) if err != nil { return fmt.Errorf("couldn't retrieve job info: %w", err) } - var taskGroup = FindConfigTaskGroup(job) - if taskGroup == nil { - return errFindingTaskGroup + err = SetMetaConfigValue(job, ConfigMetaUsedKey, ConfigMetaUsedValue) + if err != nil { + return fmt.Errorf("couldn't update runner in job as used: %w", err) } - taskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUsedValue _, err = a.RegisterNomadJob(job) if err != nil { return fmt.Errorf("couldn't update runner config: %w", err) @@ -253,7 +253,7 @@ func (a *APIClient) LoadAllJobs() ([]*nomadApi.Job, error) { jobs := make([]*nomadApi.Job, 0, len(jobStubs)) for _, jobStub := range jobStubs { - job, err := a.apiQuerier.jobInfo(jobStub.ID) + job, err := a.apiQuerier.job(jobStub.ID) if err != nil { return []*nomadApi.Job{}, fmt.Errorf("couldn't load job info for job %v: %w", jobStub.ID, err) } diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index f728675..1dfd4f3 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -38,15 +38,15 @@ type LoadRunnersTestSuite struct { } func (s *LoadRunnersTestSuite) SetupTest() { - s.jobId = "1d-0f-v3ry-sp3c14l-j0b" + s.jobId = tests.DefaultJobID s.mock = &apiQuerierMock{} s.nomadApiClient = APIClient{apiQuerier: s.mock} - s.availableRunner = newJobListStub("s0m3-r4nd0m-1d", structs.JobStatusRunning, 1) - s.anotherAvailableRunner = newJobListStub("s0m3-s1m1l4r-1d", structs.JobStatusRunning, 1) - s.pendingRunner = newJobListStub("4n0th3r-1d", structs.JobStatusPending, 0) - s.deadRunner = newJobListStub("my-1d", structs.JobStatusDead, 0) + s.availableRunner = newJobListStub(tests.DefaultJobID, structs.JobStatusRunning, 1) + s.anotherAvailableRunner = newJobListStub(tests.AnotherJobID, structs.JobStatusRunning, 1) + s.pendingRunner = newJobListStub(tests.DefaultJobID+"-1", structs.JobStatusPending, 0) + s.deadRunner = newJobListStub(tests.AnotherJobID+"-1", structs.JobStatusDead, 0) } func newJobListStub(id, status string, amountRunning int) *nomadApi.JobListStub { diff --git a/runner/manager.go b/runner/manager.go index 95939f5..ad883b8 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/google/uuid" nomadApi "github.com/hashicorp/nomad/api" - "github.com/hashicorp/nomad/nomad/structs" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "strconv" @@ -23,6 +22,11 @@ var ( type EnvironmentID int +func NewEnvironmentID(id string) (EnvironmentID, error) { + environment, err := strconv.Atoi(id) + return EnvironmentID(environment), err +} + func (e EnvironmentID) toString() string { return strconv.Itoa(int(e)) } @@ -33,8 +37,8 @@ type NomadJobID string // runners to new clients and ensure no runner is used twice. type Manager interface { // CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates - // the existing environment and all runners. - CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error) + // the existing environment and all runners. Iff a new Environment has been created, it returns true. + CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *nomadApi.Job) (bool, error) // Claim returns a new runner. // It makes sure that the runner is not in use yet and returns an error if no runner could be provided. @@ -47,6 +51,17 @@ type Manager interface { // Return signals that the runner is no longer used by the caller and can be claimed by someone else. // The runner is deleted or cleaned up for reuse depending on the used executor. Return(r Runner) error + + // ScaleAllEnvironments checks for all environments if enough runners are created. + ScaleAllEnvironments() error + + // RecoverEnvironment adds a recovered Environment to the internal structure. + // This is intended to recover environments after a restart. + RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job, desiredIdleRunnersCount uint) + + // RecoverRunner adds a recovered runner to the internal structure. + // This is intended to recover runners after a restart. + RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool) } type NomadRunnerManager struct { @@ -58,18 +73,14 @@ type NomadRunnerManager struct { // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. // It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad. // If you cancel the context the background synchronization will be stopped. -func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) (*NomadRunnerManager, error) { +func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { m := &NomadRunnerManager{ apiClient, - NewLocalNomadJobStorage(), + NewLocalNomadEnvironmentStorage(), NewLocalRunnerStorage(), } - err := m.loadExistingEnvironments() - if err != nil { - return nil, err - } go m.updateRunners(ctx) - return m, nil + return m } type NomadEnvironment struct { @@ -83,47 +94,52 @@ func (j *NomadEnvironment) ID() EnvironmentID { return j.environmentID } -func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error) { +func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) (bool, error) { _, ok := m.environments.Get(id) if !ok { - return true, m.registerEnvironment(id, desiredIdleRunnersCount) + return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob) } - return false, m.updateEnvironment(id, desiredIdleRunnersCount) + return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob) } -func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint) error { - templateJob, err := m.apiClient.LoadTemplateJob(environmentID.toString()) - if err != nil { - return fmt.Errorf("couldn't register environment: %w", err) - } - +func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) error { m.environments.Add(&NomadEnvironment{ environmentID, NewLocalRunnerStorage(), desiredIdleRunnersCount, templateJob, }) - err = m.scaleEnvironment(environmentID) + err := m.scaleEnvironment(environmentID) if err != nil { return fmt.Errorf("couldn't upscale environment %w", err) } return nil } -func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) error { +// updateEnvironment updates all runners of the specified environment. This is required as attributes like the +// CPULimit or MemoryMB could be changed in the new template job. +func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, newTemplateJob *nomadApi.Job) error { environment, ok := m.environments.Get(id) if !ok { return ErrUnknownExecutionEnvironment } environment.desiredIdleRunnersCount = desiredIdleRunnersCount - - templateJob, err := m.apiClient.LoadTemplateJob(id.toString()) + environment.templateJob = newTemplateJob + err := nomad.SetMetaConfigValue(newTemplateJob, nomad.ConfigMetaPoolSizeKey, strconv.Itoa(int(desiredIdleRunnersCount))) if err != nil { - return fmt.Errorf("update environment couldn't load template job: %w", err) + return fmt.Errorf("update environment couldn't update template environment: %w", err) } - environment.templateJob = templateJob - runners, err := m.apiClient.LoadRunners(id.toString()) + err = m.updateRunnerSpecs(id, newTemplateJob) + if err != nil { + return err + } + + return m.scaleEnvironment(id) +} + +func (m *NomadRunnerManager) updateRunnerSpecs(environmentID EnvironmentID, templateJob *nomadApi.Job) error { + runners, err := m.apiClient.LoadRunners(environmentID.toString()) if err != nil { return fmt.Errorf("update environment couldn't load runners: %w", err) } @@ -131,7 +147,7 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn for _, id := range runners { // avoid taking the address of the loop variable runnerID := id - updatedRunnerJob := *environment.templateJob + updatedRunnerJob := *templateJob updatedRunnerJob.ID = &runnerID updatedRunnerJob.Name = &runnerID _, err := m.apiClient.RegisterNomadJob(&updatedRunnerJob) @@ -143,7 +159,7 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn errorResult := strings.Join(occurredErrors, "\n") return fmt.Errorf("%d errors occurred when updating environment: %s", len(occurredErrors), errorResult) } - return m.scaleEnvironment(id) + return nil } func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) { @@ -186,6 +202,52 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) { return } +func (m *NomadRunnerManager) ScaleAllEnvironments() error { + for _, environmentID := range m.environments.List() { + err := m.scaleEnvironment(environmentID) + if err != nil { + return fmt.Errorf("can not scale up: %w", err) + } + } + return nil +} + +func (m *NomadRunnerManager) RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job, + desiredIdleRunnersCount uint) { + _, ok := m.environments.Get(id) + if ok { + log.Error("Recovering existing environment.") + return + } + environment := &NomadEnvironment{ + environmentID: id, + idleRunners: NewLocalRunnerStorage(), + } + m.environments.Add(environment) + log.WithField("environmentID", environment.environmentID).Info("Added recovered environment") + environment.desiredIdleRunnersCount = desiredIdleRunnersCount + environment.templateJob = templateJob +} + +func (m *NomadRunnerManager) RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool) { + environment, ok := m.environments.Get(id) + if !ok { + log.Error("Environment missing. Can not recover runner") + return + } + + log.WithField("jobID", *job.ID). + WithField("environmentID", environment.environmentID). + Info("Added idle runner") + + newJob := NewNomadJob(*job.ID, m.apiClient) + if isUsed { + m.usedRunners.Add(newJob) + } else { + environment.idleRunners.Add(newJob) + } +} + func (m *NomadRunnerManager) updateRunners(ctx context.Context) { retries := 0 for ctx.Err() == nil { @@ -199,17 +261,17 @@ func (m *NomadRunnerManager) updateRunners(ctx context.Context) { func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { log.WithField("id", alloc.JobID).Debug("Runner started") - if nomad.IsDefaultJobID(alloc.JobID) { + if nomad.IsEnvironmentTemplateID(alloc.JobID) { return } - environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID) - intEnvironmentID, err := strconv.Atoi(environmentID) + environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID) if err != nil { + log.WithError(err).Warn("Allocation could not be added") return } - job, ok := m.environments.Get(EnvironmentID(intEnvironmentID)) + job, ok := m.environments.Get(EnvironmentID(environmentID)) if ok { job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient)) } @@ -218,14 +280,14 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) { log.WithField("id", alloc.JobID).Debug("Runner stopped") - environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID) - intEnvironmentID, err := strconv.Atoi(environmentID) + environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID) if err != nil { + log.WithError(err).Warn("Stopped allocation can not be handled") return } m.usedRunners.Delete(alloc.JobID) - job, ok := m.environments.Get(EnvironmentID(intEnvironmentID)) + job, ok := m.environments.Get(EnvironmentID(environmentID)) if ok { job.idleRunners.Delete(alloc.JobID) } @@ -240,7 +302,7 @@ func (m *NomadRunnerManager) scaleEnvironment(id EnvironmentID) error { required := int(environment.desiredIdleRunnersCount) - environment.idleRunners.Length() - log.WithField("required", required).Debug("Scaling environment") + log.WithField("runnersRequired", required).WithField("id", id).Debug("Scaling environment") for i := 0; i < required; i++ { err := m.createRunner(environment) @@ -264,7 +326,7 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error { evalID, err := m.apiClient.RegisterNomadJob(&template) if err != nil { - return fmt.Errorf("couldn't register Nomad job: %w", err) + return fmt.Errorf("couldn't register Nomad environment: %w", err) } err = m.apiClient.MonitorEvaluation(evalID, context.Background()) if err != nil { @@ -272,98 +334,3 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error { } return nil } - -func (m *NomadRunnerManager) unusedRunners( - environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) { - newRunners = make([]Runner, 0) - job, ok := m.environments.Get(environmentID) - if !ok { - // the environment does not exist, so it won't have any unused runners - return - } - for _, runnerID := range fetchedRunnerIds { - _, ok := m.usedRunners.Get(runnerID) - if !ok { - _, ok = job.idleRunners.Get(runnerID) - if !ok { - newRunners = append(newRunners, NewNomadJob(runnerID, m.apiClient)) - } - } - } - return newRunners -} - -func (m *NomadRunnerManager) loadExistingEnvironments() error { - jobs, err := m.apiClient.LoadAllJobs() - if err != nil { - return fmt.Errorf("can't load template jobs: %w", err) - } - - for _, job := range jobs { - m.loadExistingJob(job) - } - - for _, environmentID := range m.environments.List() { - err := m.scaleEnvironment(environmentID) - if err != nil { - return fmt.Errorf("can not scale up: %w", err) - } - } - - return nil -} - -func (m *NomadRunnerManager) loadExistingJob(job *nomadApi.Job) { - if *job.Status != structs.JobStatusRunning { - return - } - - jobLogger := log.WithField("jobID", *job.ID) - - configTaskGroup := nomad.FindConfigTaskGroup(job) - if configTaskGroup == nil { - jobLogger.Info("Couldn't find config task group in job, skipping ...") - return - } - - if configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue { - m.usedRunners.Add(NewNomadJob(*job.ID, m.apiClient)) - jobLogger.Info("Added job to usedRunners") - return - } - - environmentID := configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey] - environmentIDInt, err := strconv.Atoi(environmentID) - if err != nil { - jobLogger.WithField("environmentID", environmentID). - WithError(err). - Error("Couldn't convert environment id of template job to int") - return - } - - environment, ok := m.environments.Get(EnvironmentID(environmentIDInt)) - if !ok { - desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey]) - if err != nil { - jobLogger.WithError(err).Error("Couldn't convert pool size to int") - return - } - - environment = &NomadEnvironment{ - environmentID: EnvironmentID(environmentIDInt), - idleRunners: NewLocalRunnerStorage(), - desiredIdleRunnersCount: uint(desiredIdleRunnersCount), - } - m.environments.Add(environment) - log.WithField("environmentID", environment.environmentID).Info("Added existing environment") - } - - if nomad.IsDefaultJobID(*job.ID) { - environment.templateJob = job - } else { - log.WithField("jobID", *job.ID). - WithField("environmentID", environment.environmentID). - Info("Added idle runner") - environment.idleRunners.Add(NewNomadJob(*job.ID, m.apiClient)) - } -} diff --git a/runner/manager_mock.go b/runner/manager_mock.go index a763a30..90e62f2 100644 --- a/runner/manager_mock.go +++ b/runner/manager_mock.go @@ -2,7 +2,10 @@ package runner -import mock "github.com/stretchr/testify/mock" +import ( + api "github.com/hashicorp/nomad/api" + mock "github.com/stretchr/testify/mock" +) // ManagerMock is an autogenerated mock type for the Manager type type ManagerMock struct { @@ -32,20 +35,20 @@ func (_m *ManagerMock) Claim(id EnvironmentID) (Runner, error) { return r0, r1 } -// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount -func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error) { - ret := _m.Called(id, desiredIdleRunnersCount) +// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount, teplateJob +func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *api.Job) (bool, error) { + ret := _m.Called(id, desiredIdleRunnersCount, teplateJob) var r0 bool - if rf, ok := ret.Get(0).(func(EnvironmentID, uint) bool); ok { - r0 = rf(id, desiredIdleRunnersCount) + if rf, ok := ret.Get(0).(func(EnvironmentID, uint, *api.Job) bool); ok { + r0 = rf(id, desiredIdleRunnersCount, teplateJob) } else { r0 = ret.Get(0).(bool) } var r1 error - if rf, ok := ret.Get(1).(func(EnvironmentID, uint) error); ok { - r1 = rf(id, desiredIdleRunnersCount) + if rf, ok := ret.Get(1).(func(EnvironmentID, uint, *api.Job) error); ok { + r1 = rf(id, desiredIdleRunnersCount, teplateJob) } else { r1 = ret.Error(1) } @@ -76,6 +79,16 @@ func (_m *ManagerMock) Get(runnerID string) (Runner, error) { return r0, r1 } +// RecoverEnvironment provides a mock function with given fields: id, templateJob, desiredIdleRunnersCount +func (_m *ManagerMock) RecoverEnvironment(id EnvironmentID, templateJob *api.Job, desiredIdleRunnersCount uint) { + _m.Called(id, templateJob, desiredIdleRunnersCount) +} + +// RecoverRunner provides a mock function with given fields: id, environment, isUsed +func (_m *ManagerMock) RecoverRunner(id EnvironmentID, job *api.Job, isUsed bool) { + _m.Called(id, job, isUsed) +} + // Return provides a mock function with given fields: r func (_m *ManagerMock) Return(r Runner) error { ret := _m.Called(r) @@ -89,3 +102,17 @@ func (_m *ManagerMock) Return(r Runner) error { return r0 } + +// ScaleAllEnvironments provides a mock function with given fields: +func (_m *ManagerMock) ScaleAllEnvironments() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/runner/manager_test.go b/runner/manager_test.go index 98011a0..2c28986 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -35,10 +35,7 @@ func (s *ManagerTestSuite) SetupTest() { // Instantly closed context to manually start the update process in some cases ctx, cancel := context.WithCancel(context.Background()) cancel() - var err error - - s.nomadRunnerManager, err = NewNomadRunnerManager(s.apiMock, ctx) - s.Require().NoError(err) + s.nomadRunnerManager = NewNomadRunnerManager(s.apiMock, ctx) s.exerciseRunner = NewRunner(tests.DefaultRunnerID) s.registerDefaultEnvironment() @@ -57,13 +54,13 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin apiMock.On("LoadRunners", tests.DefaultJobID).Return(returnedRunnerIds, nil) apiMock.On("JobScale", tests.DefaultJobID).Return(uint(len(returnedRunnerIds)), nil) apiMock.On("SetJobScale", tests.DefaultJobID, mock.AnythingOfType("uint"), "Runner Requested").Return(nil) - apiMock.On("LoadTemplateJob", mock.AnythingOfType("string")).Return(&nomadApi.Job{}, nil) + apiMock.On("LoadEnvironmentTemplate", mock.AnythingOfType("string")).Return(&nomadApi.Job{}, nil) apiMock.On("RegisterNomadJob", mock.Anything).Return("", nil) apiMock.On("MonitorEvaluation", mock.Anything, mock.Anything).Return(nil) } func (s *ManagerTestSuite) registerDefaultEnvironment() { - err := s.nomadRunnerManager.registerEnvironment(defaultEnvironmentID, 0) + err := s.nomadRunnerManager.registerEnvironment(defaultEnvironmentID, 0, &nomadApi.Job{}) s.Require().NoError(err) } @@ -77,7 +74,7 @@ func (s *ManagerTestSuite) waitForRunnerRefresh() { } func (s *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() { - err := s.nomadRunnerManager.registerEnvironment(anotherEnvironmentID, defaultDesiredRunnersCount) + err := s.nomadRunnerManager.registerEnvironment(anotherEnvironmentID, defaultDesiredRunnersCount, &nomadApi.Job{}) s.Require().NoError(err) job, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.True(ok) @@ -193,11 +190,11 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { allocation := &nomadApi.Allocation{ID: tests.DefaultRunnerID} - defaultJob, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) + environment, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.Require().True(ok) - allocation.JobID = defaultJob.environmentID.toString() + allocation.JobID = environment.environmentID.toString() - _, ok = defaultJob.idleRunners.Get(allocation.ID) + _, ok = environment.idleRunners.Get(allocation.ID) s.Require().False(ok) modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) { @@ -214,18 +211,17 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { go s.nomadRunnerManager.updateRunners(ctx) <-time.After(10 * time.Millisecond) - _, ok = defaultJob.idleRunners.Get(allocation.ID) + _, ok = environment.idleRunners.Get(allocation.JobID) s.True(ok) } func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { - allocation := &nomadApi.Allocation{ID: tests.DefaultRunnerID} - defaultJob, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) + allocation := &nomadApi.Allocation{JobID: tests.DefaultJobID} + environment, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.Require().True(ok) - allocation.JobID = defaultJob.environmentID.toString() - testRunner := NewRunner(allocation.ID) - defaultJob.idleRunners.Add(testRunner) + testRunner := NewRunner(allocation.JobID) + environment.idleRunners.Add(testRunner) s.nomadRunnerManager.usedRunners.Add(testRunner) modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) { @@ -242,9 +238,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { go s.nomadRunnerManager.updateRunners(ctx) <-time.After(10 * time.Millisecond) - _, ok = defaultJob.idleRunners.Get(allocation.ID) + _, ok = environment.idleRunners.Get(allocation.JobID) s.False(ok) - _, ok = s.nomadRunnerManager.usedRunners.Get(allocation.ID) + _, ok = s.nomadRunnerManager.usedRunners.Get(allocation.JobID) s.False(ok) } diff --git a/runner/nomad_environment_storage.go b/runner/nomad_environment_storage.go index d294db5..ee8ddac 100644 --- a/runner/nomad_environment_storage.go +++ b/runner/nomad_environment_storage.go @@ -4,70 +4,70 @@ import ( "sync" ) -// NomadEnvironmentStorage is an interface for storing NomadJobs. +// NomadEnvironmentStorage is an interface for storing Nomad environments. type NomadEnvironmentStorage interface { // List returns all keys of environments stored in this storage. List() []EnvironmentID - // Add adds a job to the storage. - // It overwrites the old job if one with the same id was already stored. - Add(job *NomadEnvironment) + // Add adds an environment to the storage. + // It overwrites the old environment if one with the same id was already stored. + Add(environment *NomadEnvironment) - // Get returns a job from the storage. - // Iff the job does not exist in the store, ok will be false. - Get(id EnvironmentID) (job *NomadEnvironment, ok bool) + // Get returns an environment from the storage. + // Iff the environment does not exist in the store, ok will be false. + Get(id EnvironmentID) (environment *NomadEnvironment, ok bool) - // Delete deletes the job with the passed id from the storage. It does nothing if no job with the id is present in - // the storage. + // Delete deletes the environment with the passed id from the storage. It does nothing if no environment with the id + // is present in the storage. Delete(id EnvironmentID) // Length returns the number of currently stored environments in the storage. Length() int } -// localNomadJobStorage stores NomadEnvironment objects in the local application memory. -type localNomadJobStorage struct { +// localNomadEnvironmentStorage stores NomadEnvironment objects in the local application memory. +type localNomadEnvironmentStorage struct { sync.RWMutex - jobs map[EnvironmentID]*NomadEnvironment + environments map[EnvironmentID]*NomadEnvironment } -// NewLocalNomadJobStorage responds with an empty localNomadJobStorage. +// NewLocalNomadEnvironmentStorage responds with an empty localNomadEnvironmentStorage. // This implementation stores the data thread-safe in the local application memory. -func NewLocalNomadJobStorage() *localNomadJobStorage { - return &localNomadJobStorage{ - jobs: make(map[EnvironmentID]*NomadEnvironment), +func NewLocalNomadEnvironmentStorage() *localNomadEnvironmentStorage { + return &localNomadEnvironmentStorage{ + environments: make(map[EnvironmentID]*NomadEnvironment), } } -func (s *localNomadJobStorage) List() []EnvironmentID { - keys := make([]EnvironmentID, 0, len(s.jobs)) - for k := range s.jobs { +func (s *localNomadEnvironmentStorage) List() []EnvironmentID { + keys := make([]EnvironmentID, 0, len(s.environments)) + for k := range s.environments { keys = append(keys, k) } return keys } -func (s *localNomadJobStorage) Add(job *NomadEnvironment) { +func (s *localNomadEnvironmentStorage) Add(environment *NomadEnvironment) { s.Lock() defer s.Unlock() - s.jobs[job.ID()] = job + s.environments[environment.ID()] = environment } -func (s *localNomadJobStorage) Get(id EnvironmentID) (job *NomadEnvironment, ok bool) { +func (s *localNomadEnvironmentStorage) Get(id EnvironmentID) (environment *NomadEnvironment, ok bool) { s.RLock() defer s.RUnlock() - job, ok = s.jobs[id] + environment, ok = s.environments[id] return } -func (s *localNomadJobStorage) Delete(id EnvironmentID) { +func (s *localNomadEnvironmentStorage) Delete(id EnvironmentID) { s.Lock() defer s.Unlock() - delete(s.jobs, id) + delete(s.environments, id) } -func (s *localNomadJobStorage) Length() int { +func (s *localNomadEnvironmentStorage) Length() int { s.RLock() defer s.RUnlock() - return len(s.jobs) + return len(s.environments) } diff --git a/runner/nomad_environment_storage_test.go b/runner/nomad_environment_storage_test.go index 0da9e76..ae4bcc0 100644 --- a/runner/nomad_environment_storage_test.go +++ b/runner/nomad_environment_storage_test.go @@ -6,71 +6,71 @@ import ( "testing" ) -func TestJobStoreTestSuite(t *testing.T) { - suite.Run(t, new(JobStoreTestSuite)) +func TestEnvironmentStoreTestSuite(t *testing.T) { + suite.Run(t, new(EnvironmentStoreTestSuite)) } -type JobStoreTestSuite struct { +type EnvironmentStoreTestSuite struct { suite.Suite - jobStorage *localNomadJobStorage - job *NomadEnvironment + environmentStorage *localNomadEnvironmentStorage + environment *NomadEnvironment } -func (s *JobStoreTestSuite) SetupTest() { - s.jobStorage = NewLocalNomadJobStorage() - s.job = &NomadEnvironment{environmentID: defaultEnvironmentID} +func (s *EnvironmentStoreTestSuite) SetupTest() { + s.environmentStorage = NewLocalNomadEnvironmentStorage() + s.environment = &NomadEnvironment{environmentID: defaultEnvironmentID} } -func (s *JobStoreTestSuite) TestAddedJobCanBeRetrieved() { - s.jobStorage.Add(s.job) - retrievedJob, ok := s.jobStorage.Get(s.job.ID()) +func (s *EnvironmentStoreTestSuite) TestAddedEnvironmentCanBeRetrieved() { + s.environmentStorage.Add(s.environment) + retrievedEnvironment, ok := s.environmentStorage.Get(s.environment.ID()) s.True(ok, "A saved runner should be retrievable") - s.Equal(s.job, retrievedJob) + s.Equal(s.environment, retrievedEnvironment) } -func (s *JobStoreTestSuite) TestJobWithSameIdOverwritesOldOne() { - otherJobWithSameID := &NomadEnvironment{environmentID: defaultEnvironmentID} - otherJobWithSameID.templateJob = &nomadApi.Job{} - s.NotEqual(s.job, otherJobWithSameID) +func (s *EnvironmentStoreTestSuite) TestEnvironmentWithSameIdOverwritesOldOne() { + otherEnvironmentWithSameID := &NomadEnvironment{environmentID: defaultEnvironmentID} + otherEnvironmentWithSameID.templateJob = &nomadApi.Job{} + s.NotEqual(s.environment, otherEnvironmentWithSameID) - s.jobStorage.Add(s.job) - s.jobStorage.Add(otherJobWithSameID) - retrievedJob, _ := s.jobStorage.Get(s.job.ID()) - s.NotEqual(s.job, retrievedJob) - s.Equal(otherJobWithSameID, retrievedJob) + s.environmentStorage.Add(s.environment) + s.environmentStorage.Add(otherEnvironmentWithSameID) + retrievedEnvironment, _ := s.environmentStorage.Get(s.environment.ID()) + s.NotEqual(s.environment, retrievedEnvironment) + s.Equal(otherEnvironmentWithSameID, retrievedEnvironment) } -func (s *JobStoreTestSuite) TestDeletedJobIsNotAccessible() { - s.jobStorage.Add(s.job) - s.jobStorage.Delete(s.job.ID()) - retrievedRunner, ok := s.jobStorage.Get(s.job.ID()) +func (s *EnvironmentStoreTestSuite) TestDeletedEnvironmentIsNotAccessible() { + s.environmentStorage.Add(s.environment) + s.environmentStorage.Delete(s.environment.ID()) + retrievedRunner, ok := s.environmentStorage.Get(s.environment.ID()) s.Nil(retrievedRunner) s.False(ok, "A deleted runner should not be accessible") } -func (s *JobStoreTestSuite) TestLenOfEmptyPoolIsZero() { - s.Equal(0, s.jobStorage.Length()) +func (s *EnvironmentStoreTestSuite) TestLenOfEmptyPoolIsZero() { + s.Equal(0, s.environmentStorage.Length()) } -func (s *JobStoreTestSuite) TestLenChangesOnStoreContentChange() { - s.Run("len increases when job is added", func() { - s.jobStorage.Add(s.job) - s.Equal(1, s.jobStorage.Length()) +func (s *EnvironmentStoreTestSuite) TestLenChangesOnStoreContentChange() { + s.Run("len increases when environment is added", func() { + s.environmentStorage.Add(s.environment) + s.Equal(1, s.environmentStorage.Length()) }) - s.Run("len does not increase when job with same id is added", func() { - s.jobStorage.Add(s.job) - s.Equal(1, s.jobStorage.Length()) + s.Run("len does not increase when environment with same id is added", func() { + s.environmentStorage.Add(s.environment) + s.Equal(1, s.environmentStorage.Length()) }) - s.Run("len increases again when different job is added", func() { - anotherJob := &NomadEnvironment{environmentID: anotherEnvironmentID} - s.jobStorage.Add(anotherJob) - s.Equal(2, s.jobStorage.Length()) + s.Run("len increases again when different environment is added", func() { + anotherEnvironment := &NomadEnvironment{environmentID: anotherEnvironmentID} + s.environmentStorage.Add(anotherEnvironment) + s.Equal(2, s.environmentStorage.Length()) }) - s.Run("len decreases when job is deleted", func() { - s.jobStorage.Delete(s.job.ID()) - s.Equal(1, s.jobStorage.Length()) + s.Run("len decreases when environment is deleted", func() { + s.environmentStorage.Delete(s.environment.ID()) + s.Equal(1, s.environmentStorage.Length()) }) } diff --git a/runner/runner.go b/runner/runner.go index fa4934f..c27b295 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -50,12 +50,11 @@ type Runner interface { UpdateFileSystem(request *dto.UpdateFileSystemRequest) error } -// NomadJob is an abstraction to communicate with Nomad jobs. +// NomadJob is an abstraction to communicate with Nomad environments. type NomadJob struct { ExecutionStorage - id string - allocID string - api nomad.ExecutorAPI + id string + api nomad.ExecutorAPI } // NewNomadJob creates a new NomadJob with the provided id. diff --git a/runner/runner_test.go b/runner/runner_test.go index 85d29b0..5a268bf 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -20,19 +20,19 @@ import ( ) func TestIdIsStored(t *testing.T) { - runner := NewNomadJob("42", nil) - assert.Equal(t, "42", runner.Id()) + runner := NewNomadJob(tests.DefaultJobID, nil) + assert.Equal(t, tests.DefaultJobID, runner.Id()) } func TestMarshalRunner(t *testing.T) { - runner := NewNomadJob("42", nil) + runner := NewNomadJob(tests.DefaultJobID, nil) marshal, err := json.Marshal(runner) assert.NoError(t, err) - assert.Equal(t, "{\"runnerId\":\"42\"}", string(marshal)) + assert.Equal(t, "{\"runnerId\":\""+tests.DefaultJobID+"\"}", string(marshal)) } func TestExecutionRequestIsStored(t *testing.T) { - runner := NewNomadJob("42", nil) + runner := NewNomadJob(tests.DefaultJobID, nil) executionRequest := &dto.ExecutionRequest{ Command: "command", TimeLimit: 10, @@ -47,7 +47,7 @@ func TestExecutionRequestIsStored(t *testing.T) { } func TestNewContextReturnsNewContextWithRunner(t *testing.T) { - runner := NewNomadJob("testRunner", nil) + runner := NewNomadJob(tests.DefaultRunnerID, nil) ctx := context.Background() newCtx := NewContext(ctx, runner) storedRunner := newCtx.Value(runnerContextKey).(Runner) @@ -57,7 +57,7 @@ func TestNewContextReturnsNewContextWithRunner(t *testing.T) { } func TestFromContextReturnsRunner(t *testing.T) { - runner := NewNomadJob("testRunner", nil) + runner := NewNomadJob(tests.DefaultRunnerID, nil) ctx := NewContext(context.Background(), runner) storedRunner, ok := FromContext(ctx) diff --git a/tests/constants.go b/tests/constants.go index 3d6187b..2753c6e 100644 --- a/tests/constants.go +++ b/tests/constants.go @@ -13,8 +13,10 @@ const ( DefaultEnvironmentIDAsString = "0" AnotherEnvironmentIDAsInteger = 42 AnotherEnvironmentIDAsString = "42" - DefaultJobID = DefaultEnvironmentIDAsString - AnotherJobID = AnotherEnvironmentIDAsString + DefaultUUID = "MY-DEFAULT-RANDOM-UUID" + AnotherUUID = "another-uuid-43" + DefaultJobID = DefaultEnvironmentIDAsString + "-" + DefaultUUID + AnotherJobID = AnotherEnvironmentIDAsString + "-" + AnotherUUID DefaultRunnerID = DefaultJobID AnotherRunnerID = AnotherJobID DefaultExecutionID = "s0m3-3x3cu710n-1d" diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index 264ac8a..0567df7 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -1,6 +1,7 @@ package e2e import ( + "flag" nomadApi "github.com/hashicorp/nomad/api" "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/api" @@ -22,9 +23,10 @@ import ( */ var ( - log = logging.GetLogger("e2e") - nomadClient *nomadApi.Client - nomadNamespace string + log = logging.GetLogger("e2e") + testDockerImage = flag.String("dockerImage", "", "Docker image to use in E2E tests") + nomadClient *nomadApi.Client + nomadNamespace string ) type E2ETestSuite struct { @@ -68,20 +70,24 @@ func TestMain(m *testing.M) { } func createDefaultEnvironment() { + if *testDockerImage == "" { + log.Fatal("You must specify the -dockerImage flag!") + } + path := helpers.BuildURL(api.BasePath, api.EnvironmentsPath, tests.DefaultEnvironmentIDAsString) request := dto.ExecutionEnvironmentRequest{ PrewarmingPoolSize: 10, CPULimit: 100, MemoryLimit: 100, - Image: "drp.codemoon.xopic.de/openhpi/co_execenv_python:3.8", + Image: *testDockerImage, NetworkAccess: false, ExposedPorts: nil, } resp, err := helpers.HttpPutJSON(path, request) if err != nil || resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusNoContent { - log.Fatal("Couldn't create default environment for e2e tests") + log.WithError(err).Fatal("Couldn't create default environment for e2e tests") } err = resp.Body.Close() if err != nil { diff --git a/tests/e2e/environments_test.go b/tests/e2e/environments_test.go index f5da93c..ebe6e46 100644 --- a/tests/e2e/environments_test.go +++ b/tests/e2e/environments_test.go @@ -139,7 +139,7 @@ func validateJob(t *testing.T, expected dto.ExecutionEnvironmentRequest) { func findNomadJob(t *testing.T, jobID string) *nomadApi.Job { t.Helper() - job, _, err := nomadClient.Jobs().Info(nomad.DefaultJobID(jobID), nil) + job, _, err := nomadClient.Jobs().Info(nomad.TemplateJobID(jobID), nil) if err != nil { t.Fatalf("Error retrieving Nomad job: %v", err) }