Refactor Nomad Recovery

from an approach that loaded the runners only once at the startup
to a method that will be repeated i.e. if the Nomad Event Stream connection interrupts.
This commit is contained in:
Maximilian Paß
2023-10-23 14:36:14 +02:00
committed by Sebastian Serth
parent b2898f9183
commit 6b69a2d732
22 changed files with 211 additions and 120 deletions

View File

@ -10,9 +10,9 @@ import (
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/pkg/util"
"github.com/sirupsen/logrus"
"math"
"strconv"
"time"
)
@ -30,12 +30,9 @@ type NomadRunnerManager struct {
}
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
// If you cancel the context the background synchronization will be stopped.
// KeepRunnersSynced has to be started separately.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
m := &NomadRunnerManager{NewAbstractManager(ctx), apiClient}
go m.keepRunnersSynced(ctx)
return m
return &NomadRunnerManager{NewAbstractManager(ctx), apiClient}
}
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
@ -80,40 +77,64 @@ func (m *NomadRunnerManager) Return(r Runner) error {
return err
}
func (m *NomadRunnerManager) Load() {
// SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism).
func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error {
// Load Runners
if err := m.load(); err != nil {
return fmt.Errorf("failed loading runners: %w", err)
}
// Watch for changes regarding the existing or new runners.
err := m.apiClient.WatchEventStream(ctx,
&nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
err = fmt.Errorf("nomad Event Stream failed!: %w", err)
}
return err
}
// Load recovers all runners for all existing environments.
func (m *NomadRunnerManager) load() error {
newUsedRunners := storage.NewLocalStorage[Runner]()
for _, environment := range m.environments.List() {
environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString())
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
if err != nil {
environmentLogger.WithError(err).Warn("Error fetching the runner jobs")
return fmt.Errorf("failed fetching the runner jobs: %w", err)
}
for _, job := range runnerJobs {
m.loadSingleJob(job, environmentLogger, environment)
m.loadSingleJob(job, environmentLogger, environment, newUsedRunners)
}
err = environment.ApplyPrewarmingPoolSize()
if err != nil {
environmentLogger.WithError(err).Error("Couldn't scale environment")
return fmt.Errorf("couldn't scale environment: %w", err)
}
}
m.updateUsedRunners(newUsedRunners)
return nil
}
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry,
environment ExecutionEnvironment) {
environment ExecutionEnvironment, newUsedRunners storage.Storage[Runner]) {
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
if configTaskGroup == nil {
environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID)
environmentLogger.Warnf("Couldn't find config task group in job %s, skipping ...", *job.ID)
return
}
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID)
if err != nil {
environmentLogger.WithError(err).Warn("Error loading runner portMappings")
environmentLogger.WithError(err).Warn("Error loading runner portMappings, skipping ...")
return
}
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.onRunnerDestroyed)
log.WithField("isUsed", isUsed).WithField(dto.KeyRunnerID, newJob.ID()).Debug("Recovered Runner")
if isUsed {
m.usedRunners.Add(newJob.ID(), newJob)
newUsedRunners.Add(newJob.ID(), newJob)
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
if err != nil {
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
@ -125,18 +146,23 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger
}
}
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
err := util.RetryConstantAttemptsWithContext(math.MaxInt, ctx, func() error {
err := m.apiClient.WatchEventStream(ctx,
&nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
log.WithContext(ctx).WithError(err).Errorf("Nomad Event Stream failed! Retrying...")
err = fmt.Errorf("KeepRunnersSynced: %w", err)
func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner]) {
for _, r := range m.usedRunners.List() {
var reason DestroyReason
if _, ok := newUsedRunners.Get(r.ID()); ok {
reason = ErrDestroyedAndReplaced
} else {
reason = ErrLocalDestruction
log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered")
}
return err
})
if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
log.WithContext(ctx).WithError(err).Fatal("Stopped Restarting the Nomad Event Stream")
m.usedRunners.Delete(r.ID())
if err := r.Destroy(reason); err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally")
}
}
for _, r := range newUsedRunners.List() {
m.usedRunners.Add(r.ID(), r)
}
}