diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 13be4dd..9515d37 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -390,10 +390,10 @@ func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Stora return true case !ok: // This case happens in case of an error or when an event that led to the deletion of the alloc data is duplicated. - log.WithField("alloc", alloc).Debug("Ignoring unknown allocation") + log.WithField("allocID", alloc.ID).Debug("Ignoring unknown allocation") return false case alloc.ClientStatus == allocData.allocClientStatus && alloc.DesiredStatus == allocData.allocDesiredStatus: - log.WithField("alloc", alloc).Debug("Ignoring duplicate event") + log.WithField("allocID", alloc.ID).Debug("Ignoring duplicate event") return false default: return true diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index f95d78c..9b55751 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -12,7 +12,6 @@ import ( "github.com/openHPI/poseidon/pkg/nullio" "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/tests" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "io" @@ -241,6 +240,7 @@ func eventForEvaluation(t *testing.T, eval *nomadApi.Evaluation) nomadApi.Event // simulateNomadEventStream streams the given events sequentially to the stream channel. // It returns how many events have been processed until an error occurred. func simulateNomadEventStream( + ctx context.Context, stream chan<- *nomadApi.Events, errChan chan error, events []*nomadApi.Events, @@ -259,7 +259,7 @@ func simulateNomadEventStream( // Wait for last event being processed var err error select { - case <-time.After(10 * time.Millisecond): + case <-ctx.Done(): case err = <-errChan: } return eventsProcessed, err @@ -268,10 +268,10 @@ func simulateNomadEventStream( // runEvaluationMonitoring simulates events streamed from the Nomad event stream // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // and sequentially transfers the events from the given array to a channel simulating the stream. -func runEvaluationMonitoring(events []*nomadApi.Events) (eventsProcessed int, err error) { +func runEvaluationMonitoring(ctx context.Context, events []*nomadApi.Events) (eventsProcessed int, err error) { stream := make(chan *nomadApi.Events) errChan := asynchronouslyMonitorEvaluation(stream) - return simulateNomadEventStream(stream, errChan, events) + return simulateNomadEventStream(ctx, stream, errChan, events) } func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithSuccessfulEvent() { @@ -306,7 +306,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithSuccessfulEvent() { for _, c := range cases { s.Run(c.name, func() { - eventsProcessed, err := runEvaluationMonitoring(c.streamedEvents) + eventsProcessed, err := runEvaluationMonitoring(s.TestCtx, c.streamedEvents) s.Nil(err) s.Equal(c.expectedEventsProcessed, eventsProcessed) }) @@ -347,7 +347,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithFailingEvent() { for _, c := range cases { s.Run(c.name, func() { - eventsProcessed, err := runEvaluationMonitoring(c.streamedEvents) + eventsProcessed, err := runEvaluationMonitoring(s.TestCtx, c.streamedEvents) s.Require().NotNil(err) s.Contains(err.Error(), c.expectedError.Error()) s.Equal(c.expectedEventsProcessed, eventsProcessed) @@ -363,7 +363,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationFailsWhenFailingToDecodeE } _, err := event.Evaluation() s.Require().NotNil(err) - eventsProcessed, err := runEvaluationMonitoring([]*nomadApi.Events{{Events: []nomadApi.Event{event}}}) + eventsProcessed, err := runEvaluationMonitoring(s.TestCtx, []*nomadApi.Events{{Events: []nomadApi.Event{event}}}) s.Error(err) s.Equal(1, eventsProcessed) } @@ -431,7 +431,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsIgnoresOldAllocations() { eventForAllocation(s.T(), oldRunningAllocation), }} - assertWatchAllocation(s.T(), []*nomadApi.Events{&oldAllocationEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&oldAllocationEvents}, []*nomadApi.Allocation(nil), []string(nil)) } @@ -446,7 +446,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsIgnoresUnhandledEvents() { Type: structs.TypeNodeEvent, }, }} - assertWatchAllocation(s.T(), []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []string(nil)) + assertWatchAllocation(s, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []string(nil)) } func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { @@ -454,7 +454,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { pendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), pendingAllocation)}} s.Run("it does not add allocation when client status is pending", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingEvents}, []*nomadApi.Allocation(nil), []string(nil)) + assertWatchAllocation(s, []*nomadApi.Events{&pendingEvents}, []*nomadApi.Allocation(nil), []string(nil)) }) startedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) @@ -463,12 +463,12 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { eventForAllocation(s.T(), pendingAllocation), eventForAllocation(s.T(), startedAllocation)}} s.Run("it adds allocation with matching events", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents}, []*nomadApi.Allocation{startedAllocation}, []string(nil)) }) s.Run("it skips heartbeat and adds allocation with matching events", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents}, []*nomadApi.Allocation{startedAllocation}, []string(nil)) }) @@ -481,23 +481,23 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { }} s.Run("it adds and deletes the allocation", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartStopEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartStopEvents}, []*nomadApi.Allocation{startedAllocation}, []string{stoppedAllocation.JobID}) }) s.Run("it ignores duplicate events", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingEvents, &startedEvents, &startedEvents, + assertWatchAllocation(s, []*nomadApi.Events{&pendingEvents, &startedEvents, &startedEvents, &stoppedEvents, &stoppedEvents, &stoppedEvents}, []*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID}) }) s.Run("it ignores events of unknown allocations", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&startedEvents, &startedEvents, + assertWatchAllocation(s, []*nomadApi.Events{&startedEvents, &startedEvents, &stoppedEvents, &stoppedEvents, &stoppedEvents}, []*nomadApi.Allocation(nil), []string(nil)) }) s.Run("it removes restarted allocations", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &pendingStartedEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &pendingStartedEvents}, []*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID}) }) @@ -511,7 +511,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { eventForAllocation(s.T(), rescheduleAllocation), eventForAllocation(s.T(), rescheduleStartedAllocation)}} s.Run("it removes rescheduled allocations", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &rescheduleEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &rescheduleEvents}, []*nomadApi.Allocation{startedAllocation, rescheduleStartedAllocation}, []string{startedAllocation.JobID}) }) @@ -519,7 +519,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { stoppedPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), stoppedPendingAllocation)}} s.Run("it removes stopped pending allocations", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingEvents, &stoppedPendingEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingEvents, &stoppedPendingEvents}, []*nomadApi.Allocation(nil), []string{stoppedPendingAllocation.JobID}) }) @@ -527,7 +527,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { failedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), failedAllocation)}} s.Run("it removes stopped failed allocations", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &failedEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &failedEvents}, []*nomadApi.Allocation{startedAllocation}, []string{failedAllocation.JobID}) }) @@ -535,7 +535,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { lostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), lostAllocation)}} s.Run("it removes stopped lost allocations", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &lostEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &lostEvents}, []*nomadApi.Allocation{startedAllocation}, []string{lostAllocation.JobID}) }) @@ -545,7 +545,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { eventForAllocation(s.T(), rescheduledLostAllocation)}} s.Run("it removes lost allocations not before the last restart attempt", func() { - assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &rescheduledLostEvents}, + assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &rescheduledLostEvents}, []*nomadApi.Allocation{startedAllocation}, []string(nil)) }) } @@ -598,6 +598,80 @@ func (s *MainTestSuite) TestHandleAllocationEvent_IgnoresReschedulesForStoppedJo s.False(ok) } +func (s *MainTestSuite) TestHandleAllocationEvent_RegressionTest_14_09_2023() { + jobID := "29-6f04b525-5315-11ee-af32-fa163e079f19" + a1ID := "04d86250-550c-62f9-9a21-ecdc3b38773e" + a1Starting := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + a1Starting.ID = a1ID + a1Starting.JobID = jobID + + // With this event the job is added to the idle runners + a1Running := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + a1Running.ID = a1ID + a1Running.JobID = jobID + + // With this event the job is removed from the idle runners + a2ID := "102f282f-376a-1453-4d3d-7d4e32046acd" + a2Starting := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + a2Starting.ID = a2ID + a2Starting.PreviousAllocation = a1ID + a2Starting.JobID = jobID + + // Because the runner is neither an idle runner nor an used runner, this event triggered the now removed + // race condition handling that led to neither removing a2 from the allocations nor adding a3 to the allocations. + a3ID := "0d8a8ece-cf52-2968-5a9f-e972a4150a6e" + a3Starting := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + a3Starting.ID = a3ID + a3Starting.PreviousAllocation = a2ID + a3Starting.JobID = jobID + + // a2Stopping was not ignored and led to an unexpected allocation stopping. + a2Stopping := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusStop) + a2Stopping.ID = a2ID + a2Stopping.PreviousAllocation = a1ID + a2Stopping.NextAllocation = a3ID + a2Stopping.JobID = jobID + + // a2Complete was not ignored (wrong behavior). + a2Complete := createRecentAllocation(structs.AllocClientStatusComplete, structs.AllocDesiredStatusStop) + a2Complete.ID = a2ID + a2Complete.PreviousAllocation = a1ID + a2Complete.NextAllocation = a3ID + a2Complete.JobID = jobID + + // a3Running was ignored because it was unknown (wrong behavior). + a3Running := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + a3Running.ID = a3ID + a3Running.PreviousAllocation = a2ID + a3Running.JobID = jobID + + events := []*nomadApi.Events{{Events: []nomadApi.Event{ + eventForAllocation(s.T(), a1Starting), + eventForAllocation(s.T(), a1Running), + eventForAllocation(s.T(), a2Starting), + eventForAllocation(s.T(), a3Starting), + eventForAllocation(s.T(), a2Stopping), + eventForAllocation(s.T(), a2Complete), + eventForAllocation(s.T(), a3Running), + }}} + + idleRunner := make(map[string]bool) + callbacks := &AllocationProcessing{ + OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) { + idleRunner[alloc.JobID] = true + }, + OnDeleted: func(jobID string, _ error) bool { + _, ok := idleRunner[jobID] + delete(idleRunner, jobID) + return !ok + }, + } + + _, err := runAllocationWatching(s, events, callbacks) + s.NoError(err) + s.True(idleRunner[jobID]) +} + func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() { restartedAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}} @@ -641,21 +715,21 @@ func (s *MainTestSuite) TestAPIClient_WatchAllocations() { s.Require().Error(err) events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}} - eventsProcessed, err := runAllocationWatching(s.T(), events, noopAllocationProcessing) + eventsProcessed, err := runAllocationWatching(s, events, noopAllocationProcessing) s.Error(err) s.Equal(1, eventsProcessed) } func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF() { events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}} - eventsProcessed, err := runAllocationWatching(s.T(), events, noopAllocationProcessing) + eventsProcessed, err := runAllocationWatching(s, events, noopAllocationProcessing) s.Error(err) s.Equal(1, eventsProcessed) } -func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, +func assertWatchAllocation(s *MainTestSuite, events []*nomadApi.Events, expectedNewAllocations []*nomadApi.Allocation, expectedDeletedAllocations []string) { - t.Helper() + s.T().Helper() var newAllocations []*nomadApi.Allocation var deletedAllocations []string callbacks := &AllocationProcessing{ @@ -668,23 +742,23 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, }, } - eventsProcessed, err := runAllocationWatching(t, events, callbacks) - assert.NoError(t, err) - assert.Equal(t, len(events), eventsProcessed) + eventsProcessed, err := runAllocationWatching(s, events, callbacks) + s.NoError(err) + s.Equal(len(events), eventsProcessed) - assert.Equal(t, expectedNewAllocations, newAllocations) - assert.Equal(t, expectedDeletedAllocations, deletedAllocations) + s.Equal(expectedNewAllocations, newAllocations) + s.Equal(expectedDeletedAllocations, deletedAllocations) } // runAllocationWatching simulates events streamed from the Nomad event stream // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // and sequentially transfers the events from the given array to a channel simulating the stream. -func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessing) ( +func runAllocationWatching(s *MainTestSuite, events []*nomadApi.Events, callbacks *AllocationProcessing) ( eventsProcessed int, err error) { - t.Helper() + s.T().Helper() stream := make(chan *nomadApi.Events) errChan := asynchronouslyWatchAllocations(stream, callbacks) - return simulateNomadEventStream(stream, errChan, events) + return simulateNomadEventStream(s.TestCtx, stream, errChan, events) } // asynchronouslyMonitorEvaluation creates an APIClient with mocked Nomad API and