Fix missing idle runners.
In the context of #358 we identified that the event with the type `AllocationUpdated` and the client status `pending` is common but not always send by Nomad. With this Commit we remove the condition that limits the evaluated Nomad events to the event with the type `AllocationUpdated`. Without the condition the event of the type `PlanResult` and the status `pending` will be evaluated equally. By now, this event seems to be sent every time. This restriction led to started allocation not being registered when the `AllocationUpdated` event with client status `pending` was missing.
This commit is contained in:
@ -318,9 +318,6 @@ func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Ev
|
|||||||
// storage the state is persisted between multiple calls of this function.
|
// storage the state is persisted between multiple calls of this function.
|
||||||
func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData],
|
func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData],
|
||||||
event *nomadApi.Event, callbacks *AllocationProcessoring) error {
|
event *nomadApi.Event, callbacks *AllocationProcessoring) error {
|
||||||
if event.Type != structs.TypeAllocationUpdated {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
alloc, err := event.Allocation()
|
alloc, err := event.Allocation()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to retrieve allocation from event: %w", err)
|
return fmt.Errorf("failed to retrieve allocation from event: %w", err)
|
||||||
@ -359,12 +356,13 @@ func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocat
|
|||||||
// This allows the handling of startups and re-placements of allocations.
|
// This allows the handling of startups and re-placements of allocations.
|
||||||
func handlePendingAllocationEvent(alloc *nomadApi.Allocation,
|
func handlePendingAllocationEvent(alloc *nomadApi.Allocation,
|
||||||
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
|
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
|
||||||
log.WithField("alloc_id", alloc.ID).Debug("Handle Pending Allocation Event")
|
|
||||||
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
|
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
|
||||||
allocData, ok := allocations.Get(alloc.ID)
|
allocData, ok := allocations.Get(alloc.ID)
|
||||||
if ok && allocData.allocClientStatus != structs.AllocClientStatusRunning {
|
if ok && allocData.allocClientStatus != structs.AllocClientStatusRunning {
|
||||||
// Pending Allocation is already stored.
|
// Pending Allocation is already stored.
|
||||||
log.WithField("alloc_id", alloc.ID).Debug("Pending Allocation already stored")
|
// This happens because depending on the startup duration of the runner, we get zero, one, or more events
|
||||||
|
// notifying us that the allocation is still pending.
|
||||||
|
// We are just interested in the first event, in order to have the correct start time.
|
||||||
return
|
return
|
||||||
} else if ok {
|
} else if ok {
|
||||||
// Handle Runner (/Container) re-allocations.
|
// Handle Runner (/Container) re-allocations.
|
||||||
@ -385,7 +383,6 @@ func handlePendingAllocationEvent(alloc *nomadApi.Allocation,
|
|||||||
// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
|
// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
|
||||||
func handleRunningAllocationEvent(alloc *nomadApi.Allocation,
|
func handleRunningAllocationEvent(alloc *nomadApi.Allocation,
|
||||||
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
|
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
|
||||||
log.WithField("alloc_id", alloc.ID).Debug("Handle Running Allocation Event")
|
|
||||||
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
|
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
|
||||||
// is first event that marks the transition between pending and running?
|
// is first event that marks the transition between pending and running?
|
||||||
if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusPending {
|
if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusPending {
|
||||||
|
@ -509,16 +509,31 @@ func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
|
func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
|
||||||
newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
|
t.Run("AllocationUpdated", func(t *testing.T) {
|
||||||
newPendingEvent := eventForAllocation(t, newPendingAllocation)
|
newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
|
||||||
|
newPendingEvent := eventForAllocation(t, newPendingAllocation)
|
||||||
|
|
||||||
allocations := storage.NewLocalStorage[*allocationData]()
|
allocations := storage.NewLocalStorage[*allocationData]()
|
||||||
err := handleAllocationEvent(
|
err := handleAllocationEvent(
|
||||||
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring)
|
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, ok := allocations.Get(newPendingAllocation.ID)
|
_, ok := allocations.Get(newPendingAllocation.ID)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
|
})
|
||||||
|
t.Run("PlanResult", func(t *testing.T) {
|
||||||
|
newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
|
||||||
|
newPendingEvent := eventForAllocation(t, newPendingAllocation)
|
||||||
|
newPendingEvent.Type = structs.TypePlanResult
|
||||||
|
|
||||||
|
allocations := storage.NewLocalStorage[*allocationData]()
|
||||||
|
err := handleAllocationEvent(
|
||||||
|
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, ok := allocations.Get(newPendingAllocation.ID)
|
||||||
|
assert.True(t, ok)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) {
|
func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) {
|
||||||
|
Reference in New Issue
Block a user