From c7d59810e5e05f6386b66f50a1362b5216b6765d Mon Sep 17 00:00:00 2001 From: sirkrypt0 <22522058+sirkrypt0@users.noreply.github.com> Date: Tue, 8 Jun 2021 14:42:35 +0200 Subject: [PATCH] Use Nomad jobs as runners instead of allocations As we can't control which allocations are destroyed when downscaling a job, we decided to use Nomad jobs as our runners. Thus for each runner we prewarm for an environment, a corresponding job is created in Nomad. We create a default job that serves as a template for the runners. Using this, already existing execution environments can easily be restored, once Poseidon is restarted. --- api/runners_test.go | 4 +- api/websocket.go | 4 +- api/websocket_test.go | 2 +- environment/job.go | 9 +- environment/job_test.go | 7 +- environment/manager.go | 26 ++-- environment/manager_mock.go | 2 +- environment/manager_test.go | 9 +- nomad/api_querier.go | 41 +++--- nomad/api_querier_mock.go | 49 +++++-- nomad/executor_api_mock.go | 49 +++++-- nomad/job.go | 14 +- nomad/nomad.go | 29 +++-- nomad/nomad_test.go | 81 +++++------- runner/manager.go | 123 ++++++++++-------- runner/manager_test.go | 59 +++------ ...torage.go => nomad_environment_storage.go} | 20 +-- ...t.go => nomad_environment_storage_test.go} | 21 ++- runner/runner.go | 32 +++-- runner/runner_test.go | 18 +-- 20 files changed, 333 insertions(+), 266 deletions(-) rename runner/{nomad_job_storage.go => nomad_environment_storage.go} (67%) rename runner/{nomad_job_storage_test.go => nomad_environment_storage_test.go} (73%) diff --git a/api/runners_test.go b/api/runners_test.go index a663a4c..2610506 100644 --- a/api/runners_test.go +++ b/api/runners_test.go @@ -29,7 +29,7 @@ type MiddlewareTestSuite struct { func (s *MiddlewareTestSuite) SetupTest() { s.manager = &runner.ManagerMock{} - s.runner = runner.NewNomadAllocation("runner", nil) + s.runner = runner.NewNomadJob("runner", nil) s.capturedRunner = nil s.runnerRequest = func(runnerId string) *http.Request { path, err := s.router.Get("test-runner-id").URL(RunnerIdKey, runnerId) @@ -92,7 +92,7 @@ type RunnerRouteTestSuite struct { func (s *RunnerRouteTestSuite) SetupTest() { s.runnerManager = &runner.ManagerMock{} s.router = NewRouter(s.runnerManager, nil) - s.runner = runner.NewNomadAllocation("some-id", nil) + s.runner = runner.NewNomadJob("some-id", nil) s.executionId = "execution-id" s.runner.Add(s.executionId, &dto.ExecutionRequest{}) s.runnerManager.On("Get", s.runner.Id()).Return(s.runner, nil) diff --git a/api/websocket.go b/api/websocket.go index 615ffba..8599af7 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -128,7 +128,9 @@ type webSocketProxy struct { // upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection. func upgradeConnection(writer http.ResponseWriter, request *http.Request) (webSocketConnection, error) { - connUpgrader := websocket.Upgrader{} + connUpgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { + return true + }} connection, err := connUpgrader.Upgrade(writer, request, nil) if err != nil { log.WithError(err).Warn("Connection upgrade failed") diff --git a/api/websocket_test.go b/api/websocket_test.go index 5b6a362..9db00ea 100644 --- a/api/websocket_test.go +++ b/api/websocket_test.go @@ -309,7 +309,7 @@ func TestRawToCodeOceanWriter(t *testing.T) { func newNomadAllocationWithMockedApiClient(runnerId string) (r runner.Runner, mock *nomad.ExecutorAPIMock) { mock = &nomad.ExecutorAPIMock{} - r = runner.NewNomadAllocation(runnerId, mock) + r = runner.NewNomadJob(runnerId, mock) return } diff --git a/environment/job.go b/environment/job.go index ba8642a..bb45e93 100644 --- a/environment/job.go +++ b/environment/job.go @@ -19,15 +19,16 @@ const ( //go:embed default-job.hcl var defaultJobHCL string -// registerJob creates a Nomad job based on the default job configuration and the given parameters. +// registerDefaultJob creates a Nomad job based on the default job configuration and the given parameters. // It registers the job with Nomad and waits until the registration completes. -func (m *NomadEnvironmentManager) registerJob( +func (m *NomadEnvironmentManager) registerDefaultJob( id string, prewarmingPoolSize, cpuLimit, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) error { - job := createJob(m.defaultJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + // TODO: store prewarming pool size in job meta information + job := createJob(m.defaultJob, nomad.DefaultJobID(id), prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) evalID, err := m.api.RegisterNomadJob(job) if err != nil { return err @@ -76,7 +77,7 @@ func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *n } else { taskGroup = job.TaskGroups[0] taskGroup.Name = &name - count := int(prewarmingPoolSize) + count := 1 taskGroup.Count = &count } return taskGroup diff --git a/environment/job_test.go b/environment/job_test.go index cce0d25..690eaa1 100644 --- a/environment/job_test.go +++ b/environment/job_test.go @@ -96,7 +96,6 @@ func TestCreateTaskGroupOverwritesOptionsWhenJobHasTaskGroup(t *testing.T) { // create a new copy to avoid changing the original one as it is a pointer expectedTaskGroup := *existingTaskGroup expectedTaskGroup.Name = &newName - expectedTaskGroup.Count = &newCount assert.Equal(t, expectedTaskGroup, *taskGroup) assert.Equal(t, newTaskGroupList, job.TaskGroups, "it should not modify the jobs task group list") @@ -272,7 +271,7 @@ func TestRegisterJobWhenNomadJobRegistrationFails(t *testing.T) { defaultJob: nomadApi.Job{}, } - err := m.registerJob("id", 1, 2, 3, "image", false, []uint16{}) + err := m.registerDefaultJob("id", 1, 2, 3, "image", false, []uint16{}) assert.Equal(t, expectedErr, err) apiMock.AssertNotCalled(t, "EvaluationStream") } @@ -290,7 +289,7 @@ func TestRegisterJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { defaultJob: nomadApi.Job{}, } - err := m.registerJob("id", 1, 2, 3, "image", false, []uint16{}) + err := m.registerDefaultJob("id", 1, 2, 3, "image", false, []uint16{}) assert.NoError(t, err) } @@ -308,6 +307,6 @@ func TestRegisterJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) { defaultJob: nomadApi.Job{}, } - err := m.registerJob("id", 1, 2, 3, "image", false, []uint16{}) + err := m.registerDefaultJob("id", 1, 2, 3, "image", false, []uint16{}) assert.Equal(t, expectedErr, err) } diff --git a/environment/manager.go b/environment/manager.go index 88bb3a8..947fdbd 100644 --- a/environment/manager.go +++ b/environment/manager.go @@ -50,18 +50,23 @@ func (m *NomadEnvironmentManager) CreateOrUpdate( } exists := m.runnerManager.EnvironmentExists(runner.EnvironmentID(idInt)) - err = m.registerJob(id, + err = m.registerDefaultJob(id, request.PrewarmingPoolSize, request.CPULimit, request.MemoryLimit, request.Image, request.NetworkAccess, request.ExposedPorts) - if err == nil { - if !exists { - m.runnerManager.RegisterEnvironment( - runner.EnvironmentID(idInt), runner.NomadJobID(id), request.PrewarmingPoolSize) - } - return !exists, nil + if err != nil { + return false, err } - return false, err + + // TODO: If already exists, make sure to update all existing runners as well + if !exists { + err = m.runnerManager.RegisterEnvironment( + runner.EnvironmentID(idInt), request.PrewarmingPoolSize) + if err != nil { + return false, err + } + } + return !exists, nil } func (m *NomadEnvironmentManager) Delete(id string) { @@ -70,5 +75,8 @@ func (m *NomadEnvironmentManager) Delete(id string) { func (m *NomadEnvironmentManager) Load() { // ToDo: remove create default execution environment for debugging purposes - m.runnerManager.RegisterEnvironment(runner.EnvironmentID(0), "python", 5) + err := m.runnerManager.RegisterEnvironment(runner.EnvironmentID(0), 5) + if err != nil { + return + } } diff --git a/environment/manager_mock.go b/environment/manager_mock.go index 96db957..8065491 100644 --- a/environment/manager_mock.go +++ b/environment/manager_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.8.0. DO NOT EDIT. package environment diff --git a/environment/manager_test.go b/environment/manager_test.go index d86335e..8834e82 100644 --- a/environment/manager_test.go +++ b/environment/manager_test.go @@ -28,6 +28,8 @@ func TestCreateOrUpdateTestSuite(t *testing.T) { func (s *CreateOrUpdateTestSuite) SetupTest() { s.runnerManagerMock = runner.ManagerMock{} + s.runnerManagerMock.On("RegisterEnvironment", mock.Anything, mock.Anything).Return(nil) + s.apiMock = nomad.ExecutorAPIMock{} s.request = dto.ExecutionEnvironmentRequest{ PrewarmingPoolSize: 10, @@ -53,12 +55,12 @@ func (s *CreateOrUpdateTestSuite) mockEnvironmentExists(exists bool) { func (s *CreateOrUpdateTestSuite) mockRegisterEnvironment() *mock.Call { return s.runnerManagerMock.On("RegisterEnvironment", - mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("NomadJobID"), mock.AnythingOfType("uint")). + mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("uint")). Return() } func (s *CreateOrUpdateTestSuite) createJobForRequest() *nomadApi.Job { - return createJob(s.manager.defaultJob, tests.DefaultEnvironmentIDAsString, + return createJob(s.manager.defaultJob, nomad.DefaultJobID(tests.DefaultEnvironmentIdAsString), s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit, s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts) } @@ -121,8 +123,7 @@ func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrec s.True(created) s.NoError(err) s.runnerManagerMock.AssertCalled(s.T(), "RegisterEnvironment", - runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), - runner.NomadJobID(tests.DefaultEnvironmentIDAsString), + runner.EnvironmentID(tests.DefaultEnvironmentIdAsInteger), s.request.PrewarmingPoolSize) } diff --git a/nomad/api_querier.go b/nomad/api_querier.go index c579e46..a63d361 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -2,11 +2,14 @@ package nomad import ( "context" + "errors" nomadApi "github.com/hashicorp/nomad/api" "io" "net/url" ) +var ErrNoAllocationsFound = errors.New("no allocation found") + // apiQuerier provides access to the Nomad functionality. type apiQuerier interface { // init prepares an apiClient to be able to communicate to a provided Nomad API. @@ -24,12 +27,15 @@ type apiQuerier interface { // DeleteRunner deletes the runner with the given Id. DeleteRunner(runnerId string) (err error) - // Execute runs a command in the passed allocation. - Execute(allocationID string, ctx context.Context, command []string, tty bool, + // Execute runs a command in the passed job. + Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) - // loadRunners loads all allocations of the specified job. - loadRunners(jobId string) (allocationListStub []*nomadApi.AllocationListStub, err error) + // listJobs loads all jobs with the specified prefix. + listJobs(prefix string) (allocationListStub []*nomadApi.JobListStub, err error) + + // jobInfo returns the job of the given jobID. + jobInfo(jobID string) (job *nomadApi.Job, err error) // RegisterNomadJob registers a job with Nomad. // It returns the evaluation ID that can be used when listening to the Nomad event stream. @@ -47,7 +53,7 @@ type apiQuerier interface { type nomadAPIClient struct { client *nomadApi.Client namespace string - queryOptions *nomadApi.QueryOptions + queryOptions *nomadApi.QueryOptions // ToDo: Remove } func (nc *nomadAPIClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { @@ -64,26 +70,31 @@ func (nc *nomadAPIClient) init(nomadURL *url.URL, nomadNamespace string) (err er } func (nc *nomadAPIClient) DeleteRunner(runnerID string) (err error) { - allocation, _, err := nc.client.Allocations().Info(runnerID, nc.queryOptions) - if err != nil { - return - } - _, err = nc.client.Allocations().Stop(allocation, nil) - return err + // ToDo: Fix Namespace + _, _, err = nc.client.Jobs().Deregister(runnerID, true, nc.queryOptions) + return } -func (nc *nomadAPIClient) Execute(allocationID string, +func (nc *nomadAPIClient) Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) { - allocation, _, err := nc.client.Allocations().Info(allocationID, nil) + allocations, _, err := nc.client.Jobs().Allocations(jobID, false, nil) + if len(allocations) == 0 { + return 1, ErrNoAllocationsFound + } + allocation, _, err := nc.client.Allocations().Info(allocations[0].ID, nil) if err != nil { return 1, err } return nc.client.Allocations().Exec(ctx, allocation, TaskName, tty, command, stdin, stdout, stderr, nil, nil) } -func (nc *nomadAPIClient) loadRunners(jobID string) (allocationListStub []*nomadApi.AllocationListStub, err error) { - allocationListStub, _, err = nc.client.Jobs().Allocations(jobID, true, nc.queryOptions) +func (nc *nomadApiClient) listJobs(prefix string) (jobs []*nomadApi.JobListStub, err error) { + q := nomadApi.QueryOptions{ + Namespace: nc.namespace, + Prefix: prefix, + } + jobs, _, err = nc.client.Jobs().List(&q) return } diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index 9dcbe34..f7458fc 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -79,20 +79,20 @@ func (_m *apiQuerierMock) EvaluationStream(evalID string, ctx context.Context) ( return r0, r1 } -// Execute provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr -func (_m *apiQuerierMock) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { - ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) +// Execute provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr +func (_m *apiQuerierMock) Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { + ret := _m.Called(jobID, ctx, command, tty, stdin, stdout, stderr) var r0 int if rf, ok := ret.Get(0).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) int); ok { - r0 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + r0 = rf(jobID, ctx, command, tty, stdin, stdout, stderr) } else { r0 = ret.Get(0).(int) } var r1 error if rf, ok := ret.Get(1).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) error); ok { - r1 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + r1 = rf(jobID, ctx, command, tty, stdin, stdout, stderr) } else { r1 = ret.Error(1) } @@ -193,22 +193,45 @@ func (_m *apiQuerierMock) init(nomadURL *url.URL, nomadNamespace string) error { return r0 } -// loadRunners provides a mock function with given fields: jobId -func (_m *apiQuerierMock) loadRunners(jobId string) ([]*api.AllocationListStub, error) { - ret := _m.Called(jobId) +// jobInfo provides a mock function with given fields: jobID +func (_m *apiQuerierMock) jobInfo(jobID string) (*api.Job, error) { + ret := _m.Called(jobID) - var r0 []*api.AllocationListStub - if rf, ok := ret.Get(0).(func(string) []*api.AllocationListStub); ok { - r0 = rf(jobId) + var r0 *api.Job + if rf, ok := ret.Get(0).(func(string) *api.Job); ok { + r0 = rf(jobID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*api.AllocationListStub) + r0 = ret.Get(0).(*api.Job) } } var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(jobId) + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// listJobs provides a mock function with given fields: prefix +func (_m *apiQuerierMock) listJobs(prefix string) ([]*api.JobListStub, error) { + ret := _m.Called(prefix) + + var r0 []*api.JobListStub + if rf, ok := ret.Get(0).(func(string) []*api.JobListStub); ok { + r0 = rf(prefix) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*api.JobListStub) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(prefix) } else { r1 = ret.Error(1) } diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 9adf9f8..0a001bb 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -100,20 +100,20 @@ func (_m *ExecutorAPIMock) Execute(allocationID string, ctx context.Context, com return r0, r1 } -// ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr -func (_m *ExecutorAPIMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { - ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) +// ExecuteCommand provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr +func (_m *ExecutorAPIMock) ExecuteCommand(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { + ret := _m.Called(jobID, ctx, command, tty, stdin, stdout, stderr) var r0 int if rf, ok := ret.Get(0).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) int); ok { - r0 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + r0 = rf(jobID, ctx, command, tty, stdin, stdout, stderr) } else { r0 = ret.Get(0).(int) } var r1 error if rf, ok := ret.Get(1).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) error); ok { - r1 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + r1 = rf(jobID, ctx, command, tty, stdin, stdout, stderr) } else { r1 = ret.Error(1) } @@ -265,22 +265,45 @@ func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error return r0 } -// loadRunners provides a mock function with given fields: jobId -func (_m *ExecutorAPIMock) loadRunners(jobId string) ([]*api.AllocationListStub, error) { - ret := _m.Called(jobId) +// jobInfo provides a mock function with given fields: jobID +func (_m *ExecutorAPIMock) jobInfo(jobID string) (*api.Job, error) { + ret := _m.Called(jobID) - var r0 []*api.AllocationListStub - if rf, ok := ret.Get(0).(func(string) []*api.AllocationListStub); ok { - r0 = rf(jobId) + var r0 *api.Job + if rf, ok := ret.Get(0).(func(string) *api.Job); ok { + r0 = rf(jobID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*api.AllocationListStub) + r0 = ret.Get(0).(*api.Job) } } var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(jobId) + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// listJobs provides a mock function with given fields: prefix +func (_m *ExecutorApiMock) listJobs(prefix string) ([]*api.JobListStub, error) { + ret := _m.Called(prefix) + + var r0 []*api.JobListStub + if rf, ok := ret.Get(0).(func(string) []*api.JobListStub); ok { + r0 = rf(prefix) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*api.JobListStub) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(prefix) } else { r1 = ret.Error(1) } diff --git a/nomad/job.go b/nomad/job.go index 7570404..7ce85cd 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -5,10 +5,20 @@ import ( ) const ( - TaskGroupName = "default-group" - TaskName = "default-task" + TaskGroupName = "default-group" + TaskName = "default-task" + DefaultJobIDFormat = "%s-default" ) +func DefaultJobID(id string) string { + return fmt.Sprintf(DefaultJobIDFormat, id) +} + +func (nc *nomadAPIClient) jobInfo(jobID string) (job *nomadApi.Job, err error) { + job, _, err = nc.client.Jobs().Info(jobID, nil) + return +} + // LoadJobList loads the list of jobs from the Nomad api. func (nc *nomadAPIClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { list, _, err = nc.client.Jobs().List(nc.queryOptions) diff --git a/nomad/nomad.go b/nomad/nomad.go index f25a36f..7880fe0 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -26,8 +26,10 @@ type AllocationProcessor func(*nomadApi.Allocation) type ExecutorAPI interface { apiQuerier - // LoadRunners loads all allocations of the specified job which are running and not about to get stopped. - LoadRunners(jobID string) (runnerIds []string, err error) + // LoadRunners loads all jobs of the specified environment which are running and not about to get stopped. + LoadRunners(environmentID string) (runnerIds []string, err error) + + LoadTemplateJob(environmentID string) (*nomadApi.Job, error) // MonitorEvaluation monitors the given evaluation ID. // It waits until the evaluation reaches one of the states complete, canceled or failed. @@ -65,19 +67,26 @@ func (a *APIClient) init(nomadURL *url.URL, nomadNamespace string) error { return a.apiQuerier.init(nomadURL, nomadNamespace) } -// LoadRunners loads the allocations of the specified job. -func (a *APIClient) LoadRunners(jobID string) (runnerIds []string, err error) { - list, err := a.loadRunners(jobID) +func (a *APIClient) LoadRunners(environmentID string) (runnerIDs []string, err error) { + list, err := a.listJobs(environmentID) if err != nil { return nil, err } - for _, stub := range list { - // only add allocations which are running and not about to be stopped - if stub.ClientStatus == nomadApi.AllocClientStatusRunning && stub.DesiredStatus == nomadApi.AllocDesiredStatusRun { - runnerIds = append(runnerIds, stub.ID) + for _, jobListStub := range list { + allocationRunning := jobListStub.JobSummary.Summary[TaskGroupName].Running > 0 + if jobListStub.Status == structs.JobStatusRunning && allocationRunning { + runnerIDs = append(runnerIDs, jobListStub.ID) } } - return runnerIds, nil + return runnerIDs, nil +} + +func (a *APIClient) LoadTemplateJob(environmentID string) (*nomadApi.Job, error) { + job, err := a.jobInfo(DefaultJobID(environmentID)) + if err != nil { + return nil, fmt.Errorf("failed loading template job: %w", err) + } + return job, nil } func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error { diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index 1a38032..1a5d2d5 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -31,10 +31,10 @@ type LoadRunnersTestSuite struct { jobId string mock *apiQuerierMock nomadApiClient APIClient - availableRunner *nomadApi.AllocationListStub - anotherAvailableRunner *nomadApi.AllocationListStub - stoppedRunner *nomadApi.AllocationListStub - stoppingRunner *nomadApi.AllocationListStub + availableRunner *nomadApi.JobListStub + anotherAvailableRunner *nomadApi.JobListStub + pendingRunner *nomadApi.JobListStub + deadRunner *nomadApi.JobListStub } func (s *LoadRunnersTestSuite) SetupTest() { @@ -43,82 +43,73 @@ func (s *LoadRunnersTestSuite) SetupTest() { s.mock = &apiQuerierMock{} s.nomadApiClient = APIClient{apiQuerier: s.mock} - s.availableRunner = &nomadApi.AllocationListStub{ - ID: "s0m3-r4nd0m-1d", - ClientStatus: nomadApi.AllocClientStatusRunning, - DesiredStatus: nomadApi.AllocDesiredStatusRun, - } + s.availableRunner = newJobListStub("s0m3-r4nd0m-1d", structs.JobStatusRunning, 1) + s.anotherAvailableRunner = newJobListStub("s0m3-s1m1l4r-1d", structs.JobStatusRunning, 1) + s.pendingRunner = newJobListStub("4n0th3r-1d", structs.JobStatusPending, 0) + s.deadRunner = newJobListStub("my-1d", structs.JobStatusDead, 0) +} - s.anotherAvailableRunner = &nomadApi.AllocationListStub{ - ID: "s0m3-s1m1l4r-1d", - ClientStatus: nomadApi.AllocClientStatusRunning, - DesiredStatus: nomadApi.AllocDesiredStatusRun, - } - - s.stoppedRunner = &nomadApi.AllocationListStub{ - ID: "4n0th3r-1d", - ClientStatus: nomadApi.AllocClientStatusComplete, - DesiredStatus: nomadApi.AllocDesiredStatusRun, - } - - s.stoppingRunner = &nomadApi.AllocationListStub{ - ID: "th1rd-1d", - ClientStatus: nomadApi.AllocClientStatusRunning, - DesiredStatus: nomadApi.AllocDesiredStatusStop, +func newJobListStub(id, status string, amountRunning int) *nomadApi.JobListStub { + return &nomadApi.JobListStub{ + ID: id, + Status: status, + JobSummary: &nomadApi.JobSummary{ + JobID: id, + Summary: map[string]nomadApi.TaskGroupSummary{TaskGroupName: {Running: amountRunning}}, + }, } } func (s *LoadRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { - errorString := "api errored" - s.mock.On("loadRunners", mock.AnythingOfType("string")). - Return(nil, errors.New(errorString)) + s.mock.On("listJobs", mock.AnythingOfType("string")). + Return(nil, tests.DefaultError) - returnedIds, err := s.nomadApiClient.LoadRunners(s.jobId) + returnedIds, err := s.nomadApiClient.LoadRunners(suite.jobId) s.Nil(returnedIds) - s.Error(err) + s.Equal(tests.DefaultError, err) } -func (s *LoadRunnersTestSuite) TestThrowsNoErrorWhenUnderlyingApiCallDoesNot() { - s.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{}, nil) +func (s *LoadRunnersTestSuite) TestReturnsNoErrorWhenUnderlyingApiCallDoesNot() { + s.mock.On("listJobs", mock.AnythingOfType("string")). + Return([]*nomadApi.JobListStub{}, nil) _, err := s.nomadApiClient.LoadRunners(s.jobId) s.NoError(err) } func (s *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { - s.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{s.availableRunner}, nil) + s.mock.On("listJobs", mock.AnythingOfType("string")). + Return([]*nomadApi.JobListStub{suite.availableRunner}, nil) returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) s.Len(returnedIds, 1) s.Equal(s.availableRunner.ID, returnedIds[0]) } -func (s *LoadRunnersTestSuite) TestStoppedRunnerIsNotReturned() { - s.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{s.stoppedRunner}, nil) +func (s *LoadRunnersTestSuite) TestPendingRunnerIsNotReturned() { + s.mock.On("listJobs", mock.AnythingOfType("string")). + Return([]*nomadApi.JobListStub{s.pendingRunner}, nil) returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) s.Empty(returnedIds) } -func (s *LoadRunnersTestSuite) TestStoppingRunnerIsNotReturned() { - s.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{s.stoppingRunner}, nil) +func (s *LoadRunnersTestSuite) TestDeadRunnerIsNotReturned() { + s.mock.On("listJobs", mock.AnythingOfType("string")). + Return([]*nomadApi.JobListStub{s.deadRunner}, nil) returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) s.Empty(returnedIds) } func (s *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() { - runnersList := []*nomadApi.AllocationListStub{ + runnersList := []*nomadApi.JobListStub{ s.availableRunner, s.anotherAvailableRunner, - s.stoppedRunner, - s.stoppingRunner, + s.pendingRunner, + s.deadRunner, } - s.mock.On("loadRunners", mock.AnythingOfType("string")). + s.mock.On("listJobs", mock.AnythingOfType("string")). Return(runnersList, nil) returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) diff --git a/runner/manager.go b/runner/manager.go index 2dff852..42408ce 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -3,6 +3,8 @@ package runner import ( "context" "errors" + "fmt" + "github.com/google/uuid" nomadApi "github.com/hashicorp/nomad/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" @@ -17,10 +19,12 @@ var ( ErrRunnerNotFound = errors.New("no runner found with this id") ) +const runnerNameFormat = "%s-%s" + type EnvironmentID int func (e EnvironmentID) toString() string { - return string(rune(e)) + return strconv.Itoa(int(e)) } type NomadJobID string @@ -29,7 +33,7 @@ type NomadJobID string // runners to new clients and ensure no runner is used twice. type Manager interface { // RegisterEnvironment adds a new environment that should be managed. - RegisterEnvironment(id EnvironmentID, nomadJobID NomadJobID, desiredIdleRunnersCount uint) + RegisterEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) error // EnvironmentExists returns whether the environment with the given id exists. EnvironmentExists(id EnvironmentID) bool @@ -48,9 +52,9 @@ type Manager interface { } type NomadRunnerManager struct { - apiClient nomad.ExecutorAPI - jobs NomadJobStorage - usedRunners Storage + apiClient nomad.ExecutorAPI + environments NomadEnvironmentStorage + usedRunners Storage } // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. @@ -66,35 +70,43 @@ func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *No return m } -type NomadJob struct { +type NomadEnvironment struct { environmentID EnvironmentID - jobID NomadJobID idleRunners Storage desiredIdleRunnersCount uint + templateJob *nomadApi.Job } -func (j *NomadJob) ID() EnvironmentID { +func (j *NomadEnvironment) ID() EnvironmentID { return j.environmentID } -func (m *NomadRunnerManager) RegisterEnvironment(environmentID EnvironmentID, nomadJobID NomadJobID, - desiredIdleRunnersCount uint) { - m.jobs.Add(&NomadJob{ +func (m *NomadRunnerManager) RegisterEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint) error { + templateJob, err := m.apiClient.LoadTemplateJob(environmentID.toString()) + if err != nil { + return fmt.Errorf("couldn't register environment: %w", err) + } + + m.environments.Add(&NomadEnvironment{ environmentID, - nomadJobID, NewLocalRunnerStorage(), desiredIdleRunnersCount, + templateJob, }) - go m.refreshEnvironment(environmentID) + err = m.scaleEnvironment(environmentID) + if err != nil { + return fmt.Errorf("couldn't upscale environment %w", err) + } + return nil } func (m *NomadRunnerManager) EnvironmentExists(id EnvironmentID) (ok bool) { - _, ok = m.jobs.Get(id) + _, ok = m.environments.Get(id) return } func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) { - job, ok := m.jobs.Get(environmentID) + job, ok := m.environments.Get(environmentID) if !ok { return nil, ErrUnknownExecutionEnvironment } @@ -103,6 +115,10 @@ func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) return nil, ErrNoRunnersAvailable } m.usedRunners.Add(runner) + err := m.scaleEnvironment(environmentID) + if err != nil { + return nil, fmt.Errorf("can not scale up: %w", err) + } return runner, nil } @@ -141,7 +157,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { return } - job, ok := m.jobs.Get(EnvironmentID(intJobID)) + job, ok := m.environments.Get(EnvironmentID(intJobID)) if ok { job.idleRunners.Add(NewNomadAllocation(alloc.ID, m.apiClient)) } @@ -156,58 +172,55 @@ func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) { } m.usedRunners.Delete(alloc.ID) - job, ok := m.jobs.Get(EnvironmentID(intJobID)) + job, ok := m.environments.Get(EnvironmentID(intJobID)) if ok { job.idleRunners.Delete(alloc.ID) } } -// Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... -func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentID) { - job, ok := m.jobs.Get(id) +// scaleEnvironment makes sure that the amount of idle runners is at least the desiredIdleRunnersCount. +func (m *NomadRunnerManager) scaleEnvironment(id EnvironmentID) error { + environment, ok := m.environments.Get(id) if !ok { - // this environment does not exist - return + return ErrUnknownExecutionEnvironment } - var lastJobScaling = 0 - for { - runners, err := m.apiClient.LoadRunners(string(job.jobID)) - if err != nil { - log.WithError(err).Printf("Failed fetching runners") - break - } - for _, r := range m.unusedRunners(id, runners) { - // ToDo: Listen on Nomad event stream - log.Printf("Adding allocation %+v", r) - job.idleRunners.Add(r) - } - jobScale, err := m.apiClient.JobScale(string(job.jobID)) + required := int(environment.desiredIdleRunnersCount) - environment.idleRunners.Length() + for i := 0; i < required; i++ { + err := m.createRunner(environment) if err != nil { - log.WithError(err).WithField("job", string(job.jobID)).Printf("Failed get allocation count") - break - } - additionallyNeededRunners := int(job.desiredIdleRunnersCount) - job.idleRunners.Length() - requiredRunnerCount := int(jobScale) - if additionallyNeededRunners > 0 { - requiredRunnerCount += additionallyNeededRunners - } - time.Sleep(50 * time.Millisecond) - if requiredRunnerCount != lastJobScaling { - log.Printf("Set job scaling %d", requiredRunnerCount) - err = m.apiClient.SetJobScale(string(job.jobID), uint(requiredRunnerCount), "Runner Requested") - if err != nil { - log.WithError(err).Printf("Failed set allocation scaling") - continue - } - lastJobScaling = requiredRunnerCount + return fmt.Errorf("couldn't create new runner: %w", err) } } + return nil } -func (m *NomadRunnerManager) unusedRunners(environmentId EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) { +func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error { + newUUID, err := uuid.NewUUID() + if err != nil { + return fmt.Errorf("failed generating runner id") + } + newRunnerID := fmt.Sprintf(runnerNameFormat, environment.ID().toString(), newUUID.String()) + + template := *environment.templateJob + template.ID = &newRunnerID + template.Name = &newRunnerID + + evalID, err := m.apiClient.RegisterNomadJob(&template) + if err != nil { + return fmt.Errorf("couldn't register Nomad job: %w", err) + } + err = m.apiClient.MonitorEvaluation(evalID, context.Background()) + if err != nil { + return fmt.Errorf("couldn't monitor evaluation: %w", err) + } + environment.idleRunners.Add(NewNomadJob(newRunnerID, m.apiClient)) + return nil +} + +func (m *NomadRunnerManager) unusedRunners(environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) { newRunners = make([]Runner, 0) - job, ok := m.jobs.Get(environmentId) + job, ok := m.environments.Get(environmentID) if !ok { // the environment does not exist, so it won't have any unused runners return @@ -217,7 +230,7 @@ func (m *NomadRunnerManager) unusedRunners(environmentId EnvironmentID, fetchedR if !ok { _, ok = job.idleRunners.Get(runnerID) if !ok { - newRunners = append(newRunners, NewNomadAllocation(runnerID, m.apiClient)) + newRunners = append(newRunners, NewNomadJob(runnerID, m.apiClient)) } } } diff --git a/runner/manager_test.go b/runner/manager_test.go index 3595ca9..219891c 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -53,14 +53,18 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin apiMock.On("LoadRunners", tests.DefaultJobID).Return(returnedRunnerIds, nil) apiMock.On("JobScale", tests.DefaultJobID).Return(uint(len(returnedRunnerIds)), nil) apiMock.On("SetJobScale", tests.DefaultJobID, mock.AnythingOfType("uint"), "Runner Requested").Return(nil) + apiMock.On("LoadTemplateJob", mock.AnythingOfType("string")).Return(&nomadApi.Job{}, nil) + apiMock.On("RegisterNomadJob", mock.Anything).Return("", nil) + apiMock.On("MonitorEvaluation", mock.Anything, mock.Anything).Return(nil) } func (s *ManagerTestSuite) registerDefaultEnvironment() { - s.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentID, tests.DefaultJobID, defaultDesiredRunnersCount) + err := s.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentId, 0) + s.Require().NoError(err) } func (s *ManagerTestSuite) AddIdleRunnerForDefaultEnvironment(r Runner) { - job, _ := s.nomadRunnerManager.jobs.Get(defaultEnvironmentID) + job, _ := s.nomadRunnerManager.environments.Get(defaultEnvironmentID) job.idleRunners.Add(r) } @@ -69,8 +73,9 @@ func (s *ManagerTestSuite) waitForRunnerRefresh() { } func (s *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() { - s.nomadRunnerManager.RegisterEnvironment(anotherEnvironmentID, tests.DefaultJobID, defaultDesiredRunnersCount) - job, ok := s.nomadRunnerManager.jobs.Get(defaultEnvironmentID) + err := s.nomadRunnerManager.RegisterEnvironment(anotherEnvironmentId, defaultDesiredRunnersCount) + s.Require().NoError(err) + job, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentId) s.True(ok) s.NotNil(job) } @@ -120,10 +125,8 @@ func (s *ManagerTestSuite) TestClaimThrowsAnErrorIfNoRunnersAvailable() { } func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { - mockRunnerQueries(s.apiMock, []string{tests.DefaultRunnerID}) - s.waitForRunnerRefresh() - receivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID) - s.Require().NoError(err) + s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) + receivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentId) savedRunner, ok := s.nomadRunnerManager.usedRunners.Get(receivedRunner.Id()) s.True(ok) s.Equal(savedRunner, receivedRunner) @@ -164,38 +167,6 @@ func (s *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() { s.Error(err) } -func (s *ManagerTestSuite) TestRefreshFetchesRunners() { - mockRunnerQueries(s.apiMock, []string{tests.DefaultRunnerID}) - s.waitForRunnerRefresh() - s.apiMock.AssertCalled(s.T(), "LoadRunners", tests.DefaultJobID) -} - -func (s *ManagerTestSuite) TestNewRunnersFoundInRefreshAreAddedToIdleRunners() { - mockRunnerQueries(s.apiMock, []string{tests.DefaultRunnerID}) - s.waitForRunnerRefresh() - job, _ := s.nomadRunnerManager.jobs.Get(defaultEnvironmentID) - _, ok := job.idleRunners.Get(tests.DefaultRunnerID) - s.True(ok) -} - -func (s *ManagerTestSuite) TestRefreshScalesJob() { - mockRunnerQueries(s.apiMock, []string{tests.DefaultRunnerID}) - s.waitForRunnerRefresh() - // use one runner to necessitate rescaling - _, _ = s.nomadRunnerManager.Claim(defaultEnvironmentID) - s.waitForRunnerRefresh() - s.apiMock.AssertCalled(s.T(), "SetJobScale", tests.DefaultJobID, defaultDesiredRunnersCount, "Runner Requested") -} - -func (s *ManagerTestSuite) TestRefreshAddsRunnerToPool() { - mockRunnerQueries(s.apiMock, []string{tests.DefaultRunnerID}) - s.waitForRunnerRefresh() - job, _ := s.nomadRunnerManager.jobs.Get(defaultEnvironmentID) - poolRunner, ok := job.idleRunners.Get(tests.DefaultRunnerID) - s.True(ok) - s.Equal(tests.DefaultRunnerID, poolRunner.Id()) -} - func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { var hook *test.Hook logger, hook := test.NewNullLogger() @@ -282,16 +253,16 @@ func modifyMockedCall(apiMock *nomad.ExecutorAPIMock, method string, modifier fu } func (s *ManagerTestSuite) TestWhenEnvironmentDoesNotExistEnvironmentExistsReturnsFalse() { - id := anotherEnvironmentID - _, ok := s.nomadRunnerManager.jobs.Get(id) + id := anotherEnvironmentId + _, ok := s.nomadRunnerManager.environments.Get(id) require.False(s.T(), ok) s.False(s.nomadRunnerManager.EnvironmentExists(id)) } func (s *ManagerTestSuite) TestWhenEnvironmentExistsEnvironmentExistsReturnsTrue() { - id := anotherEnvironmentID - s.nomadRunnerManager.jobs.Add(&NomadJob{environmentID: id}) + id := anotherEnvironmentId + s.nomadRunnerManager.environments.Add(&NomadEnvironment{environmentId: id}) exists := s.nomadRunnerManager.EnvironmentExists(id) s.True(exists) diff --git a/runner/nomad_job_storage.go b/runner/nomad_environment_storage.go similarity index 67% rename from runner/nomad_job_storage.go rename to runner/nomad_environment_storage.go index c66c1dc..a3a38d4 100644 --- a/runner/nomad_job_storage.go +++ b/runner/nomad_environment_storage.go @@ -4,45 +4,45 @@ import ( "sync" ) -// NomadJobStorage is an interface for storing NomadJobs. -type NomadJobStorage interface { +// NomadEnvironmentStorage is an interface for storing NomadJobs. +type NomadEnvironmentStorage interface { // Add adds a job to the storage. // It overwrites the old job if one with the same id was already stored. - Add(job *NomadJob) + Add(job *NomadEnvironment) // Get returns a job from the storage. // Iff the job does not exist in the store, ok will be false. - Get(id EnvironmentID) (job *NomadJob, ok bool) + Get(id EnvironmentID) (job *NomadEnvironment, ok bool) // Delete deletes the job with the passed id from the storage. It does nothing if no job with the id is present in // the storage. Delete(id EnvironmentID) - // Length returns the number of currently stored jobs in the storage. + // Length returns the number of currently stored environments in the storage. Length() int } -// localNomadJobStorage stores NomadJob objects in the local application memory. +// localNomadJobStorage stores NomadEnvironment objects in the local application memory. type localNomadJobStorage struct { sync.RWMutex - jobs map[EnvironmentID]*NomadJob + jobs map[EnvironmentID]*NomadEnvironment } // NewLocalNomadJobStorage responds with an empty localNomadJobStorage. // This implementation stores the data thread-safe in the local application memory. func NewLocalNomadJobStorage() *localNomadJobStorage { return &localNomadJobStorage{ - jobs: make(map[EnvironmentID]*NomadJob), + jobs: make(map[EnvironmentID]*NomadEnvironment), } } -func (s *localNomadJobStorage) Add(job *NomadJob) { +func (s *localNomadJobStorage) Add(job *NomadEnvironment) { s.Lock() defer s.Unlock() s.jobs[job.ID()] = job } -func (s *localNomadJobStorage) Get(id EnvironmentID) (job *NomadJob, ok bool) { +func (s *localNomadJobStorage) Get(id EnvironmentID) (job *NomadEnvironment, ok bool) { s.RLock() defer s.RUnlock() job, ok = s.jobs[id] diff --git a/runner/nomad_job_storage_test.go b/runner/nomad_environment_storage_test.go similarity index 73% rename from runner/nomad_job_storage_test.go rename to runner/nomad_environment_storage_test.go index 4feb996..80e1bbe 100644 --- a/runner/nomad_job_storage_test.go +++ b/runner/nomad_environment_storage_test.go @@ -1,8 +1,8 @@ package runner import ( + nomadApi "github.com/hashicorp/nomad/api" "github.com/stretchr/testify/suite" - "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "testing" ) @@ -13,12 +13,12 @@ func TestJobStoreTestSuite(t *testing.T) { type JobStoreTestSuite struct { suite.Suite jobStorage *localNomadJobStorage - job *NomadJob + job *NomadEnvironment } -func (s *JobStoreTestSuite) SetupTest() { - s.jobStorage = NewLocalNomadJobStorage() - s.job = &NomadJob{environmentID: defaultEnvironmentID, jobID: tests.DefaultJobID} +func (suite *JobStoreTestSuite) SetupTest() { + suite.jobStorage = NewLocalNomadJobStorage() + suite.job = &NomadEnvironment{environmentID: defaultEnvironmentId} } func (s *JobStoreTestSuite) TestAddedJobCanBeRetrieved() { @@ -28,11 +28,10 @@ func (s *JobStoreTestSuite) TestAddedJobCanBeRetrieved() { s.Equal(s.job, retrievedJob) } -func (s *JobStoreTestSuite) TestJobWithSameIdOverwritesOldOne() { - otherJobWithSameID := &NomadJob{environmentID: defaultEnvironmentID} - // assure runner is actually different - otherJobWithSameID.jobID = tests.AnotherJobID - s.NotEqual(s.job, otherJobWithSameID) +func (suite *JobStoreTestSuite) TestJobWithSameIdOverwritesOldOne() { + otherJobWithSameID := &NomadEnvironment{environmentID: defaultEnvironmentId} + otherJobWithSameID.templateJob = &nomadApi.Job{} + suite.NotEqual(suite.job, otherJobWithSameID) s.jobStorage.Add(s.job) s.jobStorage.Add(otherJobWithSameID) @@ -65,7 +64,7 @@ func (s *JobStoreTestSuite) TestLenChangesOnStoreContentChange() { }) s.Run("len increases again when different job is added", func() { - anotherJob := &NomadJob{environmentID: anotherEnvironmentID} + anotherJob := &NomadEnvironment{environmentID: anotherEnvironmentID} s.jobStorage.Add(anotherJob) s.Equal(2, s.jobStorage.Length()) }) diff --git a/runner/runner.go b/runner/runner.go index df8ced4..b189ac4 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -50,23 +50,29 @@ type Runner interface { UpdateFileSystem(request *dto.UpdateFileSystemRequest) error } -// NomadAllocation is an abstraction to communicate with Nomad allocations. -type NomadAllocation struct { +// NomadJob is an abstraction to communicate with Nomad jobs. +type NomadJob struct { ExecutionStorage - id string - api nomad.ExecutorAPI + id string + allocID string + api nomad.ExecutorAPI } -// NewNomadAllocation creates a new Nomad allocation with the provided id. -func NewNomadAllocation(id string, apiClient nomad.ExecutorAPI) *NomadAllocation { - return &NomadAllocation{ +// NewRunner creates a new runner with the provided id. +func NewRunner(id string) Runner { + return NewNomadJob(id, nil) +} + +// NewNomadJob creates a new NomadJob with the provided id. +func NewNomadJob(id string, apiClient nomad.ExecutorAPI) *NomadJob { + return &NomadJob{ id: id, api: apiClient, ExecutionStorage: NewLocalExecutionStorage(), } } -func (r *NomadAllocation) Id() string { +func (r *NomadJob) Id() string { return r.id } @@ -75,7 +81,7 @@ type ExitInfo struct { Err error } -func (r *NomadAllocation) ExecuteInteractively( +func (r *NomadJob) ExecuteInteractively( request *dto.ExecutionRequest, stdin io.Reader, stdout, stderr io.Writer, @@ -90,14 +96,14 @@ func (r *NomadAllocation) ExecuteInteractively( } exit := make(chan ExitInfo) go func() { - exitCode, err := r.api.ExecuteCommand(r.Id(), ctx, command, true, stdin, stdout, stderr) + exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, stdin, stdout, stderr) exit <- ExitInfo{uint8(exitCode), err} close(exit) }() return exit, cancel } -func (r *NomadAllocation) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { +func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { var tarBuffer bytes.Buffer if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil { return err @@ -108,7 +114,7 @@ func (r *NomadAllocation) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequ updateFileCommand := (&dto.ExecutionRequest{Command: fileDeletionCommand + copyCommand}).FullCommand() stdOut := bytes.Buffer{} stdErr := bytes.Buffer{} - exitCode, err := r.api.ExecuteCommand(r.Id(), context.Background(), updateFileCommand, false, + exitCode, err := r.api.ExecuteCommand(r.id, context.Background(), updateFileCommand, false, &tarBuffer, &stdOut, &stdErr) if err != nil { @@ -182,7 +188,7 @@ func tarHeader(file dto.File) *tar.Header { // MarshalJSON implements json.Marshaler interface. // This exports private attributes like the id too. -func (r *NomadAllocation) MarshalJSON() ([]byte, error) { +func (r *NomadJob) MarshalJSON() ([]byte, error) { return json.Marshal(struct { ID string `json:"runnerId"` }{ diff --git a/runner/runner_test.go b/runner/runner_test.go index 46bf074..eea70c9 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -20,19 +20,19 @@ import ( ) func TestIdIsStored(t *testing.T) { - runner := NewNomadAllocation("42", nil) + runner := NewNomadJob("42", nil) assert.Equal(t, "42", runner.Id()) } func TestMarshalRunner(t *testing.T) { - runner := NewNomadAllocation("42", nil) + runner := NewNomadJob("42", nil) marshal, err := json.Marshal(runner) assert.NoError(t, err) assert.Equal(t, "{\"runnerId\":\"42\"}", string(marshal)) } func TestExecutionRequestIsStored(t *testing.T) { - runner := NewNomadAllocation("42", nil) + runner := NewNomadJob("42", nil) executionRequest := &dto.ExecutionRequest{ Command: "command", TimeLimit: 10, @@ -47,7 +47,7 @@ func TestExecutionRequestIsStored(t *testing.T) { } func TestNewContextReturnsNewContextWithRunner(t *testing.T) { - runner := NewNomadAllocation("testRunner", nil) + runner := NewNomadJob("testRunner", nil) ctx := context.Background() newCtx := NewContext(ctx, runner) storedRunner := newCtx.Value(runnerContextKey).(Runner) @@ -57,7 +57,7 @@ func TestNewContextReturnsNewContextWithRunner(t *testing.T) { } func TestFromContextReturnsRunner(t *testing.T) { - runner := NewNomadAllocation("testRunner", nil) + runner := NewNomadJob("testRunner", nil) ctx := NewContext(context.Background(), runner) storedRunner, ok := FromContext(ctx) @@ -75,7 +75,7 @@ func TestFromContextReturnsIsNotOkWhenContextHasNoRunner(t *testing.T) { func TestExecuteCallsAPI(t *testing.T) { apiMock := &nomad.ExecutorAPIMock{} apiMock.On("ExecuteCommand", mock.Anything, mock.Anything, mock.Anything, true, mock.Anything, mock.Anything, mock.Anything).Return(0, nil) - runner := NewNomadAllocation(tests.DefaultRunnerID, apiMock) + runner := NewNomadJob(tests.DefaultRunnerID, apiMock) request := &dto.ExecutionRequest{Command: "echo 'Hello World!'"} runner.ExecuteInteractively(request, nil, nil, nil) @@ -86,7 +86,7 @@ func TestExecuteCallsAPI(t *testing.T) { func TestExecuteReturnsAfterTimeout(t *testing.T) { apiMock := newApiMockWithTimeLimitHandling() - runner := NewNomadAllocation(tests.DefaultRunnerID, apiMock) + runner := NewNomadJob(tests.DefaultRunnerID, apiMock) timeLimit := 1 execution := &dto.ExecutionRequest{TimeLimit: timeLimit} @@ -124,7 +124,7 @@ func TestUpdateFileSystemTestSuite(t *testing.T) { type UpdateFileSystemTestSuite struct { suite.Suite - runner *NomadAllocation + runner *NomadJob apiMock *nomad.ExecutorAPIMock mockedExecuteCommandCall *mock.Call command []string @@ -133,7 +133,7 @@ type UpdateFileSystemTestSuite struct { func (s *UpdateFileSystemTestSuite) SetupTest() { s.apiMock = &nomad.ExecutorAPIMock{} - s.runner = NewNomadAllocation(tests.DefaultRunnerID, s.apiMock) + s.runner = NewNomadJob(tests.DefaultRunnerID, s.apiMock) s.mockedExecuteCommandCall = s.apiMock.On("ExecuteCommand", tests.DefaultRunnerID, mock.Anything, mock.Anything, false, mock.Anything, mock.Anything, mock.Anything). Run(func(args mock.Arguments) { s.command = args.Get(2).([]string)