diff --git a/api/runners.go b/api/runners.go index 761c541..73104ff 100644 --- a/api/runners.go +++ b/api/runners.go @@ -9,6 +9,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" "net/url" + "time" ) const ( @@ -49,16 +50,19 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req environmentId := runner.EnvironmentID(runnerRequest.ExecutionEnvironmentId) nextRunner, err := r.manager.Claim(environmentId) if err != nil { - if err == runner.ErrUnknownExecutionEnvironment { + switch err { + case runner.ErrUnknownExecutionEnvironment: writeNotFound(writer, err) - } else if err == runner.ErrNoRunnersAvailable { + case runner.ErrNoRunnersAvailable: log.WithField("environment", environmentId).Warn("No runners available") writeInternalServerError(writer, err, dto.ErrorNomadOverload) - } else { + default: writeInternalServerError(writer, err, dto.ErrorUnknown) } return } + timeout := time.Duration(runnerRequest.InactivityTimeout) * time.Second + nextRunner.SetupTimeout(timeout, nextRunner, r.manager) sendJson(writer, &dto.RunnerResponse{Id: nextRunner.Id()}, http.StatusOK) } diff --git a/api/websocket.go b/api/websocket.go index d05a32b..3cf0f23 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -186,11 +186,8 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti return } - if exitInfo.Err == context.DeadlineExceeded { - err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) - if err != nil { - return - } + if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) { + _ = wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) return } else if exitInfo.Err != nil { errorMessage := "Error executing the request" diff --git a/nomad/nomad.go b/nomad/nomad.go index 4ce5a64..697aae2 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -63,8 +63,8 @@ type ExecutorAPI interface { ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) - // MarkRunnerAsUsed marks the runner with the given ID as used. - MarkRunnerAsUsed(runnerID string) error + // MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata. + MarkRunnerAsUsed(runnerID string, duration int) error } // APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real diff --git a/runner/manager.go b/runner/manager.go index c7640e3..f0b86d6 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -185,6 +185,8 @@ func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) return nil, fmt.Errorf("can't mark runner as used: %w", err) } + runner.SetupTimeout(time.Duration(duration) * time.Second) + err = m.scaleEnvironment(environmentID) if err != nil { log.WithError(err).WithField("environmentID", environmentID).Error("Couldn't scale environment") @@ -202,6 +204,7 @@ func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) { } func (m *NomadRunnerManager) Return(r Runner) (err error) { + r.StopTimeout() err = m.apiClient.DeleteRunner(r.Id()) if err != nil { return @@ -234,9 +237,15 @@ func (m *NomadRunnerManager) Load() { continue } isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue - newJob := NewNomadJob(*job.ID, m.apiClient) + newJob := NewNomadJob(*job.ID, m.apiClient, m) if isUsed { m.usedRunners.Add(newJob) + timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey]) + if err != nil { + log.WithError(err).Warn("Error loading timeout from meta values") + } else { + newJob.SetupTimeout(time.Duration(timeout) * time.Second) + } } else { environment.idleRunners.Add(newJob) } @@ -273,7 +282,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { job, ok := m.environments.Get(environmentID) if ok { - job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient)) + job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient, m)) } } diff --git a/runner/runner.go b/runner/runner.go index c27b295..239ca74 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -11,6 +11,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "io" "strings" + "sync" "time" ) @@ -26,14 +27,113 @@ const ( ) var ( - ErrorFileCopyFailed = errors.New("file copy failed") + ErrorFileCopyFailed = errors.New("file copy failed") + ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded") ) +// InactivityTimer is a wrapper around a timer that is used to delete a a Runner after some time of inactivity. +type InactivityTimer interface { + // SetupTimeout starts the timeout after a runner gets deleted. + SetupTimeout(duration time.Duration) + + // ResetTimeout resets the current timeout so that the runner gets deleted after the time set in Setup from now. + // It does not make an already expired timer run again. + ResetTimeout() + + // StopTimeout stops the timeout but does not remove the runner. + StopTimeout() + + // TimeoutPassed returns true if the timeout expired and false otherwise. + TimeoutPassed() bool +} + +type TimerState uint8 + +const ( + TimerInactive TimerState = 0 + TimerRunning TimerState = 1 + TimerExpired TimerState = 2 +) + +type InactivityTimerImplementation struct { + timer *time.Timer + duration time.Duration + state TimerState + runner Runner + manager Manager + sync.Mutex +} + +func NewInactivityTimer(runner Runner, manager Manager) InactivityTimer { + return &InactivityTimerImplementation{ + state: TimerInactive, + runner: runner, + manager: manager, + } +} + +func (t *InactivityTimerImplementation) SetupTimeout(duration time.Duration) { + t.Lock() + defer t.Unlock() + // Stop old timer if present. + if t.timer != nil { + t.timer.Stop() + } + if duration == 0 { + t.state = TimerInactive + return + } + t.state = TimerRunning + t.duration = duration + + t.timer = time.AfterFunc(duration, func() { + t.Lock() + t.state = TimerExpired + // The timer must be unlocked here already in order to avoid a deadlock with the call to StopTimout in Manager.Return. + t.Unlock() + err := t.manager.Return(t.runner) + if err != nil { + log.WithError(err).WithField("id", t.runner.Id()).Warn("Returning runner after inactivity caused an error") + } else { + log.WithField("id", t.runner.Id()).Info("Returning runner due to inactivity timeout") + } + }) +} + +func (t *InactivityTimerImplementation) ResetTimeout() { + t.Lock() + defer t.Unlock() + if t.state != TimerRunning { + // The timer has already expired or been stopped. We don't want to restart it. + return + } + if t.timer.Stop() { + t.timer.Reset(t.duration) + } else { + log.Error("Timer is in state running but stopped. This should never happen") + } +} + +func (t *InactivityTimerImplementation) StopTimeout() { + t.Lock() + defer t.Unlock() + if t.state != TimerRunning { + return + } + t.timer.Stop() + t.state = TimerInactive +} + +func (t *InactivityTimerImplementation) TimeoutPassed() bool { + return t.state == TimerExpired +} + type Runner interface { // Id returns the id of the runner. Id() string ExecutionStorage + InactivityTimer // ExecuteInteractively runs the given execution request and forwards from and to the given reader and writers. // An ExitInfo is sent to the exit channel on command completion. @@ -53,17 +153,20 @@ type Runner interface { // NomadJob is an abstraction to communicate with Nomad environments. type NomadJob struct { ExecutionStorage + InactivityTimer id string api nomad.ExecutorAPI } // NewNomadJob creates a new NomadJob with the provided id. -func NewNomadJob(id string, apiClient nomad.ExecutorAPI) *NomadJob { - return &NomadJob{ +func NewNomadJob(id string, apiClient nomad.ExecutorAPI, manager Manager) *NomadJob { + job := &NomadJob{ id: id, api: apiClient, ExecutionStorage: NewLocalExecutionStorage(), } + job.InactivityTimer = NewInactivityTimer(job, manager) + return job } func (r *NomadJob) Id() string { @@ -80,6 +183,8 @@ func (r *NomadJob) ExecuteInteractively( stdin io.Reader, stdout, stderr io.Writer, ) (<-chan ExitInfo, context.CancelFunc) { + r.ResetTimeout() + command := request.FullCommand() var ctx context.Context var cancel context.CancelFunc @@ -91,6 +196,9 @@ func (r *NomadJob) ExecuteInteractively( exit := make(chan ExitInfo) go func() { exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, stdin, stdout, stderr) + if err == nil && r.TimeoutPassed() { + err = ErrorRunnerInactivityTimeout + } exit <- ExitInfo{uint8(exitCode), err} close(exit) }() @@ -98,6 +206,8 @@ func (r *NomadJob) ExecuteInteractively( } func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { + r.ResetTimeout() + var tarBuffer bytes.Buffer if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil { return err diff --git a/runner/runner_test.go b/runner/runner_test.go index 5a268bf..a4d66c7 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -80,8 +80,8 @@ func TestExecuteCallsAPI(t *testing.T) { request := &dto.ExecutionRequest{Command: "echo 'Hello World!'"} runner.ExecuteInteractively(request, nil, nil, nil) - <-time.After(50 * time.Millisecond) - apiMock.AssertCalled(t, "ExecuteCommand", tests.DefaultRunnerID, mock.Anything, request.FullCommand(), true, mock.Anything, mock.Anything, mock.Anything) + time.Sleep(tests.ShortTimeout) + s.apiMock.AssertCalled(s.T(), "ExecuteCommand", tests.DefaultRunnerID, mock.Anything, request.FullCommand(), true, mock.Anything, mock.Anything, mock.Anything) } func TestExecuteReturnsAfterTimeout(t *testing.T) { @@ -252,6 +252,81 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader return files } +func TestInactivityTimerTestSuite(t *testing.T) { + suite.Run(t, new(InactivityTimerTestSuite)) +} + +type InactivityTimerTestSuite struct { + suite.Suite + runner Runner + manager *ManagerMock + returned chan bool +} + +func (s *InactivityTimerTestSuite) SetupTest() { + s.returned = make(chan bool) + s.runner = NewRunner(tests.DefaultRunnerID) + s.manager = &ManagerMock{} + s.manager.On("Return", mock.Anything).Run(func(_ mock.Arguments) { + s.returned <- true + }).Return(nil) + + s.runner.SetupTimeout(tests.ShortTimeout, s.runner, s.manager) +} + +func (s *InactivityTimerTestSuite) TearDownTest() { + s.runner.StopTimeout() +} + +func (s *InactivityTimerTestSuite) TestRunnerIsReturnedAfterTimeout() { + s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestRunnerIsNotReturnedBeforeTimeout() { + s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout/2)) +} + +func (s *InactivityTimerTestSuite) TestResetTimeoutExtendsTheDeadline() { + time.Sleep(2 * tests.ShortTimeout / 4) + s.runner.ResetTimeout() + s.False(tests.ChannelReceivesSomething(s.returned, 3*tests.ShortTimeout/4), + "Because of the reset, the timeout should not be reached by now.") + s.True(tests.ChannelReceivesSomething(s.returned, 5*tests.ShortTimeout/4), + "After reset, the timout should be reached by now.") +} + +func (s *InactivityTimerTestSuite) TestStopTimeoutStopsTimeout() { + s.runner.StopTimeout() + s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsFalseBeforeDeadline() { + s.False(s.runner.TimeoutPassed()) +} + +func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsTrueAfterDeadline() { + time.Sleep(2 * tests.ShortTimeout) + s.True(s.runner.TimeoutPassed()) +} + +func (s *InactivityTimerTestSuite) TestTimerIsNotResetAfterDeadline() { + time.Sleep(2 * tests.ShortTimeout) + <-s.returned + s.runner.ResetTimeout() + s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestSetupTimoutStopsOldTimout() { + s.runner.SetupTimeout(3*tests.ShortTimeout, s.runner, s.manager) + s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) + s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestTimerIsInactiveWhenDurationIsZero() { + s.runner.SetupTimeout(0, s.runner, s.manager) + s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout)) +} + // NewRunner creates a new runner with the provided id. func NewRunner(id string) Runner { return NewNomadJob(id, nil)