diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 044238d..d85fc0f 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -37,7 +37,7 @@ type AllocationProcessing struct { OnNew NewAllocationProcessor OnDeleted DeletedAllocationProcessor } -type DeletedAllocationProcessor func(jobID string) +type DeletedAllocationProcessor func(jobID string) (removedByPoseidon bool) type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration) type allocationData struct { @@ -191,7 +191,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) go func() { err = a.WatchEventStream(ctx, &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ string) {}, + OnDeleted: func(_ string) bool { return false }, }) cancel() // cancel the waiting for an evaluation result if watching the event stream ends. }() @@ -415,7 +415,15 @@ func handlePendingAllocationEvent(alloc *nomadApi.Allocation, allocData *allocat } else if alloc.PreviousAllocation != "" { // Handle Runner (/Container) re-allocations. if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok { - callbacks.OnDeleted(prevData.jobID) + if removedByPoseidon := callbacks.OnDeleted(prevData.jobID); removedByPoseidon { + // This case handles a race condition between the overdue runner inactivity timeout and the rescheduling of a + // lost allocation. The race condition leads first to the rescheduling of the runner, but right after to it + // being stopped. Instead of reporting an unexpected stop of the pending runner, we just not start tracking it. + return + } + // Improve: When returning in the step before the allocation data will never be removed (data leak). + // But it also shouldn't be removed as the current event could be received multiple times. + // If we removed the allocation data the previous case handling wouldn't be triggered for the replicated event. allocations.Delete(alloc.PreviousAllocation) } else { log.WithField("alloc", alloc).Warn("Previous Allocation not found") @@ -483,14 +491,15 @@ func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocation func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, expectedStop bool) { + removedByPoseidon := false if alloc.NextAllocation == "" { - callbacks.OnDeleted(alloc.JobID) + removedByPoseidon = callbacks.OnDeleted(alloc.JobID) allocations.Delete(alloc.ID) } entry := log.WithField("job", alloc.JobID) replacementAllocationScheduled := alloc.NextAllocation != "" - if expectedStop == replacementAllocationScheduled { + if !removedByPoseidon && expectedStop == replacementAllocationScheduled { entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting") } else { entry.Debug("Expected Allocation Stopping / Restarting") diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index ded934a..a8e9dcf 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -26,7 +26,7 @@ import ( var ( noopAllocationProcessing = &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ string) {}, + OnDeleted: func(_ string) bool { return false }, } ErrUnexpectedEOF = errors.New("unexpected EOF") ) @@ -576,6 +576,26 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { }) } +func TestHandleAllocationEvent_IgnoresReschedulesForStoppedJobs(t *testing.T) { + startedAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + rescheduledAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + rescheduledAllocation.ID = tests.AnotherUUID + rescheduledAllocation.PreviousAllocation = startedAllocation.ID + rescheduledEvent := eventForAllocation(t, rescheduledAllocation) + + allocations := storage.NewLocalStorage[*allocationData]() + allocations.Add(startedAllocation.ID, &allocationData{jobID: startedAllocation.JobID}) + + err := handleAllocationEvent(time.Now().UnixNano(), allocations, &rescheduledEvent, &AllocationProcessing{ + OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, + OnDeleted: func(_ string) bool { return true }, + }) + require.NoError(t, err) + + _, ok := allocations.Get(rescheduledAllocation.ID) + assert.False(t, ok) +} + func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) { apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) @@ -618,8 +638,9 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) { newAllocations = append(newAllocations, alloc) }, - OnDeleted: func(jobID string) { + OnDeleted: func(jobID string) bool { deletedAllocations = append(deletedAllocations, jobID) + return false }, } diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 656e553..c8aed76 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -177,22 +177,26 @@ func monitorAllocationStartupDuration(startup time.Duration, runnerID string, en monitoring.WriteInfluxPoint(p) } -func (m *NomadRunnerManager) onAllocationStopped(runnerID string) { +func (m *NomadRunnerManager) onAllocationStopped(runnerID string) (alreadyRemoved bool) { log.WithField("id", runnerID).Debug("Runner stopped") if nomad.IsEnvironmentTemplateID(runnerID) { - return + return false } environmentID, err := nomad.EnvironmentIDFromRunnerID(runnerID) if err != nil { log.WithError(err).Warn("Stopped allocation can not be handled") - return + return false } + _, stillActive := m.usedRunners.Get(runnerID) m.usedRunners.Delete(runnerID) + environment, ok := m.environments.Get(environmentID.ToString()) if ok { environment.DeleteRunner(runnerID) } + + return !stillActive }