From b620d0fad7b18755b87849c3620f4e3f0e7f5158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Thu, 1 Jun 2023 01:40:37 +0100 Subject: [PATCH] Introduce Allocation State Tracking in order to break down the current state and evaluate if it is invalid. --- internal/nomad/executor_api_mock.go | 4 +- internal/nomad/nomad.go | 218 +++++++++++++++++--------- internal/nomad/nomad_test.go | 181 ++++++++++++--------- internal/runner/nomad_manager.go | 14 +- internal/runner/nomad_manager_test.go | 6 +- 5 files changed, 266 insertions(+), 157 deletions(-) diff --git a/internal/nomad/executor_api_mock.go b/internal/nomad/executor_api_mock.go index 02b8bbc..bc63be1 100644 --- a/internal/nomad/executor_api_mock.go +++ b/internal/nomad/executor_api_mock.go @@ -343,11 +343,11 @@ func (_m *ExecutorAPIMock) SetJobScale(jobID string, count uint, reason string) } // WatchEventStream provides a mock function with given fields: ctx, callbacks -func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error { +func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error { ret := _m.Called(ctx, callbacks) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *AllocationProcessoring) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *AllocationProcessing) error); ok { r0 = rf(ctx, callbacks) } else { r0 = ret.Error(0) diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 93cc393..044238d 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -32,19 +32,21 @@ var ( // resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected. const resultChannelWriteTimeout = 10 * time.Millisecond -// AllocationProcessoring includes the callbacks to interact with allcoation events. -type AllocationProcessoring struct { - OnNew AllocationProcessorMonitored - OnDeleted AllocationProcessor +// AllocationProcessing includes the callbacks to interact with allocation events. +type AllocationProcessing struct { + OnNew NewAllocationProcessor + OnDeleted DeletedAllocationProcessor } -type AllocationProcessor func(*nomadApi.Allocation) -type AllocationProcessorMonitored func(*nomadApi.Allocation, time.Duration) +type DeletedAllocationProcessor func(jobID string) +type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration) type allocationData struct { // allocClientStatus defines the state defined by Nomad. allocClientStatus string - jobID string - start time.Time + // allocDesiredStatus defines if the allocation wants to be running or being stopped. + allocDesiredStatus string + jobID string + start time.Time // Just debugging information allocNomadNode string } @@ -79,7 +81,7 @@ type ExecutorAPI interface { // WatchEventStream listens on the Nomad event stream for allocation and evaluation events. // Depending on the incoming event, any of the given function is executed. // Do not run multiple times simultaneously. - WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error + WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error // ExecuteCommand executes the given command in the job/runner with the given id. // It writes the output of the command to stdout/stderr and reads input from stdin. @@ -187,9 +189,9 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) defer cancel() // cancel the WatchEventStream when the evaluation result was read. go func() { - err = a.WatchEventStream(ctx, &AllocationProcessoring{ + err = a.WatchEventStream(ctx, &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ *nomadApi.Allocation) {}, + OnDeleted: func(_ string) {}, }) cancel() // cancel the waiting for an evaluation result if watching the event stream ends. }() @@ -204,7 +206,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) } } -func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error { +func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error { startTime := time.Now().UnixNano() stream, err := a.EventStream(ctx) if err != nil { @@ -253,10 +255,11 @@ func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) { case stub.ClientStatus == structs.AllocClientStatusPending || stub.ClientStatus == structs.AllocClientStatusRunning: log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation") a.allocations.Add(stub.ID, &allocationData{ - allocClientStatus: stub.ClientStatus, - jobID: stub.JobID, - start: time.Unix(0, stub.CreateTime), - allocNomadNode: stub.NodeName, + allocClientStatus: stub.ClientStatus, + allocDesiredStatus: stub.DesiredStatus, + jobID: stub.JobID, + start: time.Unix(0, stub.CreateTime), + allocNomadNode: stub.NodeName, }) } } @@ -318,7 +321,7 @@ func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Ev // is called. The allocations storage is used to track pending and running allocations. Using the // storage the state is persisted between multiple calls of this function. func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData], - event *nomadApi.Event, callbacks *AllocationProcessoring) error { + event *nomadApi.Event, callbacks *AllocationProcessing) error { alloc, err := event.Allocation() if err != nil { return fmt.Errorf("failed to retrieve allocation from event: %w", err) @@ -329,6 +332,8 @@ func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocat log.WithField("alloc_id", alloc.ID). WithField("ClientStatus", alloc.ClientStatus). WithField("DesiredStatus", alloc.DesiredStatus). + WithField("PrevAllocation", alloc.PreviousAllocation). + WithField("NextAllocation", alloc.NextAllocation). Debug("Handle Allocation Event") // When starting the API and listening on the Nomad event stream we might get events that already @@ -338,95 +343,158 @@ func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocat return nil } + if valid := filterDuplicateEvents(alloc, allocations); !valid { + return nil + } + + allocData := updateAllocationData(alloc, allocations) + switch alloc.ClientStatus { case structs.AllocClientStatusPending: - handlePendingAllocationEvent(alloc, allocations, callbacks) + handlePendingAllocationEvent(alloc, allocData, allocations, callbacks) case structs.AllocClientStatusRunning: - handleRunningAllocationEvent(alloc, allocations, callbacks) + handleRunningAllocationEvent(alloc, allocData, allocations, callbacks) case structs.AllocClientStatusComplete: - handleCompleteAllocationEvent(alloc, allocations, callbacks) + handleCompleteAllocationEvent(alloc, allocData, allocations, callbacks) case structs.AllocClientStatusFailed: - handleFailedAllocationEvent(alloc) + handleFailedAllocationEvent(alloc, allocData, allocations, callbacks) + case structs.AllocClientStatusLost: + handleLostAllocationEvent(alloc, allocData, allocations, callbacks) default: log.WithField("alloc", alloc).Warn("Other Client Status") } return nil } +// filterDuplicateEvents identifies duplicate events or events of unknown allocations. +func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (valid bool) { + newAllocationExpected := alloc.ClientStatus == structs.AllocClientStatusPending && + alloc.DesiredStatus == structs.AllocDesiredStatusRun + allocData, ok := allocations.Get(alloc.ID) + + switch { + case !ok && newAllocationExpected: + return true + case !ok: + // This case happens in case of an error or when an event that led to the deletion of the alloc data is duplicated. + log.WithField("alloc", alloc).Trace("Ignoring unknown allocation") + return false + case alloc.ClientStatus == allocData.allocClientStatus && alloc.DesiredStatus == allocData.allocDesiredStatus: + log.WithField("alloc", alloc).Trace("Ignoring duplicate event") + return false + default: + return true + } +} + +// updateAllocationData updates the allocation tracking data according to the passed alloc. +// The allocation data before this allocation update is returned. +func updateAllocationData( + alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (previous *allocationData) { + allocData, ok := allocations.Get(alloc.ID) + if ok { + data := *allocData + previous = &data + + allocData.allocClientStatus = alloc.ClientStatus + allocData.allocDesiredStatus = alloc.DesiredStatus + allocations.Add(alloc.ID, allocData) + } + return previous +} + // handlePendingAllocationEvent manages allocation that are currently pending. // This allows the handling of startups and re-placements of allocations. -func handlePendingAllocationEvent(alloc *nomadApi.Allocation, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) { - if alloc.DesiredStatus == structs.AllocDesiredStatusRun { - allocData, ok := allocations.Get(alloc.ID) - if ok && allocData.allocClientStatus != structs.AllocClientStatusRunning { - // Pending Allocation is already stored. - // This happens because depending on the startup duration of the runner, we get zero, one, or more events - // notifying us that the allocation is still pending. - // We are just interested in the first event, in order to have the correct start time. - return - } else if ok { +func handlePendingAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData, + allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) { + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusRun: + if allocData != nil { + // Handle Allocation restart. + callbacks.OnDeleted(alloc.JobID) + } else if alloc.PreviousAllocation != "" { // Handle Runner (/Container) re-allocations. - callbacks.OnDeleted(alloc) + if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok { + callbacks.OnDeleted(prevData.jobID) + allocations.Delete(alloc.PreviousAllocation) + } else { + log.WithField("alloc", alloc).Warn("Previous Allocation not found") + } } + // Store Pending Allocation - Allocation gets started, wait until it runs. allocations.Add(alloc.ID, &allocationData{ - allocClientStatus: structs.AllocClientStatusPending, - jobID: alloc.JobID, - start: time.Now(), - allocNomadNode: alloc.NodeName, + allocClientStatus: alloc.ClientStatus, + allocDesiredStatus: alloc.DesiredStatus, + jobID: alloc.JobID, + start: time.Now(), + allocNomadNode: alloc.NodeName, }) - } else { + case structs.AllocDesiredStatusStop: + handleStoppingAllocationEvent(alloc, allocations, callbacks, false) + default: log.WithField("alloc", alloc).Warn("Other Desired Status") } } // handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events. -func handleRunningAllocationEvent(alloc *nomadApi.Allocation, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) { - if alloc.DesiredStatus == structs.AllocDesiredStatusRun { - // is first event that marks the transition between pending and running? - if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusPending { - startupDuration := time.Since(allocData.start) - callbacks.OnNew(alloc, startupDuration) - allocData.allocClientStatus = structs.AllocClientStatusRunning - } - } -} - -// handleCompleteAllocationEvent handles allocations that stopped. -func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, - allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) { - if alloc.DesiredStatus == structs.AllocDesiredStatusStop { - if _, ok := allocations.Get(alloc.ID); ok { - callbacks.OnDeleted(alloc) - allocations.Delete(alloc.ID) - } - } else { +func handleRunningAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData, + _ storage.Storage[*allocationData], callbacks *AllocationProcessing) { + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusRun: + startupDuration := time.Since(allocData.start) + callbacks.OnNew(alloc, startupDuration) + case structs.AllocDesiredStatusStop: + // It is normal that running allocations will stop. We will handle it when it is stopped. + default: log.WithField("alloc", alloc).Warn("Other Desired Status") } } -// handleFailedAllocationEvent logs only the first of the multiple failure events. -func handleFailedAllocationEvent(alloc *nomadApi.Allocation) { - if alloc.FollowupEvalID == "" && alloc.PreviousAllocation == "" { - log.WithField("job", alloc.JobID). - WithField("reason", failureDisplayMessage(alloc)). - WithField("alloc", alloc). - Warn("Allocation failure") +// handleCompleteAllocationEvent handles allocations that stopped. +func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData, + allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) { + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusRun: + log.WithField("alloc", alloc).Warn("Complete allocation desires to run") + case structs.AllocDesiredStatusStop: + callbacks.OnDeleted(alloc.JobID) + allocations.Delete(alloc.ID) + default: + log.WithField("alloc", alloc).Warn("Other Desired Status") } } -// failureDisplayMessage parses the DisplayMessage of a failed allocation. -func failureDisplayMessage(alloc *nomadApi.Allocation) (msg string) { - for _, state := range alloc.TaskStates { - for _, event := range state.Events { - if event.FailsTask { - return event.DisplayMessage - } - } +// handleFailedAllocationEvent logs only the last of the multiple failure events. +func handleFailedAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData, + allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) { + // The stop is expected when the allocation desired to stop even before it failed. + expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop + handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop) +} + +// handleLostAllocationEvent logs only the last of the multiple lost events. +func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData, + allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) { + // The stop is expected when the allocation desired to stop even before it got lost. + expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop + handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop) +} + +func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData], + callbacks *AllocationProcessing, expectedStop bool) { + if alloc.NextAllocation == "" { + callbacks.OnDeleted(alloc.JobID) + allocations.Delete(alloc.ID) + } + + entry := log.WithField("job", alloc.JobID) + replacementAllocationScheduled := alloc.NextAllocation != "" + if expectedStop == replacementAllocationScheduled { + entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting") + } else { + entry.Debug("Expected Allocation Stopping / Restarting") } - return "" } // checkEvaluation checks whether the given evaluation failed. diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index 90ad725..ded934a 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -24,9 +24,9 @@ import ( ) var ( - noopAllocationProcessoring = &AllocationProcessoring{ + noopAllocationProcessing = &AllocationProcessing{ OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, - OnDeleted: func(_ *nomadApi.Allocation) {}, + OnDeleted: func(_ string) {}, } ErrUnexpectedEOF = errors.New("unexpected EOF") ) @@ -431,7 +431,7 @@ func TestApiClient_WatchAllocationsIgnoresOldAllocations(t *testing.T) { }} assertWatchAllocation(t, []*nomadApi.Events{&oldAllocationEvents}, - []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil)) + []*nomadApi.Allocation(nil), []string(nil)) } func createOldAllocation(clientStatus, desiredStatus string) *nomadApi.Allocation { @@ -445,67 +445,107 @@ func TestApiClient_WatchAllocationsIgnoresUnhandledEvents(t *testing.T) { Type: structs.TypeNodeEvent, }, }} - assertWatchAllocation(t, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil)) - - planEvents := nomadApi.Events{Events: []nomadApi.Event{ - { - Topic: nomadApi.TopicAllocation, - Type: structs.TypePlanResult, - }, - }} - assertWatchAllocation(t, []*nomadApi.Events{&planEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil)) + assertWatchAllocation(t, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []string(nil)) } -func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) { - newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) - pendingAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, newPendingAllocation)}} +func TestApiClient_WatchAllocationsUsesCallbacksForEvents(t *testing.T) { + pendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + pendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, pendingAllocation)}} - newStartedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) - startAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{ - eventForAllocation(t, newPendingAllocation), - eventForAllocation(t, newStartedAllocation), + t.Run("it does not add allocation when client status is pending", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingEvents}, []*nomadApi.Allocation(nil), []string(nil)) + }) + + startedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + startedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, startedAllocation)}} + pendingStartedEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(t, pendingAllocation), eventForAllocation(t, startedAllocation)}} + + t.Run("it adds allocation with matching events", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents}, + []*nomadApi.Allocation{startedAllocation}, []string(nil)) + }) + + t.Run("it skips heartbeat and adds allocation with matching events", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents}, + []*nomadApi.Allocation{startedAllocation}, []string(nil)) + }) + + stoppedAllocation := createRecentAllocation(structs.AllocClientStatusComplete, structs.AllocDesiredStatusStop) + stoppedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, stoppedAllocation)}} + pendingStartStopEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(t, pendingAllocation), + eventForAllocation(t, startedAllocation), + eventForAllocation(t, stoppedAllocation), }} - newStoppedAllocation := createRecentAllocation(structs.AllocClientStatusComplete, structs.AllocDesiredStatusStop) - stopAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{ - eventForAllocation(t, newPendingAllocation), - eventForAllocation(t, newStartedAllocation), - eventForAllocation(t, newStoppedAllocation), - }} + t.Run("it adds and deletes the allocation", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartStopEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{stoppedAllocation.JobID}) + }) - var cases = []struct { - streamedEvents []*nomadApi.Events - expectedNewAllocations []*nomadApi.Allocation - expectedDeletedAllocations []*nomadApi.Allocation - name string - }{ - {[]*nomadApi.Events{&pendingAllocationEvents}, - []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil), - "it does not add allocation when client status is pending"}, - {[]*nomadApi.Events{&startAllocationEvents}, - []*nomadApi.Allocation{newStartedAllocation}, - []*nomadApi.Allocation(nil), - "it adds allocation with matching events"}, - {[]*nomadApi.Events{{}, &startAllocationEvents}, - []*nomadApi.Allocation{newStartedAllocation}, - []*nomadApi.Allocation(nil), - "it skips heartbeat and adds allocation with matching events"}, - {[]*nomadApi.Events{&stopAllocationEvents}, - []*nomadApi.Allocation{newStartedAllocation}, - []*nomadApi.Allocation{newStoppedAllocation}, - "it adds and deletes the allocation"}, - {[]*nomadApi.Events{&startAllocationEvents, &startAllocationEvents}, - []*nomadApi.Allocation{newStartedAllocation, newStartedAllocation}, - []*nomadApi.Allocation{newPendingAllocation}, - "it handles multiple events"}, - } + t.Run("it ignores duplicate events", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingEvents, &startedEvents, &startedEvents, + &stoppedEvents, &stoppedEvents, &stoppedEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID}) + }) - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - assertWatchAllocation(t, c.streamedEvents, - c.expectedNewAllocations, c.expectedDeletedAllocations) - }) - } + t.Run("it ignores events of unknown allocations", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&startedEvents, &startedEvents, + &stoppedEvents, &stoppedEvents, &stoppedEvents}, []*nomadApi.Allocation(nil), []string(nil)) + }) + + t.Run("it removes restarted allocations", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &pendingStartedEvents}, + []*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID}) + }) + + rescheduleAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + rescheduleAllocation.ID = tests.AnotherUUID + rescheduleAllocation.PreviousAllocation = pendingAllocation.ID + rescheduleStartedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + rescheduleStartedAllocation.ID = tests.AnotherUUID + rescheduleAllocation.PreviousAllocation = pendingAllocation.ID + rescheduleEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(t, rescheduleAllocation), eventForAllocation(t, rescheduleStartedAllocation)}} + + t.Run("it removes rescheduled allocations", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &rescheduleEvents}, + []*nomadApi.Allocation{startedAllocation, rescheduleStartedAllocation}, []string{startedAllocation.JobID}) + }) + + stoppedPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusStop) + stoppedPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, stoppedPendingAllocation)}} + + t.Run("it removes stopped pending allocations", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingEvents, &stoppedPendingEvents}, + []*nomadApi.Allocation(nil), []string{stoppedPendingAllocation.JobID}) + }) + + failedAllocation := createRecentAllocation(structs.AllocClientStatusFailed, structs.AllocDesiredStatusStop) + failedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, failedAllocation)}} + + t.Run("it removes stopped failed allocations", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &failedEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{failedAllocation.JobID}) + }) + + lostAllocation := createRecentAllocation(structs.AllocClientStatusLost, structs.AllocDesiredStatusStop) + lostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, lostAllocation)}} + + t.Run("it removes stopped lost allocations", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &lostEvents}, + []*nomadApi.Allocation{startedAllocation}, []string{lostAllocation.JobID}) + }) + + rescheduledLostAllocation := createRecentAllocation(structs.AllocClientStatusLost, structs.AllocDesiredStatusStop) + rescheduledLostAllocation.NextAllocation = tests.AnotherUUID + rescheduledLostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, rescheduledLostAllocation)}} + + t.Run("it removes lost allocations not before the last restart attempt", func(t *testing.T) { + assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &rescheduledLostEvents}, + []*nomadApi.Allocation{startedAllocation}, []string(nil)) + }) } func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { @@ -515,7 +555,7 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { allocations := storage.NewLocalStorage[*allocationData]() err := handleAllocationEvent( - time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring) + time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing) require.NoError(t, err) _, ok := allocations.Get(newPendingAllocation.ID) @@ -528,7 +568,7 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { allocations := storage.NewLocalStorage[*allocationData]() err := handleAllocationEvent( - time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring) + time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing) require.NoError(t, err) _, ok := allocations.Get(newPendingAllocation.ID) @@ -541,7 +581,7 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetri apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false} - err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessoring) + err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessing) assert.ErrorIs(t, err, tests.ErrDefault) } @@ -557,29 +597,29 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWi require.Error(t, err) events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}} - eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring) + eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessing) assert.Error(t, err) assert.Equal(t, 1, eventsProcessed) } func TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF(t *testing.T) { events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}} - eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring) + eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessing) assert.Error(t, err) assert.Equal(t, 1, eventsProcessed) } func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, - expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) { + expectedNewAllocations []*nomadApi.Allocation, expectedDeletedAllocations []string) { t.Helper() var newAllocations []*nomadApi.Allocation - var deletedAllocations []*nomadApi.Allocation - callbacks := &AllocationProcessoring{ + var deletedAllocations []string + callbacks := &AllocationProcessing{ OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) { newAllocations = append(newAllocations, alloc) }, - OnDeleted: func(alloc *nomadApi.Allocation) { - deletedAllocations = append(deletedAllocations, alloc) + OnDeleted: func(jobID string) { + deletedAllocations = append(deletedAllocations, jobID) }, } @@ -594,7 +634,7 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, // runAllocationWatching simulates events streamed from the Nomad event stream // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // and sequentially transfers the events from the given array to a channel simulating the stream. -func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessoring) ( +func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessing) ( eventsProcessed int, err error) { t.Helper() stream := make(chan *nomadApi.Events) @@ -606,7 +646,7 @@ func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *A // runs the MonitorEvaluation method in a goroutine. The mock returns a read-only // version of the given stream to simulate an event stream gotten from the real // Nomad API. -func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *AllocationProcessoring) chan error { +func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *AllocationProcessing) chan error { ctx := context.Background() // We can only get a read-only channel once we return it from a function. readOnlyStream := func() <-chan *nomadApi.Events { return stream }() @@ -644,7 +684,8 @@ func eventForAllocation(t *testing.T, alloc *nomadApi.Allocation) nomadApi.Event func createAllocation(modifyTime int64, clientStatus, desiredStatus string) *nomadApi.Allocation { return &nomadApi.Allocation{ - ID: tests.DefaultRunnerID, + ID: tests.DefaultUUID, + JobID: tests.DefaultRunnerID, ModifyTime: modifyTime, ClientStatus: clientStatus, DesiredStatus: desiredStatus, diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index def4cbd..656e553 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -133,7 +133,7 @@ func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { retries := 0 for ctx.Err() == nil { err := m.apiClient.WatchEventStream(ctx, - &nomad.AllocationProcessoring{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) + &nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) retries += 1 log.WithContext(ctx).WithError(err).Errorf("Stopped updating the runners! Retry %v", retries) <-time.After(time.Second) @@ -177,22 +177,22 @@ func monitorAllocationStartupDuration(startup time.Duration, runnerID string, en monitoring.WriteInfluxPoint(p) } -func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) { - log.WithField("id", alloc.JobID).Debug("Runner stopped") +func (m *NomadRunnerManager) onAllocationStopped(runnerID string) { + log.WithField("id", runnerID).Debug("Runner stopped") - if nomad.IsEnvironmentTemplateID(alloc.JobID) { + if nomad.IsEnvironmentTemplateID(runnerID) { return } - environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID) + environmentID, err := nomad.EnvironmentIDFromRunnerID(runnerID) if err != nil { log.WithError(err).Warn("Stopped allocation can not be handled") return } - m.usedRunners.Delete(alloc.JobID) + m.usedRunners.Delete(runnerID) environment, ok := m.environments.Get(environmentID.ToString()) if ok { - environment.DeleteRunner(alloc.JobID) + environment.DeleteRunner(runnerID) } } diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 6056a72..4af0f70 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -233,7 +233,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { - callbacks, ok := args.Get(1).(*nomad.AllocationProcessoring) + callbacks, ok := args.Get(1).(*nomad.AllocationProcessing) s.Require().True(ok) callbacks.OnNew(allocation, 0) call.ReturnArguments = mock.Arguments{nil} @@ -261,9 +261,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { - callbacks, ok := args.Get(1).(*nomad.AllocationProcessoring) + callbacks, ok := args.Get(1).(*nomad.AllocationProcessing) s.Require().True(ok) - callbacks.OnDeleted(allocation) + callbacks.OnDeleted(allocation.JobID) call.ReturnArguments = mock.Arguments{nil} }) })