diff --git a/internal/api/ws/codeocean_writer.go b/internal/api/ws/codeocean_writer.go index ddb0976..cc8c667 100644 --- a/internal/api/ws/codeocean_writer.go +++ b/internal/api/ws/codeocean_writer.go @@ -84,6 +84,8 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { switch { case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) + case errors.Is(info.Err, runner.ErrOOMKilled): + cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: runner.ErrOOMKilled.Error()}) case info.Err != nil: errorMessage := "Error executing the request" log.WithContext(cw.ctx).WithError(info.Err).Warn(errorMessage) diff --git a/internal/api/ws/codeocean_writer_test.go b/internal/api/ws/codeocean_writer_test.go index 675191e..588b37d 100644 --- a/internal/api/ws/codeocean_writer_test.go +++ b/internal/api/ws/codeocean_writer_test.go @@ -60,6 +60,9 @@ func TestCodeOceanOutputWriter_SendExitInfo(t *testing.T) { dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}}, {"Error", &runner.ExitInfo{Err: websocket.ErrCloseSent}, dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: "Error executing the request"}}, + // CodeOcean expects this exact string in case of a OOM Killed runner. + {"Specific data for OOM Killed runner", &runner.ExitInfo{Err: runner.ErrOOMKilled}, + dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: "the allocation was OOM Killed"}}, {"Exit", &runner.ExitInfo{Code: 21}, dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: 21}}, } diff --git a/internal/nomad/job.go b/internal/nomad/job.go index e65e187..ca1c442 100644 --- a/internal/nomad/job.go +++ b/internal/nomad/job.go @@ -196,3 +196,20 @@ func partOfJobID(id string, part uint) (dto.EnvironmentID, error) { } return dto.EnvironmentID(environmentID), nil } + +func isOOMKilled(alloc *nomadApi.Allocation) bool { + state, ok := alloc.TaskStates[TaskName] + if !ok { + return false + } + + var oomKilledCount uint64 + for _, event := range state.Events { + if oomString, ok := event.Details["oom_killed"]; ok { + if oom, err := strconv.ParseBool(oomString); err == nil && oom { + oomKilledCount++ + } + } + } + return oomKilledCount >= state.Restarts +} diff --git a/internal/nomad/job_test.go b/internal/nomad/job_test.go index f6ddc9f..dec0beb 100644 --- a/internal/nomad/job_test.go +++ b/internal/nomad/job_test.go @@ -141,3 +141,18 @@ func TestEnvironmentIDFromRunnerID(t *testing.T) { _, err = EnvironmentIDFromRunnerID("") assert.Error(t, err) } + +func TestOOMKilledAllocation(t *testing.T) { + event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}} + state := nomadApi.TaskState{Restarts: 2, Events: []*nomadApi.TaskEvent{&event}} + alloc := nomadApi.Allocation{TaskStates: map[string]*nomadApi.TaskState{TaskName: &state}} + assert.False(t, isOOMKilled(&alloc)) + + event2 := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "false"}} + alloc.TaskStates[TaskName].Events = []*nomadApi.TaskEvent{&event, &event2} + assert.False(t, isOOMKilled(&alloc)) + + event3 := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}} + alloc.TaskStates[TaskName].Events = []*nomadApi.TaskEvent{&event, &event2, &event3} + assert.True(t, isOOMKilled(&alloc)) +} diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 5a0feda..d8f8208 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -21,12 +21,21 @@ import ( ) var ( - log = logging.GetLogger("nomad") - ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") - ErrorEvaluation = errors.New("evaluation could not complete") - ErrorPlacingAllocations = errors.New("failed to place all allocations") - ErrorLoadingJob = errors.New("failed to load job") - ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") + log = logging.GetLogger("nomad") + ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") + ErrorEvaluation = errors.New("evaluation could not complete") + ErrorPlacingAllocations = errors.New("failed to place all allocations") + ErrorLoadingJob = errors.New("failed to load job") + ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") + ErrorOOMKilled RunnerDeletedReason = errors.New("the allocation was OOM Killed") + ErrorAllocationRescheduled RunnerDeletedReason = errors.New("the allocation was rescheduled") + ErrorAllocationStopped RunnerDeletedReason = errors.New("the allocation was stopped") + ErrorAllocationStoppedUnexpectedly RunnerDeletedReason = fmt.Errorf("%w unexpectedly", ErrorAllocationStopped) + ErrorAllocationRescheduledUnexpectedly RunnerDeletedReason = fmt.Errorf( + "%w correctly but rescheduled", ErrorAllocationStopped) + // ErrorAllocationCompleted is for reporting the reason for the stopped allocation. + // We do not consider it as an error but add it anyway for a complete reporting. + ErrorAllocationCompleted RunnerDeletedReason = errors.New("the allocation completed") ) // resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected. @@ -37,7 +46,8 @@ type AllocationProcessing struct { OnNew NewAllocationProcessor OnDeleted DeletedAllocationProcessor } -type DeletedAllocationProcessor func(jobID string) (removedByPoseidon bool) +type RunnerDeletedReason error +type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool) type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration) type allocationData struct { @@ -191,7 +201,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) go func() { err = a.WatchEventStream(ctx, &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ string) bool { return false }, + OnDeleted: func(_ string, _ error) bool { return false }, }) cancel() // cancel the waiting for an evaluation result if watching the event stream ends. }() @@ -411,11 +421,15 @@ func handlePendingAllocationEvent(alloc *nomadApi.Allocation, allocData *allocat case structs.AllocDesiredStatusRun: if allocData != nil { // Handle Allocation restart. - callbacks.OnDeleted(alloc.JobID) + var reason error + if isOOMKilled(alloc) { + reason = ErrorOOMKilled + } + callbacks.OnDeleted(alloc.JobID, reason) } else if alloc.PreviousAllocation != "" { // Handle Runner (/Container) re-allocations. if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok { - if removedByPoseidon := callbacks.OnDeleted(prevData.jobID); removedByPoseidon { + if removedByPoseidon := callbacks.OnDeleted(prevData.jobID, ErrorAllocationRescheduled); removedByPoseidon { // This case handles a race condition between the overdue runner inactivity timeout and the rescheduling of a // lost allocation. The race condition leads first to the rescheduling of the runner, but right after to it // being stopped. Instead of reporting an unexpected stop of the pending runner, we just not start tracking it. @@ -466,7 +480,7 @@ func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData case structs.AllocDesiredStatusRun: log.WithField("alloc", alloc).Warn("Complete allocation desires to run") case structs.AllocDesiredStatusStop: - callbacks.OnDeleted(alloc.JobID) + callbacks.OnDeleted(alloc.JobID, ErrorAllocationCompleted) allocations.Delete(alloc.ID) default: log.WithField("alloc", alloc).Warn("Other Desired Status") @@ -477,32 +491,40 @@ func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData func handleFailedAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData, allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) { // The stop is expected when the allocation desired to stop even before it failed. - expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop - handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop) + reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop + handleStoppingAllocationEvent(alloc, allocations, callbacks, reschedulingExpected) } // handleLostAllocationEvent logs only the last of the multiple lost events. func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData, allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) { // The stop is expected when the allocation desired to stop even before it got lost. - expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop - handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop) + reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop + handleStoppingAllocationEvent(alloc, allocations, callbacks, reschedulingExpected) } func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData], - callbacks *AllocationProcessing, expectedStop bool) { + callbacks *AllocationProcessing, reschedulingExpected bool) { + replacementAllocationScheduled := alloc.NextAllocation != "" + correctRescheduling := reschedulingExpected == replacementAllocationScheduled + removedByPoseidon := false - if alloc.NextAllocation == "" { - removedByPoseidon = callbacks.OnDeleted(alloc.JobID) + if !replacementAllocationScheduled { + var reason error + if correctRescheduling { + reason = ErrorAllocationStoppedUnexpectedly + } else { + reason = ErrorAllocationRescheduledUnexpectedly + } + removedByPoseidon = callbacks.OnDeleted(alloc.JobID, reason) allocations.Delete(alloc.ID) } entry := log.WithField("job", alloc.JobID) - replacementAllocationScheduled := alloc.NextAllocation != "" - if !removedByPoseidon && expectedStop == replacementAllocationScheduled { + if !removedByPoseidon && !correctRescheduling { entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting") } else { - entry.Debug("Expected Allocation Stopping / Restarting") + entry.Trace("Expected Allocation Stopping / Restarting") } } diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index a8e9dcf..51da0c6 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -26,7 +26,7 @@ import ( var ( noopAllocationProcessing = &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ string) bool { return false }, + OnDeleted: func(_ string, _ error) bool { return false }, } ErrUnexpectedEOF = errors.New("unexpected EOF") ) @@ -588,7 +588,7 @@ func TestHandleAllocationEvent_IgnoresReschedulesForStoppedJobs(t *testing.T) { err := handleAllocationEvent(time.Now().UnixNano(), allocations, &rescheduledEvent, &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ string) bool { return true }, + OnDeleted: func(_ string, _ error) bool { return true }, }) require.NoError(t, err) @@ -596,6 +596,28 @@ func TestHandleAllocationEvent_IgnoresReschedulesForStoppedJobs(t *testing.T) { assert.False(t, ok) } +func TestHandleAllocationEvent_ReportsOOMKilledStatus(t *testing.T) { + restartedAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}} + state := nomadApi.TaskState{Restarts: 1, Events: []*nomadApi.TaskEvent{&event}} + restartedAllocation.TaskStates = map[string]*nomadApi.TaskState{TaskName: &state} + restartedEvent := eventForAllocation(t, restartedAllocation) + + allocations := storage.NewLocalStorage[*allocationData]() + allocations.Add(restartedAllocation.ID, &allocationData{jobID: restartedAllocation.JobID}) + + var reason error + err := handleAllocationEvent(time.Now().UnixNano(), allocations, &restartedEvent, &AllocationProcessing{ + OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, + OnDeleted: func(_ string, r error) bool { + reason = r + return true + }, + }) + require.NoError(t, err) + assert.ErrorIs(t, reason, ErrorOOMKilled) +} + func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) { apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) @@ -638,7 +660,7 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) { newAllocations = append(newAllocations, alloc) }, - OnDeleted: func(jobID string) bool { + OnDeleted: func(jobID string, _ error) bool { deletedAllocations = append(deletedAllocations, jobID) return false }, diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index fff9329..39e3a76 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -61,7 +61,7 @@ func NewAWSFunctionWorkload( workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute, ctx) workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error { - return workload.Destroy(false) + return workload.Destroy(nil) }) return workload, nil } @@ -136,7 +136,7 @@ func (w *AWSFunctionWorkload) GetFileContent(_ string, _ http.ResponseWriter, _ return dto.ErrNotSupported } -func (w *AWSFunctionWorkload) Destroy(_ bool) error { +func (w *AWSFunctionWorkload) Destroy(_ DestroyReason) error { w.cancel() if err := w.onDestroy(w); err != nil { return fmt.Errorf("error while destroying aws runner: %w", err) diff --git a/internal/runner/aws_runner_test.go b/internal/runner/aws_runner_test.go index e5416a9..8f2011e 100644 --- a/internal/runner/aws_runner_test.go +++ b/internal/runner/aws_runner_test.go @@ -147,7 +147,8 @@ func TestAWSFunctionWorkload_Destroy(t *testing.T) { }) require.NoError(t, err) - err = r.Destroy(false) + var reason error + err = r.Destroy(reason) assert.NoError(t, err) assert.True(t, hasDestroyBeenCalled) } diff --git a/internal/runner/inactivity_timer.go b/internal/runner/inactivity_timer.go index 7d2b07d..902f496 100644 --- a/internal/runner/inactivity_timer.go +++ b/internal/runner/inactivity_timer.go @@ -31,7 +31,7 @@ const ( TimerExpired TimerState = 2 ) -var ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded") +var ErrorRunnerInactivityTimeout DestroyReason = errors.New("runner inactivity timeout exceeded") type InactivityTimerImplementation struct { timer *time.Timer diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 02c729d..e9ffec7 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -71,7 +71,7 @@ func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int func (m *NomadRunnerManager) Return(r Runner) error { m.usedRunners.Delete(r.ID()) - err := r.Destroy(false) + err := r.Destroy(ErrDestroyedByAPIRequest) if err != nil { err = fmt.Errorf("cannot return runner: %w", err) } @@ -174,7 +174,7 @@ func monitorAllocationStartupDuration(startup time.Duration, runnerID string, en } // onAllocationStopped is the callback for when Nomad stopped an allocation. -func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemoved bool) { +func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) (alreadyRemoved bool) { log.WithField(dto.KeyRunnerID, runnerID).Debug("Runner stopped") if nomad.IsEnvironmentTemplateID(runnerID) { @@ -189,8 +189,17 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemove r, stillActive := m.usedRunners.Get(runnerID) if stillActive { + // Mask the internal stop reason because the runner might disclose/forward it to CodeOcean/externally. + switch { + case errors.Is(reason, nomad.ErrorOOMKilled): + reason = ErrOOMKilled + default: + log.WithField(dto.KeyRunnerID, runnerID).WithField("reason", reason).Debug("Internal reason for allocation stop") + reason = ErrAllocationStopped + } + m.usedRunners.Delete(runnerID) - if err := r.Destroy(true); err != nil { + if err := r.Destroy(reason); err != nil { log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed") } } diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 792d8ef..acb94ff 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -259,7 +259,8 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { s.Require().True(ok) mockIdleRunners(environment.(*ExecutionEnvironmentMock)) - testRunner := NewNomadJob(allocation.JobID, nil, nil, s.nomadRunnerManager.onRunnerDestroyed) + testRunner := NewNomadJob(allocation.JobID, nil, s.apiMock, s.nomadRunnerManager.onRunnerDestroyed) + s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) environment.AddRunner(testRunner) s.nomadRunnerManager.usedRunners.Add(testRunner.ID(), testRunner) @@ -267,7 +268,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { call.Run(func(args mock.Arguments) { callbacks, ok := args.Get(1).(*nomad.AllocationProcessing) s.Require().True(ok) - callbacks.OnDeleted(allocation.JobID) + callbacks.OnDeleted(allocation.JobID, nil) call.ReturnArguments = mock.Arguments{nil} }) }) @@ -400,7 +401,7 @@ func testStoppedInactivityTimer(s *ManagerTestSuite, stopAllocation bool) { s.Require().False(runnerDestroyed) if stopAllocation { - alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(runner.ID()) + alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(runner.ID(), nil) s.False(alreadyRemoved) } diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 5edacf0..a9989b0 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -25,6 +25,8 @@ import ( const ( // runnerContextKey is the key used to store runners in context.Context. runnerContextKey dto.ContextKey = "runner" + // destroyReasonContextKey is the key used to store the reason of the destruction in the context.Context. + destroyReasonContextKey dto.ContextKey = "destroyReason" // SIGQUIT is the character that causes a tty to send the SIGQUIT signal to the controlled process. SIGQUIT = 0x1c // executionTimeoutGracePeriod is the time to wait after sending a SIGQUIT signal to a timed out execution. @@ -36,9 +38,13 @@ const ( ) var ( - ErrorUnknownExecution = errors.New("unknown execution") - ErrorFileCopyFailed = errors.New("file copy failed") - ErrFileNotFound = errors.New("file not found or insufficient permissions") + ErrorUnknownExecution = errors.New("unknown execution") + ErrorFileCopyFailed = errors.New("file copy failed") + ErrFileNotFound = errors.New("file not found or insufficient permissions") + ErrAllocationStopped DestroyReason = errors.New("the allocation stopped") + ErrOOMKilled DestroyReason = nomad.ErrorOOMKilled + ErrDestroyedByAPIRequest DestroyReason = errors.New("the client wants to stop the runner") + ErrCannotStopExecution DestroyReason = errors.New("the execution did not stop after SIGQUIT") ) // NomadJob is an abstraction to communicate with Nomad environments. @@ -71,7 +77,7 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping, job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute, ctx) job.InactivityTimer = NewInactivityTimer(job, func(r Runner) error { - err := r.Destroy(false) + err := r.Destroy(ErrorRunnerInactivityTimeout) if err != nil { err = fmt.Errorf("NomadJob: %w", err) } @@ -230,14 +236,15 @@ func (r *NomadJob) GetFileContent( return nil } -func (r *NomadJob) Destroy(local bool) (err error) { +func (r *NomadJob) Destroy(reason DestroyReason) (err error) { + r.ctx = context.WithValue(r.ctx, destroyReasonContextKey, reason) r.cancel() r.StopTimeout() if r.onDestroy != nil { err = r.onDestroy(r) } - if !local && err == nil { + if err == nil && (!errors.Is(reason, ErrAllocationStopped) || !errors.Is(reason, ErrOOMKilled)) { err = util.RetryExponential(time.Second, func() (err error) { if err = r.api.DeleteJob(r.ID()); err != nil { err = fmt.Errorf("error deleting runner in Nomad: %w", err) @@ -290,14 +297,22 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co } err := ctx.Err() - if r.TimeoutPassed() { - err = ErrorRunnerInactivityTimeout + if reason, ok := r.ctx.Value(destroyReasonContextKey).(error); ok { + err = reason + if r.TimeoutPassed() && !errors.Is(err, ErrorRunnerInactivityTimeout) { + log.WithError(err).Warn("Wrong destroy reason for expired runner") + } } // From this time on the WebSocket connection to the client is closed in /internal/api/websocket.go // waitForExit. Input can still be sent to the executor. exit <- ExitInfo{255, err} + // This condition prevents further interaction with a stopped / dead allocation. + if errors.Is(err, ErrAllocationStopped) || errors.Is(err, ErrOOMKilled) { + return + } + // This injects the SIGQUIT character into the stdin. This character is parsed by the tty line discipline // (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty. // By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal @@ -318,8 +333,8 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co case <-exitInternal: log.WithContext(ctx).Debug("Execution terminated after SIGQUIT") case <-time.After(executionTimeoutGracePeriod): - log.WithContext(ctx).Info("Execution did not quit after SIGQUIT") - if err := r.Destroy(false); err != nil { + log.WithContext(ctx).Info(ErrCannotStopExecution.Error()) + if err := r.Destroy(ErrCannotStopExecution); err != nil { log.WithContext(ctx).Error("Error when destroying runner") } } diff --git a/internal/runner/nomad_runner_test.go b/internal/runner/nomad_runner_test.go index cd6b6dc..d8ee6b4 100644 --- a/internal/runner/nomad_runner_test.go +++ b/internal/runner/nomad_runner_test.go @@ -114,12 +114,14 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() { s.manager = &ManagerMock{} s.manager.On("Return", mock.Anything).Return(nil) + ctx, cancel := context.WithCancel(context.Background()) s.runner = &NomadJob{ executions: storage.NewLocalStorage[*dto.ExecutionRequest](), InactivityTimer: s.timer, id: tests.DefaultRunnerID, api: s.apiMock, - ctx: context.Background(), + ctx: ctx, + cancel: cancel, } } @@ -235,12 +237,30 @@ func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut( executionRequest := &dto.ExecutionRequest{} s.runner.StoreExecution(defaultExecutionID, executionRequest) - exitChannel, cancel, err := s.runner.ExecuteInteractively( + exitChannel, _, err := s.runner.ExecuteInteractively( defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background()) s.Require().NoError(err) - cancel() + err = s.runner.Destroy(ErrorRunnerInactivityTimeout) + s.Require().NoError(err) exit := <-exitChannel - s.Equal(ErrorRunnerInactivityTimeout, exit.Err) + s.ErrorIs(exit.Err, ErrorRunnerInactivityTimeout) +} + +func (s *ExecuteInteractivelyTestSuite) TestDestroyReasonIsPassedToExecution() { + s.mockedExecuteCommandCall.Run(func(args mock.Arguments) { + select {} + }).Return(0, nil) + s.mockedTimeoutPassedCall.Return(true) + executionRequest := &dto.ExecutionRequest{} + s.runner.StoreExecution(defaultExecutionID, executionRequest) + + exitChannel, _, err := s.runner.ExecuteInteractively( + defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background()) + s.Require().NoError(err) + err = s.runner.Destroy(ErrOOMKilled) + s.Require().NoError(err) + exit := <-exitChannel + s.ErrorIs(exit.Err, ErrOOMKilled) } func TestUpdateFileSystemTestSuite(t *testing.T) { diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 305762c..30bd5e2 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -17,6 +17,9 @@ type ExitInfo struct { type DestroyRunnerHandler = func(r Runner) error +// DestroyReason specifies errors that are expected as reason for destroying a runner. +type DestroyReason error + type Runner interface { InactivityTimer @@ -59,8 +62,8 @@ type Runner interface { GetFileContent(path string, content http.ResponseWriter, privilegedExecution bool, ctx context.Context) error // Destroy destroys the Runner in Nomad. - // Iff local is true, the destruction will not be propagated to external systems. - Destroy(local bool) error + // Depending on the reason special cases of the Destruction will be handled. + Destroy(reason DestroyReason) error } // NewContext creates a context containing a runner. diff --git a/internal/runner/runner_mock.go b/internal/runner/runner_mock.go index 533a855..03b5e36 100644 --- a/internal/runner/runner_mock.go +++ b/internal/runner/runner_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package runner @@ -20,13 +20,13 @@ type RunnerMock struct { mock.Mock } -// Destroy provides a mock function with given fields: local -func (_m *RunnerMock) Destroy(local bool) error { - ret := _m.Called(local) +// Destroy provides a mock function with given fields: reason +func (_m *RunnerMock) Destroy(reason DestroyReason) error { + ret := _m.Called(reason) var r0 error - if rf, ok := ret.Get(0).(func(bool) error); ok { - r0 = rf(local) + if rf, ok := ret.Get(0).(func(DestroyReason) error); ok { + r0 = rf(reason) } else { r0 = ret.Error(0) }