diff --git a/internal/api/ws/codeocean_writer.go b/internal/api/ws/codeocean_writer.go index 8fa7946..47396a2 100644 --- a/internal/api/ws/codeocean_writer.go +++ b/internal/api/ws/codeocean_writer.go @@ -95,7 +95,7 @@ func (cw *codeOceanOutputWriter) StdErr() io.Writer { // Close forwards the kind of exit (timeout, error, normal) to CodeOcean. // This results in the closing of the WebSocket connection. -// The call of Close is mandantory! +// The call of Close is mandatory! func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { defer func() { cw.queue <- &writingLoopMessage{done: true} }() // Mask the internal stop reason before disclosing/forwarding it externally/to CodeOcean. @@ -104,7 +104,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { return case info.Err == nil: cw.send(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: info.Code}) - case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): + case errors.Is(info.Err, runner.ErrorExecutionTimeout) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) case errors.Is(info.Err, runner.ErrOOMKilled): cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: dto.ErrOOMKilled.Error()}) diff --git a/internal/environment/nomad_manager.go b/internal/environment/nomad_manager.go index ee01e0b..c3a7a88 100644 --- a/internal/environment/nomad_manager.go +++ b/internal/environment/nomad_manager.go @@ -3,7 +3,6 @@ package environment import ( "context" _ "embed" - "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -14,6 +13,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/util" + "github.com/sirupsen/logrus" "math" "os" "time" @@ -187,14 +187,17 @@ func (m *NomadEnvironmentManager) KeepEnvironmentsSynced(synchronizeRunners func return nil }) - if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - log.WithContext(ctx).WithError(err).Fatal("Stopped KeepEnvironmentsSynced") + level := logrus.InfoLevel + if err != nil && ctx.Err() == nil { + level = logrus.FatalLevel } + log.WithContext(ctx).WithError(err).Log(level, "Stopped KeepEnvironmentsSynced") } // Load recovers all environments from the Jobs in Nomad. // As it replaces the environments the idle runners stored are not tracked anymore. func (m *NomadEnvironmentManager) load() error { + log.Info("Loading environments") // We have to destroy the environments first as otherwise they would just be replaced and old goroutines might stay running. for _, environment := range m.runnerManager.ListEnvironments() { err := environment.Delete(runner.ErrDestroyedAndReplaced) diff --git a/internal/runner/inactivity_timer.go b/internal/runner/inactivity_timer.go index 902f496..39b9aab 100644 --- a/internal/runner/inactivity_timer.go +++ b/internal/runner/inactivity_timer.go @@ -31,7 +31,10 @@ const ( TimerExpired TimerState = 2 ) -var ErrorRunnerInactivityTimeout DestroyReason = errors.New("runner inactivity timeout exceeded") +var ( + ErrorRunnerInactivityTimeout DestroyReason = errors.New("runner inactivity timeout exceeded") + ErrorExecutionTimeout = errors.New("execution timeout exceeded") +) type InactivityTimerImplementation struct { timer *time.Timer diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index d24474f..a579fd1 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -79,16 +79,17 @@ func (m *NomadRunnerManager) Return(r Runner) error { // SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism). func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { - // Load Runners + log.Info("Loading runners") if err := m.load(); err != nil { return fmt.Errorf("failed loading runners: %w", err) } // Watch for changes regarding the existing or new runners. + log.Info("Watching Event Stream") err := m.apiClient.WatchEventStream(ctx, &nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) - if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + if err != nil && ctx.Err() == nil { err = fmt.Errorf("nomad Event Stream failed!: %w", err) } return err @@ -97,7 +98,6 @@ func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { // Load recovers all runners for all existing environments. func (m *NomadRunnerManager) load() error { newUsedRunners := storage.NewLocalStorage[Runner]() - for _, environment := range m.environments.List() { environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString()) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 1cc5999..ef1ead7 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -342,6 +342,9 @@ func (r *NomadJob) handleExit(exitInfo ExitInfo, exitInternal <-chan ExitInfo, e func (r *NomadJob) handleContextDone(exitInternal <-chan ExitInfo, exit chan<- ExitInfo, stdin io.ReadWriter, ctx context.Context) { err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + err = ErrorExecutionTimeout + } // for errors.Is(err, context.Canceled) the user likely disconnected from the execution. if reason, ok := r.ctx.Value(destroyReasonContextKey).(error); ok { err = reason if r.TimeoutPassed() && !errors.Is(err, ErrorRunnerInactivityTimeout) { diff --git a/pkg/util/util.go b/pkg/util/util.go index e92778c..32bf11f 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -3,7 +3,6 @@ package util import ( "context" "errors" - "fmt" "github.com/openHPI/poseidon/pkg/logging" "time" ) @@ -14,6 +13,7 @@ var ( MaxConnectionRetriesExponential = 18 // InitialWaitingDuration is the default initial duration of waiting after a failed time. InitialWaitingDuration = time.Second + ErrRetryContextDone = errors.New("the retry context is done") ) func retryExponential(ctx context.Context, sleep time.Duration, f func() error) func() error { @@ -23,6 +23,7 @@ func retryExponential(ctx context.Context, sleep time.Duration, f func() error) if err != nil { select { case <-ctx.Done(): + err = ErrRetryContextDone case <-time.After(sleep): sleep *= 2 } @@ -39,7 +40,7 @@ func retryConstant(ctx context.Context, sleep time.Duration, f func() error) fun if err != nil { select { case <-ctx.Done(): - return fmt.Errorf("stopped retrying: %w", ctx.Err()) + return ErrRetryContextDone case <-time.After(sleep): } } @@ -53,7 +54,7 @@ func retryAttempts(maxAttempts int, f func() error) (err error) { err = f() if err == nil { return nil - } else if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + } else if errors.Is(err, ErrRetryContextDone) { return err } log.WithField("count", i).WithError(err).Debug("retrying after error")