From 038d71ff5109dc2d4fad4558497af37df8b799e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Sun, 19 Mar 2023 21:50:36 +0000 Subject: [PATCH] Nomad: Handle Container re-allocation --- internal/nomad/nomad.go | 53 +++++++++++++++++--------- internal/nomad/nomad_test.go | 10 +++-- internal/runner/nomad_manager.go | 2 +- pkg/monitoring/influxdb2_middleware.go | 17 +++++---- 4 files changed, 52 insertions(+), 30 deletions(-) diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 61580a1..a3e95bb 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -9,7 +9,9 @@ import ( "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/nullio" + "github.com/openHPI/poseidon/pkg/storage" "io" "strconv" "strings" @@ -36,6 +38,12 @@ type AllocationProcessoring struct { type AllocationProcessor func(*nomadApi.Allocation) type AllocationProcessorMonitored func(*nomadApi.Allocation, time.Duration) +type allocationData struct { + // allocClientStatus defines the state defined by Nomad. + allocClientStatus string + start time.Time +} + // ExecutorAPI provides access to a container orchestration solution. type ExecutorAPI interface { apiQuerier @@ -184,14 +192,15 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP if err != nil { return fmt.Errorf("failed retrieving allocation stream: %w", err) } - pendingAllocations := make(map[string]time.Time) + // allocations contain management data for all pending and running allocations. + allocations := storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations, nil, 0, nil) handler := func(event *nomadApi.Event) (bool, error) { switch event.Topic { case nomadApi.TopicEvaluation: return false, handleEvaluationEvent(a.evaluations, event) case nomadApi.TopicAllocation: - return false, handleAllocationEvent(startTime, pendingAllocations, event, callbacks) + return false, handleAllocationEvent(startTime, allocations, event, callbacks) default: return false, nil } @@ -255,10 +264,10 @@ func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Ev // handleAllocationEvent is an event handler that processes allocation events. // If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation -// is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the -// map the state is persisted between multiple calls of this function. -func handleAllocationEvent(startTime int64, pendingAllocations map[string]time.Time, event *nomadApi.Event, - callbacks *AllocationProcessoring) error { +// is called. The allocations storage is used to track pending and running allocations. Using the +// storage the state is persisted between multiple calls of this function. +func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData], + event *nomadApi.Event, callbacks *AllocationProcessoring) error { if event.Type != structs.TypeAllocationUpdated { return nil } @@ -278,36 +287,46 @@ func handleAllocationEvent(startTime int64, pendingAllocations map[string]time.T switch alloc.ClientStatus { case structs.AllocClientStatusPending: - handlePendingAllocationEvent(alloc, pendingAllocations) + handlePendingAllocationEvent(alloc, allocations, callbacks) case structs.AllocClientStatusRunning: - handleRunningAllocationEvent(alloc, pendingAllocations, callbacks) + handleRunningAllocationEvent(alloc, allocations, callbacks) case structs.AllocClientStatusFailed: handleFailedAllocationEvent(alloc) } return nil } -// handlePendingAllocationEvent sets flag in pendingAllocations that can be used to filter following events. -func handlePendingAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]time.Time) { +// handlePendingAllocationEvent manages allocation that are currently pending. +// This allows the handling of startups and re-placements of allocations. +func handlePendingAllocationEvent(alloc *nomadApi.Allocation, + allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) { if alloc.DesiredStatus == structs.AllocDesiredStatusRun { + // Handle Runner (/Container) re-allocations. + if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusRunning { + callbacks.OnDeleted(alloc) + } // allocation is started, wait until it runs and add to our list afterwards - pendingAllocations[alloc.ID] = time.Now() + allocations.Add(alloc.ID, &allocationData{allocClientStatus: structs.AllocClientStatusPending, start: time.Now()}) } } // handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events. -func handleRunningAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]time.Time, - callbacks *AllocationProcessoring) { +func handleRunningAllocationEvent(alloc *nomadApi.Allocation, + allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) { switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop: callbacks.OnDeleted(alloc) + if _, ok := allocations.Get(alloc.ID); ok { + allocations.Delete(alloc.ID) + } else { + log.WithField("id", alloc.ID).Warn("Removing not listed allocation") + } case structs.AllocDesiredStatusRun: // is first event that marks the transition between pending and running? - startedAt, ok := pendingAllocations[alloc.ID] - if ok { - startupDuration := time.Since(startedAt) + if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusPending { + startupDuration := time.Since(allocData.start) callbacks.OnNew(alloc, startupDuration) - delete(pendingAllocations, alloc.ID) + allocData.allocClientStatus = structs.AllocClientStatusRunning } } } diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index 3968309..0c288d8 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -10,6 +10,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/pkg/nullio" + "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/tests" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -495,7 +496,7 @@ func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) { "it adds and deletes the allocation"}, {[]*nomadApi.Events{&startAllocationEvents, &startAllocationEvents}, []*nomadApi.Allocation{newStartedAllocation, newStartedAllocation}, - []*nomadApi.Allocation(nil), + []*nomadApi.Allocation{newPendingAllocation}, "it handles multiple events"}, } @@ -511,11 +512,12 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) newPendingEvent := eventForAllocation(t, newPendingAllocation) - pendingMap := make(map[string]time.Time) - err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, noopAllocationProcessoring) + allocations := storage.NewLocalStorage[*allocationData]() + err := handleAllocationEvent( + time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring) require.NoError(t, err) - _, ok := pendingMap[newPendingAllocation.ID] + _, ok := allocations.Get(newPendingAllocation.ID) assert.True(t, ok) } diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index c0d2ab3..901d541 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -147,7 +147,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start } if _, ok := m.usedRunners.Get(alloc.JobID); ok { - log.WithField("id", alloc.JobID).Debug("Started Runner is already in use") + log.WithField("id", alloc.JobID).WithField("states", alloc.TaskStates).Error("Started Runner is already in use") return } diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index e1efeba..8af7818 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -21,14 +21,15 @@ const ( // influxdbContextKey is a key (runner.ContextKey) to reference the influxdb data point in the request context. influxdbContextKey dto.ContextKey = "influxdb data point" // measurementPrefix allows easier filtering in influxdb. - measurementPrefix = "poseidon_" - measurementPoolSize = measurementPrefix + "poolsize" - MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners" - MeasurementExecutionsAWS = measurementPrefix + "aws_executions" - MeasurementExecutionsNomad = measurementPrefix + "nomad_executions" - MeasurementEnvironments = measurementPrefix + "environments" - MeasurementUsedRunner = measurementPrefix + "used_runners" - MeasurementFileDownload = measurementPrefix + "file_download" + measurementPrefix = "poseidon_" + measurementPoolSize = measurementPrefix + "poolsize" + MeasurementNomadAllocations = measurementPrefix + "nomad_allocations" + MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners" + MeasurementExecutionsAWS = measurementPrefix + "aws_executions" + MeasurementExecutionsNomad = measurementPrefix + "nomad_executions" + MeasurementEnvironments = measurementPrefix + "environments" + MeasurementUsedRunner = measurementPrefix + "used_runners" + MeasurementFileDownload = measurementPrefix + "file_download" // The keys for the monitored tags and fields.