From d0dd5c08cba5155b1c2191adf7718884a3c95d8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Mon, 30 Oct 2023 14:51:31 +0100 Subject: [PATCH] Remove usage of context.DeadlineExceeded for internal decisions as this error is strongly used by other packages. By checking such wrapped errors the internal decision can be influenced accidentally. In this case the retry mechanism checked if the error is context.DeadlineExceeded and assumed it would be created by the internal context. This assumption was wrong. --- internal/api/ws/codeocean_writer.go | 4 ++-- internal/environment/nomad_manager.go | 9 ++++++--- internal/runner/inactivity_timer.go | 5 ++++- internal/runner/nomad_manager.go | 6 +++--- internal/runner/nomad_runner.go | 3 +++ pkg/util/util.go | 7 ++++--- 6 files changed, 22 insertions(+), 12 deletions(-) diff --git a/internal/api/ws/codeocean_writer.go b/internal/api/ws/codeocean_writer.go index 8fa7946..47396a2 100644 --- a/internal/api/ws/codeocean_writer.go +++ b/internal/api/ws/codeocean_writer.go @@ -95,7 +95,7 @@ func (cw *codeOceanOutputWriter) StdErr() io.Writer { // Close forwards the kind of exit (timeout, error, normal) to CodeOcean. // This results in the closing of the WebSocket connection. -// The call of Close is mandantory! +// The call of Close is mandatory! func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { defer func() { cw.queue <- &writingLoopMessage{done: true} }() // Mask the internal stop reason before disclosing/forwarding it externally/to CodeOcean. @@ -104,7 +104,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { return case info.Err == nil: cw.send(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: info.Code}) - case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): + case errors.Is(info.Err, runner.ErrorExecutionTimeout) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) case errors.Is(info.Err, runner.ErrOOMKilled): cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: dto.ErrOOMKilled.Error()}) diff --git a/internal/environment/nomad_manager.go b/internal/environment/nomad_manager.go index ee01e0b..c3a7a88 100644 --- a/internal/environment/nomad_manager.go +++ b/internal/environment/nomad_manager.go @@ -3,7 +3,6 @@ package environment import ( "context" _ "embed" - "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -14,6 +13,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/util" + "github.com/sirupsen/logrus" "math" "os" "time" @@ -187,14 +187,17 @@ func (m *NomadEnvironmentManager) KeepEnvironmentsSynced(synchronizeRunners func return nil }) - if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - log.WithContext(ctx).WithError(err).Fatal("Stopped KeepEnvironmentsSynced") + level := logrus.InfoLevel + if err != nil && ctx.Err() == nil { + level = logrus.FatalLevel } + log.WithContext(ctx).WithError(err).Log(level, "Stopped KeepEnvironmentsSynced") } // Load recovers all environments from the Jobs in Nomad. // As it replaces the environments the idle runners stored are not tracked anymore. func (m *NomadEnvironmentManager) load() error { + log.Info("Loading environments") // We have to destroy the environments first as otherwise they would just be replaced and old goroutines might stay running. for _, environment := range m.runnerManager.ListEnvironments() { err := environment.Delete(runner.ErrDestroyedAndReplaced) diff --git a/internal/runner/inactivity_timer.go b/internal/runner/inactivity_timer.go index 902f496..39b9aab 100644 --- a/internal/runner/inactivity_timer.go +++ b/internal/runner/inactivity_timer.go @@ -31,7 +31,10 @@ const ( TimerExpired TimerState = 2 ) -var ErrorRunnerInactivityTimeout DestroyReason = errors.New("runner inactivity timeout exceeded") +var ( + ErrorRunnerInactivityTimeout DestroyReason = errors.New("runner inactivity timeout exceeded") + ErrorExecutionTimeout = errors.New("execution timeout exceeded") +) type InactivityTimerImplementation struct { timer *time.Timer diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index d24474f..a579fd1 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -79,16 +79,17 @@ func (m *NomadRunnerManager) Return(r Runner) error { // SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism). func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { - // Load Runners + log.Info("Loading runners") if err := m.load(); err != nil { return fmt.Errorf("failed loading runners: %w", err) } // Watch for changes regarding the existing or new runners. + log.Info("Watching Event Stream") err := m.apiClient.WatchEventStream(ctx, &nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) - if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + if err != nil && ctx.Err() == nil { err = fmt.Errorf("nomad Event Stream failed!: %w", err) } return err @@ -97,7 +98,6 @@ func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { // Load recovers all runners for all existing environments. func (m *NomadRunnerManager) load() error { newUsedRunners := storage.NewLocalStorage[Runner]() - for _, environment := range m.environments.List() { environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString()) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 1cc5999..ef1ead7 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -342,6 +342,9 @@ func (r *NomadJob) handleExit(exitInfo ExitInfo, exitInternal <-chan ExitInfo, e func (r *NomadJob) handleContextDone(exitInternal <-chan ExitInfo, exit chan<- ExitInfo, stdin io.ReadWriter, ctx context.Context) { err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + err = ErrorExecutionTimeout + } // for errors.Is(err, context.Canceled) the user likely disconnected from the execution. if reason, ok := r.ctx.Value(destroyReasonContextKey).(error); ok { err = reason if r.TimeoutPassed() && !errors.Is(err, ErrorRunnerInactivityTimeout) { diff --git a/pkg/util/util.go b/pkg/util/util.go index e92778c..32bf11f 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -3,7 +3,6 @@ package util import ( "context" "errors" - "fmt" "github.com/openHPI/poseidon/pkg/logging" "time" ) @@ -14,6 +13,7 @@ var ( MaxConnectionRetriesExponential = 18 // InitialWaitingDuration is the default initial duration of waiting after a failed time. InitialWaitingDuration = time.Second + ErrRetryContextDone = errors.New("the retry context is done") ) func retryExponential(ctx context.Context, sleep time.Duration, f func() error) func() error { @@ -23,6 +23,7 @@ func retryExponential(ctx context.Context, sleep time.Duration, f func() error) if err != nil { select { case <-ctx.Done(): + err = ErrRetryContextDone case <-time.After(sleep): sleep *= 2 } @@ -39,7 +40,7 @@ func retryConstant(ctx context.Context, sleep time.Duration, f func() error) fun if err != nil { select { case <-ctx.Done(): - return fmt.Errorf("stopped retrying: %w", ctx.Err()) + return ErrRetryContextDone case <-time.After(sleep): } } @@ -53,7 +54,7 @@ func retryAttempts(maxAttempts int, f func() error) (err error) { err = f() if err == nil { return nil - } else if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + } else if errors.Is(err, ErrRetryContextDone) { return err } log.WithField("count", i).WithError(err).Debug("retrying after error")