diff --git a/api/runners.go b/api/runners.go index e405bd4..761c541 100644 --- a/api/runners.go +++ b/api/runners.go @@ -52,6 +52,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req if err == runner.ErrUnknownExecutionEnvironment { writeNotFound(writer, err) } else if err == runner.ErrNoRunnersAvailable { + log.WithField("environment", environmentId).Warn("No runners available") writeInternalServerError(writer, err, dto.ErrorNomadOverload) } else { writeInternalServerError(writer, err, dto.ErrorUnknown) diff --git a/api/websocket.go b/api/websocket.go index 8599af7..615ffba 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -128,9 +128,7 @@ type webSocketProxy struct { // upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection. func upgradeConnection(writer http.ResponseWriter, request *http.Request) (webSocketConnection, error) { - connUpgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { - return true - }} + connUpgrader := websocket.Upgrader{} connection, err := connUpgrader.Upgrade(writer, request, nil) if err != nil { log.WithError(err).Warn("Connection upgrade failed") diff --git a/ci/demo-job.tpl.nomad b/ci/demo-job.tpl.nomad index 5e7f53a..dcfef67 100644 --- a/ci/demo-job.tpl.nomad +++ b/ci/demo-job.tpl.nomad @@ -1,6 +1,6 @@ // This job is used by the e2e tests as a demo job. -job "python" { +job "0-default" { datacenters = ["dc1"] type = "batch" namespace = "${NOMAD_NAMESPACE}" diff --git a/environment/default-job.hcl b/environment/default-job.hcl index 0dd6c2d..de439fc 100644 --- a/environment/default-job.hcl +++ b/environment/default-job.hcl @@ -1,6 +1,6 @@ // This is the default job configuration that is used when no path to another default configuration is given -job "python" { +job "0-default" { datacenters = ["dc1"] type = "batch" @@ -50,4 +50,31 @@ job "python" { } } } + + group "config" { + // We want to store whether a task is in use in order to recover from a downtime. + // Without a separate config task, marking a task as used would result in a restart of that task, + // as the meta information is passed to the container as environment variables. + count = 0 + task "config" { + driver = "exec" + config { + command = "whoami" + } + logs { + max_files = 1 + max_file_size = 1 + } + resources { + // minimum values + cpu = 1 + memory = 10 + } + } + meta { + environment = "0" + used = "false" + prewarmingPoolSize = "1" + } + } } diff --git a/environment/job.go b/environment/job.go index bb45e93..a3c2fdf 100644 --- a/environment/job.go +++ b/environment/job.go @@ -9,10 +9,6 @@ import ( "strconv" ) -const ( - DefaultTaskDriver = "docker" -) - // defaultJobHCL holds our default job in HCL format. // The default job is used when creating new job and provides // common settings that all the jobs share. @@ -22,13 +18,13 @@ var defaultJobHCL string // registerDefaultJob creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. func (m *NomadEnvironmentManager) registerDefaultJob( - id string, + environmentID string, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) error { - // TODO: store prewarming pool size in job meta information - job := createJob(m.defaultJob, nomad.DefaultJobID(id), prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + job := createDefaultJob(m.defaultJob, environmentID, prewarmingPoolSize, + cpuLimit, memoryLimit, image, networkAccess, exposedPorts) evalID, err := m.api.RegisterNomadJob(job) if err != nil { return err @@ -36,20 +32,21 @@ func (m *NomadEnvironmentManager) registerDefaultJob( return m.api.MonitorEvaluation(evalID, context.Background()) } -func createJob( +func createDefaultJob( defaultJob nomadApi.Job, - id string, + environmentID string, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) *nomadApi.Job { - job := defaultJob - job.ID = &id - job.Name = &id + defaultJobID := nomad.DefaultJobID(environmentID) + job.ID = &defaultJobID + job.Name = &defaultJobID var taskGroup = createTaskGroup(&job, nomad.TaskGroupName, prewarmingPoolSize) configureTask(taskGroup, nomad.TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + storeConfiguration(&job, environmentID, prewarmingPoolSize) return &job } @@ -137,16 +134,16 @@ func configureTask( exposedPorts []uint16) { var task *nomadApi.Task if len(taskGroup.Tasks) == 0 { - task = nomadApi.NewTask(name, DefaultTaskDriver) + task = nomadApi.NewTask(name, nomad.DefaultTaskDriver) taskGroup.Tasks = []*nomadApi.Task{task} } else { task = taskGroup.Tasks[0] task.Name = name } - integerCpuLimit := int(cpuLimit) + integerCPULimit := int(cpuLimit) integerMemoryLimit := int(memoryLimit) task.Resources = &nomadApi.Resources{ - CPU: &integerCpuLimit, + CPU: &integerCPULimit, MemoryMB: &integerMemoryLimit, } @@ -157,3 +154,44 @@ func configureTask( configureNetwork(taskGroup, networkAccess, exposedPorts) } + +func storeConfiguration(job *nomadApi.Job, id string, prewarmingPoolSize uint) { + taskGroup := getConfigTaskGroup(job) + checkForDummyTask(taskGroup) + + if taskGroup.Meta == nil { + taskGroup.Meta = make(map[string]string) + } + taskGroup.Meta[nomad.ConfigMetaEnvironmentKey] = id + taskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUnusedValue + taskGroup.Meta[nomad.ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize)) +} + +func getConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { + taskGroup := nomad.FindConfigTaskGroup(job) + if taskGroup == nil { + taskGroup = nomadApi.NewTaskGroup(nomad.ConfigTaskName, 0) + } + return taskGroup +} + +// checkForDummyTask ensures that a dummy task is in the task group so that the group is accepted by Nomad. +func checkForDummyTask(taskGroup *nomadApi.TaskGroup) { + var task *nomadApi.Task + for _, t := range taskGroup.Tasks { + if t.Name == nomad.ConfigTaskName { + task = t + break + } + } + + if task == nil { + task = nomadApi.NewTask(nomad.ConfigTaskName, nomad.DefaultConfigTaskDriver) + taskGroup.Tasks = append(taskGroup.Tasks, task) + } + + if task.Config == nil { + task.Config = make(map[string]interface{}) + } + task.Config["command"] = nomad.DefaultConfigTaskCommand +} diff --git a/environment/job_test.go b/environment/job_test.go index 690eaa1..9680a80 100644 --- a/environment/job_test.go +++ b/environment/job_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" + "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "testing" ) @@ -52,8 +53,9 @@ func createTestResources() *nomadApi.Resources { } func createTestJob() (*nomadApi.Job, *nomadApi.Job) { - base := nomadApi.NewBatchJob("python-job", "python-job", "region-name", 100) - job := nomadApi.NewBatchJob("python-job", "python-job", "region-name", 100) + jobID := nomad.DefaultJobID(tests.DefaultEnvironmentIDAsString) + base := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) + job := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100) task := createTestTask() task.Name = nomad.TaskName image := "python:latest" @@ -204,7 +206,7 @@ func TestConfigureTaskWhenNoTaskExists(t *testing.T) { expectedResources := createTestResources() expectedTaskGroup := *taskGroup - expectedTask := nomadApi.NewTask("task", DefaultTaskDriver) + expectedTask := nomadApi.NewTask("task", nomad.DefaultTaskDriver) expectedTask.Resources = expectedResources expectedImage := "python:latest" expectedTask.Config = map[string]interface{}{"image": expectedImage, "network_mode": "none"} @@ -246,9 +248,9 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { func TestCreateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.APIClient{}, *base} - job := createJob( + job := createDefaultJob( manager.defaultJob, - *testJob.ID, + tests.DefaultEnvironmentIDAsString, uint(*testJob.TaskGroups[0].Count), uint(*testJob.TaskGroups[0].Tasks[0].Resources.CPU), uint(*testJob.TaskGroups[0].Tasks[0].Resources.MemoryMB), diff --git a/environment/manager.go b/environment/manager.go index 389474b..756d740 100644 --- a/environment/manager.go +++ b/environment/manager.go @@ -13,10 +13,6 @@ var log = logging.GetLogger("environment") // Manager encapsulates API calls to the executor API for creation and deletion of execution environments. type Manager interface { - // Load fetches all already created execution environments from the executor and registers them at the runner manager. - // It should be called during the startup process (e.g. on creation of the Manager). - Load() - // CreateOrUpdate creates/updates an execution environment on the executor. // Iff the job was created, the returned boolean is true and the returned error is nil. CreateOrUpdate( @@ -30,7 +26,6 @@ type Manager interface { func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorAPI) *NomadEnvironmentManager { environmentManager := &NomadEnvironmentManager{runnerManager, apiClient, *parseJob(defaultJobHCL)} - environmentManager.Load() return environmentManager } @@ -66,11 +61,3 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( func (m *NomadEnvironmentManager) Delete(id string) { } - -func (m *NomadEnvironmentManager) Load() { - // ToDo: remove create default execution environment for debugging purposes - _, err := m.runnerManager.CreateOrUpdateEnvironment(runner.EnvironmentID(0), 5) - if err != nil { - return - } -} diff --git a/environment/manager_test.go b/environment/manager_test.go index 7bd9278..390c7f1 100644 --- a/environment/manager_test.go +++ b/environment/manager_test.go @@ -60,7 +60,7 @@ func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(exists bool) *mo } func (s *CreateOrUpdateTestSuite) createJobForRequest() *nomadApi.Job { - return createJob(s.manager.defaultJob, nomad.DefaultJobID(tests.DefaultEnvironmentIdAsString), + return createDefaultJob(s.manager.defaultJob, tests.DefaultEnvironmentIDAsString, s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } @@ -121,7 +121,7 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrec s.True(created) s.NoError(err) s.runnerManagerMock.AssertCalled(s.T(), "CreateOrUpdateEnvironment", - runner.EnvironmentID(tests.DefaultEnvironmentIdAsInteger), s.request.PrewarmingPoolSize) + runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request.PrewarmingPoolSize) } func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistOccurredErrorIsPassedAndNoEnvironmentRegistered() { @@ -130,5 +130,5 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistOccurredErrorIs s.registerNomadJobMockCall.Return("", tests.ErrDefault) created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsString, s.request) s.False(created) - s.Equal(tests.DefaultError, err) + s.Equal(tests.ErrDefault, err) } diff --git a/main.go b/main.go index 2f7fdfc..2810ff1 100644 --- a/main.go +++ b/main.go @@ -46,7 +46,10 @@ func initServer() *http.Server { log.WithError(err).WithField("nomad url", config.Config.NomadAPIURL()).Fatal("Error parsing the nomad url") } - runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, context.Background()) + runnerManager, err := runner.NewNomadRunnerManager(nomadAPIClient, context.Background()) + if err != nil { + log.WithError(err).Fatal("Error creating new Nomad runner manager") + } environmentManager := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) return &http.Server{ diff --git a/nomad/api_querier.go b/nomad/api_querier.go index a63d361..082c6fd 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -8,7 +8,9 @@ import ( "net/url" ) -var ErrNoAllocationsFound = errors.New("no allocation found") +var ( + ErrNoAllocationsFound = errors.New("no allocation found") +) // apiQuerier provides access to the Nomad functionality. type apiQuerier interface { @@ -51,9 +53,8 @@ type apiQuerier interface { // nomadAPIClient implements the nomadApiQuerier interface and provides access to a real Nomad API. type nomadAPIClient struct { - client *nomadApi.Client - namespace string - queryOptions *nomadApi.QueryOptions // ToDo: Remove + client *nomadApi.Client + namespace string } func (nc *nomadAPIClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { @@ -63,15 +64,11 @@ func (nc *nomadAPIClient) init(nomadURL *url.URL, nomadNamespace string) (err er Namespace: nomadNamespace, }) nc.namespace = nomadNamespace - nc.queryOptions = &nomadApi.QueryOptions{ - Namespace: nc.namespace, - } return err } func (nc *nomadAPIClient) DeleteRunner(runnerID string) (err error) { - // ToDo: Fix Namespace - _, _, err = nc.client.Jobs().Deregister(runnerID, true, nc.queryOptions) + _, _, err = nc.client.Jobs().Deregister(runnerID, true, nc.writeOptions()) return } @@ -89,7 +86,7 @@ func (nc *nomadAPIClient) Execute(jobID string, return nc.client.Allocations().Exec(ctx, allocation, TaskName, tty, command, stdin, stdout, stderr, nil, nil) } -func (nc *nomadApiClient) listJobs(prefix string) (jobs []*nomadApi.JobListStub, err error) { +func (nc *nomadAPIClient) listJobs(prefix string) (jobs []*nomadApi.JobListStub, err error) { q := nomadApi.QueryOptions{ Namespace: nc.namespace, Prefix: prefix, @@ -120,7 +117,7 @@ func (nc *nomadAPIClient) EvaluationStream(evalID string, ctx context.Context) ( nomadApi.TopicEvaluation: {evalID}, }, 0, - nc.queryOptions) + nc.queryOptions()) return } @@ -131,6 +128,18 @@ func (nc *nomadAPIClient) AllocationStream(ctx context.Context) (stream <-chan * nomadApi.TopicAllocation: {}, }, 0, - nc.queryOptions) + nc.queryOptions()) return } + +func (nc *nomadAPIClient) queryOptions() *nomadApi.QueryOptions { + return &nomadApi.QueryOptions{ + Namespace: nc.namespace, + } +} + +func (nc *nomadAPIClient) writeOptions() *nomadApi.WriteOptions { + return &nomadApi.WriteOptions{ + Namespace: nc.namespace, + } +} diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 0a001bb..96c23c7 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -142,6 +142,29 @@ func (_m *ExecutorAPIMock) JobScale(jobId string) (uint, error) { return r0, r1 } +// LoadAllJobs provides a mock function with given fields: +func (_m *ExecutorAPIMock) LoadAllJobs() ([]*api.Job, error) { + ret := _m.Called() + + var r0 []*api.Job + if rf, ok := ret.Get(0).(func() []*api.Job); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadJobList provides a mock function with given fields: func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { ret := _m.Called() @@ -188,6 +211,43 @@ func (_m *ExecutorAPIMock) LoadRunners(jobID string) ([]string, error) { return r0, r1 } +// LoadTemplateJob provides a mock function with given fields: environmentID +func (_m *ExecutorAPIMock) LoadTemplateJob(environmentID string) (*api.Job, error) { + ret := _m.Called(environmentID) + + var r0 *api.Job + if rf, ok := ret.Get(0).(func(string) *api.Job); ok { + r0 = rf(environmentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(environmentID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MarkRunnerAsUsed provides a mock function with given fields: runnerID +func (_m *ExecutorAPIMock) MarkRunnerAsUsed(runnerID string) error { + ret := _m.Called(runnerID) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(runnerID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // MonitorEvaluation provides a mock function with given fields: evaluationID, ctx func (_m *ExecutorAPIMock) MonitorEvaluation(evaluationID string, ctx context.Context) error { ret := _m.Called(evaluationID, ctx) @@ -289,7 +349,7 @@ func (_m *ExecutorAPIMock) jobInfo(jobID string) (*api.Job, error) { } // listJobs provides a mock function with given fields: prefix -func (_m *ExecutorApiMock) listJobs(prefix string) ([]*api.JobListStub, error) { +func (_m *ExecutorAPIMock) listJobs(prefix string) ([]*api.JobListStub, error) { ret := _m.Called(prefix) var r0 []*api.JobListStub diff --git a/nomad/job.go b/nomad/job.go index 7ce85cd..2392eb8 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -1,17 +1,56 @@ package nomad import ( + "fmt" nomadApi "github.com/hashicorp/nomad/api" + "strings" ) const ( - TaskGroupName = "default-group" - TaskName = "default-task" - DefaultJobIDFormat = "%s-default" + TaskGroupName = "default-group" + TaskName = "default-task" + ConfigTaskGroupName = "config" + ConfigTaskName = "config" + defaultRunnerJobID = "default" + runnerJobIDFormat = "%s-%s" + DefaultTaskDriver = "docker" + DefaultConfigTaskDriver = "exec" + DefaultConfigTaskCommand = "whoami" + ConfigMetaEnvironmentKey = "environment" + ConfigMetaUsedKey = "used" + ConfigMetaUsedValue = "true" + ConfigMetaUnusedValue = "false" + ConfigMetaPoolSizeKey = "prewarmingPoolSize" ) func DefaultJobID(id string) string { - return fmt.Sprintf(DefaultJobIDFormat, id) + return RunnerJobID(id, defaultRunnerJobID) +} + +func RunnerJobID(environmentID, runnerID string) string { + return fmt.Sprintf(runnerJobIDFormat, environmentID, runnerID) +} + +func IsDefaultJobID(jobID string) bool { + parts := strings.Split(jobID, "-") + return len(parts) == 2 && parts[1] == defaultRunnerJobID +} + +func FindConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup { + for _, tg := range job.TaskGroups { + if *tg.Name == ConfigTaskGroupName { + return tg + } + } + return nil +} + +func EnvironmentIDFromJobID(jobID string) string { + parts := strings.Split(jobID, "-") + if len(parts) == 0 { + return "" + } + return parts[0] } func (nc *nomadAPIClient) jobInfo(jobID string) (job *nomadApi.Job, err error) { @@ -21,13 +60,13 @@ func (nc *nomadAPIClient) jobInfo(jobID string) (job *nomadApi.Job, err error) { // LoadJobList loads the list of jobs from the Nomad api. func (nc *nomadAPIClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { - list, _, err = nc.client.Jobs().List(nc.queryOptions) + list, _, err = nc.client.Jobs().List(nc.queryOptions()) return } // JobScale returns the scale of the passed job. func (nc *nomadAPIClient) JobScale(jobID string) (jobScale uint, err error) { - status, _, err := nc.client.Jobs().ScaleStatus(jobID, nc.queryOptions) + status, _, err := nc.client.Jobs().ScaleStatus(jobID, nc.queryOptions()) if err != nil { return } diff --git a/nomad/nomad.go b/nomad/nomad.go index 7880fe0..a7eaa3b 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -18,6 +18,7 @@ var ( ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") errEvaluation = errors.New("evaluation could not complete") errPlacingAllocations = errors.New("failed to place all allocations") + errFindingTaskGroup = errors.New("no task group found") ) type AllocationProcessor func(*nomadApi.Allocation) @@ -26,6 +27,9 @@ type AllocationProcessor func(*nomadApi.Allocation) type ExecutorAPI interface { apiQuerier + // LoadAllJobs loads all existing jobs independent of the environment or if it is a template job. + LoadAllJobs() ([]*nomadApi.Job, error) + // LoadRunners loads all jobs of the specified environment which are running and not about to get stopped. LoadRunners(environmentID string) (runnerIds []string, err error) @@ -46,6 +50,9 @@ type ExecutorAPI interface { // If tty is true, the command will run with a tty. ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) + + // MarkRunnerAsUsed marks the runner with the given ID as used. + MarkRunnerAsUsed(runnerID string) error } // APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real @@ -221,6 +228,40 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) { return err } +func (a *APIClient) MarkRunnerAsUsed(runnerID string) error { + job, err := a.jobInfo(runnerID) + if err != nil { + return fmt.Errorf("couldn't retrieve job info: %w", err) + } + var taskGroup = FindConfigTaskGroup(job) + if taskGroup == nil { + return errFindingTaskGroup + } + taskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUsedValue + _, err = a.RegisterNomadJob(job) + if err != nil { + return fmt.Errorf("couldn't update runner config: %w", err) + } + return nil +} + +func (a *APIClient) LoadAllJobs() ([]*nomadApi.Job, error) { + jobStubs, err := a.LoadJobList() + if err != nil { + return []*nomadApi.Job{}, fmt.Errorf("couldn't load jobs: %w", err) + } + + jobs := make([]*nomadApi.Job, 0, len(jobStubs)) + for _, jobStub := range jobStubs { + job, err := a.apiQuerier.jobInfo(jobStub.ID) + if err != nil { + return []*nomadApi.Job{}, fmt.Errorf("couldn't load job info for job %v: %w", jobStub.ID, err) + } + jobs = append(jobs, job) + } + return jobs, nil +} + // nullReader is a struct that implements the io.Reader interface and returns nothing when reading // from it. type nullReader struct{} diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index 1a5d2d5..f728675 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -62,11 +62,11 @@ func newJobListStub(id, status string, amountRunning int) *nomadApi.JobListStub func (s *LoadRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { s.mock.On("listJobs", mock.AnythingOfType("string")). - Return(nil, tests.DefaultError) + Return(nil, tests.ErrDefault) - returnedIds, err := s.nomadApiClient.LoadRunners(suite.jobId) + returnedIds, err := s.nomadApiClient.LoadRunners(s.jobId) s.Nil(returnedIds) - s.Equal(tests.DefaultError, err) + s.Equal(tests.ErrDefault, err) } func (s *LoadRunnersTestSuite) TestReturnsNoErrorWhenUnderlyingApiCallDoesNot() { @@ -79,7 +79,7 @@ func (s *LoadRunnersTestSuite) TestReturnsNoErrorWhenUnderlyingApiCallDoesNot() func (s *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { s.mock.On("listJobs", mock.AnythingOfType("string")). - Return([]*nomadApi.JobListStub{suite.availableRunner}, nil) + Return([]*nomadApi.JobListStub{s.availableRunner}, nil) returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) s.Len(returnedIds, 1) diff --git a/runner/manager.go b/runner/manager.go index 1e3d405..95939f5 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/google/uuid" nomadApi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "strconv" @@ -20,8 +21,6 @@ var ( ErrRunnerNotFound = errors.New("no runner found with this id") ) -const runnerNameFormat = "%s-%s" - type EnvironmentID int func (e EnvironmentID) toString() string { @@ -35,7 +34,7 @@ type NomadJobID string type Manager interface { // CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates // the existing environment and all runners. - CreateOrUpdateEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint) (bool, error) + CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error) // Claim returns a new runner. // It makes sure that the runner is not in use yet and returns an error if no runner could be provided. @@ -59,14 +58,18 @@ type NomadRunnerManager struct { // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. // It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad. // If you cancel the context the background synchronization will be stopped. -func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { +func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) (*NomadRunnerManager, error) { m := &NomadRunnerManager{ apiClient, NewLocalNomadJobStorage(), NewLocalRunnerStorage(), } + err := m.loadExistingEnvironments() + if err != nil { + return nil, err + } go m.updateRunners(ctx) - return m + return m, nil } type NomadEnvironment struct { @@ -140,7 +143,7 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn errorResult := strings.Join(occurredErrors, "\n") return fmt.Errorf("%d errors occurred when updating environment: %s", len(occurredErrors), errorResult) } - return nil + return m.scaleEnvironment(id) } func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) { @@ -153,10 +156,16 @@ func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) return nil, ErrNoRunnersAvailable } m.usedRunners.Add(runner) - err := m.scaleEnvironment(environmentID) + err := m.apiClient.MarkRunnerAsUsed(runner.Id()) if err != nil { - return nil, fmt.Errorf("can not scale up: %w", err) + return nil, fmt.Errorf("can't mark runner as used: %w", err) } + + err = m.scaleEnvironment(environmentID) + if err != nil { + log.WithError(err).WithField("environmentID", environmentID).Error("Couldn't scale environment") + } + return runner, nil } @@ -188,31 +197,37 @@ func (m *NomadRunnerManager) updateRunners(ctx context.Context) { } func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { - log.WithField("id", alloc.ID).Debug("Allocation started") + log.WithField("id", alloc.JobID).Debug("Runner started") - intJobID, err := strconv.Atoi(alloc.JobID) + if nomad.IsDefaultJobID(alloc.JobID) { + return + } + + environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID) + intEnvironmentID, err := strconv.Atoi(environmentID) if err != nil { return } - job, ok := m.environments.Get(EnvironmentID(intJobID)) + job, ok := m.environments.Get(EnvironmentID(intEnvironmentID)) if ok { - job.idleRunners.Add(NewNomadAllocation(alloc.ID, m.apiClient)) + job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient)) } } func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) { - log.WithField("id", alloc.ID).Debug("Allocation stopped") + log.WithField("id", alloc.JobID).Debug("Runner stopped") - intJobID, err := strconv.Atoi(alloc.JobID) + environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID) + intEnvironmentID, err := strconv.Atoi(environmentID) if err != nil { return } - m.usedRunners.Delete(alloc.ID) - job, ok := m.environments.Get(EnvironmentID(intJobID)) + m.usedRunners.Delete(alloc.JobID) + job, ok := m.environments.Get(EnvironmentID(intEnvironmentID)) if ok { - job.idleRunners.Delete(alloc.ID) + job.idleRunners.Delete(alloc.JobID) } } @@ -224,6 +239,9 @@ func (m *NomadRunnerManager) scaleEnvironment(id EnvironmentID) error { } required := int(environment.desiredIdleRunnersCount) - environment.idleRunners.Length() + + log.WithField("required", required).Debug("Scaling environment") + for i := 0; i < required; i++ { err := m.createRunner(environment) if err != nil { @@ -238,7 +256,7 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error { if err != nil { return fmt.Errorf("failed generating runner id") } - newRunnerID := fmt.Sprintf(runnerNameFormat, environment.ID().toString(), newUUID.String()) + newRunnerID := nomad.RunnerJobID(environment.ID().toString(), newUUID.String()) template := *environment.templateJob template.ID = &newRunnerID @@ -252,11 +270,11 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error { if err != nil { return fmt.Errorf("couldn't monitor evaluation: %w", err) } - environment.idleRunners.Add(NewNomadJob(newRunnerID, m.apiClient)) return nil } -func (m *NomadRunnerManager) unusedRunners(environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) { +func (m *NomadRunnerManager) unusedRunners( + environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) { newRunners = make([]Runner, 0) job, ok := m.environments.Get(environmentID) if !ok { @@ -272,5 +290,80 @@ func (m *NomadRunnerManager) unusedRunners(environmentID EnvironmentID, fetchedR } } } - return + return newRunners +} + +func (m *NomadRunnerManager) loadExistingEnvironments() error { + jobs, err := m.apiClient.LoadAllJobs() + if err != nil { + return fmt.Errorf("can't load template jobs: %w", err) + } + + for _, job := range jobs { + m.loadExistingJob(job) + } + + for _, environmentID := range m.environments.List() { + err := m.scaleEnvironment(environmentID) + if err != nil { + return fmt.Errorf("can not scale up: %w", err) + } + } + + return nil +} + +func (m *NomadRunnerManager) loadExistingJob(job *nomadApi.Job) { + if *job.Status != structs.JobStatusRunning { + return + } + + jobLogger := log.WithField("jobID", *job.ID) + + configTaskGroup := nomad.FindConfigTaskGroup(job) + if configTaskGroup == nil { + jobLogger.Info("Couldn't find config task group in job, skipping ...") + return + } + + if configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue { + m.usedRunners.Add(NewNomadJob(*job.ID, m.apiClient)) + jobLogger.Info("Added job to usedRunners") + return + } + + environmentID := configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey] + environmentIDInt, err := strconv.Atoi(environmentID) + if err != nil { + jobLogger.WithField("environmentID", environmentID). + WithError(err). + Error("Couldn't convert environment id of template job to int") + return + } + + environment, ok := m.environments.Get(EnvironmentID(environmentIDInt)) + if !ok { + desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey]) + if err != nil { + jobLogger.WithError(err).Error("Couldn't convert pool size to int") + return + } + + environment = &NomadEnvironment{ + environmentID: EnvironmentID(environmentIDInt), + idleRunners: NewLocalRunnerStorage(), + desiredIdleRunnersCount: uint(desiredIdleRunnersCount), + } + m.environments.Add(environment) + log.WithField("environmentID", environment.environmentID).Info("Added existing environment") + } + + if nomad.IsDefaultJobID(*job.ID) { + environment.templateJob = job + } else { + log.WithField("jobID", *job.ID). + WithField("environmentID", environment.environmentID). + Info("Added idle runner") + environment.idleRunners.Add(NewNomadJob(*job.ID, m.apiClient)) + } } diff --git a/runner/manager_mock.go b/runner/manager_mock.go index de7b942..a763a30 100644 --- a/runner/manager_mock.go +++ b/runner/manager_mock.go @@ -10,11 +10,11 @@ type ManagerMock struct { } // Claim provides a mock function with given fields: id -func (_m *ManagerMock) Claim(id EnvironmentId) (Runner, error) { +func (_m *ManagerMock) Claim(id EnvironmentID) (Runner, error) { ret := _m.Called(id) var r0 Runner - if rf, ok := ret.Get(0).(func(EnvironmentId) Runner); ok { + if rf, ok := ret.Get(0).(func(EnvironmentID) Runner); ok { r0 = rf(id) } else { if ret.Get(0) != nil { @@ -23,7 +23,7 @@ func (_m *ManagerMock) Claim(id EnvironmentId) (Runner, error) { } var r1 error - if rf, ok := ret.Get(1).(func(EnvironmentId) error); ok { + if rf, ok := ret.Get(1).(func(EnvironmentID) error); ok { r1 = rf(id) } else { r1 = ret.Error(1) @@ -32,20 +32,20 @@ func (_m *ManagerMock) Claim(id EnvironmentId) (Runner, error) { return r0, r1 } -// CreateOrUpdateEnvironment provides a mock function with given fields: environmentId, desiredIdleRunnersCount -func (_m *ManagerMock) CreateOrUpdateEnvironment(environmentId EnvironmentId, desiredIdleRunnersCount uint) (bool, error) { - ret := _m.Called(environmentId, desiredIdleRunnersCount) +// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount +func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error) { + ret := _m.Called(id, desiredIdleRunnersCount) var r0 bool - if rf, ok := ret.Get(0).(func(EnvironmentId, uint) bool); ok { - r0 = rf(environmentId, desiredIdleRunnersCount) + if rf, ok := ret.Get(0).(func(EnvironmentID, uint) bool); ok { + r0 = rf(id, desiredIdleRunnersCount) } else { r0 = ret.Get(0).(bool) } var r1 error - if rf, ok := ret.Get(1).(func(EnvironmentId, uint) error); ok { - r1 = rf(environmentId, desiredIdleRunnersCount) + if rf, ok := ret.Get(1).(func(EnvironmentID, uint) error); ok { + r1 = rf(id, desiredIdleRunnersCount) } else { r1 = ret.Error(1) } @@ -53,13 +53,13 @@ func (_m *ManagerMock) CreateOrUpdateEnvironment(environmentId EnvironmentId, de return r0, r1 } -// Get provides a mock function with given fields: runnerId -func (_m *ManagerMock) Get(runnerId string) (Runner, error) { - ret := _m.Called(runnerId) +// Get provides a mock function with given fields: runnerID +func (_m *ManagerMock) Get(runnerID string) (Runner, error) { + ret := _m.Called(runnerID) var r0 Runner if rf, ok := ret.Get(0).(func(string) Runner); ok { - r0 = rf(runnerId) + r0 = rf(runnerID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(Runner) @@ -68,7 +68,7 @@ func (_m *ManagerMock) Get(runnerId string) (Runner, error) { var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(runnerId) + r1 = rf(runnerID) } else { r1 = ret.Error(1) } diff --git a/runner/manager_test.go b/runner/manager_test.go index 0b55460..98011a0 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -31,13 +31,16 @@ type ManagerTestSuite struct { func (s *ManagerTestSuite) SetupTest() { s.apiMock = &nomad.ExecutorAPIMock{} + mockRunnerQueries(s.apiMock, []string{}) // Instantly closed context to manually start the update process in some cases ctx, cancel := context.WithCancel(context.Background()) cancel() - s.nomadRunnerManager = NewNomadRunnerManager(s.apiMock, ctx) + var err error + + s.nomadRunnerManager, err = NewNomadRunnerManager(s.apiMock, ctx) + s.Require().NoError(err) s.exerciseRunner = NewRunner(tests.DefaultRunnerID) - mockRunnerQueries(s.apiMock, []string{}) s.registerDefaultEnvironment() } @@ -49,6 +52,8 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin <-time.After(10 * time.Minute) // 10 minutes is the default test timeout call.ReturnArguments = mock.Arguments{nil} }) + apiMock.On("LoadAllJobs").Return([]*nomadApi.Job{}, nil) + apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string")).Return(nil) apiMock.On("LoadRunners", tests.DefaultJobID).Return(returnedRunnerIds, nil) apiMock.On("JobScale", tests.DefaultJobID).Return(uint(len(returnedRunnerIds)), nil) apiMock.On("SetJobScale", tests.DefaultJobID, mock.AnythingOfType("uint"), "Runner Requested").Return(nil) @@ -58,7 +63,7 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin } func (s *ManagerTestSuite) registerDefaultEnvironment() { - err := s.nomadRunnerManager.registerEnvironment(defaultEnvironmentId, 0) + err := s.nomadRunnerManager.registerEnvironment(defaultEnvironmentID, 0) s.Require().NoError(err) } @@ -72,9 +77,9 @@ func (s *ManagerTestSuite) waitForRunnerRefresh() { } func (s *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() { - err := s.nomadRunnerManager.registerEnvironment(anotherEnvironmentId, defaultDesiredRunnersCount) + err := s.nomadRunnerManager.registerEnvironment(anotherEnvironmentID, defaultDesiredRunnersCount) s.Require().NoError(err) - job, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentId) + job, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.True(ok) s.NotNil(job) } @@ -125,7 +130,7 @@ func (s *ManagerTestSuite) TestClaimThrowsAnErrorIfNoRunnersAvailable() { func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) - receivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentId) + receivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentID) savedRunner, ok := s.nomadRunnerManager.usedRunners.Get(receivedRunner.Id()) s.True(ok) s.Equal(savedRunner, receivedRunner) @@ -188,9 +193,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { allocation := &nomadApi.Allocation{ID: tests.DefaultRunnerID} - defaultJob, ok := s.nomadRunnerManager.jobs.Get(defaultEnvironmentID) + defaultJob, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.Require().True(ok) - allocation.JobID = string(defaultJob.jobID) + allocation.JobID = defaultJob.environmentID.toString() _, ok = defaultJob.idleRunners.Get(allocation.ID) s.Require().False(ok) @@ -215,9 +220,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { allocation := &nomadApi.Allocation{ID: tests.DefaultRunnerID} - defaultJob, ok := s.nomadRunnerManager.jobs.Get(defaultEnvironmentID) + defaultJob, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) s.Require().True(ok) - allocation.JobID = string(defaultJob.jobID) + allocation.JobID = defaultJob.environmentID.toString() testRunner := NewRunner(allocation.ID) defaultJob.idleRunners.Add(testRunner) diff --git a/runner/nomad_environment_storage.go b/runner/nomad_environment_storage.go index a3a38d4..d294db5 100644 --- a/runner/nomad_environment_storage.go +++ b/runner/nomad_environment_storage.go @@ -6,6 +6,9 @@ import ( // NomadEnvironmentStorage is an interface for storing NomadJobs. type NomadEnvironmentStorage interface { + // List returns all keys of environments stored in this storage. + List() []EnvironmentID + // Add adds a job to the storage. // It overwrites the old job if one with the same id was already stored. Add(job *NomadEnvironment) @@ -36,6 +39,14 @@ func NewLocalNomadJobStorage() *localNomadJobStorage { } } +func (s *localNomadJobStorage) List() []EnvironmentID { + keys := make([]EnvironmentID, 0, len(s.jobs)) + for k := range s.jobs { + keys = append(keys, k) + } + return keys +} + func (s *localNomadJobStorage) Add(job *NomadEnvironment) { s.Lock() defer s.Unlock() diff --git a/runner/nomad_environment_storage_test.go b/runner/nomad_environment_storage_test.go index 80e1bbe..0da9e76 100644 --- a/runner/nomad_environment_storage_test.go +++ b/runner/nomad_environment_storage_test.go @@ -16,9 +16,9 @@ type JobStoreTestSuite struct { job *NomadEnvironment } -func (suite *JobStoreTestSuite) SetupTest() { - suite.jobStorage = NewLocalNomadJobStorage() - suite.job = &NomadEnvironment{environmentID: defaultEnvironmentId} +func (s *JobStoreTestSuite) SetupTest() { + s.jobStorage = NewLocalNomadJobStorage() + s.job = &NomadEnvironment{environmentID: defaultEnvironmentID} } func (s *JobStoreTestSuite) TestAddedJobCanBeRetrieved() { @@ -28,10 +28,10 @@ func (s *JobStoreTestSuite) TestAddedJobCanBeRetrieved() { s.Equal(s.job, retrievedJob) } -func (suite *JobStoreTestSuite) TestJobWithSameIdOverwritesOldOne() { - otherJobWithSameID := &NomadEnvironment{environmentID: defaultEnvironmentId} +func (s *JobStoreTestSuite) TestJobWithSameIdOverwritesOldOne() { + otherJobWithSameID := &NomadEnvironment{environmentID: defaultEnvironmentID} otherJobWithSameID.templateJob = &nomadApi.Job{} - suite.NotEqual(suite.job, otherJobWithSameID) + s.NotEqual(s.job, otherJobWithSameID) s.jobStorage.Add(s.job) s.jobStorage.Add(otherJobWithSameID) diff --git a/runner/runner.go b/runner/runner.go index b189ac4..fa4934f 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -58,11 +58,6 @@ type NomadJob struct { api nomad.ExecutorAPI } -// NewRunner creates a new runner with the provided id. -func NewRunner(id string) Runner { - return NewNomadJob(id, nil) -} - // NewNomadJob creates a new NomadJob with the provided id. func NewNomadJob(id string, apiClient nomad.ExecutorAPI) *NomadJob { return &NomadJob{ diff --git a/runner/runner_test.go b/runner/runner_test.go index eea70c9..85d29b0 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -254,5 +254,5 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader // NewRunner creates a new runner with the provided id. func NewRunner(id string) Runner { - return NewNomadAllocation(id, nil) + return NewNomadJob(id, nil) } diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index 9d8706e..264ac8a 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -3,8 +3,13 @@ package e2e import ( nomadApi "github.com/hashicorp/nomad/api" "github.com/stretchr/testify/suite" + "gitlab.hpi.de/codeocean/codemoon/poseidon/api" + "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/config" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" + "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" + "gitlab.hpi.de/codeocean/codemoon/poseidon/tests/helpers" + "net/http" "os" "testing" "time" @@ -51,8 +56,35 @@ func TestMain(m *testing.M) { if err != nil { log.WithError(err).Fatal("Could not create Nomad client") } - // ToDo: Add Nomad job here when it is possible to create execution environments. See #26. log.Info("Test Run") + createDefaultEnvironment() + + // wait for environment to become ready + <-time.After(10 * time.Second) + code := m.Run() + cleanupJobsForEnvironment(&testing.T{}, "0") os.Exit(code) } + +func createDefaultEnvironment() { + path := helpers.BuildURL(api.BasePath, api.EnvironmentsPath, tests.DefaultEnvironmentIDAsString) + + request := dto.ExecutionEnvironmentRequest{ + PrewarmingPoolSize: 10, + CPULimit: 100, + MemoryLimit: 100, + Image: "drp.codemoon.xopic.de/openhpi/co_execenv_python:3.8", + NetworkAccess: false, + ExposedPorts: nil, + } + + resp, err := helpers.HttpPutJSON(path, request) + if err != nil || resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusNoContent { + log.Fatal("Couldn't create default environment for e2e tests") + } + err = resp.Body.Close() + if err != nil { + log.Fatal("Failed closing body") + } +} diff --git a/tests/e2e/environments_test.go b/tests/e2e/environments_test.go index d483b71..f5da93c 100644 --- a/tests/e2e/environments_test.go +++ b/tests/e2e/environments_test.go @@ -6,8 +6,10 @@ import ( "github.com/stretchr/testify/require" "gitlab.hpi.de/codeocean/codemoon/poseidon/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests/helpers" + "io" "net/http" "strings" "testing" @@ -67,10 +69,22 @@ func TestCreateOrUpdateEnvironment(t *testing.T) { validateJob(t, request) }) - _, _, err := nomadClient.Jobs().DeregisterOpts( - tests.AnotherEnvironmentIDAsString, &nomadApi.DeregisterOptions{Purge: true}, nil) + cleanupJobsForEnvironment(t, tests.AnotherEnvironmentIDAsString) +} + +func cleanupJobsForEnvironment(t *testing.T, environmentID string) { + t.Helper() + + jobListStub, _, err := nomadClient.Jobs().List(&nomadApi.QueryOptions{Prefix: environmentID}) if err != nil { - t.Fatalf("Error when removing test job %v", err) + t.Fatalf("Error when listing test jobs: %v", err) + } + + for _, j := range jobListStub { + _, _, err := nomadClient.Jobs().DeregisterOpts(j.ID, &nomadApi.DeregisterOptions{Purge: true}, nil) + if err != nil { + t.Fatalf("Error when removing test job %v", err) + } } } @@ -81,7 +95,9 @@ func assertPutReturnsStatusAndZeroContent(t *testing.T, path string, require.Nil(t, err) assert.Equal(t, status, resp.StatusCode) assert.Equal(t, int64(0), resp.ContentLength) - + content, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Empty(t, string(content)) _ = resp.Body.Close() } @@ -91,7 +107,7 @@ func validateJob(t *testing.T, expected dto.ExecutionEnvironmentRequest) { assertEqualValueStringPointer(t, nomadNamespace, job.Namespace) assertEqualValueStringPointer(t, "batch", job.Type) - require.Equal(t, 1, len(job.TaskGroups)) + require.Equal(t, 2, len(job.TaskGroups)) taskGroup := job.TaskGroups[0] require.NotNil(t, taskGroup.Count) @@ -123,7 +139,7 @@ func validateJob(t *testing.T, expected dto.ExecutionEnvironmentRequest) { func findNomadJob(t *testing.T, jobID string) *nomadApi.Job { t.Helper() - job, _, err := nomadClient.Jobs().Info(jobID, nil) + job, _, err := nomadClient.Jobs().Info(nomad.DefaultJobID(jobID), nil) if err != nil { t.Fatalf("Error retrieving Nomad job: %v", err) }