Introduce reason for destroying runner
in order to return a specific error for OOM Killed Executions.
This commit is contained in:

committed by
Sebastian Serth

parent
b3fedf274c
commit
6a1677dea0
@ -84,6 +84,8 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) {
|
|||||||
switch {
|
switch {
|
||||||
case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout):
|
case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout):
|
||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout})
|
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout})
|
||||||
|
case errors.Is(info.Err, runner.ErrOOMKilled):
|
||||||
|
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: runner.ErrOOMKilled.Error()})
|
||||||
case info.Err != nil:
|
case info.Err != nil:
|
||||||
errorMessage := "Error executing the request"
|
errorMessage := "Error executing the request"
|
||||||
log.WithContext(cw.ctx).WithError(info.Err).Warn(errorMessage)
|
log.WithContext(cw.ctx).WithError(info.Err).Warn(errorMessage)
|
||||||
|
@ -60,6 +60,9 @@ func TestCodeOceanOutputWriter_SendExitInfo(t *testing.T) {
|
|||||||
dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}},
|
dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}},
|
||||||
{"Error", &runner.ExitInfo{Err: websocket.ErrCloseSent},
|
{"Error", &runner.ExitInfo{Err: websocket.ErrCloseSent},
|
||||||
dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: "Error executing the request"}},
|
dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: "Error executing the request"}},
|
||||||
|
// CodeOcean expects this exact string in case of a OOM Killed runner.
|
||||||
|
{"Specific data for OOM Killed runner", &runner.ExitInfo{Err: runner.ErrOOMKilled},
|
||||||
|
dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: "the allocation was OOM Killed"}},
|
||||||
{"Exit", &runner.ExitInfo{Code: 21},
|
{"Exit", &runner.ExitInfo{Code: 21},
|
||||||
dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: 21}},
|
dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: 21}},
|
||||||
}
|
}
|
||||||
|
@ -196,3 +196,20 @@ func partOfJobID(id string, part uint) (dto.EnvironmentID, error) {
|
|||||||
}
|
}
|
||||||
return dto.EnvironmentID(environmentID), nil
|
return dto.EnvironmentID(environmentID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isOOMKilled(alloc *nomadApi.Allocation) bool {
|
||||||
|
state, ok := alloc.TaskStates[TaskName]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var oomKilledCount uint64
|
||||||
|
for _, event := range state.Events {
|
||||||
|
if oomString, ok := event.Details["oom_killed"]; ok {
|
||||||
|
if oom, err := strconv.ParseBool(oomString); err == nil && oom {
|
||||||
|
oomKilledCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return oomKilledCount >= state.Restarts
|
||||||
|
}
|
||||||
|
@ -141,3 +141,18 @@ func TestEnvironmentIDFromRunnerID(t *testing.T) {
|
|||||||
_, err = EnvironmentIDFromRunnerID("")
|
_, err = EnvironmentIDFromRunnerID("")
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOOMKilledAllocation(t *testing.T) {
|
||||||
|
event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}}
|
||||||
|
state := nomadApi.TaskState{Restarts: 2, Events: []*nomadApi.TaskEvent{&event}}
|
||||||
|
alloc := nomadApi.Allocation{TaskStates: map[string]*nomadApi.TaskState{TaskName: &state}}
|
||||||
|
assert.False(t, isOOMKilled(&alloc))
|
||||||
|
|
||||||
|
event2 := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "false"}}
|
||||||
|
alloc.TaskStates[TaskName].Events = []*nomadApi.TaskEvent{&event, &event2}
|
||||||
|
assert.False(t, isOOMKilled(&alloc))
|
||||||
|
|
||||||
|
event3 := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}}
|
||||||
|
alloc.TaskStates[TaskName].Events = []*nomadApi.TaskEvent{&event, &event2, &event3}
|
||||||
|
assert.True(t, isOOMKilled(&alloc))
|
||||||
|
}
|
||||||
|
@ -21,12 +21,21 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
log = logging.GetLogger("nomad")
|
log = logging.GetLogger("nomad")
|
||||||
ErrorExecutorCommunicationFailed = errors.New("communication with executor failed")
|
ErrorExecutorCommunicationFailed = errors.New("communication with executor failed")
|
||||||
ErrorEvaluation = errors.New("evaluation could not complete")
|
ErrorEvaluation = errors.New("evaluation could not complete")
|
||||||
ErrorPlacingAllocations = errors.New("failed to place all allocations")
|
ErrorPlacingAllocations = errors.New("failed to place all allocations")
|
||||||
ErrorLoadingJob = errors.New("failed to load job")
|
ErrorLoadingJob = errors.New("failed to load job")
|
||||||
ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found")
|
ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found")
|
||||||
|
ErrorOOMKilled RunnerDeletedReason = errors.New("the allocation was OOM Killed")
|
||||||
|
ErrorAllocationRescheduled RunnerDeletedReason = errors.New("the allocation was rescheduled")
|
||||||
|
ErrorAllocationStopped RunnerDeletedReason = errors.New("the allocation was stopped")
|
||||||
|
ErrorAllocationStoppedUnexpectedly RunnerDeletedReason = fmt.Errorf("%w unexpectedly", ErrorAllocationStopped)
|
||||||
|
ErrorAllocationRescheduledUnexpectedly RunnerDeletedReason = fmt.Errorf(
|
||||||
|
"%w correctly but rescheduled", ErrorAllocationStopped)
|
||||||
|
// ErrorAllocationCompleted is for reporting the reason for the stopped allocation.
|
||||||
|
// We do not consider it as an error but add it anyway for a complete reporting.
|
||||||
|
ErrorAllocationCompleted RunnerDeletedReason = errors.New("the allocation completed")
|
||||||
)
|
)
|
||||||
|
|
||||||
// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected.
|
// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected.
|
||||||
@ -37,7 +46,8 @@ type AllocationProcessing struct {
|
|||||||
OnNew NewAllocationProcessor
|
OnNew NewAllocationProcessor
|
||||||
OnDeleted DeletedAllocationProcessor
|
OnDeleted DeletedAllocationProcessor
|
||||||
}
|
}
|
||||||
type DeletedAllocationProcessor func(jobID string) (removedByPoseidon bool)
|
type RunnerDeletedReason error
|
||||||
|
type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool)
|
||||||
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
|
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
|
||||||
|
|
||||||
type allocationData struct {
|
type allocationData struct {
|
||||||
@ -191,7 +201,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context)
|
|||||||
go func() {
|
go func() {
|
||||||
err = a.WatchEventStream(ctx, &AllocationProcessing{
|
err = a.WatchEventStream(ctx, &AllocationProcessing{
|
||||||
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
||||||
OnDeleted: func(_ string) bool { return false },
|
OnDeleted: func(_ string, _ error) bool { return false },
|
||||||
})
|
})
|
||||||
cancel() // cancel the waiting for an evaluation result if watching the event stream ends.
|
cancel() // cancel the waiting for an evaluation result if watching the event stream ends.
|
||||||
}()
|
}()
|
||||||
@ -411,11 +421,15 @@ func handlePendingAllocationEvent(alloc *nomadApi.Allocation, allocData *allocat
|
|||||||
case structs.AllocDesiredStatusRun:
|
case structs.AllocDesiredStatusRun:
|
||||||
if allocData != nil {
|
if allocData != nil {
|
||||||
// Handle Allocation restart.
|
// Handle Allocation restart.
|
||||||
callbacks.OnDeleted(alloc.JobID)
|
var reason error
|
||||||
|
if isOOMKilled(alloc) {
|
||||||
|
reason = ErrorOOMKilled
|
||||||
|
}
|
||||||
|
callbacks.OnDeleted(alloc.JobID, reason)
|
||||||
} else if alloc.PreviousAllocation != "" {
|
} else if alloc.PreviousAllocation != "" {
|
||||||
// Handle Runner (/Container) re-allocations.
|
// Handle Runner (/Container) re-allocations.
|
||||||
if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok {
|
if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok {
|
||||||
if removedByPoseidon := callbacks.OnDeleted(prevData.jobID); removedByPoseidon {
|
if removedByPoseidon := callbacks.OnDeleted(prevData.jobID, ErrorAllocationRescheduled); removedByPoseidon {
|
||||||
// This case handles a race condition between the overdue runner inactivity timeout and the rescheduling of a
|
// This case handles a race condition between the overdue runner inactivity timeout and the rescheduling of a
|
||||||
// lost allocation. The race condition leads first to the rescheduling of the runner, but right after to it
|
// lost allocation. The race condition leads first to the rescheduling of the runner, but right after to it
|
||||||
// being stopped. Instead of reporting an unexpected stop of the pending runner, we just not start tracking it.
|
// being stopped. Instead of reporting an unexpected stop of the pending runner, we just not start tracking it.
|
||||||
@ -466,7 +480,7 @@ func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData
|
|||||||
case structs.AllocDesiredStatusRun:
|
case structs.AllocDesiredStatusRun:
|
||||||
log.WithField("alloc", alloc).Warn("Complete allocation desires to run")
|
log.WithField("alloc", alloc).Warn("Complete allocation desires to run")
|
||||||
case structs.AllocDesiredStatusStop:
|
case structs.AllocDesiredStatusStop:
|
||||||
callbacks.OnDeleted(alloc.JobID)
|
callbacks.OnDeleted(alloc.JobID, ErrorAllocationCompleted)
|
||||||
allocations.Delete(alloc.ID)
|
allocations.Delete(alloc.ID)
|
||||||
default:
|
default:
|
||||||
log.WithField("alloc", alloc).Warn("Other Desired Status")
|
log.WithField("alloc", alloc).Warn("Other Desired Status")
|
||||||
@ -477,32 +491,40 @@ func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData
|
|||||||
func handleFailedAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
|
func handleFailedAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
|
||||||
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
|
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
|
||||||
// The stop is expected when the allocation desired to stop even before it failed.
|
// The stop is expected when the allocation desired to stop even before it failed.
|
||||||
expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop
|
reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop
|
||||||
handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop)
|
handleStoppingAllocationEvent(alloc, allocations, callbacks, reschedulingExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLostAllocationEvent logs only the last of the multiple lost events.
|
// handleLostAllocationEvent logs only the last of the multiple lost events.
|
||||||
func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
|
func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
|
||||||
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
|
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
|
||||||
// The stop is expected when the allocation desired to stop even before it got lost.
|
// The stop is expected when the allocation desired to stop even before it got lost.
|
||||||
expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop
|
reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop
|
||||||
handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop)
|
handleStoppingAllocationEvent(alloc, allocations, callbacks, reschedulingExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData],
|
func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData],
|
||||||
callbacks *AllocationProcessing, expectedStop bool) {
|
callbacks *AllocationProcessing, reschedulingExpected bool) {
|
||||||
|
replacementAllocationScheduled := alloc.NextAllocation != ""
|
||||||
|
correctRescheduling := reschedulingExpected == replacementAllocationScheduled
|
||||||
|
|
||||||
removedByPoseidon := false
|
removedByPoseidon := false
|
||||||
if alloc.NextAllocation == "" {
|
if !replacementAllocationScheduled {
|
||||||
removedByPoseidon = callbacks.OnDeleted(alloc.JobID)
|
var reason error
|
||||||
|
if correctRescheduling {
|
||||||
|
reason = ErrorAllocationStoppedUnexpectedly
|
||||||
|
} else {
|
||||||
|
reason = ErrorAllocationRescheduledUnexpectedly
|
||||||
|
}
|
||||||
|
removedByPoseidon = callbacks.OnDeleted(alloc.JobID, reason)
|
||||||
allocations.Delete(alloc.ID)
|
allocations.Delete(alloc.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
entry := log.WithField("job", alloc.JobID)
|
entry := log.WithField("job", alloc.JobID)
|
||||||
replacementAllocationScheduled := alloc.NextAllocation != ""
|
if !removedByPoseidon && !correctRescheduling {
|
||||||
if !removedByPoseidon && expectedStop == replacementAllocationScheduled {
|
|
||||||
entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting")
|
entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting")
|
||||||
} else {
|
} else {
|
||||||
entry.Debug("Expected Allocation Stopping / Restarting")
|
entry.Trace("Expected Allocation Stopping / Restarting")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
noopAllocationProcessing = &AllocationProcessing{
|
noopAllocationProcessing = &AllocationProcessing{
|
||||||
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
||||||
OnDeleted: func(_ string) bool { return false },
|
OnDeleted: func(_ string, _ error) bool { return false },
|
||||||
}
|
}
|
||||||
ErrUnexpectedEOF = errors.New("unexpected EOF")
|
ErrUnexpectedEOF = errors.New("unexpected EOF")
|
||||||
)
|
)
|
||||||
@ -588,7 +588,7 @@ func TestHandleAllocationEvent_IgnoresReschedulesForStoppedJobs(t *testing.T) {
|
|||||||
|
|
||||||
err := handleAllocationEvent(time.Now().UnixNano(), allocations, &rescheduledEvent, &AllocationProcessing{
|
err := handleAllocationEvent(time.Now().UnixNano(), allocations, &rescheduledEvent, &AllocationProcessing{
|
||||||
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
||||||
OnDeleted: func(_ string) bool { return true },
|
OnDeleted: func(_ string, _ error) bool { return true },
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -596,6 +596,28 @@ func TestHandleAllocationEvent_IgnoresReschedulesForStoppedJobs(t *testing.T) {
|
|||||||
assert.False(t, ok)
|
assert.False(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHandleAllocationEvent_ReportsOOMKilledStatus(t *testing.T) {
|
||||||
|
restartedAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
|
||||||
|
event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}}
|
||||||
|
state := nomadApi.TaskState{Restarts: 1, Events: []*nomadApi.TaskEvent{&event}}
|
||||||
|
restartedAllocation.TaskStates = map[string]*nomadApi.TaskState{TaskName: &state}
|
||||||
|
restartedEvent := eventForAllocation(t, restartedAllocation)
|
||||||
|
|
||||||
|
allocations := storage.NewLocalStorage[*allocationData]()
|
||||||
|
allocations.Add(restartedAllocation.ID, &allocationData{jobID: restartedAllocation.JobID})
|
||||||
|
|
||||||
|
var reason error
|
||||||
|
err := handleAllocationEvent(time.Now().UnixNano(), allocations, &restartedEvent, &AllocationProcessing{
|
||||||
|
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
||||||
|
OnDeleted: func(_ string, r error) bool {
|
||||||
|
reason = r
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.ErrorIs(t, reason, ErrorOOMKilled)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) {
|
func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) {
|
||||||
apiMock := &apiQuerierMock{}
|
apiMock := &apiQuerierMock{}
|
||||||
apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault)
|
apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault)
|
||||||
@ -638,7 +660,7 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events,
|
|||||||
OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) {
|
OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) {
|
||||||
newAllocations = append(newAllocations, alloc)
|
newAllocations = append(newAllocations, alloc)
|
||||||
},
|
},
|
||||||
OnDeleted: func(jobID string) bool {
|
OnDeleted: func(jobID string, _ error) bool {
|
||||||
deletedAllocations = append(deletedAllocations, jobID)
|
deletedAllocations = append(deletedAllocations, jobID)
|
||||||
return false
|
return false
|
||||||
},
|
},
|
||||||
|
@ -61,7 +61,7 @@ func NewAWSFunctionWorkload(
|
|||||||
workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
|
workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
|
||||||
monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute, ctx)
|
monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute, ctx)
|
||||||
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
|
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
|
||||||
return workload.Destroy(false)
|
return workload.Destroy(nil)
|
||||||
})
|
})
|
||||||
return workload, nil
|
return workload, nil
|
||||||
}
|
}
|
||||||
@ -136,7 +136,7 @@ func (w *AWSFunctionWorkload) GetFileContent(_ string, _ http.ResponseWriter, _
|
|||||||
return dto.ErrNotSupported
|
return dto.ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *AWSFunctionWorkload) Destroy(_ bool) error {
|
func (w *AWSFunctionWorkload) Destroy(_ DestroyReason) error {
|
||||||
w.cancel()
|
w.cancel()
|
||||||
if err := w.onDestroy(w); err != nil {
|
if err := w.onDestroy(w); err != nil {
|
||||||
return fmt.Errorf("error while destroying aws runner: %w", err)
|
return fmt.Errorf("error while destroying aws runner: %w", err)
|
||||||
|
@ -147,7 +147,8 @@ func TestAWSFunctionWorkload_Destroy(t *testing.T) {
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = r.Destroy(false)
|
var reason error
|
||||||
|
err = r.Destroy(reason)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, hasDestroyBeenCalled)
|
assert.True(t, hasDestroyBeenCalled)
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ const (
|
|||||||
TimerExpired TimerState = 2
|
TimerExpired TimerState = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded")
|
var ErrorRunnerInactivityTimeout DestroyReason = errors.New("runner inactivity timeout exceeded")
|
||||||
|
|
||||||
type InactivityTimerImplementation struct {
|
type InactivityTimerImplementation struct {
|
||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
|
@ -71,7 +71,7 @@ func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int
|
|||||||
|
|
||||||
func (m *NomadRunnerManager) Return(r Runner) error {
|
func (m *NomadRunnerManager) Return(r Runner) error {
|
||||||
m.usedRunners.Delete(r.ID())
|
m.usedRunners.Delete(r.ID())
|
||||||
err := r.Destroy(false)
|
err := r.Destroy(ErrDestroyedByAPIRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("cannot return runner: %w", err)
|
err = fmt.Errorf("cannot return runner: %w", err)
|
||||||
}
|
}
|
||||||
@ -174,7 +174,7 @@ func monitorAllocationStartupDuration(startup time.Duration, runnerID string, en
|
|||||||
}
|
}
|
||||||
|
|
||||||
// onAllocationStopped is the callback for when Nomad stopped an allocation.
|
// onAllocationStopped is the callback for when Nomad stopped an allocation.
|
||||||
func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemoved bool) {
|
func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) (alreadyRemoved bool) {
|
||||||
log.WithField(dto.KeyRunnerID, runnerID).Debug("Runner stopped")
|
log.WithField(dto.KeyRunnerID, runnerID).Debug("Runner stopped")
|
||||||
|
|
||||||
if nomad.IsEnvironmentTemplateID(runnerID) {
|
if nomad.IsEnvironmentTemplateID(runnerID) {
|
||||||
@ -189,8 +189,17 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemove
|
|||||||
|
|
||||||
r, stillActive := m.usedRunners.Get(runnerID)
|
r, stillActive := m.usedRunners.Get(runnerID)
|
||||||
if stillActive {
|
if stillActive {
|
||||||
|
// Mask the internal stop reason because the runner might disclose/forward it to CodeOcean/externally.
|
||||||
|
switch {
|
||||||
|
case errors.Is(reason, nomad.ErrorOOMKilled):
|
||||||
|
reason = ErrOOMKilled
|
||||||
|
default:
|
||||||
|
log.WithField(dto.KeyRunnerID, runnerID).WithField("reason", reason).Debug("Internal reason for allocation stop")
|
||||||
|
reason = ErrAllocationStopped
|
||||||
|
}
|
||||||
|
|
||||||
m.usedRunners.Delete(runnerID)
|
m.usedRunners.Delete(runnerID)
|
||||||
if err := r.Destroy(true); err != nil {
|
if err := r.Destroy(reason); err != nil {
|
||||||
log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed")
|
log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,8 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() {
|
|||||||
s.Require().True(ok)
|
s.Require().True(ok)
|
||||||
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
|
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
|
||||||
|
|
||||||
testRunner := NewNomadJob(allocation.JobID, nil, nil, s.nomadRunnerManager.onRunnerDestroyed)
|
testRunner := NewNomadJob(allocation.JobID, nil, s.apiMock, s.nomadRunnerManager.onRunnerDestroyed)
|
||||||
|
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||||
environment.AddRunner(testRunner)
|
environment.AddRunner(testRunner)
|
||||||
s.nomadRunnerManager.usedRunners.Add(testRunner.ID(), testRunner)
|
s.nomadRunnerManager.usedRunners.Add(testRunner.ID(), testRunner)
|
||||||
|
|
||||||
@ -267,7 +268,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() {
|
|||||||
call.Run(func(args mock.Arguments) {
|
call.Run(func(args mock.Arguments) {
|
||||||
callbacks, ok := args.Get(1).(*nomad.AllocationProcessing)
|
callbacks, ok := args.Get(1).(*nomad.AllocationProcessing)
|
||||||
s.Require().True(ok)
|
s.Require().True(ok)
|
||||||
callbacks.OnDeleted(allocation.JobID)
|
callbacks.OnDeleted(allocation.JobID, nil)
|
||||||
call.ReturnArguments = mock.Arguments{nil}
|
call.ReturnArguments = mock.Arguments{nil}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -400,7 +401,7 @@ func testStoppedInactivityTimer(s *ManagerTestSuite, stopAllocation bool) {
|
|||||||
s.Require().False(runnerDestroyed)
|
s.Require().False(runnerDestroyed)
|
||||||
|
|
||||||
if stopAllocation {
|
if stopAllocation {
|
||||||
alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(runner.ID())
|
alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(runner.ID(), nil)
|
||||||
s.False(alreadyRemoved)
|
s.False(alreadyRemoved)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,6 +25,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
// runnerContextKey is the key used to store runners in context.Context.
|
// runnerContextKey is the key used to store runners in context.Context.
|
||||||
runnerContextKey dto.ContextKey = "runner"
|
runnerContextKey dto.ContextKey = "runner"
|
||||||
|
// destroyReasonContextKey is the key used to store the reason of the destruction in the context.Context.
|
||||||
|
destroyReasonContextKey dto.ContextKey = "destroyReason"
|
||||||
// SIGQUIT is the character that causes a tty to send the SIGQUIT signal to the controlled process.
|
// SIGQUIT is the character that causes a tty to send the SIGQUIT signal to the controlled process.
|
||||||
SIGQUIT = 0x1c
|
SIGQUIT = 0x1c
|
||||||
// executionTimeoutGracePeriod is the time to wait after sending a SIGQUIT signal to a timed out execution.
|
// executionTimeoutGracePeriod is the time to wait after sending a SIGQUIT signal to a timed out execution.
|
||||||
@ -36,9 +38,13 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrorUnknownExecution = errors.New("unknown execution")
|
ErrorUnknownExecution = errors.New("unknown execution")
|
||||||
ErrorFileCopyFailed = errors.New("file copy failed")
|
ErrorFileCopyFailed = errors.New("file copy failed")
|
||||||
ErrFileNotFound = errors.New("file not found or insufficient permissions")
|
ErrFileNotFound = errors.New("file not found or insufficient permissions")
|
||||||
|
ErrAllocationStopped DestroyReason = errors.New("the allocation stopped")
|
||||||
|
ErrOOMKilled DestroyReason = nomad.ErrorOOMKilled
|
||||||
|
ErrDestroyedByAPIRequest DestroyReason = errors.New("the client wants to stop the runner")
|
||||||
|
ErrCannotStopExecution DestroyReason = errors.New("the execution did not stop after SIGQUIT")
|
||||||
)
|
)
|
||||||
|
|
||||||
// NomadJob is an abstraction to communicate with Nomad environments.
|
// NomadJob is an abstraction to communicate with Nomad environments.
|
||||||
@ -71,7 +77,7 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
|
|||||||
job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
|
job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
|
||||||
monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute, ctx)
|
monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute, ctx)
|
||||||
job.InactivityTimer = NewInactivityTimer(job, func(r Runner) error {
|
job.InactivityTimer = NewInactivityTimer(job, func(r Runner) error {
|
||||||
err := r.Destroy(false)
|
err := r.Destroy(ErrorRunnerInactivityTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("NomadJob: %w", err)
|
err = fmt.Errorf("NomadJob: %w", err)
|
||||||
}
|
}
|
||||||
@ -230,14 +236,15 @@ func (r *NomadJob) GetFileContent(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *NomadJob) Destroy(local bool) (err error) {
|
func (r *NomadJob) Destroy(reason DestroyReason) (err error) {
|
||||||
|
r.ctx = context.WithValue(r.ctx, destroyReasonContextKey, reason)
|
||||||
r.cancel()
|
r.cancel()
|
||||||
r.StopTimeout()
|
r.StopTimeout()
|
||||||
if r.onDestroy != nil {
|
if r.onDestroy != nil {
|
||||||
err = r.onDestroy(r)
|
err = r.onDestroy(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !local && err == nil {
|
if err == nil && (!errors.Is(reason, ErrAllocationStopped) || !errors.Is(reason, ErrOOMKilled)) {
|
||||||
err = util.RetryExponential(time.Second, func() (err error) {
|
err = util.RetryExponential(time.Second, func() (err error) {
|
||||||
if err = r.api.DeleteJob(r.ID()); err != nil {
|
if err = r.api.DeleteJob(r.ID()); err != nil {
|
||||||
err = fmt.Errorf("error deleting runner in Nomad: %w", err)
|
err = fmt.Errorf("error deleting runner in Nomad: %w", err)
|
||||||
@ -290,14 +297,22 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := ctx.Err()
|
err := ctx.Err()
|
||||||
if r.TimeoutPassed() {
|
if reason, ok := r.ctx.Value(destroyReasonContextKey).(error); ok {
|
||||||
err = ErrorRunnerInactivityTimeout
|
err = reason
|
||||||
|
if r.TimeoutPassed() && !errors.Is(err, ErrorRunnerInactivityTimeout) {
|
||||||
|
log.WithError(err).Warn("Wrong destroy reason for expired runner")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// From this time on the WebSocket connection to the client is closed in /internal/api/websocket.go
|
// From this time on the WebSocket connection to the client is closed in /internal/api/websocket.go
|
||||||
// waitForExit. Input can still be sent to the executor.
|
// waitForExit. Input can still be sent to the executor.
|
||||||
exit <- ExitInfo{255, err}
|
exit <- ExitInfo{255, err}
|
||||||
|
|
||||||
|
// This condition prevents further interaction with a stopped / dead allocation.
|
||||||
|
if errors.Is(err, ErrAllocationStopped) || errors.Is(err, ErrOOMKilled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// This injects the SIGQUIT character into the stdin. This character is parsed by the tty line discipline
|
// This injects the SIGQUIT character into the stdin. This character is parsed by the tty line discipline
|
||||||
// (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty.
|
// (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty.
|
||||||
// By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal
|
// By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal
|
||||||
@ -318,8 +333,8 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co
|
|||||||
case <-exitInternal:
|
case <-exitInternal:
|
||||||
log.WithContext(ctx).Debug("Execution terminated after SIGQUIT")
|
log.WithContext(ctx).Debug("Execution terminated after SIGQUIT")
|
||||||
case <-time.After(executionTimeoutGracePeriod):
|
case <-time.After(executionTimeoutGracePeriod):
|
||||||
log.WithContext(ctx).Info("Execution did not quit after SIGQUIT")
|
log.WithContext(ctx).Info(ErrCannotStopExecution.Error())
|
||||||
if err := r.Destroy(false); err != nil {
|
if err := r.Destroy(ErrCannotStopExecution); err != nil {
|
||||||
log.WithContext(ctx).Error("Error when destroying runner")
|
log.WithContext(ctx).Error("Error when destroying runner")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,12 +114,14 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() {
|
|||||||
s.manager = &ManagerMock{}
|
s.manager = &ManagerMock{}
|
||||||
s.manager.On("Return", mock.Anything).Return(nil)
|
s.manager.On("Return", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
s.runner = &NomadJob{
|
s.runner = &NomadJob{
|
||||||
executions: storage.NewLocalStorage[*dto.ExecutionRequest](),
|
executions: storage.NewLocalStorage[*dto.ExecutionRequest](),
|
||||||
InactivityTimer: s.timer,
|
InactivityTimer: s.timer,
|
||||||
id: tests.DefaultRunnerID,
|
id: tests.DefaultRunnerID,
|
||||||
api: s.apiMock,
|
api: s.apiMock,
|
||||||
ctx: context.Background(),
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,12 +237,30 @@ func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut(
|
|||||||
executionRequest := &dto.ExecutionRequest{}
|
executionRequest := &dto.ExecutionRequest{}
|
||||||
s.runner.StoreExecution(defaultExecutionID, executionRequest)
|
s.runner.StoreExecution(defaultExecutionID, executionRequest)
|
||||||
|
|
||||||
exitChannel, cancel, err := s.runner.ExecuteInteractively(
|
exitChannel, _, err := s.runner.ExecuteInteractively(
|
||||||
defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background())
|
defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
cancel()
|
err = s.runner.Destroy(ErrorRunnerInactivityTimeout)
|
||||||
|
s.Require().NoError(err)
|
||||||
exit := <-exitChannel
|
exit := <-exitChannel
|
||||||
s.Equal(ErrorRunnerInactivityTimeout, exit.Err)
|
s.ErrorIs(exit.Err, ErrorRunnerInactivityTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ExecuteInteractivelyTestSuite) TestDestroyReasonIsPassedToExecution() {
|
||||||
|
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
||||||
|
select {}
|
||||||
|
}).Return(0, nil)
|
||||||
|
s.mockedTimeoutPassedCall.Return(true)
|
||||||
|
executionRequest := &dto.ExecutionRequest{}
|
||||||
|
s.runner.StoreExecution(defaultExecutionID, executionRequest)
|
||||||
|
|
||||||
|
exitChannel, _, err := s.runner.ExecuteInteractively(
|
||||||
|
defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background())
|
||||||
|
s.Require().NoError(err)
|
||||||
|
err = s.runner.Destroy(ErrOOMKilled)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
exit := <-exitChannel
|
||||||
|
s.ErrorIs(exit.Err, ErrOOMKilled)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateFileSystemTestSuite(t *testing.T) {
|
func TestUpdateFileSystemTestSuite(t *testing.T) {
|
||||||
|
@ -17,6 +17,9 @@ type ExitInfo struct {
|
|||||||
|
|
||||||
type DestroyRunnerHandler = func(r Runner) error
|
type DestroyRunnerHandler = func(r Runner) error
|
||||||
|
|
||||||
|
// DestroyReason specifies errors that are expected as reason for destroying a runner.
|
||||||
|
type DestroyReason error
|
||||||
|
|
||||||
type Runner interface {
|
type Runner interface {
|
||||||
InactivityTimer
|
InactivityTimer
|
||||||
|
|
||||||
@ -59,8 +62,8 @@ type Runner interface {
|
|||||||
GetFileContent(path string, content http.ResponseWriter, privilegedExecution bool, ctx context.Context) error
|
GetFileContent(path string, content http.ResponseWriter, privilegedExecution bool, ctx context.Context) error
|
||||||
|
|
||||||
// Destroy destroys the Runner in Nomad.
|
// Destroy destroys the Runner in Nomad.
|
||||||
// Iff local is true, the destruction will not be propagated to external systems.
|
// Depending on the reason special cases of the Destruction will be handled.
|
||||||
Destroy(local bool) error
|
Destroy(reason DestroyReason) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewContext creates a context containing a runner.
|
// NewContext creates a context containing a runner.
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Code generated by mockery v2.30.1. DO NOT EDIT.
|
// Code generated by mockery v2.30.16. DO NOT EDIT.
|
||||||
|
|
||||||
package runner
|
package runner
|
||||||
|
|
||||||
@ -20,13 +20,13 @@ type RunnerMock struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// Destroy provides a mock function with given fields: local
|
// Destroy provides a mock function with given fields: reason
|
||||||
func (_m *RunnerMock) Destroy(local bool) error {
|
func (_m *RunnerMock) Destroy(reason DestroyReason) error {
|
||||||
ret := _m.Called(local)
|
ret := _m.Called(reason)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
if rf, ok := ret.Get(0).(func(bool) error); ok {
|
if rf, ok := ret.Get(0).(func(DestroyReason) error); ok {
|
||||||
r0 = rf(local)
|
r0 = rf(reason)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Error(0)
|
r0 = ret.Error(0)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user