From e0e254a6afbf803d0489fae93f6025cefd49d751 Mon Sep 17 00:00:00 2001 From: Konrad Hanff Date: Tue, 22 Jun 2021 11:11:33 +0200 Subject: [PATCH] Persist runner timeout in metadata To be able to restore the runner timeouts even after a Poseidon restart, the timeout is stored in the Nomad metadata. The timeout will restart, but at least the runner will be returned at all. --- api/runners.go | 6 +- api/runners_test.go | 7 +- nomad/executor_api_mock.go | 173 ++++++++++++++++--------------------- nomad/job.go | 1 + nomad/nomad.go | 7 +- runner/constants_test.go | 5 +- runner/manager.go | 8 +- runner/manager_mock.go | 26 +++--- runner/manager_test.go | 18 ++-- 9 files changed, 116 insertions(+), 135 deletions(-) diff --git a/api/runners.go b/api/runners.go index 73104ff..4b34e25 100644 --- a/api/runners.go +++ b/api/runners.go @@ -9,7 +9,6 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" "net/url" - "time" ) const ( @@ -48,7 +47,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req return } environmentId := runner.EnvironmentID(runnerRequest.ExecutionEnvironmentId) - nextRunner, err := r.manager.Claim(environmentId) + nextRunner, err := r.manager.Claim(environmentId, runnerRequest.InactivityTimeout) if err != nil { switch err { case runner.ErrUnknownExecutionEnvironment: @@ -61,9 +60,6 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req } return } - timeout := time.Duration(runnerRequest.InactivityTimeout) * time.Second - nextRunner.SetupTimeout(timeout, nextRunner, r.manager) - sendJson(writer, &dto.RunnerResponse{Id: nextRunner.Id()}, http.StatusOK) } diff --git a/api/runners_test.go b/api/runners_test.go index e8f8aa5..36e6d6a 100644 --- a/api/runners_test.go +++ b/api/runners_test.go @@ -123,7 +123,7 @@ func (s *ProvideRunnerTestSuite) SetupTest() { } func (s *ProvideRunnerTestSuite) TestValidRequestReturnsRunner() { - s.runnerManager.On("Claim", mock.AnythingOfType("runner.EnvironmentID")).Return(s.runner, nil) + s.runnerManager.On("Claim", mock.AnythingOfType("runner.EnvironmentID"), mock.AnythingOfType("int")).Return(s.runner, nil) recorder := httptest.NewRecorder() s.router.ServeHTTP(recorder, s.defaultRequest) @@ -149,7 +149,7 @@ func (s *ProvideRunnerTestSuite) TestInvalidRequestReturnsBadRequest() { func (s *ProvideRunnerTestSuite) TestWhenExecutionEnvironmentDoesNotExistReturnsNotFound() { s.runnerManager. - On("Claim", mock.AnythingOfType("runner.EnvironmentID")). + On("Claim", mock.AnythingOfType("runner.EnvironmentID"), mock.AnythingOfType("int")). Return(nil, runner.ErrUnknownExecutionEnvironment) recorder := httptest.NewRecorder() @@ -158,7 +158,8 @@ func (s *ProvideRunnerTestSuite) TestWhenExecutionEnvironmentDoesNotExistReturns } func (s *ProvideRunnerTestSuite) TestWhenNoRunnerAvailableReturnsNomadOverload() { - s.runnerManager.On("Claim", mock.AnythingOfType("runner.EnvironmentID")).Return(nil, runner.ErrNoRunnersAvailable) + s.runnerManager.On("Claim", mock.AnythingOfType("runner.EnvironmentID"), mock.AnythingOfType("int")). + Return(nil, runner.ErrNoRunnersAvailable) recorder := httptest.NewRecorder() s.router.ServeHTTP(recorder, s.defaultRequest) diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 40cc0d0..4026ab3 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.8.0. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package nomad @@ -79,29 +79,8 @@ func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context) return r0, r1 } -// Execute provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr -func (_m *ExecutorAPIMock) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { - ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) - - var r0 int - if rf, ok := ret.Get(0).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) int); ok { - r0 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) - } else { - r0 = ret.Get(0).(int) - } - - var r1 error - if rf, ok := ret.Get(1).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) error); ok { - r1 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ExecuteCommand provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr -func (_m *ExecutorAPIMock) ExecuteCommand(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { +// Execute provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr +func (_m *ExecutorAPIMock) Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { ret := _m.Called(jobID, ctx, command, tty, stdin, stdout, stderr) var r0 int @@ -121,6 +100,27 @@ func (_m *ExecutorAPIMock) ExecuteCommand(jobID string, ctx context.Context, com return r0, r1 } +// ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr +func (_m *ExecutorAPIMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { + ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) + + var r0 int + if rf, ok := ret.Get(0).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) int); ok { + r0 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + } else { + r0 = ret.Get(0).(int) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) error); ok { + r1 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // JobScale provides a mock function with given fields: jobId func (_m *ExecutorAPIMock) JobScale(jobId string) (uint, error) { ret := _m.Called(jobId) @@ -142,6 +142,29 @@ func (_m *ExecutorAPIMock) JobScale(jobId string) (uint, error) { return r0, r1 } +// LoadEnvironmentJobs provides a mock function with given fields: +func (_m *ExecutorAPIMock) LoadEnvironmentJobs() ([]*api.Job, error) { + ret := _m.Called() + + var r0 []*api.Job + if rf, ok := ret.Get(0).(func() []*api.Job); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*api.Job) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadJobList provides a mock function with given fields: func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { ret := _m.Called() @@ -165,6 +188,29 @@ func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { return r0, r1 } +// LoadRunnerIDs provides a mock function with given fields: environmentID +func (_m *ExecutorAPIMock) LoadRunnerIDs(environmentID string) ([]string, error) { + ret := _m.Called(environmentID) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(environmentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(environmentID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LoadRunnerJobs provides a mock function with given fields: environmentID func (_m *ExecutorAPIMock) LoadRunnerJobs(environmentID string) ([]*api.Job, error) { ret := _m.Called(environmentID) @@ -188,82 +234,13 @@ func (_m *ExecutorAPIMock) LoadRunnerJobs(environmentID string) ([]*api.Job, err return r0, r1 } -// LoadRunners provides a mock function with given fields: jobID -func (_m *ExecutorAPIMock) LoadRunnerIDs(jobID string) ([]string, error) { - ret := _m.Called(jobID) - - var r0 []string - if rf, ok := ret.Get(0).(func(string) []string); ok { - r0 = rf(jobID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(jobID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// LoadTemplateJob provides a mock function with given fields: environmentID -func (_m *ExecutorAPIMock) LoadEnvironmentTemplate(environmentID string) (*api.Job, error) { - ret := _m.Called(environmentID) - - var r0 *api.Job - if rf, ok := ret.Get(0).(func(string) *api.Job); ok { - r0 = rf(environmentID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*api.Job) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(environmentID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// LoadTemplateJobs provides a mock function with given fields: -func (_m *ExecutorAPIMock) LoadEnvironmentJobs() ([]*api.Job, error) { - ret := _m.Called() - - var r0 []*api.Job - if rf, ok := ret.Get(0).(func() []*api.Job); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*api.Job) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MarkRunnerAsUsed provides a mock function with given fields: runnerID -func (_m *ExecutorAPIMock) MarkRunnerAsUsed(runnerID string) error { - ret := _m.Called(runnerID) +// MarkRunnerAsUsed provides a mock function with given fields: runnerID, duration +func (_m *ExecutorAPIMock) MarkRunnerAsUsed(runnerID string, duration int) error { + ret := _m.Called(runnerID, duration) var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(runnerID) + if rf, ok := ret.Get(0).(func(string, int) error); ok { + r0 = rf(runnerID, duration) } else { r0 = ret.Error(0) } diff --git a/nomad/job.go b/nomad/job.go index cbb7660..5f3fa80 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -21,6 +21,7 @@ const ( ConfigMetaUsedKey = "used" ConfigMetaUsedValue = "true" ConfigMetaUnusedValue = "false" + ConfigMetaTimeoutKey = "timeout" ConfigMetaPoolSizeKey = "prewarmingPoolSize" ) diff --git a/nomad/nomad.go b/nomad/nomad.go index 460e162..14d0c0a 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -11,6 +11,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/util" "io" "net/url" + "strconv" "time" ) @@ -255,7 +256,7 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) { return err } -func (a *APIClient) MarkRunnerAsUsed(runnerID string) error { +func (a *APIClient) MarkRunnerAsUsed(runnerID string, duration int) error { job, err := a.job(runnerID) if err != nil { return fmt.Errorf("couldn't retrieve job info: %w", err) @@ -264,6 +265,10 @@ func (a *APIClient) MarkRunnerAsUsed(runnerID string) error { if err != nil { return fmt.Errorf("couldn't update runner in job as used: %w", err) } + err = SetMetaConfigValue(job, ConfigMetaTimeoutKey, strconv.Itoa(duration)) + if err != nil { + return fmt.Errorf("couldn't update runner in job with timeout: %w", err) + } _, err = a.RegisterNomadJob(job) if err != nil { return fmt.Errorf("couldn't update runner config: %w", err) diff --git a/runner/constants_test.go b/runner/constants_test.go index bb5fba2..c15c0cc 100644 --- a/runner/constants_test.go +++ b/runner/constants_test.go @@ -3,6 +3,7 @@ package runner import "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" const ( - defaultEnvironmentID = EnvironmentID(tests.DefaultEnvironmentIDAsInteger) - anotherEnvironmentID = EnvironmentID(tests.AnotherEnvironmentIDAsInteger) + defaultEnvironmentID = EnvironmentID(tests.DefaultEnvironmentIDAsInteger) + anotherEnvironmentID = EnvironmentID(tests.AnotherEnvironmentIDAsInteger) + defaultInactivityTimeout = 0 ) diff --git a/runner/manager.go b/runner/manager.go index f0b86d6..22e1dbd 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -44,9 +44,9 @@ type Manager interface { CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job, scale bool) (bool, error) - // Claim returns a new runner. + // Claim returns a new runner. The runner is deleted after duration seconds if duration is not 0. // It makes sure that the runner is not in use yet and returns an error if no runner could be provided. - Claim(id EnvironmentID) (Runner, error) + Claim(id EnvironmentID, duration int) (Runner, error) // Get returns the used runner with the given runnerId. // If no runner with the given runnerId is currently used, it returns an error. @@ -170,7 +170,7 @@ func (m *NomadRunnerManager) updateRunnerSpecs(environmentID EnvironmentID, temp return occurredError } -func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) { +func (m *NomadRunnerManager) Claim(environmentID EnvironmentID, duration int) (Runner, error) { job, ok := m.environments.Get(environmentID) if !ok { return nil, ErrUnknownExecutionEnvironment @@ -180,7 +180,7 @@ func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) return nil, ErrNoRunnersAvailable } m.usedRunners.Add(runner) - err := m.apiClient.MarkRunnerAsUsed(runner.Id()) + err := m.apiClient.MarkRunnerAsUsed(runner.Id(), duration) if err != nil { return nil, fmt.Errorf("can't mark runner as used: %w", err) } diff --git a/runner/manager_mock.go b/runner/manager_mock.go index bdf5a24..6483ed1 100644 --- a/runner/manager_mock.go +++ b/runner/manager_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.8.0. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package runner @@ -12,13 +12,13 @@ type ManagerMock struct { mock.Mock } -// Claim provides a mock function with given fields: id -func (_m *ManagerMock) Claim(id EnvironmentID) (Runner, error) { - ret := _m.Called(id) +// Claim provides a mock function with given fields: id, duration +func (_m *ManagerMock) Claim(id EnvironmentID, duration int) (Runner, error) { + ret := _m.Called(id, duration) var r0 Runner - if rf, ok := ret.Get(0).(func(EnvironmentID) Runner); ok { - r0 = rf(id) + if rf, ok := ret.Get(0).(func(EnvironmentID, int) Runner); ok { + r0 = rf(id, duration) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(Runner) @@ -26,8 +26,8 @@ func (_m *ManagerMock) Claim(id EnvironmentID) (Runner, error) { } var r1 error - if rf, ok := ret.Get(1).(func(EnvironmentID) error); ok { - r1 = rf(id) + if rf, ok := ret.Get(1).(func(EnvironmentID, int) error); ok { + r1 = rf(id, duration) } else { r1 = ret.Error(1) } @@ -35,20 +35,20 @@ func (_m *ManagerMock) Claim(id EnvironmentID) (Runner, error) { return r0, r1 } -// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount, teplateJob, scale -func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *api.Job, scale bool) (bool, error) { - ret := _m.Called(id, desiredIdleRunnersCount, teplateJob, scale) +// CreateOrUpdateEnvironment provides a mock function with given fields: id, desiredIdleRunnersCount, templateJob, scale +func (_m *ManagerMock) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *api.Job, scale bool) (bool, error) { + ret := _m.Called(id, desiredIdleRunnersCount, templateJob, scale) var r0 bool if rf, ok := ret.Get(0).(func(EnvironmentID, uint, *api.Job, bool) bool); ok { - r0 = rf(id, desiredIdleRunnersCount, teplateJob, scale) + r0 = rf(id, desiredIdleRunnersCount, templateJob, scale) } else { r0 = ret.Get(0).(bool) } var r1 error if rf, ok := ret.Get(1).(func(EnvironmentID, uint, *api.Job, bool) error); ok { - r1 = rf(id, desiredIdleRunnersCount, teplateJob, scale) + r1 = rf(id, desiredIdleRunnersCount, templateJob, scale) } else { r1 = ret.Error(1) } diff --git a/runner/manager_test.go b/runner/manager_test.go index 833a692..2782761 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -50,7 +50,7 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin call.ReturnArguments = mock.Arguments{nil} }) apiMock.On("LoadEnvironmentJobs").Return([]*nomadApi.Job{}, nil) - apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string")).Return(nil) + apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil) apiMock.On("LoadRunnerIDs", tests.DefaultJobID).Return(returnedRunnerIds, nil) apiMock.On("JobScale", tests.DefaultJobID).Return(uint(len(returnedRunnerIds)), nil) apiMock.On("SetJobScale", tests.DefaultJobID, mock.AnythingOfType("uint"), "Runner Requested").Return(nil) @@ -82,28 +82,28 @@ func (s *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() { } func (s *ManagerTestSuite) TestClaimReturnsNotFoundErrorIfEnvironmentNotFound() { - runner, err := s.nomadRunnerManager.Claim(EnvironmentID(42)) + runner, err := s.nomadRunnerManager.Claim(EnvironmentID(42), defaultInactivityTimeout) s.Nil(runner) s.Equal(ErrUnknownExecutionEnvironment, err) } func (s *ManagerTestSuite) TestClaimReturnsRunnerIfAvailable() { s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) - receivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID) + receivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) s.NoError(err) s.Equal(s.exerciseRunner, receivedRunner) } func (s *ManagerTestSuite) TestClaimReturnsErrorIfNoRunnerAvailable() { s.waitForRunnerRefresh() - runner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID) + runner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) s.Nil(runner) s.Equal(ErrNoRunnersAvailable, err) } func (s *ManagerTestSuite) TestClaimReturnsNoRunnerOfDifferentEnvironment() { s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) - receivedRunner, err := s.nomadRunnerManager.Claim(anotherEnvironmentID) + receivedRunner, err := s.nomadRunnerManager.Claim(anotherEnvironmentID, defaultInactivityTimeout) s.Nil(receivedRunner) s.Error(err) } @@ -112,22 +112,22 @@ func (s *ManagerTestSuite) TestClaimDoesNotReturnTheSameRunnerTwice() { s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) s.AddIdleRunnerForDefaultEnvironment(NewRunner(tests.AnotherRunnerID, s.nomadRunnerManager)) - firstReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID) + firstReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) s.NoError(err) - secondReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID) + secondReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) s.NoError(err) s.NotEqual(firstReceivedRunner, secondReceivedRunner) } func (s *ManagerTestSuite) TestClaimThrowsAnErrorIfNoRunnersAvailable() { - receivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID) + receivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) s.Nil(receivedRunner) s.Error(err) } func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) - receivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentID) + receivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) savedRunner, ok := s.nomadRunnerManager.usedRunners.Get(receivedRunner.Id()) s.True(ok) s.Equal(savedRunner, receivedRunner)