Nomad: Handle Container re-allocation
This commit is contained in:

committed by
Sebastian Serth

parent
e0db1bafe8
commit
038d71ff51
@ -9,7 +9,9 @@ import (
|
|||||||
"github.com/openHPI/poseidon/internal/config"
|
"github.com/openHPI/poseidon/internal/config"
|
||||||
"github.com/openHPI/poseidon/pkg/dto"
|
"github.com/openHPI/poseidon/pkg/dto"
|
||||||
"github.com/openHPI/poseidon/pkg/logging"
|
"github.com/openHPI/poseidon/pkg/logging"
|
||||||
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
||||||
"github.com/openHPI/poseidon/pkg/nullio"
|
"github.com/openHPI/poseidon/pkg/nullio"
|
||||||
|
"github.com/openHPI/poseidon/pkg/storage"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -36,6 +38,12 @@ type AllocationProcessoring struct {
|
|||||||
type AllocationProcessor func(*nomadApi.Allocation)
|
type AllocationProcessor func(*nomadApi.Allocation)
|
||||||
type AllocationProcessorMonitored func(*nomadApi.Allocation, time.Duration)
|
type AllocationProcessorMonitored func(*nomadApi.Allocation, time.Duration)
|
||||||
|
|
||||||
|
type allocationData struct {
|
||||||
|
// allocClientStatus defines the state defined by Nomad.
|
||||||
|
allocClientStatus string
|
||||||
|
start time.Time
|
||||||
|
}
|
||||||
|
|
||||||
// ExecutorAPI provides access to a container orchestration solution.
|
// ExecutorAPI provides access to a container orchestration solution.
|
||||||
type ExecutorAPI interface {
|
type ExecutorAPI interface {
|
||||||
apiQuerier
|
apiQuerier
|
||||||
@ -184,14 +192,15 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed retrieving allocation stream: %w", err)
|
return fmt.Errorf("failed retrieving allocation stream: %w", err)
|
||||||
}
|
}
|
||||||
pendingAllocations := make(map[string]time.Time)
|
// allocations contain management data for all pending and running allocations.
|
||||||
|
allocations := storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations, nil, 0, nil)
|
||||||
|
|
||||||
handler := func(event *nomadApi.Event) (bool, error) {
|
handler := func(event *nomadApi.Event) (bool, error) {
|
||||||
switch event.Topic {
|
switch event.Topic {
|
||||||
case nomadApi.TopicEvaluation:
|
case nomadApi.TopicEvaluation:
|
||||||
return false, handleEvaluationEvent(a.evaluations, event)
|
return false, handleEvaluationEvent(a.evaluations, event)
|
||||||
case nomadApi.TopicAllocation:
|
case nomadApi.TopicAllocation:
|
||||||
return false, handleAllocationEvent(startTime, pendingAllocations, event, callbacks)
|
return false, handleAllocationEvent(startTime, allocations, event, callbacks)
|
||||||
default:
|
default:
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -255,10 +264,10 @@ func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Ev
|
|||||||
|
|
||||||
// handleAllocationEvent is an event handler that processes allocation events.
|
// handleAllocationEvent is an event handler that processes allocation events.
|
||||||
// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation
|
// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation
|
||||||
// is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the
|
// is called. The allocations storage is used to track pending and running allocations. Using the
|
||||||
// map the state is persisted between multiple calls of this function.
|
// storage the state is persisted between multiple calls of this function.
|
||||||
func handleAllocationEvent(startTime int64, pendingAllocations map[string]time.Time, event *nomadApi.Event,
|
func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData],
|
||||||
callbacks *AllocationProcessoring) error {
|
event *nomadApi.Event, callbacks *AllocationProcessoring) error {
|
||||||
if event.Type != structs.TypeAllocationUpdated {
|
if event.Type != structs.TypeAllocationUpdated {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -278,36 +287,46 @@ func handleAllocationEvent(startTime int64, pendingAllocations map[string]time.T
|
|||||||
|
|
||||||
switch alloc.ClientStatus {
|
switch alloc.ClientStatus {
|
||||||
case structs.AllocClientStatusPending:
|
case structs.AllocClientStatusPending:
|
||||||
handlePendingAllocationEvent(alloc, pendingAllocations)
|
handlePendingAllocationEvent(alloc, allocations, callbacks)
|
||||||
case structs.AllocClientStatusRunning:
|
case structs.AllocClientStatusRunning:
|
||||||
handleRunningAllocationEvent(alloc, pendingAllocations, callbacks)
|
handleRunningAllocationEvent(alloc, allocations, callbacks)
|
||||||
case structs.AllocClientStatusFailed:
|
case structs.AllocClientStatusFailed:
|
||||||
handleFailedAllocationEvent(alloc)
|
handleFailedAllocationEvent(alloc)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handlePendingAllocationEvent sets flag in pendingAllocations that can be used to filter following events.
|
// handlePendingAllocationEvent manages allocation that are currently pending.
|
||||||
func handlePendingAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]time.Time) {
|
// This allows the handling of startups and re-placements of allocations.
|
||||||
|
func handlePendingAllocationEvent(alloc *nomadApi.Allocation,
|
||||||
|
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
|
||||||
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
|
if alloc.DesiredStatus == structs.AllocDesiredStatusRun {
|
||||||
|
// Handle Runner (/Container) re-allocations.
|
||||||
|
if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusRunning {
|
||||||
|
callbacks.OnDeleted(alloc)
|
||||||
|
}
|
||||||
// allocation is started, wait until it runs and add to our list afterwards
|
// allocation is started, wait until it runs and add to our list afterwards
|
||||||
pendingAllocations[alloc.ID] = time.Now()
|
allocations.Add(alloc.ID, &allocationData{allocClientStatus: structs.AllocClientStatusPending, start: time.Now()})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
|
// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
|
||||||
func handleRunningAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]time.Time,
|
func handleRunningAllocationEvent(alloc *nomadApi.Allocation,
|
||||||
callbacks *AllocationProcessoring) {
|
allocations storage.Storage[*allocationData], callbacks *AllocationProcessoring) {
|
||||||
switch alloc.DesiredStatus {
|
switch alloc.DesiredStatus {
|
||||||
case structs.AllocDesiredStatusStop:
|
case structs.AllocDesiredStatusStop:
|
||||||
callbacks.OnDeleted(alloc)
|
callbacks.OnDeleted(alloc)
|
||||||
|
if _, ok := allocations.Get(alloc.ID); ok {
|
||||||
|
allocations.Delete(alloc.ID)
|
||||||
|
} else {
|
||||||
|
log.WithField("id", alloc.ID).Warn("Removing not listed allocation")
|
||||||
|
}
|
||||||
case structs.AllocDesiredStatusRun:
|
case structs.AllocDesiredStatusRun:
|
||||||
// is first event that marks the transition between pending and running?
|
// is first event that marks the transition between pending and running?
|
||||||
startedAt, ok := pendingAllocations[alloc.ID]
|
if allocData, ok := allocations.Get(alloc.ID); ok && allocData.allocClientStatus == structs.AllocClientStatusPending {
|
||||||
if ok {
|
startupDuration := time.Since(allocData.start)
|
||||||
startupDuration := time.Since(startedAt)
|
|
||||||
callbacks.OnNew(alloc, startupDuration)
|
callbacks.OnNew(alloc, startupDuration)
|
||||||
delete(pendingAllocations, alloc.ID)
|
allocData.allocClientStatus = structs.AllocClientStatusRunning
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
"github.com/openHPI/poseidon/internal/config"
|
"github.com/openHPI/poseidon/internal/config"
|
||||||
"github.com/openHPI/poseidon/pkg/nullio"
|
"github.com/openHPI/poseidon/pkg/nullio"
|
||||||
|
"github.com/openHPI/poseidon/pkg/storage"
|
||||||
"github.com/openHPI/poseidon/tests"
|
"github.com/openHPI/poseidon/tests"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
@ -495,7 +496,7 @@ func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) {
|
|||||||
"it adds and deletes the allocation"},
|
"it adds and deletes the allocation"},
|
||||||
{[]*nomadApi.Events{&startAllocationEvents, &startAllocationEvents},
|
{[]*nomadApi.Events{&startAllocationEvents, &startAllocationEvents},
|
||||||
[]*nomadApi.Allocation{newStartedAllocation, newStartedAllocation},
|
[]*nomadApi.Allocation{newStartedAllocation, newStartedAllocation},
|
||||||
[]*nomadApi.Allocation(nil),
|
[]*nomadApi.Allocation{newPendingAllocation},
|
||||||
"it handles multiple events"},
|
"it handles multiple events"},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,11 +512,12 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
|
|||||||
newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
|
newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
|
||||||
newPendingEvent := eventForAllocation(t, newPendingAllocation)
|
newPendingEvent := eventForAllocation(t, newPendingAllocation)
|
||||||
|
|
||||||
pendingMap := make(map[string]time.Time)
|
allocations := storage.NewLocalStorage[*allocationData]()
|
||||||
err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, noopAllocationProcessoring)
|
err := handleAllocationEvent(
|
||||||
|
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessoring)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, ok := pendingMap[newPendingAllocation.ID]
|
_, ok := allocations.Get(newPendingAllocation.ID)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,7 +147,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start
|
|||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := m.usedRunners.Get(alloc.JobID); ok {
|
if _, ok := m.usedRunners.Get(alloc.JobID); ok {
|
||||||
log.WithField("id", alloc.JobID).Debug("Started Runner is already in use")
|
log.WithField("id", alloc.JobID).WithField("states", alloc.TaskStates).Error("Started Runner is already in use")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,14 +21,15 @@ const (
|
|||||||
// influxdbContextKey is a key (runner.ContextKey) to reference the influxdb data point in the request context.
|
// influxdbContextKey is a key (runner.ContextKey) to reference the influxdb data point in the request context.
|
||||||
influxdbContextKey dto.ContextKey = "influxdb data point"
|
influxdbContextKey dto.ContextKey = "influxdb data point"
|
||||||
// measurementPrefix allows easier filtering in influxdb.
|
// measurementPrefix allows easier filtering in influxdb.
|
||||||
measurementPrefix = "poseidon_"
|
measurementPrefix = "poseidon_"
|
||||||
measurementPoolSize = measurementPrefix + "poolsize"
|
measurementPoolSize = measurementPrefix + "poolsize"
|
||||||
MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners"
|
MeasurementNomadAllocations = measurementPrefix + "nomad_allocations"
|
||||||
MeasurementExecutionsAWS = measurementPrefix + "aws_executions"
|
MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners"
|
||||||
MeasurementExecutionsNomad = measurementPrefix + "nomad_executions"
|
MeasurementExecutionsAWS = measurementPrefix + "aws_executions"
|
||||||
MeasurementEnvironments = measurementPrefix + "environments"
|
MeasurementExecutionsNomad = measurementPrefix + "nomad_executions"
|
||||||
MeasurementUsedRunner = measurementPrefix + "used_runners"
|
MeasurementEnvironments = measurementPrefix + "environments"
|
||||||
MeasurementFileDownload = measurementPrefix + "file_download"
|
MeasurementUsedRunner = measurementPrefix + "used_runners"
|
||||||
|
MeasurementFileDownload = measurementPrefix + "file_download"
|
||||||
|
|
||||||
// The keys for the monitored tags and fields.
|
// The keys for the monitored tags and fields.
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user