Introduce Allocation State Tracking

in order to break down the current state and evaluate if it is invalid.
This commit is contained in:
Maximilian Paß
2023-06-01 01:40:37 +01:00
committed by Sebastian Serth
parent bcab46d746
commit b620d0fad7
5 changed files with 266 additions and 157 deletions

View File

@ -343,11 +343,11 @@ func (_m *ExecutorAPIMock) SetJobScale(jobID string, count uint, reason string)
}
// WatchEventStream provides a mock function with given fields: ctx, callbacks
func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error {
func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error {
ret := _m.Called(ctx, callbacks)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *AllocationProcessoring) error); ok {
if rf, ok := ret.Get(0).(func(context.Context, *AllocationProcessing) error); ok {
r0 = rf(ctx, callbacks)
} else {
r0 = ret.Error(0)

View File

@ -32,17 +32,19 @@ var (
// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected.
const resultChannelWriteTimeout = 10 * time.Millisecond
// AllocationProcessoring includes the callbacks to interact with allcoation events.
type AllocationProcessoring struct {
OnNew AllocationProcessorMonitored
OnDeleted AllocationProcessor
// AllocationProcessing includes the callbacks to interact with allocation events.
type AllocationProcessing struct {
OnNew NewAllocationProcessor
OnDeleted DeletedAllocationProcessor
}
type AllocationProcessor func(*nomadApi.Allocation)
type AllocationProcessorMonitored func(*nomadApi.Allocation, time.Duration)
type DeletedAllocationProcessor func(jobID string)
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
type allocationData struct {
// allocClientStatus defines the state defined by Nomad.
allocClientStatus string
// allocDesiredStatus defines if the allocation wants to be running or being stopped.
allocDesiredStatus string
jobID string
start time.Time
// Just debugging information
@ -79,7 +81,7 @@ type ExecutorAPI interface {
// WatchEventStream listens on the Nomad event stream for allocation and evaluation events.
// Depending on the incoming event, any of the given function is executed.
// Do not run multiple times simultaneously.
WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error
WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error
// ExecuteCommand executes the given command in the job/runner with the given id.
// It writes the output of the command to stdout/stderr and reads input from stdin.
@ -187,9 +189,9 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context)
defer cancel() // cancel the WatchEventStream when the evaluation result was read.
go func() {
err = a.WatchEventStream(ctx, &AllocationProcessoring{
err = a.WatchEventStream(ctx, &AllocationProcessing{
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
OnDeleted: func(_ *nomadApi.Allocation) {},
OnDeleted: func(_ string) {},
})
cancel() // cancel the waiting for an evaluation result if watching the event stream ends.
}()
@ -204,7 +206,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context)
}
}
func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error {
func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error {
startTime := time.Now().UnixNano()
stream, err := a.EventStream(ctx)
if err != nil {
@ -254,6 +256,7 @@ func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) {
log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation")
a.allocations.Add(stub.ID, &allocationData{
allocClientStatus: stub.ClientStatus,
allocDesiredStatus: stub.DesiredStatus,
jobID: stub.JobID,
start: time.Unix(0, stub.CreateTime),
allocNomadNode: stub.NodeName,
@ -318,7 +321,7 @@ func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Ev
// is called. The allocations storage is used to track pending and running allocations. Using the
// storage the state is persisted between multiple calls of this function.
func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData],
event *nomadApi.Event, callbacks *AllocationProcessoring) error {
event *nomadApi.Event, callbacks *AllocationProcessing) error {
alloc, err := event.Allocation()
if err != nil {
return fmt.Errorf("failed to retrieve allocation from event: %w", err)
@ -329,6 +332,8 @@ func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocat
log.WithField("alloc_id", alloc.ID).
WithField("ClientStatus", alloc.ClientStatus).
WithField("DesiredStatus", alloc.DesiredStatus).
WithField("PrevAllocation", alloc.PreviousAllocation).
WithField("NextAllocation", alloc.NextAllocation).
Debug("Handle Allocation Event")
// When starting the API and listening on the Nomad event stream we might get events that already
@ -338,95 +343,158 @@ func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocat
return nil
}
if valid := filterDuplicateEvents(alloc, allocations); !valid {
return nil
}
allocData := updateAllocationData(alloc, allocations)
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
handlePendingAllocationEvent(alloc, allocations, callbacks)
handlePendingAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusRunning:
handleRunningAllocationEvent(alloc, allocations, callbacks)
handleRunningAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusComplete:
handleCompleteAllocationEvent(alloc, allocations, callbacks)
handleCompleteAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusFailed:
handleFailedAllocationEvent(alloc)
handleFailedAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusLost:
handleLostAllocationEvent(alloc, allocData, allocations, callbacks)
default:
log.WithField("alloc", alloc).Warn("Other Client Status")
}
return nil
}
// filterDuplicateEvents identifies duplicate events or events of unknown allocations.
func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (valid bool) {
newAllocationExpected := alloc.ClientStatus == structs.AllocClientStatusPending &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun
allocData, ok := allocations.Get(alloc.ID)
switch {
case !ok && newAllocationExpected:
return true
case !ok:
// This case happens in case of an error or when an event that led to the deletion of the alloc data is duplicated.
log.WithField("alloc", alloc).Trace("Ignoring unknown allocation")
return false
case alloc.ClientStatus == allocData.allocClientStatus && alloc.DesiredStatus == allocData.allocDesiredStatus:
log.WithField("alloc", alloc).Trace("Ignoring duplicate event")
return false
default:
return true
}
}
// updateAllocationData updates the allocation tracking data according to the passed alloc.
// The allocation data before this allocation update is returned.
func updateAllocationData(
alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (previous *allocationData) {
allocData, ok := allocations.Get(alloc.ID)
if ok {
data := *allocData
previous = &data
allocData.allocClientStatus = alloc.ClientStatus
allocData.allocDesiredStatus = alloc.DesiredStatus
allocations.Add(alloc.ID, allocData)
}
return previous
}
// handlePendingAllocationEvent manages allocation that are currently pending.
// This allows the handling of startups and re-placements of allocations.
func handlePendingAllocationEvent(alloc *nomadApi.Allocation,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
allocData, ok := allocations.Get(alloc.ID)
if ok && allocData.allocClientStatus != structs.AllocClientStatusRunning {
// Pending Allocation is already stored.
// This happens because depending on the startup duration of the runner, we get zero, one, or more events
// notifying us that the allocation is still pending.
// We are just interested in the first event, in order to have the correct start time.
return
} else if ok {
func handlePendingAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
if allocData != nil {
// Handle Allocation restart.
callbacks.OnDeleted(alloc.JobID)
} else if alloc.PreviousAllocation != "" {
// Handle Runner (/Container) re-allocations.
callbacks.OnDeleted(alloc)
if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok {
callbacks.OnDeleted(prevData.jobID)
allocations.Delete(alloc.PreviousAllocation)
} else {
log.WithField("alloc", alloc).Warn("Previous Allocation not found")
}
}
// Store Pending Allocation - Allocation gets started, wait until it runs.
allocations.Add(alloc.ID, &allocationData{
allocClientStatus: structs.AllocClientStatusPending,
allocClientStatus: alloc.ClientStatus,
allocDesiredStatus: alloc.DesiredStatus,
jobID: alloc.JobID,
start: time.Now(),
allocNomadNode: alloc.NodeName,
})
} else {
case structs.AllocDesiredStatusStop:
handleStoppingAllocationEvent(alloc, allocations, callbacks, false)
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}
// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
func handleRunningAllocationEvent(alloc *nomadApi.Allocation,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
// is first event that marks the transition between pending and running?
if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusPending {
func handleRunningAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
_ storage.Storage[*allocationData], callbacks *AllocationProcessing) {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
startupDuration := time.Since(allocData.start)
callbacks.OnNew(alloc, startupDuration)
allocData.allocClientStatus = structs.AllocClientStatusRunning
}
}
}
// handleCompleteAllocationEvent handles allocations that stopped.
func handleCompleteAllocationEvent(alloc *nomadApi.Allocation,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
if alloc.DesiredStatus == structs.AllocDesiredStatusStop {
if _, ok := allocations.Get(alloc.ID); ok {
callbacks.OnDeleted(alloc)
allocations.Delete(alloc.ID)
}
} else {
case structs.AllocDesiredStatusStop:
// It is normal that running allocations will stop. We will handle it when it is stopped.
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}
// handleFailedAllocationEvent logs only the first of the multiple failure events.
func handleFailedAllocationEvent(alloc *nomadApi.Allocation) {
if alloc.FollowupEvalID == "" && alloc.PreviousAllocation == "" {
log.WithField("job", alloc.JobID).
WithField("reason", failureDisplayMessage(alloc)).
WithField("alloc", alloc).
Warn("Allocation failure")
// handleCompleteAllocationEvent handles allocations that stopped.
func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
log.WithField("alloc", alloc).Warn("Complete allocation desires to run")
case structs.AllocDesiredStatusStop:
callbacks.OnDeleted(alloc.JobID)
allocations.Delete(alloc.ID)
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}
// failureDisplayMessage parses the DisplayMessage of a failed allocation.
func failureDisplayMessage(alloc *nomadApi.Allocation) (msg string) {
for _, state := range alloc.TaskStates {
for _, event := range state.Events {
if event.FailsTask {
return event.DisplayMessage
// handleFailedAllocationEvent logs only the last of the multiple failure events.
func handleFailedAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
// The stop is expected when the allocation desired to stop even before it failed.
expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop
handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop)
}
// handleLostAllocationEvent logs only the last of the multiple lost events.
func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
// The stop is expected when the allocation desired to stop even before it got lost.
expectedStop := allocData.allocDesiredStatus == structs.AllocDesiredStatusStop
handleStoppingAllocationEvent(alloc, allocations, callbacks, expectedStop)
}
func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData],
callbacks *AllocationProcessing, expectedStop bool) {
if alloc.NextAllocation == "" {
callbacks.OnDeleted(alloc.JobID)
allocations.Delete(alloc.ID)
}
entry := log.WithField("job", alloc.JobID)
replacementAllocationScheduled := alloc.NextAllocation != ""
if expectedStop == replacementAllocationScheduled {
entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting")
} else {
entry.Debug("Expected Allocation Stopping / Restarting")
}
return ""
}
// checkEvaluation checks whether the given evaluation failed.

View File

@ -24,9 +24,9 @@ import (
)
var (
noopAllocationProcessoring = &AllocationProcessoring{
noopAllocationProcessing = &AllocationProcessing{
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
OnDeleted: func(_ *nomadApi.Allocation) {},
OnDeleted: func(_ string) {},
}
ErrUnexpectedEOF = errors.New("unexpected EOF")
)
@ -431,7 +431,7 @@ func TestApiClient_WatchAllocationsIgnoresOldAllocations(t *testing.T) {
}}
assertWatchAllocation(t, []*nomadApi.Events{&oldAllocationEvents},
[]*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil))
[]*nomadApi.Allocation(nil), []string(nil))
}
func createOldAllocation(clientStatus, desiredStatus string) *nomadApi.Allocation {
@ -445,67 +445,107 @@ func TestApiClient_WatchAllocationsIgnoresUnhandledEvents(t *testing.T) {
Type: structs.TypeNodeEvent,
},
}}
assertWatchAllocation(t, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil))
planEvents := nomadApi.Events{Events: []nomadApi.Event{
{
Topic: nomadApi.TopicAllocation,
Type: structs.TypePlanResult,
},
}}
assertWatchAllocation(t, []*nomadApi.Events{&planEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil))
assertWatchAllocation(t, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []string(nil))
}
func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) {
newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
pendingAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, newPendingAllocation)}}
func TestApiClient_WatchAllocationsUsesCallbacksForEvents(t *testing.T) {
pendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
pendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, pendingAllocation)}}
newStartedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun)
startAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{
eventForAllocation(t, newPendingAllocation),
eventForAllocation(t, newStartedAllocation),
}}
newStoppedAllocation := createRecentAllocation(structs.AllocClientStatusComplete, structs.AllocDesiredStatusStop)
stopAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{
eventForAllocation(t, newPendingAllocation),
eventForAllocation(t, newStartedAllocation),
eventForAllocation(t, newStoppedAllocation),
}}
var cases = []struct {
streamedEvents []*nomadApi.Events
expectedNewAllocations []*nomadApi.Allocation
expectedDeletedAllocations []*nomadApi.Allocation
name string
}{
{[]*nomadApi.Events{&pendingAllocationEvents},
[]*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil),
"it does not add allocation when client status is pending"},
{[]*nomadApi.Events{&startAllocationEvents},
[]*nomadApi.Allocation{newStartedAllocation},
[]*nomadApi.Allocation(nil),
"it adds allocation with matching events"},
{[]*nomadApi.Events{{}, &startAllocationEvents},
[]*nomadApi.Allocation{newStartedAllocation},
[]*nomadApi.Allocation(nil),
"it skips heartbeat and adds allocation with matching events"},
{[]*nomadApi.Events{&stopAllocationEvents},
[]*nomadApi.Allocation{newStartedAllocation},
[]*nomadApi.Allocation{newStoppedAllocation},
"it adds and deletes the allocation"},
{[]*nomadApi.Events{&startAllocationEvents, &startAllocationEvents},
[]*nomadApi.Allocation{newStartedAllocation, newStartedAllocation},
[]*nomadApi.Allocation{newPendingAllocation},
"it handles multiple events"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
assertWatchAllocation(t, c.streamedEvents,
c.expectedNewAllocations, c.expectedDeletedAllocations)
t.Run("it does not add allocation when client status is pending", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingEvents}, []*nomadApi.Allocation(nil), []string(nil))
})
startedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun)
startedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, startedAllocation)}}
pendingStartedEvents := nomadApi.Events{Events: []nomadApi.Event{
eventForAllocation(t, pendingAllocation), eventForAllocation(t, startedAllocation)}}
t.Run("it adds allocation with matching events", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string(nil))
})
t.Run("it skips heartbeat and adds allocation with matching events", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string(nil))
})
stoppedAllocation := createRecentAllocation(structs.AllocClientStatusComplete, structs.AllocDesiredStatusStop)
stoppedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, stoppedAllocation)}}
pendingStartStopEvents := nomadApi.Events{Events: []nomadApi.Event{
eventForAllocation(t, pendingAllocation),
eventForAllocation(t, startedAllocation),
eventForAllocation(t, stoppedAllocation),
}}
t.Run("it adds and deletes the allocation", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartStopEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{stoppedAllocation.JobID})
})
t.Run("it ignores duplicate events", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingEvents, &startedEvents, &startedEvents,
&stoppedEvents, &stoppedEvents, &stoppedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID})
})
t.Run("it ignores events of unknown allocations", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&startedEvents, &startedEvents,
&stoppedEvents, &stoppedEvents, &stoppedEvents}, []*nomadApi.Allocation(nil), []string(nil))
})
t.Run("it removes restarted allocations", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &pendingStartedEvents},
[]*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID})
})
rescheduleAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
rescheduleAllocation.ID = tests.AnotherUUID
rescheduleAllocation.PreviousAllocation = pendingAllocation.ID
rescheduleStartedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun)
rescheduleStartedAllocation.ID = tests.AnotherUUID
rescheduleAllocation.PreviousAllocation = pendingAllocation.ID
rescheduleEvents := nomadApi.Events{Events: []nomadApi.Event{
eventForAllocation(t, rescheduleAllocation), eventForAllocation(t, rescheduleStartedAllocation)}}
t.Run("it removes rescheduled allocations", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &rescheduleEvents},
[]*nomadApi.Allocation{startedAllocation, rescheduleStartedAllocation}, []string{startedAllocation.JobID})
})
stoppedPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusStop)
stoppedPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, stoppedPendingAllocation)}}
t.Run("it removes stopped pending allocations", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingEvents, &stoppedPendingEvents},
[]*nomadApi.Allocation(nil), []string{stoppedPendingAllocation.JobID})
})
failedAllocation := createRecentAllocation(structs.AllocClientStatusFailed, structs.AllocDesiredStatusStop)
failedEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, failedAllocation)}}
t.Run("it removes stopped failed allocations", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &failedEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{failedAllocation.JobID})
})
lostAllocation := createRecentAllocation(structs.AllocClientStatusLost, structs.AllocDesiredStatusStop)
lostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, lostAllocation)}}
t.Run("it removes stopped lost allocations", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &lostEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{lostAllocation.JobID})
})
rescheduledLostAllocation := createRecentAllocation(structs.AllocClientStatusLost, structs.AllocDesiredStatusStop)
rescheduledLostAllocation.NextAllocation = tests.AnotherUUID
rescheduledLostEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, rescheduledLostAllocation)}}
t.Run("it removes lost allocations not before the last restart attempt", func(t *testing.T) {
assertWatchAllocation(t, []*nomadApi.Events{&pendingStartedEvents, &rescheduledLostEvents},
[]*nomadApi.Allocation{startedAllocation}, []string(nil))
})
}
}
func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
@ -515,7 +555,7 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
allocations := storage.NewLocalStorage[*allocationData]()
err := handleAllocationEvent(
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring)
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing)
require.NoError(t, err)
_, ok := allocations.Get(newPendingAllocation.ID)
@ -528,7 +568,7 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
allocations := storage.NewLocalStorage[*allocationData]()
err := handleAllocationEvent(
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring)
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing)
require.NoError(t, err)
_, ok := allocations.Get(newPendingAllocation.ID)
@ -541,7 +581,7 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetri
apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault)
apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false}
err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessoring)
err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessing)
assert.ErrorIs(t, err, tests.ErrDefault)
}
@ -557,29 +597,29 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWi
require.Error(t, err)
events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}}
eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring)
eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessing)
assert.Error(t, err)
assert.Equal(t, 1, eventsProcessed)
}
func TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF(t *testing.T) {
events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}}
eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring)
eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessing)
assert.Error(t, err)
assert.Equal(t, 1, eventsProcessed)
}
func assertWatchAllocation(t *testing.T, events []*nomadApi.Events,
expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) {
expectedNewAllocations []*nomadApi.Allocation, expectedDeletedAllocations []string) {
t.Helper()
var newAllocations []*nomadApi.Allocation
var deletedAllocations []*nomadApi.Allocation
callbacks := &AllocationProcessoring{
var deletedAllocations []string
callbacks := &AllocationProcessing{
OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) {
newAllocations = append(newAllocations, alloc)
},
OnDeleted: func(alloc *nomadApi.Allocation) {
deletedAllocations = append(deletedAllocations, alloc)
OnDeleted: func(jobID string) {
deletedAllocations = append(deletedAllocations, jobID)
},
}
@ -594,7 +634,7 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events,
// runAllocationWatching simulates events streamed from the Nomad event stream
// to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine
// and sequentially transfers the events from the given array to a channel simulating the stream.
func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessoring) (
func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessing) (
eventsProcessed int, err error) {
t.Helper()
stream := make(chan *nomadApi.Events)
@ -606,7 +646,7 @@ func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *A
// runs the MonitorEvaluation method in a goroutine. The mock returns a read-only
// version of the given stream to simulate an event stream gotten from the real
// Nomad API.
func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *AllocationProcessoring) chan error {
func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *AllocationProcessing) chan error {
ctx := context.Background()
// We can only get a read-only channel once we return it from a function.
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
@ -644,7 +684,8 @@ func eventForAllocation(t *testing.T, alloc *nomadApi.Allocation) nomadApi.Event
func createAllocation(modifyTime int64, clientStatus, desiredStatus string) *nomadApi.Allocation {
return &nomadApi.Allocation{
ID: tests.DefaultRunnerID,
ID: tests.DefaultUUID,
JobID: tests.DefaultRunnerID,
ModifyTime: modifyTime,
ClientStatus: clientStatus,
DesiredStatus: desiredStatus,

View File

@ -133,7 +133,7 @@ func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
retries := 0
for ctx.Err() == nil {
err := m.apiClient.WatchEventStream(ctx,
&nomad.AllocationProcessoring{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
&nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
retries += 1
log.WithContext(ctx).WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
<-time.After(time.Second)
@ -177,22 +177,22 @@ func monitorAllocationStartupDuration(startup time.Duration, runnerID string, en
monitoring.WriteInfluxPoint(p)
}
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner stopped")
func (m *NomadRunnerManager) onAllocationStopped(runnerID string) {
log.WithField("id", runnerID).Debug("Runner stopped")
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
if nomad.IsEnvironmentTemplateID(runnerID) {
return
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID)
environmentID, err := nomad.EnvironmentIDFromRunnerID(runnerID)
if err != nil {
log.WithError(err).Warn("Stopped allocation can not be handled")
return
}
m.usedRunners.Delete(alloc.JobID)
m.usedRunners.Delete(runnerID)
environment, ok := m.environments.Get(environmentID.ToString())
if ok {
environment.DeleteRunner(alloc.JobID)
environment.DeleteRunner(runnerID)
}
}

View File

@ -233,7 +233,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() {
modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) {
call.Run(func(args mock.Arguments) {
callbacks, ok := args.Get(1).(*nomad.AllocationProcessoring)
callbacks, ok := args.Get(1).(*nomad.AllocationProcessing)
s.Require().True(ok)
callbacks.OnNew(allocation, 0)
call.ReturnArguments = mock.Arguments{nil}
@ -261,9 +261,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() {
modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) {
call.Run(func(args mock.Arguments) {
callbacks, ok := args.Get(1).(*nomad.AllocationProcessoring)
callbacks, ok := args.Get(1).(*nomad.AllocationProcessing)
s.Require().True(ok)
callbacks.OnDeleted(allocation)
callbacks.OnDeleted(allocation.JobID)
call.ReturnArguments = mock.Arguments{nil}
})
})