diff --git a/environment/job.go b/environment/job.go deleted file mode 100644 index 4fde8a1..0000000 --- a/environment/job.go +++ /dev/null @@ -1,198 +0,0 @@ -package environment - -import ( - "context" - _ "embed" - nomadApi "github.com/hashicorp/nomad/api" - "github.com/hashicorp/nomad/jobspec2" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" - "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" - "strconv" -) - -// defaultJobHCL holds our default job in HCL format. -// The default job is used when creating new job and provides -// common settings that all the jobs share. -//go:embed default-job.hcl -var defaultJobHCL string - -// registerTemplateJob creates a Nomad job based on the default job configuration and the given parameters. -// It registers the job with Nomad and waits until the registration completes. -func (m *NomadEnvironmentManager) registerTemplateJob( - environmentID runner.EnvironmentID, - prewarmingPoolSize, cpuLimit, memoryLimit uint, - image string, - networkAccess bool, - exposedPorts []uint16) (*nomadApi.Job, error) { - job := createTemplateJob(m.defaultJob, environmentID, prewarmingPoolSize, - cpuLimit, memoryLimit, image, networkAccess, exposedPorts) - evalID, err := m.api.RegisterNomadJob(job) - if err != nil { - return nil, err - } - return job, m.api.MonitorEvaluation(evalID, context.Background()) -} - -func createTemplateJob( - defaultJob nomadApi.Job, - environmentID runner.EnvironmentID, - prewarmingPoolSize, cpuLimit, memoryLimit uint, - image string, - networkAccess bool, - exposedPorts []uint16) *nomadApi.Job { - job := defaultJob - templateJobID := nomad.TemplateJobID(strconv.Itoa(int(environmentID))) - job.ID = &templateJobID - job.Name = &templateJobID - - var taskGroup = createTaskGroup(&job, nomad.TaskGroupName, prewarmingPoolSize) - configureTask(taskGroup, nomad.TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) - storeConfiguration(&job, environmentID, prewarmingPoolSize) - - return &job -} - -func parseJob(jobHCL string) *nomadApi.Job { - config := jobspec2.ParseConfig{ - Body: []byte(jobHCL), - AllowFS: false, - Strict: true, - } - job, err := jobspec2.ParseWithConfig(&config) - if err != nil { - log.WithError(err).Fatal("Error parsing Nomad job") - return nil - } - - return job -} - -func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup { - var taskGroup *nomadApi.TaskGroup - if len(job.TaskGroups) == 0 { - taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize)) - job.TaskGroups = []*nomadApi.TaskGroup{taskGroup} - } else { - taskGroup = job.TaskGroups[0] - taskGroup.Name = &name - count := 1 - taskGroup.Count = &count - } - return taskGroup -} - -func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) { - if len(taskGroup.Tasks) == 0 { - // This function is only used internally and must be called as last step when configuring the task. - // This error is not recoverable. - log.Fatal("Can't configure network before task has been configured!") - } - task := taskGroup.Tasks[0] - - if task.Config == nil { - task.Config = make(map[string]interface{}) - } - - if networkAccess { - var networkResource *nomadApi.NetworkResource - if len(taskGroup.Networks) == 0 { - networkResource = &nomadApi.NetworkResource{} - taskGroup.Networks = []*nomadApi.NetworkResource{networkResource} - } else { - networkResource = taskGroup.Networks[0] - } - // Prefer "bridge" network over "host" to have an isolated network namespace with bridged interface - // instead of joining the host network namespace. - networkResource.Mode = "bridge" - for _, portNumber := range exposedPorts { - port := nomadApi.Port{ - Label: strconv.FormatUint(uint64(portNumber), 10), - To: int(portNumber), - } - networkResource.DynamicPorts = append(networkResource.DynamicPorts, port) - } - - // Explicitly set mode to override existing settings when updating job from without to with network. - // Don't use bridge as it collides with the bridge mode above. This results in Docker using 'bridge' - // mode, meaning all allocations will be attached to the `docker0` adapter and could reach other - // non-Nomad containers attached to it. This is avoided when using Nomads bridge network mode. - task.Config["network_mode"] = "" - } else { - // Somehow, we can't set the network mode to none in the NetworkResource on task group level. - // See https://github.com/hashicorp/nomad/issues/10540 - task.Config["network_mode"] = "none" - // Explicitly set Networks to signal Nomad to remove the possibly existing networkResource - taskGroup.Networks = []*nomadApi.NetworkResource{} - } -} - -func configureTask( - taskGroup *nomadApi.TaskGroup, - name string, - cpuLimit, memoryLimit uint, - image string, - networkAccess bool, - exposedPorts []uint16) { - var task *nomadApi.Task - if len(taskGroup.Tasks) == 0 { - task = nomadApi.NewTask(name, nomad.DefaultTaskDriver) - taskGroup.Tasks = []*nomadApi.Task{task} - } else { - task = taskGroup.Tasks[0] - task.Name = name - } - integerCPULimit := int(cpuLimit) - integerMemoryLimit := int(memoryLimit) - task.Resources = &nomadApi.Resources{ - CPU: &integerCPULimit, - MemoryMB: &integerMemoryLimit, - } - - if task.Config == nil { - task.Config = make(map[string]interface{}) - } - task.Config["image"] = image - - configureNetwork(taskGroup, networkAccess, exposedPorts) -} - -func storeConfiguration(job *nomadApi.Job, id runner.EnvironmentID, prewarmingPoolSize uint) { - taskGroup := findOrCreateConfigTaskGroup(job) - - if taskGroup.Meta == nil { - taskGroup.Meta = make(map[string]string) - } - taskGroup.Meta[nomad.ConfigMetaEnvironmentKey] = strconv.Itoa(int(id)) - taskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUnusedValue - taskGroup.Meta[nomad.ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize)) -} - -func findOrCreateConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { - taskGroup := nomad.FindConfigTaskGroup(job) - if taskGroup == nil { - taskGroup = nomadApi.NewTaskGroup(nomad.ConfigTaskGroupName, 0) - } - createDummyTaskIfNotPresent(taskGroup) - return taskGroup -} - -// createDummyTaskIfNotPresent ensures that a dummy task is in the task group so that the group is accepted by Nomad. -func createDummyTaskIfNotPresent(taskGroup *nomadApi.TaskGroup) { - var task *nomadApi.Task - for _, t := range taskGroup.Tasks { - if t.Name == nomad.DummyTaskName { - task = t - break - } - } - - if task == nil { - task = nomadApi.NewTask(nomad.DummyTaskName, nomad.DefaultDummyTaskDriver) - taskGroup.Tasks = append(taskGroup.Tasks, task) - } - - if task.Config == nil { - task.Config = make(map[string]interface{}) - } - task.Config["command"] = nomad.DefaultTaskCommand -} diff --git a/environment/manager.go b/environment/manager.go index e84f9db..24424be 100644 --- a/environment/manager.go +++ b/environment/manager.go @@ -1,8 +1,10 @@ package environment import ( + _ "embed" "fmt" nomadApi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/jobspec2" "github.com/hashicorp/nomad/nomad/structs" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" @@ -11,6 +13,12 @@ import ( "strconv" ) +// defaultJobHCL holds our default job in HCL format. +// The default job is used when creating new job and provides +// common settings that all the jobs share. +//go:embed default-job.hcl +var defaultJobHCL string + var log = logging.GetLogger("environment") // Manager encapsulates API calls to the executor API for creation and deletion of execution environments. @@ -43,7 +51,7 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( id runner.EnvironmentID, request dto.ExecutionEnvironmentRequest, ) (bool, error) { - templateJob, err := m.registerTemplateJob(id, + templateJob, err := m.api.RegisterTemplateJob(&m.defaultJob, int(id), request.PrewarmingPoolSize, request.CPULimit, request.MemoryLimit, request.Image, request.NetworkAccess, request.ExposedPorts) @@ -132,3 +140,18 @@ func (m *NomadEnvironmentManager) recoverJobs(jobs []*nomadApi.Job, onJob jobAdd } } } + +func parseJob(jobHCL string) *nomadApi.Job { + config := jobspec2.ParseConfig{ + Body: []byte(jobHCL), + AllowFS: false, + Strict: true, + } + job, err := jobspec2.ParseWithConfig(&config) + if err != nil { + log.WithError(err).Fatal("Error parsing Nomad job") + return nil + } + + return job +} diff --git a/environment/manager_test.go b/environment/manager_test.go index 5145a3e..355bd49 100644 --- a/environment/manager_test.go +++ b/environment/manager_test.go @@ -2,6 +2,9 @@ package environment import ( nomadApi "github.com/hashicorp/nomad/api" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" @@ -13,11 +16,11 @@ import ( type CreateOrUpdateTestSuite struct { suite.Suite - runnerManagerMock runner.ManagerMock - apiMock nomad.ExecutorAPIMock - registerNomadJobMockCall *mock.Call - request dto.ExecutionEnvironmentRequest - manager *NomadEnvironmentManager + runnerManagerMock runner.ManagerMock + apiMock nomad.ExecutorAPIMock + request dto.ExecutionEnvironmentRequest + manager *NomadEnvironmentManager + environmentID runner.EnvironmentID } func TestCreateOrUpdateTestSuite(t *testing.T) { @@ -37,82 +40,107 @@ func (s *CreateOrUpdateTestSuite) SetupTest() { ExposedPorts: nil, } - s.registerNomadJobMockCall = s.apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("eval-id", nil) - s.apiMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.AnythingOfType("*context.emptyCtx")).Return(nil) - s.manager = &NomadEnvironmentManager{ runnerManager: &s.runnerManagerMock, api: &s.apiMock, } + + s.environmentID = runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger) } -func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(exists bool) *mock.Call { - return s.runnerManagerMock.On("CreateOrUpdateEnvironment", +func (s *CreateOrUpdateTestSuite) mockRegisterTemplateJob(job *nomadApi.Job, err error) { + s.apiMock.On("RegisterTemplateJob", + mock.AnythingOfType("*api.Job"), mock.AnythingOfType("int"), + mock.AnythingOfType("uint"), mock.AnythingOfType("uint"), mock.AnythingOfType("uint"), + mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("[]uint16")). + Return(job, err) +} + +func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(created bool, err error) { + s.runnerManagerMock.On("CreateOrUpdateEnvironment", mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("uint"), mock.AnythingOfType("*api.Job")). - Return(!exists, nil) + Return(created, err) } func (s *CreateOrUpdateTestSuite) createJobForRequest() *nomadApi.Job { - return createTemplateJob(s.manager.defaultJob, tests.DefaultEnvironmentIDAsInteger, + return nomad.CreateTemplateJob(&s.manager.defaultJob, tests.DefaultEnvironmentIDAsInteger, s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } -func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsRegistersCorrectJob() { - s.mockCreateOrUpdateEnvironment(true) - expectedJob := s.createJobForRequest() +func (s *CreateOrUpdateTestSuite) TestRegistersCorrectTemplateJob() { + s.mockRegisterTemplateJob(&nomadApi.Job{}, nil) + s.mockCreateOrUpdateEnvironment(true, nil) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) + _, err := s.manager.CreateOrUpdate(s.environmentID, s.request) s.NoError(err) - s.False(created) - s.apiMock.AssertCalled(s.T(), "RegisterNomadJob", expectedJob) + + s.apiMock.AssertCalled(s.T(), "RegisterTemplateJob", + &s.manager.defaultJob, int(s.environmentID), + s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, + s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } -func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsOccurredErrorIsPassed() { - s.mockCreateOrUpdateEnvironment(true) +func (s *CreateOrUpdateTestSuite) TestReturnsErrorWhenRegisterTemplateJobReturnsError() { + s.mockRegisterTemplateJob(nil, tests.ErrDefault) - s.registerNomadJobMockCall.Return("", tests.ErrDefault) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) - s.False(created) + created, err := s.manager.CreateOrUpdate(s.environmentID, s.request) s.Equal(tests.ErrDefault, err) -} - -func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsReturnsFalse() { - s.mockCreateOrUpdateEnvironment(true) - - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) - s.NoError(err) s.False(created) } -func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrectJob() { - s.mockCreateOrUpdateEnvironment(false) +func (s *CreateOrUpdateTestSuite) TestCreatesOrUpdatesCorrectEnvironment() { + templateJobID := tests.DefaultJobID + templateJob := &nomadApi.Job{ID: &templateJobID} + s.mockRegisterTemplateJob(templateJob, nil) + s.mockCreateOrUpdateEnvironment(true, nil) - expectedJob := s.createJobForRequest() - - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) - s.NoError(err) - s.True(created) - s.apiMock.AssertCalled(s.T(), "RegisterNomadJob", expectedJob) -} - -func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrectEnvironment() { - s.mockCreateOrUpdateEnvironment(false) - - expectedJob := s.createJobForRequest() - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) - s.True(created) + _, err := s.manager.CreateOrUpdate(s.environmentID, s.request) s.NoError(err) s.runnerManagerMock.AssertCalled(s.T(), "CreateOrUpdateEnvironment", - runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request.PrewarmingPoolSize, expectedJob) + s.environmentID, s.request.PrewarmingPoolSize, templateJob) } -func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistOccurredErrorIsPassedAndNoEnvironmentRegistered() { - s.mockCreateOrUpdateEnvironment(false) - - s.registerNomadJobMockCall.Return("", tests.ErrDefault) - created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request) - s.False(created) +func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentReturnsError() { + s.mockRegisterTemplateJob(&nomadApi.Job{}, nil) + s.mockCreateOrUpdateEnvironment(false, tests.ErrDefault) + _, err := s.manager.CreateOrUpdate(runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request) s.Equal(tests.ErrDefault, err) - s.runnerManagerMock.AssertNotCalled(s.T(), "CreateOrUpdateEnvironment") +} + +func (s *CreateOrUpdateTestSuite) TestReturnsTrueIfCreatesOrUpdateEnvironmentReturnsTrue() { + s.mockRegisterTemplateJob(&nomadApi.Job{}, nil) + s.mockCreateOrUpdateEnvironment(true, nil) + created, _ := s.manager.CreateOrUpdate(runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request) + s.True(created) +} + +func (s *CreateOrUpdateTestSuite) TestReturnsFalseIfCreatesOrUpdateEnvironmentReturnsFalse() { + s.mockRegisterTemplateJob(&nomadApi.Job{}, nil) + s.mockCreateOrUpdateEnvironment(false, nil) + created, _ := s.manager.CreateOrUpdate(runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request) + s.False(created) +} + +func TestParseJob(t *testing.T) { + exited := false + logger, hook := test.NewNullLogger() + logger.ExitFunc = func(i int) { + exited = true + } + + log = logger.WithField("pkg", "nomad") + + t.Run("parses the given default job", func(t *testing.T) { + job := parseJob(defaultJobHCL) + assert.False(t, exited) + assert.NotNil(t, job) + }) + + t.Run("fatals when given wrong job", func(t *testing.T) { + job := parseJob("") + assert.True(t, exited) + assert.Nil(t, job) + assert.Equal(t, logrus.FatalLevel, hook.LastEntry().Level) + }) } diff --git a/nomad/api_querier.go b/nomad/api_querier.go index 5f437bc..befdd19 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -143,3 +143,32 @@ func (nc *nomadAPIClient) writeOptions() *nomadApi.WriteOptions { Namespace: nc.namespace, } } + +// LoadJobList loads the list of jobs from the Nomad api. +func (nc *nomadAPIClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { + list, _, err = nc.client.Jobs().List(nc.queryOptions()) + return +} + +// JobScale returns the scale of the passed job. +func (nc *nomadAPIClient) JobScale(jobID string) (jobScale uint, err error) { + status, _, err := nc.client.Jobs().ScaleStatus(jobID, nc.queryOptions()) + if err != nil { + return + } + // ToDo: Consider counting also the placed and desired allocations + jobScale = uint(status.TaskGroups[TaskGroupName].Running) + return +} + +// SetJobScale sets the scaling count of the passed job to Nomad. +func (nc *nomadAPIClient) SetJobScale(jobID string, count uint, reason string) (err error) { + intCount := int(count) + _, _, err = nc.client.Jobs().Scale(jobID, TaskGroupName, &intCount, reason, false, nil, nil) + return +} + +func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) { + job, _, err = nc.client.Jobs().Info(jobID, nil) + return +} diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index 2d49f07..b68a8b9 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -193,7 +193,7 @@ func (_m *apiQuerierMock) init(nomadURL *url.URL, nomadNamespace string) error { return r0 } -// jobInfo provides a mock function with given fields: jobID +// job provides a mock function with given fields: jobID func (_m *apiQuerierMock) job(jobID string) (*api.Job, error) { ret := _m.Called(jobID) diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index c549e5b..95813e2 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -165,6 +165,29 @@ func (_m *ExecutorAPIMock) LoadAllJobs() ([]*api.Job, error) { return r0, r1 } +// LoadEnvironmentTemplate provides a mock function with given fields: environmentID +func (_m *ExecutorAPIMock) LoadEnvironmentTemplate(environmentID string) (*api.Job, error) { + ret := _m.Called(environmentID) + + var r0 *api.Job + if rf, ok := ret.Get(0).(func(string) *api.Job); ok { + r0 = rf(environmentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(environmentID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadJobList provides a mock function with given fields: func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { ret := _m.Called() @@ -283,6 +306,29 @@ func (_m *ExecutorAPIMock) RegisterNomadJob(job *api.Job) (string, error) { return r0, r1 } +// RegisterTemplateJob provides a mock function with given fields: defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts +func (_m *ExecutorAPIMock) RegisterTemplateJob(defaultJob *api.Job, environmentID int, prewarmingPoolSize uint, cpuLimit uint, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) (*api.Job, error) { + ret := _m.Called(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + + var r0 *api.Job + if rf, ok := ret.Get(0).(func(*api.Job, int, uint, uint, uint, string, bool, []uint16) *api.Job); ok { + r0 = rf(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*api.Job, int, uint, uint, uint, string, bool, []uint16) error); ok { + r1 = rf(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SetJobScale provides a mock function with given fields: jobId, count, reason func (_m *ExecutorAPIMock) SetJobScale(jobId string, count uint, reason string) error { ret := _m.Called(jobId, count, reason) @@ -325,7 +371,7 @@ func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error return r0 } -// jobInfo provides a mock function with given fields: jobID +// job provides a mock function with given fields: jobID func (_m *ExecutorAPIMock) job(jobID string) (*api.Job, error) { ret := _m.Called(jobID) diff --git a/nomad/job.go b/nomad/job.go index f561eed..2369dc7 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" @@ -78,31 +79,171 @@ func SetMetaConfigValue(job *nomadApi.Job, key, value string) error { return nil } -// LoadJobList loads the list of jobs from the Nomad api. -func (nc *nomadAPIClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { - list, _, err = nc.client.Jobs().List(nc.queryOptions()) - return -} - -// JobScale returns the scale of the passed job. -func (nc *nomadAPIClient) JobScale(jobID string) (jobScale uint, err error) { - status, _, err := nc.client.Jobs().ScaleStatus(jobID, nc.queryOptions()) +// RegisterTemplateJob creates a Nomad job based on the default job configuration and the given parameters. +// It registers the job with Nomad and waits until the registration completes. +func (a *APIClient) RegisterTemplateJob( + defaultJob *nomadApi.Job, + environmentID int, + prewarmingPoolSize, cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) (*nomadApi.Job, error) { + job := CreateTemplateJob(defaultJob, environmentID, prewarmingPoolSize, + cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + evalID, err := a.apiQuerier.RegisterNomadJob(job) if err != nil { - return + return nil, fmt.Errorf("couldn't register template job: %w", err) } - // ToDo: Consider counting also the placed and desired allocations - jobScale = uint(status.TaskGroups[TaskGroupName].Running) - return + return job, a.MonitorEvaluation(evalID, context.Background()) } -// SetJobScale sets the scaling count of the passed job to Nomad. -func (nc *nomadAPIClient) SetJobScale(jobID string, count uint, reason string) (err error) { - intCount := int(count) - _, _, err = nc.client.Jobs().Scale(jobID, TaskGroupName, &intCount, reason, false, nil, nil) - return +// CreateTemplateJob creates a Nomad job based on the default job configuration and the given parameters. +// It registers the job with Nomad and waits until the registration completes. +func CreateTemplateJob( + defaultJob *nomadApi.Job, + environmentID int, + prewarmingPoolSize, cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) *nomadApi.Job { + job := *defaultJob + templateJobID := TemplateJobID(strconv.Itoa(environmentID)) + job.ID = &templateJobID + job.Name = &templateJobID + + var taskGroup = createTaskGroup(&job, TaskGroupName, prewarmingPoolSize) + configureTask(taskGroup, TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + storeConfiguration(&job, environmentID, prewarmingPoolSize) + + return &job } -func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) { - job, _, err = nc.client.Jobs().Info(jobID, nil) - return +func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup { + var taskGroup *nomadApi.TaskGroup + if len(job.TaskGroups) == 0 { + taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize)) + job.TaskGroups = []*nomadApi.TaskGroup{taskGroup} + } else { + taskGroup = job.TaskGroups[0] + taskGroup.Name = &name + count := 1 + taskGroup.Count = &count + } + return taskGroup +} + +func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) { + if len(taskGroup.Tasks) == 0 { + // This function is only used internally and must be called as last step when configuring the task. + // This error is not recoverable. + log.Fatal("Can't configure network before task has been configured!") + } + task := taskGroup.Tasks[0] + + if task.Config == nil { + task.Config = make(map[string]interface{}) + } + + if networkAccess { + var networkResource *nomadApi.NetworkResource + if len(taskGroup.Networks) == 0 { + networkResource = &nomadApi.NetworkResource{} + taskGroup.Networks = []*nomadApi.NetworkResource{networkResource} + } else { + networkResource = taskGroup.Networks[0] + } + // Prefer "bridge" network over "host" to have an isolated network namespace with bridged interface + // instead of joining the host network namespace. + networkResource.Mode = "bridge" + for _, portNumber := range exposedPorts { + port := nomadApi.Port{ + Label: strconv.FormatUint(uint64(portNumber), 10), + To: int(portNumber), + } + networkResource.DynamicPorts = append(networkResource.DynamicPorts, port) + } + + // Explicitly set mode to override existing settings when updating job from without to with network. + // Don't use bridge as it collides with the bridge mode above. This results in Docker using 'bridge' + // mode, meaning all allocations will be attached to the `docker0` adapter and could reach other + // non-Nomad containers attached to it. This is avoided when using Nomads bridge network mode. + task.Config["network_mode"] = "" + } else { + // Somehow, we can't set the network mode to none in the NetworkResource on task group level. + // See https://github.com/hashicorp/nomad/issues/10540 + task.Config["network_mode"] = "none" + // Explicitly set Networks to signal Nomad to remove the possibly existing networkResource + taskGroup.Networks = []*nomadApi.NetworkResource{} + } +} + +func configureTask( + taskGroup *nomadApi.TaskGroup, + name string, + cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) { + var task *nomadApi.Task + if len(taskGroup.Tasks) == 0 { + task = nomadApi.NewTask(name, DefaultTaskDriver) + taskGroup.Tasks = []*nomadApi.Task{task} + } else { + task = taskGroup.Tasks[0] + task.Name = name + } + integerCPULimit := int(cpuLimit) + integerMemoryLimit := int(memoryLimit) + task.Resources = &nomadApi.Resources{ + CPU: &integerCPULimit, + MemoryMB: &integerMemoryLimit, + } + + if task.Config == nil { + task.Config = make(map[string]interface{}) + } + task.Config["image"] = image + + configureNetwork(taskGroup, networkAccess, exposedPorts) +} + +func storeConfiguration(job *nomadApi.Job, id int, prewarmingPoolSize uint) { + taskGroup := findOrCreateConfigTaskGroup(job) + + if taskGroup.Meta == nil { + taskGroup.Meta = make(map[string]string) + } + taskGroup.Meta[ConfigMetaEnvironmentKey] = strconv.Itoa(id) + taskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUnusedValue + taskGroup.Meta[ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize)) +} + +func findOrCreateConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { + taskGroup := FindConfigTaskGroup(job) + if taskGroup == nil { + taskGroup = nomadApi.NewTaskGroup(ConfigTaskGroupName, 0) + } + createDummyTaskIfNotPresent(taskGroup) + return taskGroup +} + +// createDummyTaskIfNotPresent ensures that a dummy task is in the task group so that the group is accepted by Nomad. +func createDummyTaskIfNotPresent(taskGroup *nomadApi.TaskGroup) { + var task *nomadApi.Task + for _, t := range taskGroup.Tasks { + if t.Name == DummyTaskName { + task = t + break + } + } + + if task == nil { + task = nomadApi.NewTask(DummyTaskName, DefaultDummyTaskDriver) + taskGroup.Tasks = append(taskGroup.Tasks, task) + } + + if task.Config == nil { + task.Config = make(map[string]interface{}) + } + task.Config["command"] = DefaultTaskCommand } diff --git a/environment/job_test.go b/nomad/job_test.go similarity index 76% rename from environment/job_test.go rename to nomad/job_test.go index 8f2943e..3d9db24 100644 --- a/environment/job_test.go +++ b/nomad/job_test.go @@ -1,7 +1,6 @@ -package environment +package nomad import ( - "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/sirupsen/logrus" @@ -9,35 +8,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" - "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "testing" ) -func TestParseJob(t *testing.T) { - exited := false - logger, hook := test.NewNullLogger() - logger.ExitFunc = func(i int) { - exited = true - } - - log = logger.WithField("pkg", "nomad") - - t.Run("parses the given default job", func(t *testing.T) { - job := parseJob(defaultJobHCL) - assert.False(t, exited) - assert.NotNil(t, job) - }) - - t.Run("fatals when given wrong job", func(t *testing.T) { - job := parseJob("") - assert.True(t, exited) - assert.Nil(t, job) - assert.Equal(t, logrus.FatalLevel, hook.LastEntry().Level) - }) -} - func createTestTaskGroup() *nomadApi.TaskGroup { return nomadApi.NewTaskGroup("taskGroup", 42) } @@ -52,18 +26,18 @@ func createTestResources() *nomadApi.Resources { return &nomadApi.Resources{CPU: &expectedCPULimit, MemoryMB: &expectedMemoryLimit} } -func createTestJob() (*nomadApi.Job, *nomadApi.Job) { - jobID := nomad.TemplateJobID(tests.DefaultEnvironmentIDAsString) - base := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) - job := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) +func createTestJob() (job, base *nomadApi.Job) { + jobID := TemplateJobID(tests.DefaultEnvironmentIDAsString) + base = nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) + job = nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) task := createTestTask() - task.Name = nomad.TaskName + task.Name = TaskName image := "python:latest" task.Config = map[string]interface{}{"image": image} task.Config["network_mode"] = "none" task.Resources = createTestResources() taskGroup := createTestTaskGroup() - taskGroupName := nomad.TaskGroupName + taskGroupName := TaskGroupName taskGroup.Name = &taskGroupName taskGroup.Tasks = []*nomadApi.Task{task} taskGroup.Networks = []*nomadApi.NetworkResource{} @@ -206,7 +180,7 @@ func TestConfigureTaskWhenNoTaskExists(t *testing.T) { expectedResources := createTestResources() expectedTaskGroup := *taskGroup - expectedTask := nomadApi.NewTask("task", nomad.DefaultTaskDriver) + expectedTask := nomadApi.NewTask("task", DefaultTaskDriver) expectedTask.Resources = expectedResources expectedImage := "python:latest" expectedTask.Config = map[string]interface{}{"image": expectedImage, "network_mode": "none"} @@ -247,12 +221,11 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { func TestCreateTemplateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() - manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.APIClient{}, *base} - testJobEnvironmentID, err := nomad.EnvironmentIDFromJobID(*testJob.ID) + testJobEnvironmentID, err := EnvironmentIDFromJobID(*testJob.ID) assert.NoError(t, err) - job := createTemplateJob( - manager.defaultJob, - runner.EnvironmentID(testJobEnvironmentID), + job := CreateTemplateJob( + base, + testJobEnvironmentID, uint(*testJob.TaskGroups[0].Count), uint(*testJob.TaskGroups[0].Tasks[0].Resources.CPU), uint(*testJob.TaskGroups[0].Tasks[0].Resources.MemoryMB), @@ -264,53 +237,51 @@ func TestCreateTemplateJobSetsAllGivenArguments(t *testing.T) { } func TestRegisterTemplateJobFailsWhenNomadJobRegistrationFails(t *testing.T) { - apiMock := nomad.ExecutorAPIMock{} - expectedErr := errors.New("test error") + apiMock := apiQuerierMock{} + expectedErr := tests.ErrDefault apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", expectedErr) - m := NomadEnvironmentManager{ - runnerManager: nil, - api: &apiMock, - defaultJob: nomadApi.Job{}, - } + apiClient := &APIClient{&apiMock} - _, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{}) - assert.Equal(t, expectedErr, err) + _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger, + 1, 2, 3, "image", false, []uint16{}) + assert.ErrorIs(t, err, expectedErr) apiMock.AssertNotCalled(t, "EvaluationStream") } func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { - apiMock := nomad.ExecutorAPIMock{} + apiMock := apiQuerierMock{} evaluationID := "id" + stream := make(chan *nomadApi.Events) + readonlyStream := func() <-chan *nomadApi.Events { + return stream + }() + // Immediately close stream to avoid any reading from it resulting in endless wait + close(stream) + apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) - apiMock.On("MonitorEvaluation", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(nil) + apiMock.On("EvaluationStream", evaluationID, mock.AnythingOfType("*context.emptyCtx")). + Return(readonlyStream, nil) - m := NomadEnvironmentManager{ - runnerManager: nil, - api: &apiMock, - defaultJob: nomadApi.Job{}, - } + apiClient := &APIClient{&apiMock} - _, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{}) + _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger, + 1, 2, 3, "image", false, []uint16{}) assert.NoError(t, err) } func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) { - apiMock := nomad.ExecutorAPIMock{} + apiMock := apiQuerierMock{} evaluationID := "id" - expectedErr := errors.New("test error") apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) - apiMock.On("MonitorEvaluation", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(expectedErr) + apiMock.On("EvaluationStream", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(nil, tests.ErrDefault) - m := NomadEnvironmentManager{ - runnerManager: nil, - api: &apiMock, - defaultJob: nomadApi.Job{}, - } + apiClient := &APIClient{&apiMock} - _, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{}) - assert.Equal(t, expectedErr, err) + _, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger, + 1, 2, 3, "image", false, []uint16{}) + assert.ErrorIs(t, err, tests.ErrDefault) } diff --git a/nomad/nomad.go b/nomad/nomad.go index 9702808..c7351bc 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -32,6 +32,12 @@ type ExecutorAPI interface { // LoadRunners loads all runners of the specified environment which are running and not about to get stopped. LoadRunners(environmentID string) (runnerIds []string, err error) + // RegisterTemplateJob creates a template job based on the default job configuration and the given parameters. + // It registers the job and waits until the registration completes. + RegisterTemplateJob(defaultJob *nomadApi.Job, environmentID int, + prewarmingPoolSize, cpuLimit, memoryLimit uint, + image string, networkAccess bool, exposedPorts []uint16) (*nomadApi.Job, error) + // LoadEnvironmentTemplate loads the template job of the specified environment. // Based on the template job new runners can be created. LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job, error) @@ -98,7 +104,7 @@ func (a *APIClient) LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job } func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error { - stream, err := a.EvaluationStream(evaluationID, ctx) + stream, err := a.apiQuerier.EvaluationStream(evaluationID, ctx) if err != nil { return fmt.Errorf("failed retrieving evaluation stream: %w", err) }