diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 8c20540..2a94ef3 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -312,7 +312,7 @@ func newNomadAllocationWithMockedAPIClient(runnerID string) (runner.Runner, *nom executorAPIMock := &nomad.ExecutorAPIMock{} manager := &runner.ManagerMock{} manager.On("Return", mock.Anything).Return(nil) - r := runner.NewNomadJob(runnerID, nil, executorAPIMock, manager.Return) + r := runner.NewNomadJob(runnerID, nil, executorAPIMock, nil) return r, executorAPIMock } @@ -332,7 +332,7 @@ func newRunnerWithNotMockedRunnerManager(t *testing.T, apiMock *nomad.ExecutorAP server := httptest.NewServer(router) runnerID := tests.DefaultRunnerID - runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, runnerManager.Return) + runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, nil) e, err := environment.NewNomadEnvironment(0, apiMock, "job \"template-0\" {}") require.NoError(t, err) eID, err := nomad.EnvironmentIDFromRunnerID(runnerID) diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index 6324406..fff9329 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -61,7 +61,7 @@ func NewAWSFunctionWorkload( workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute, ctx) workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error { - return workload.Destroy() + return workload.Destroy(false) }) return workload, nil } @@ -136,7 +136,7 @@ func (w *AWSFunctionWorkload) GetFileContent(_ string, _ http.ResponseWriter, _ return dto.ErrNotSupported } -func (w *AWSFunctionWorkload) Destroy() error { +func (w *AWSFunctionWorkload) Destroy(_ bool) error { w.cancel() if err := w.onDestroy(w); err != nil { return fmt.Errorf("error while destroying aws runner: %w", err) diff --git a/internal/runner/aws_runner_test.go b/internal/runner/aws_runner_test.go index 34f23d7..e5416a9 100644 --- a/internal/runner/aws_runner_test.go +++ b/internal/runner/aws_runner_test.go @@ -147,7 +147,7 @@ func TestAWSFunctionWorkload_Destroy(t *testing.T) { }) require.NoError(t, err) - err = r.Destroy() + err = r.Destroy(false) assert.NoError(t, err) assert.True(t, hasDestroyBeenCalled) } diff --git a/internal/runner/inactivity_timer_test.go b/internal/runner/inactivity_timer_test.go index 41c5389..2cbfd7f 100644 --- a/internal/runner/inactivity_timer_test.go +++ b/internal/runner/inactivity_timer_test.go @@ -1,8 +1,8 @@ package runner import ( + "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/tests" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "testing" "time" @@ -15,18 +15,17 @@ func TestInactivityTimerTestSuite(t *testing.T) { type InactivityTimerTestSuite struct { suite.Suite runner Runner - manager *ManagerMock returned chan bool } func (s *InactivityTimerTestSuite) SetupTest() { s.returned = make(chan bool, 1) - s.manager = &ManagerMock{} - s.manager.On("Return", mock.Anything).Run(func(_ mock.Arguments) { + apiMock := &nomad.ExecutorAPIMock{} + apiMock.On("DeleteJob", tests.DefaultRunnerID).Return(nil) + s.runner = NewNomadJob(tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error { s.returned <- true - }).Return(nil) - - s.runner = NewRunner(tests.DefaultRunnerID, s.manager) + return nil + }) s.runner.SetupTimeout(tests.ShortTimeout) } diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 8e5afbd..02c729d 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -71,17 +71,11 @@ func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int func (m *NomadRunnerManager) Return(r Runner) error { m.usedRunners.Delete(r.ID()) - r.StopTimeout() - err := util.RetryExponential(time.Second, func() (err error) { - if err = m.apiClient.DeleteJob(r.ID()); err != nil { - err = fmt.Errorf("error deleting runner in Nomad: %w", err) - } - return - }) + err := r.Destroy(false) if err != nil { - return fmt.Errorf("%w", err) + err = fmt.Errorf("cannot return runner: %w", err) } - return nil + return err } func (m *NomadRunnerManager) Load() { @@ -114,7 +108,7 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger environmentLogger.WithError(err).Warn("Error loading runner portMappings") return } - newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.Return) + newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.onRunnerDestroyed) log.WithField("isUsed", isUsed).WithField(dto.KeyRunnerID, newJob.ID()).Debug("Recovered Runner") if isUsed { m.usedRunners.Add(newJob.ID(), newJob) @@ -140,6 +134,7 @@ func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { } } +// onAllocationAdded is the callback for when Nomad started an allocation. func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, startup time.Duration) { log.WithField(dto.KeyRunnerID, alloc.JobID).WithField("startupDuration", startup).Debug("Runner started") @@ -165,7 +160,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start if alloc.AllocatedResources != nil { mappedPorts = alloc.AllocatedResources.Shared.Ports } - environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.Return)) + environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed)) monitorAllocationStartupDuration(startup, alloc.JobID, environmentID) } } @@ -178,6 +173,7 @@ func monitorAllocationStartupDuration(startup time.Duration, runnerID string, en monitoring.WriteInfluxPoint(p) } +// onAllocationStopped is the callback for when Nomad stopped an allocation. func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemoved bool) { log.WithField(dto.KeyRunnerID, runnerID).Debug("Runner stopped") @@ -193,8 +189,10 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemove r, stillActive := m.usedRunners.Get(runnerID) if stillActive { - r.StopTimeout() m.usedRunners.Delete(runnerID) + if err := r.Destroy(true); err != nil { + log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed") + } } environment, ok := m.environments.Get(environmentID.ToString()) @@ -204,3 +202,15 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemove return !stillActive } + +// onRunnerDestroyed is the callback when the runner destroys itself. +// The main use of this callback is to remove the runner from the used runners, when its timeout exceeds. +func (m *NomadRunnerManager) onRunnerDestroyed(r Runner) error { + m.usedRunners.Delete(r.ID()) + + environment, ok := m.environments.Get(r.Environment().ToString()) + if ok { + environment.DeleteRunner(r.ID()) + } + return nil +} diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 427ec19..792d8ef 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -40,7 +40,7 @@ func (s *ManagerTestSuite) SetupTest() { cancel() s.nomadRunnerManager = NewNomadRunnerManager(s.apiMock, ctx) - s.exerciseRunner = NewRunner(tests.DefaultRunnerID, s.nomadRunnerManager) + s.exerciseRunner = NewNomadJob(tests.DefaultRunnerID, nil, s.apiMock, s.nomadRunnerManager.onRunnerDestroyed) s.exerciseEnvironment = createBasicEnvironmentMock(defaultEnvironmentID) s.nomadRunnerManager.StoreEnvironment(s.exerciseEnvironment) } @@ -130,7 +130,7 @@ func (s *ManagerTestSuite) TestClaimReturnsNoRunnerOfDifferentEnvironment() { func (s *ManagerTestSuite) TestClaimDoesNotReturnTheSameRunnerTwice() { s.exerciseEnvironment.On("Sample", mock.Anything).Return(s.exerciseRunner, true).Once() s.exerciseEnvironment.On("Sample", mock.Anything). - Return(NewRunner(tests.AnotherRunnerID, s.nomadRunnerManager), true).Once() + Return(NewNomadJob(tests.AnotherRunnerID, nil, nil, s.nomadRunnerManager.onRunnerDestroyed), true).Once() firstReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, defaultInactivityTimeout) s.NoError(err) @@ -150,6 +150,7 @@ func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() { s.exerciseEnvironment.On("Sample", mock.Anything).Return(s.exerciseRunner, true) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return() s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) util.MaxConnectionRetriesExponential = 1 modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) { @@ -181,6 +182,7 @@ func (s *ManagerTestSuite) TestGetReturnsErrorIfRunnerNotFound() { func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() { s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return() s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner) err := s.nomadRunnerManager.Return(s.exerciseRunner) s.Nil(err) @@ -190,6 +192,7 @@ func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() { func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() { s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return() err := s.nomadRunnerManager.Return(s.exerciseRunner) s.Nil(err) s.apiMock.AssertCalled(s.T(), "DeleteJob", s.exerciseRunner.ID()) @@ -197,6 +200,7 @@ func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() { func (s *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() { s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(tests.ErrDefault) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return() err := s.nomadRunnerManager.Return(s.exerciseRunner) s.Error(err) } @@ -255,7 +259,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { s.Require().True(ok) mockIdleRunners(environment.(*ExecutionEnvironmentMock)) - testRunner := NewRunner(allocation.JobID, s.nomadRunnerManager) + testRunner := NewNomadJob(allocation.JobID, nil, nil, s.nomadRunnerManager.onRunnerDestroyed) environment.AddRunner(testRunner) s.nomadRunnerManager.usedRunners.Add(testRunner.ID(), testRunner) @@ -378,20 +382,22 @@ func (s *ManagerTestSuite) TestOnAllocationStopped() { func testStoppedInactivityTimer(s *ManagerTestSuite, stopAllocation bool) { s.T().Helper() + s.apiMock.On("DeleteJob", tests.DefaultRunnerID).Return(nil) + environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsString) s.Require().True(ok) mockIdleRunners(environment.(*ExecutionEnvironmentMock)) - inactivityTimerCalled := false + runnerDestroyed := false environment.AddRunner(NewNomadJob(tests.DefaultRunnerID, []nomadApi.PortMapping{}, s.apiMock, func(r Runner) error { - inactivityTimerCalled = true + runnerDestroyed = true return nil })) runner, err := s.nomadRunnerManager.Claim(defaultEnvironmentID, 1) s.Require().NoError(err) s.Require().False(runner.TimeoutPassed()) - s.Require().False(inactivityTimerCalled) + s.Require().False(runnerDestroyed) if stopAllocation { alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(runner.ID()) @@ -400,7 +406,7 @@ func testStoppedInactivityTimer(s *ManagerTestSuite, stopAllocation bool) { <-time.After(time.Second + tests.ShortTimeout) s.NotEqual(stopAllocation, runner.TimeoutPassed()) - s.NotEqual(stopAllocation, inactivityTimerCalled) + s.True(runnerDestroyed) } func TestNomadRunnerManager_Load(t *testing.T) { @@ -449,6 +455,7 @@ func TestNomadRunnerManager_Load(t *testing.T) { runnerManager.usedRunners.Purge() t.Run("Restart timeout of used runner", func(t *testing.T) { apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) + environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return() timeout := 1 _, job := helpers.CreateTemplateJob() diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 068a47f..eb18467 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -15,6 +15,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/nullio" "github.com/openHPI/poseidon/pkg/storage" + "github.com/openHPI/poseidon/pkg/util" "io" "net/http" "strings" @@ -69,7 +70,13 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping, } job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute, ctx) - job.InactivityTimer = NewInactivityTimer(job, onDestroy) + job.InactivityTimer = NewInactivityTimer(job, func(r Runner) error { + err := r.Destroy(false) + if err != nil { + err = fmt.Errorf("NomadJob: %w", err) + } + return err + }) return job } @@ -120,7 +127,7 @@ func (r *NomadJob) ExecuteInteractively( // We have to handle three contexts // - requestCtx: The context of the http request (including Sentry data) - // - r.ctx: The context of the runner (runner timeout) + // - r.ctx: The context of the runner (runner timeout, or runner destroyed) // - executionCtx: The context of the execution (execution timeout) // -> The executionCtx cancel that might be triggered (when the client connection breaks) @@ -223,12 +230,26 @@ func (r *NomadJob) GetFileContent( return nil } -func (r *NomadJob) Destroy() error { +func (r *NomadJob) Destroy(local bool) (err error) { r.cancel() - if err := r.onDestroy(r); err != nil { - return fmt.Errorf("error while destroying runner: %w", err) + r.StopTimeout() + if r.onDestroy != nil { + err = r.onDestroy(r) } - return nil + + if !local && err == nil { + err = util.RetryExponential(time.Second, func() (err error) { + if err = r.api.DeleteJob(r.ID()); err != nil { + err = fmt.Errorf("error deleting runner in Nomad: %w", err) + } + return + }) + } + + if err != nil { + err = fmt.Errorf("cannot destroy runner: %w", err) + } + return err } func prepareExecution(request *dto.ExecutionRequest, environmentCtx context.Context) ( @@ -291,7 +312,7 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co log.WithContext(ctx).Debug("Execution terminated after SIGQUIT") case <-time.After(executionTimeoutGracePeriod): log.WithContext(ctx).Info("Execution did not quit after SIGQUIT") - if err := r.Destroy(); err != nil { + if err := r.Destroy(false); err != nil { log.WithContext(ctx).Error("Error when destroying runner") } } diff --git a/internal/runner/nomad_runner_test.go b/internal/runner/nomad_runner_test.go index f49caff..c8c219b 100644 --- a/internal/runner/nomad_runner_test.go +++ b/internal/runner/nomad_runner_test.go @@ -87,15 +87,6 @@ func TestFromContextReturnsIsNotOkWhenContextHasNoRunner(t *testing.T) { assert.False(t, ok) } -func TestDestroyReturnsRunner(t *testing.T) { - manager := &ManagerMock{} - manager.On("Return", mock.Anything).Return(nil) - runner := NewRunner(tests.DefaultRunnerID, manager) - err := runner.Destroy() - assert.NoError(t, err) - manager.AssertCalled(t, "Return", runner) -} - func TestExecuteInteractivelyTestSuite(t *testing.T) { suite.Run(t, new(ExecuteInteractivelyTestSuite)) } @@ -115,7 +106,9 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() { s.mockedExecuteCommandCall = s.apiMock.On("ExecuteCommand", mock.Anything, mock.Anything, mock.Anything, true, false, mock.Anything, mock.Anything, mock.Anything). Return(0, nil) + s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.timer = &InactivityTimerMock{} + s.timer.On("StopTimeout").Return() s.timer.On("ResetTimeout").Return() s.mockedTimeoutPassedCall = s.timer.On("TimeoutPassed").Return(false) s.manager = &ManagerMock{} @@ -126,7 +119,6 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() { InactivityTimer: s.timer, id: tests.DefaultRunnerID, api: s.apiMock, - onDestroy: s.manager.Return, ctx: context.Background(), } } @@ -207,15 +199,24 @@ func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal( s.mockedExecuteCommandCall.Run(func(args mock.Arguments) { select {} }) + runnerDestroyed := false + s.runner.onDestroy = func(_ Runner) error { + runnerDestroyed = true + return nil + } timeLimit := 1 executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit} s.runner.cancel = func() {} s.runner.StoreExecution(defaultExecutionID, executionRequest) + _, _, err := s.runner.ExecuteInteractively( defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil, context.Background()) s.Require().NoError(err) + <-time.After(executionTimeoutGracePeriod + time.Duration(timeLimit)*time.Second + tests.ShortTimeout) - s.manager.AssertCalled(s.T(), "Return", s.runner) + s.manager.AssertNotCalled(s.T(), "Return", s.runner) + s.apiMock.AssertCalled(s.T(), "DeleteJob", s.runner.ID()) + s.True(runnerDestroyed) } func (s *ExecuteInteractivelyTestSuite) TestResetTimerGetsCalled() { @@ -395,17 +396,6 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader return files } -// NewRunner creates a new runner with the provided id and manager. -func NewRunner(id string, manager Accessor) Runner { - var handler DestroyRunnerHandler - if manager != nil { - handler = manager.Return - } else { - handler = func(_ Runner) error { return nil } - } - return NewNomadJob(id, nil, nil, handler) -} - func (s *UpdateFileSystemTestSuite) TestGetFileContentReturnsErrorIfExitCodeIsNotZero() { s.mockedExecuteCommandCall.RunFn = nil s.mockedExecuteCommandCall.Return(1, nil) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 7212dfd..305762c 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -59,7 +59,8 @@ type Runner interface { GetFileContent(path string, content http.ResponseWriter, privilegedExecution bool, ctx context.Context) error // Destroy destroys the Runner in Nomad. - Destroy() error + // Iff local is true, the destruction will not be propagated to external systems. + Destroy(local bool) error } // NewContext creates a context containing a runner. diff --git a/internal/runner/runner_mock.go b/internal/runner/runner_mock.go index a40a796..533a855 100644 --- a/internal/runner/runner_mock.go +++ b/internal/runner/runner_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.16.0. DO NOT EDIT. +// Code generated by mockery v2.30.1. DO NOT EDIT. package runner @@ -20,13 +20,13 @@ type RunnerMock struct { mock.Mock } -// Destroy provides a mock function with given fields: -func (_m *RunnerMock) Destroy() error { - ret := _m.Called() +// Destroy provides a mock function with given fields: local +func (_m *RunnerMock) Destroy(local bool) error { + ret := _m.Called(local) var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(bool) error); ok { + r0 = rf(local) } else { r0 = ret.Error(0) } @@ -53,6 +53,11 @@ func (_m *RunnerMock) ExecuteInteractively(id string, stdin io.ReadWriter, stdou ret := _m.Called(id, stdin, stdout, stderr, ctx) var r0 <-chan ExitInfo + var r1 context.CancelFunc + var r2 error + if rf, ok := ret.Get(0).(func(string, io.ReadWriter, io.Writer, io.Writer, context.Context) (<-chan ExitInfo, context.CancelFunc, error)); ok { + return rf(id, stdin, stdout, stderr, ctx) + } if rf, ok := ret.Get(0).(func(string, io.ReadWriter, io.Writer, io.Writer, context.Context) <-chan ExitInfo); ok { r0 = rf(id, stdin, stdout, stderr, ctx) } else { @@ -61,7 +66,6 @@ func (_m *RunnerMock) ExecuteInteractively(id string, stdin io.ReadWriter, stdou } } - var r1 context.CancelFunc if rf, ok := ret.Get(1).(func(string, io.ReadWriter, io.Writer, io.Writer, context.Context) context.CancelFunc); ok { r1 = rf(id, stdin, stdout, stderr, ctx) } else { @@ -70,7 +74,6 @@ func (_m *RunnerMock) ExecuteInteractively(id string, stdin io.ReadWriter, stdou } } - var r2 error if rf, ok := ret.Get(2).(func(string, io.ReadWriter, io.Writer, io.Writer, context.Context) error); ok { r2 = rf(id, stdin, stdout, stderr, ctx) } else { @@ -200,13 +203,12 @@ func (_m *RunnerMock) UpdateFileSystem(request *dto.UpdateFileSystemRequest, ctx return r0 } -type mockConstructorTestingTNewRunnerMock interface { +// NewRunnerMock creates a new instance of RunnerMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRunnerMock(t interface { mock.TestingT Cleanup(func()) -} - -// NewRunnerMock creates a new instance of RunnerMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewRunnerMock(t mockConstructorTestingTNewRunnerMock) *RunnerMock { +}) *RunnerMock { mock := &RunnerMock{} mock.Mock.Test(t)