diff --git a/internal/environment/nomad_environment.go b/internal/environment/nomad_environment.go index 2c6e07c..cf7754b 100644 --- a/internal/environment/nomad_environment.go +++ b/internal/environment/nomad_environment.go @@ -245,7 +245,7 @@ func (n *NomadEnvironment) Sample() (runner.Runner, bool) { r, ok := n.idleRunners.Sample() if ok && n.idleRunners.Length() < n.PrewarmingPoolSize() { go func() { - err := util.RetryExponentialContext(n.ctx, func() error { return n.createRunner(false) }) + err := util.RetryExponentialWithContext(n.ctx, func() error { return n.createRunner(false) }) if err != nil { log.WithError(err).WithField(dto.KeyEnvironmentID, n.ID().ToString()). Error("Couldn't create new runner for claimed one") diff --git a/internal/environment/nomad_manager.go b/internal/environment/nomad_manager.go index 0ac67ff..f344387 100644 --- a/internal/environment/nomad_manager.go +++ b/internal/environment/nomad_manager.go @@ -44,7 +44,7 @@ func NewNomadEnvironmentManager( m := &NomadEnvironmentManager{&AbstractManager{nil, runnerManager}, apiClient, templateEnvironmentJobHCL} - if err := util.RetryExponentialContext(ctx, func() error { return m.Load() }); err != nil { + if err := util.RetryExponentialWithContext(ctx, func() error { return m.Load() }); err != nil { log.WithError(err).Error("Error recovering the execution environments") } runnerManager.Load() diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 4e61602..fc3c7fc 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -12,6 +12,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/util" "github.com/sirupsen/logrus" + "math" "strconv" "time" ) @@ -125,13 +126,17 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger } func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { - retries := 0 - for ctx.Err() == nil { + err := util.RetryConstantAttemptsWithContext(math.MaxInt, ctx, func() error { err := m.apiClient.WatchEventStream(ctx, &nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) - retries += 1 - log.WithContext(ctx).WithError(err).WithField("count", retries).Errorf("Nomad Event Stream failed! Retrying...") - <-time.After(time.Second) + if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + log.WithContext(ctx).WithError(err).Errorf("Nomad Event Stream failed! Retrying...") + err = fmt.Errorf("KeepRunnersSynced: %w", err) + } + return err + }) + if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + log.WithContext(ctx).WithError(err).Fatal("Stopped Restarting the Nomad Event Stream") } } diff --git a/pkg/util/util.go b/pkg/util/util.go index 578b8c2..e92778c 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,6 +2,7 @@ package util import ( "context" + "errors" "fmt" "github.com/openHPI/poseidon/pkg/logging" "time" @@ -15,33 +16,66 @@ var ( InitialWaitingDuration = time.Second ) -// RetryExponentialAttemptsContext executes the passed function -// with exponentially increasing time in between starting at the passed sleep duration -// up to a maximum of attempts tries as long as the context is not done. -func RetryExponentialAttemptsContext( - ctx context.Context, attempts int, sleep time.Duration, f func() error) (err error) { - for i := 0; i < attempts; i++ { - if ctx.Err() != nil { - return fmt.Errorf("stopped retry due to: %w", ctx.Err()) - } else if err = f(); err == nil { - return nil - } else { - log.WithField("count", i).WithError(err).Debug("retrying after error") - time.Sleep(sleep) - sleep *= 2 +func retryExponential(ctx context.Context, sleep time.Duration, f func() error) func() error { + return func() error { + err := f() + + if err != nil { + select { + case <-ctx.Done(): + case <-time.After(sleep): + sleep *= 2 + } } + + return err + } +} + +func retryConstant(ctx context.Context, sleep time.Duration, f func() error) func() error { + return func() error { + err := f() + + if err != nil { + select { + case <-ctx.Done(): + return fmt.Errorf("stopped retrying: %w", ctx.Err()) + case <-time.After(sleep): + } + } + + return err + } +} + +func retryAttempts(maxAttempts int, f func() error) (err error) { + for i := 0; i < maxAttempts; i++ { + err = f() + if err == nil { + return nil + } else if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return err + } + log.WithField("count", i).WithError(err).Debug("retrying after error") } return err } -func RetryExponentialContext(ctx context.Context, f func() error) error { - return RetryExponentialAttemptsContext(ctx, MaxConnectionRetriesExponential, InitialWaitingDuration, f) -} - -func RetryExponentialDuration(sleep time.Duration, f func() error) error { - return RetryExponentialAttemptsContext(context.Background(), MaxConnectionRetriesExponential, sleep, f) +// RetryExponentialWithContext executes the passed function with exponentially increasing time starting with one second +// up to a default maximum number of attempts as long as the context is not done. +func RetryExponentialWithContext(ctx context.Context, f func() error) error { + return retryAttempts(MaxConnectionRetriesExponential, retryExponential(ctx, InitialWaitingDuration, f)) } +// RetryExponential executes the passed function with exponentially increasing time starting with one second +// up to a default maximum number of attempts. func RetryExponential(f func() error) error { - return RetryExponentialDuration(InitialWaitingDuration, f) + return retryAttempts(MaxConnectionRetriesExponential, + retryExponential(context.Background(), InitialWaitingDuration, f)) +} + +// RetryConstantAttemptsWithContext executes the passed function with a constant retry delay of one second +// up to the passed maximum number of attempts as long as the context is not done. +func RetryConstantAttemptsWithContext(attempts int, ctx context.Context, f func() error) error { + return retryAttempts(attempts, retryConstant(ctx, InitialWaitingDuration, f)) }