Add regression test for the recent lost runners.

This commit is contained in:
Maximilian Paß
2023-09-17 17:58:09 +02:00
committed by Sebastian Serth
parent 39fc0f9d9d
commit 788cb0f660
2 changed files with 109 additions and 35 deletions

View File

@ -390,10 +390,10 @@ func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Stora
return true return true
case !ok: case !ok:
// This case happens in case of an error or when an event that led to the deletion of the alloc data is duplicated. // This case happens in case of an error or when an event that led to the deletion of the alloc data is duplicated.
log.WithField("alloc", alloc).Debug("Ignoring unknown allocation") log.WithField("allocID", alloc.ID).Debug("Ignoring unknown allocation")
return false return false
case alloc.ClientStatus == allocData.allocClientStatus && alloc.DesiredStatus == allocData.allocDesiredStatus: case alloc.ClientStatus == allocData.allocClientStatus && alloc.DesiredStatus == allocData.allocDesiredStatus:
log.WithField("alloc", alloc).Debug("Ignoring duplicate event") log.WithField("allocID", alloc.ID).Debug("Ignoring duplicate event")
return false return false
default: default:
return true return true

View File

@ -12,7 +12,6 @@ import (
"github.com/openHPI/poseidon/pkg/nullio" "github.com/openHPI/poseidon/pkg/nullio"
"github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/tests" "github.com/openHPI/poseidon/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"io" "io"
@ -241,6 +240,7 @@ func eventForEvaluation(t *testing.T, eval *nomadApi.Evaluation) nomadApi.Event
// simulateNomadEventStream streams the given events sequentially to the stream channel. // simulateNomadEventStream streams the given events sequentially to the stream channel.
// It returns how many events have been processed until an error occurred. // It returns how many events have been processed until an error occurred.
func simulateNomadEventStream( func simulateNomadEventStream(
ctx context.Context,
stream chan<- *nomadApi.Events, stream chan<- *nomadApi.Events,
errChan chan error, errChan chan error,
events []*nomadApi.Events, events []*nomadApi.Events,
@ -259,7 +259,7 @@ func simulateNomadEventStream(
// Wait for last event being processed // Wait for last event being processed
var err error var err error
select { select {
case <-time.After(10 * time.Millisecond): case <-ctx.Done():
case err = <-errChan: case err = <-errChan:
} }
return eventsProcessed, err return eventsProcessed, err
@ -268,10 +268,10 @@ func simulateNomadEventStream(
// runEvaluationMonitoring simulates events streamed from the Nomad event stream // runEvaluationMonitoring simulates events streamed from the Nomad event stream
// to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine
// and sequentially transfers the events from the given array to a channel simulating the stream. // and sequentially transfers the events from the given array to a channel simulating the stream.
func runEvaluationMonitoring(events []*nomadApi.Events) (eventsProcessed int, err error) { func runEvaluationMonitoring(ctx context.Context, events []*nomadApi.Events) (eventsProcessed int, err error) {
stream := make(chan *nomadApi.Events) stream := make(chan *nomadApi.Events)
errChan := asynchronouslyMonitorEvaluation(stream) errChan := asynchronouslyMonitorEvaluation(stream)
return simulateNomadEventStream(stream, errChan, events) return simulateNomadEventStream(ctx, stream, errChan, events)
} }
func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithSuccessfulEvent() { func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithSuccessfulEvent() {
@ -306,7 +306,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithSuccessfulEvent() {
for _, c := range cases { for _, c := range cases {
s.Run(c.name, func() { s.Run(c.name, func() {
eventsProcessed, err := runEvaluationMonitoring(c.streamedEvents) eventsProcessed, err := runEvaluationMonitoring(s.TestCtx, c.streamedEvents)
s.Nil(err) s.Nil(err)
s.Equal(c.expectedEventsProcessed, eventsProcessed) s.Equal(c.expectedEventsProcessed, eventsProcessed)
}) })
@ -347,7 +347,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationWithFailingEvent() {
for _, c := range cases { for _, c := range cases {
s.Run(c.name, func() { s.Run(c.name, func() {
eventsProcessed, err := runEvaluationMonitoring(c.streamedEvents) eventsProcessed, err := runEvaluationMonitoring(s.TestCtx, c.streamedEvents)
s.Require().NotNil(err) s.Require().NotNil(err)
s.Contains(err.Error(), c.expectedError.Error()) s.Contains(err.Error(), c.expectedError.Error())
s.Equal(c.expectedEventsProcessed, eventsProcessed) s.Equal(c.expectedEventsProcessed, eventsProcessed)
@ -363,7 +363,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationFailsWhenFailingToDecodeE
} }
_, err := event.Evaluation() _, err := event.Evaluation()
s.Require().NotNil(err) s.Require().NotNil(err)
eventsProcessed, err := runEvaluationMonitoring([]*nomadApi.Events{{Events: []nomadApi.Event{event}}}) eventsProcessed, err := runEvaluationMonitoring(s.TestCtx, []*nomadApi.Events{{Events: []nomadApi.Event{event}}})
s.Error(err) s.Error(err)
s.Equal(1, eventsProcessed) s.Equal(1, eventsProcessed)
} }
@ -431,7 +431,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsIgnoresOldAllocations() {
eventForAllocation(s.T(), oldRunningAllocation), eventForAllocation(s.T(), oldRunningAllocation),
}} }}
assertWatchAllocation(s.T(), []*nomadApi.Events{&oldAllocationEvents}, assertWatchAllocation(s, []*nomadApi.Events{&oldAllocationEvents},
[]*nomadApi.Allocation(nil), []string(nil)) []*nomadApi.Allocation(nil), []string(nil))
} }
@ -446,7 +446,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsIgnoresUnhandledEvents() {
Type: structs.TypeNodeEvent, Type: structs.TypeNodeEvent,
}, },
}} }}
assertWatchAllocation(s.T(), []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []string(nil)) assertWatchAllocation(s, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []string(nil))
} }
func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() { func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
@ -454,7 +454,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
pendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), pendingAllocation)}} pendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), pendingAllocation)}}
s.Run("it does not add allocation when client status is pending", func() { s.Run("it does not add allocation when client status is pending", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingEvents}, []*nomadApi.Allocation(nil), []string(nil)) assertWatchAllocation(s, []*nomadApi.Events{&pendingEvents}, []*nomadApi.Allocation(nil), []string(nil))
}) })
startedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) startedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun)
@ -463,12 +463,12 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
eventForAllocation(s.T(), pendingAllocation), eventForAllocation(s.T(), startedAllocation)}} eventForAllocation(s.T(), pendingAllocation), eventForAllocation(s.T(), startedAllocation)}}
s.Run("it adds allocation with matching events", func() { s.Run("it adds allocation with matching events", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string(nil)) []*nomadApi.Allocation{startedAllocation}, []string(nil))
}) })
s.Run("it skips heartbeat and adds allocation with matching events", func() { s.Run("it skips heartbeat and adds allocation with matching events", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string(nil)) []*nomadApi.Allocation{startedAllocation}, []string(nil))
}) })
@ -481,23 +481,23 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
}} }}
s.Run("it adds and deletes the allocation", func() { s.Run("it adds and deletes the allocation", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartStopEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartStopEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{stoppedAllocation.JobID}) []*nomadApi.Allocation{startedAllocation}, []string{stoppedAllocation.JobID})
}) })
s.Run("it ignores duplicate events", func() { s.Run("it ignores duplicate events", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingEvents, &startedEvents, &startedEvents, assertWatchAllocation(s, []*nomadApi.Events{&pendingEvents, &startedEvents, &startedEvents,
&stoppedEvents, &stoppedEvents, &stoppedEvents}, &stoppedEvents, &stoppedEvents, &stoppedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID}) []*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID})
}) })
s.Run("it ignores events of unknown allocations", func() { s.Run("it ignores events of unknown allocations", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&startedEvents, &startedEvents, assertWatchAllocation(s, []*nomadApi.Events{&startedEvents, &startedEvents,
&stoppedEvents, &stoppedEvents, &stoppedEvents}, []*nomadApi.Allocation(nil), []string(nil)) &stoppedEvents, &stoppedEvents, &stoppedEvents}, []*nomadApi.Allocation(nil), []string(nil))
}) })
s.Run("it removes restarted allocations", func() { s.Run("it removes restarted allocations", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &pendingStartedEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &pendingStartedEvents},
[]*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID}) []*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID})
}) })
@ -511,7 +511,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
eventForAllocation(s.T(), rescheduleAllocation), eventForAllocation(s.T(), rescheduleStartedAllocation)}} eventForAllocation(s.T(), rescheduleAllocation), eventForAllocation(s.T(), rescheduleStartedAllocation)}}
s.Run("it removes rescheduled allocations", func() { s.Run("it removes rescheduled allocations", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &rescheduleEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &rescheduleEvents},
[]*nomadApi.Allocation{startedAllocation, rescheduleStartedAllocation}, []string{startedAllocation.JobID}) []*nomadApi.Allocation{startedAllocation, rescheduleStartedAllocation}, []string{startedAllocation.JobID})
}) })
@ -519,7 +519,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
stoppedPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), stoppedPendingAllocation)}} stoppedPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), stoppedPendingAllocation)}}
s.Run("it removes stopped pending allocations", func() { s.Run("it removes stopped pending allocations", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingEvents, &stoppedPendingEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingEvents, &stoppedPendingEvents},
[]*nomadApi.Allocation(nil), []string{stoppedPendingAllocation.JobID}) []*nomadApi.Allocation(nil), []string{stoppedPendingAllocation.JobID})
}) })
@ -527,7 +527,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
failedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), failedAllocation)}} failedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), failedAllocation)}}
s.Run("it removes stopped failed allocations", func() { s.Run("it removes stopped failed allocations", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &failedEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &failedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{failedAllocation.JobID}) []*nomadApi.Allocation{startedAllocation}, []string{failedAllocation.JobID})
}) })
@ -535,7 +535,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
lostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), lostAllocation)}} lostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(s.T(), lostAllocation)}}
s.Run("it removes stopped lost allocations", func() { s.Run("it removes stopped lost allocations", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &lostEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &lostEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{lostAllocation.JobID}) []*nomadApi.Allocation{startedAllocation}, []string{lostAllocation.JobID})
}) })
@ -545,7 +545,7 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
eventForAllocation(s.T(), rescheduledLostAllocation)}} eventForAllocation(s.T(), rescheduledLostAllocation)}}
s.Run("it removes lost allocations not before the last restart attempt", func() { s.Run("it removes lost allocations not before the last restart attempt", func() {
assertWatchAllocation(s.T(), []*nomadApi.Events{&pendingStartedEvents, &rescheduledLostEvents}, assertWatchAllocation(s, []*nomadApi.Events{&pendingStartedEvents, &rescheduledLostEvents},
[]*nomadApi.Allocation{startedAllocation}, []string(nil)) []*nomadApi.Allocation{startedAllocation}, []string(nil))
}) })
} }
@ -598,6 +598,80 @@ func (s *MainTestSuite) TestHandleAllocationEvent_IgnoresReschedulesForStoppedJo
s.False(ok) s.False(ok)
} }
func (s *MainTestSuite) TestHandleAllocationEvent_RegressionTest_14_09_2023() {
jobID := "29-6f04b525-5315-11ee-af32-fa163e079f19"
a1ID := "04d86250-550c-62f9-9a21-ecdc3b38773e"
a1Starting := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
a1Starting.ID = a1ID
a1Starting.JobID = jobID
// With this event the job is added to the idle runners
a1Running := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun)
a1Running.ID = a1ID
a1Running.JobID = jobID
// With this event the job is removed from the idle runners
a2ID := "102f282f-376a-1453-4d3d-7d4e32046acd"
a2Starting := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
a2Starting.ID = a2ID
a2Starting.PreviousAllocation = a1ID
a2Starting.JobID = jobID
// Because the runner is neither an idle runner nor an used runner, this event triggered the now removed
// race condition handling that led to neither removing a2 from the allocations nor adding a3 to the allocations.
a3ID := "0d8a8ece-cf52-2968-5a9f-e972a4150a6e"
a3Starting := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
a3Starting.ID = a3ID
a3Starting.PreviousAllocation = a2ID
a3Starting.JobID = jobID
// a2Stopping was not ignored and led to an unexpected allocation stopping.
a2Stopping := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusStop)
a2Stopping.ID = a2ID
a2Stopping.PreviousAllocation = a1ID
a2Stopping.NextAllocation = a3ID
a2Stopping.JobID = jobID
// a2Complete was not ignored (wrong behavior).
a2Complete := createRecentAllocation(structs.AllocClientStatusComplete, structs.AllocDesiredStatusStop)
a2Complete.ID = a2ID
a2Complete.PreviousAllocation = a1ID
a2Complete.NextAllocation = a3ID
a2Complete.JobID = jobID
// a3Running was ignored because it was unknown (wrong behavior).
a3Running := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun)
a3Running.ID = a3ID
a3Running.PreviousAllocation = a2ID
a3Running.JobID = jobID
events := []*nomadApi.Events{{Events: []nomadApi.Event{
eventForAllocation(s.T(), a1Starting),
eventForAllocation(s.T(), a1Running),
eventForAllocation(s.T(), a2Starting),
eventForAllocation(s.T(), a3Starting),
eventForAllocation(s.T(), a2Stopping),
eventForAllocation(s.T(), a2Complete),
eventForAllocation(s.T(), a3Running),
}}}
idleRunner := make(map[string]bool)
callbacks := &AllocationProcessing{
OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) {
idleRunner[alloc.JobID] = true
},
OnDeleted: func(jobID string, _ error) bool {
_, ok := idleRunner[jobID]
delete(idleRunner, jobID)
return !ok
},
}
_, err := runAllocationWatching(s, events, callbacks)
s.NoError(err)
s.True(idleRunner[jobID])
}
func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() { func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() {
restartedAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) restartedAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}} event := nomadApi.TaskEvent{Details: map[string]string{"oom_killed": "true"}}
@ -641,21 +715,21 @@ func (s *MainTestSuite) TestAPIClient_WatchAllocations() {
s.Require().Error(err) s.Require().Error(err)
events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}} events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}}
eventsProcessed, err := runAllocationWatching(s.T(), events, noopAllocationProcessing) eventsProcessed, err := runAllocationWatching(s, events, noopAllocationProcessing)
s.Error(err) s.Error(err)
s.Equal(1, eventsProcessed) s.Equal(1, eventsProcessed)
} }
func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF() { func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF() {
events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}} events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}}
eventsProcessed, err := runAllocationWatching(s.T(), events, noopAllocationProcessing) eventsProcessed, err := runAllocationWatching(s, events, noopAllocationProcessing)
s.Error(err) s.Error(err)
s.Equal(1, eventsProcessed) s.Equal(1, eventsProcessed)
} }
func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, func assertWatchAllocation(s *MainTestSuite, events []*nomadApi.Events,
expectedNewAllocations []*nomadApi.Allocation, expectedDeletedAllocations []string) { expectedNewAllocations []*nomadApi.Allocation, expectedDeletedAllocations []string) {
t.Helper() s.T().Helper()
var newAllocations []*nomadApi.Allocation var newAllocations []*nomadApi.Allocation
var deletedAllocations []string var deletedAllocations []string
callbacks := &AllocationProcessing{ callbacks := &AllocationProcessing{
@ -668,23 +742,23 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events,
}, },
} }
eventsProcessed, err := runAllocationWatching(t, events, callbacks) eventsProcessed, err := runAllocationWatching(s, events, callbacks)
assert.NoError(t, err) s.NoError(err)
assert.Equal(t, len(events), eventsProcessed) s.Equal(len(events), eventsProcessed)
assert.Equal(t, expectedNewAllocations, newAllocations) s.Equal(expectedNewAllocations, newAllocations)
assert.Equal(t, expectedDeletedAllocations, deletedAllocations) s.Equal(expectedDeletedAllocations, deletedAllocations)
} }
// runAllocationWatching simulates events streamed from the Nomad event stream // runAllocationWatching simulates events streamed from the Nomad event stream
// to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine
// and sequentially transfers the events from the given array to a channel simulating the stream. // and sequentially transfers the events from the given array to a channel simulating the stream.
func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessing) ( func runAllocationWatching(s *MainTestSuite, events []*nomadApi.Events, callbacks *AllocationProcessing) (
eventsProcessed int, err error) { eventsProcessed int, err error) {
t.Helper() s.T().Helper()
stream := make(chan *nomadApi.Events) stream := make(chan *nomadApi.Events)
errChan := asynchronouslyWatchAllocations(stream, callbacks) errChan := asynchronouslyWatchAllocations(stream, callbacks)
return simulateNomadEventStream(stream, errChan, events) return simulateNomadEventStream(s.TestCtx, stream, errChan, events)
} }
// asynchronouslyMonitorEvaluation creates an APIClient with mocked Nomad API and // asynchronouslyMonitorEvaluation creates an APIClient with mocked Nomad API and