diff --git a/internal/runner/inactivity_timer.go b/internal/runner/inactivity_timer.go new file mode 100644 index 0000000..94ffcc4 --- /dev/null +++ b/internal/runner/inactivity_timer.go @@ -0,0 +1,106 @@ +package runner + +import ( + "errors" + "sync" + "time" +) + +// InactivityTimer is a wrapper around a timer that is used to delete a a Runner after some time of inactivity. +type InactivityTimer interface { + // SetupTimeout starts the timeout after a runner gets deleted. + SetupTimeout(duration time.Duration) + + // ResetTimeout resets the current timeout so that the runner gets deleted after the time set in Setup from now. + // It does not make an already expired timer run again. + ResetTimeout() + + // StopTimeout stops the timeout but does not remove the runner. + StopTimeout() + + // TimeoutPassed returns true if the timeout expired and false otherwise. + TimeoutPassed() bool +} + +type TimerState uint8 + +const ( + TimerInactive TimerState = 0 + TimerRunning TimerState = 1 + TimerExpired TimerState = 2 +) + +var ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded") + +type InactivityTimerImplementation struct { + timer *time.Timer + duration time.Duration + state TimerState + runner Runner + manager Manager + sync.Mutex +} + +func NewInactivityTimer(runner Runner, manager Manager) InactivityTimer { + return &InactivityTimerImplementation{ + state: TimerInactive, + runner: runner, + manager: manager, + } +} + +func (t *InactivityTimerImplementation) SetupTimeout(duration time.Duration) { + t.Lock() + defer t.Unlock() + // Stop old timer if present. + if t.timer != nil { + t.timer.Stop() + } + if duration == 0 { + t.state = TimerInactive + return + } + t.state = TimerRunning + t.duration = duration + + t.timer = time.AfterFunc(duration, func() { + t.Lock() + t.state = TimerExpired + // The timer must be unlocked here already in order to avoid a deadlock with the call to StopTimout in Manager.Return. + t.Unlock() + err := t.manager.Return(t.runner) + if err != nil { + log.WithError(err).WithField("id", t.runner.ID()).Warn("Returning runner after inactivity caused an error") + } else { + log.WithField("id", t.runner.ID()).Info("Returning runner due to inactivity timeout") + } + }) +} + +func (t *InactivityTimerImplementation) ResetTimeout() { + t.Lock() + defer t.Unlock() + if t.state != TimerRunning { + // The timer has already expired or been stopped. We don't want to restart it. + return + } + if t.timer.Stop() { + t.timer.Reset(t.duration) + } else { + log.Error("Timer is in state running but stopped. This should never happen") + } +} + +func (t *InactivityTimerImplementation) StopTimeout() { + t.Lock() + defer t.Unlock() + if t.state != TimerRunning { + return + } + t.timer.Stop() + t.state = TimerInactive +} + +func (t *InactivityTimerImplementation) TimeoutPassed() bool { + return t.state == TimerExpired +} diff --git a/internal/runner/inactivity_timer_test.go b/internal/runner/inactivity_timer_test.go new file mode 100644 index 0000000..451a044 --- /dev/null +++ b/internal/runner/inactivity_timer_test.go @@ -0,0 +1,86 @@ +package runner + +import ( + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" + "testing" + "time" +) + +func TestInactivityTimerTestSuite(t *testing.T) { + suite.Run(t, new(InactivityTimerTestSuite)) +} + +type InactivityTimerTestSuite struct { + suite.Suite + runner Runner + manager *ManagerMock + returned chan bool +} + +func (s *InactivityTimerTestSuite) SetupTest() { + s.returned = make(chan bool, 1) + s.manager = &ManagerMock{} + s.manager.On("Return", mock.Anything).Run(func(_ mock.Arguments) { + s.returned <- true + }).Return(nil) + + s.runner = NewRunner(tests.DefaultRunnerID, s.manager) + + s.runner.SetupTimeout(tests.ShortTimeout) +} + +func (s *InactivityTimerTestSuite) TearDownTest() { + s.runner.StopTimeout() +} + +func (s *InactivityTimerTestSuite) TestRunnerIsReturnedAfterTimeout() { + s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestRunnerIsNotReturnedBeforeTimeout() { + s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout/2)) +} + +func (s *InactivityTimerTestSuite) TestResetTimeoutExtendsTheDeadline() { + time.Sleep(3 * tests.ShortTimeout / 4) + s.runner.ResetTimeout() + s.False(tests.ChannelReceivesSomething(s.returned, 3*tests.ShortTimeout/4), + "Because of the reset, the timeout should not be reached by now.") + s.True(tests.ChannelReceivesSomething(s.returned, 5*tests.ShortTimeout/4), + "After reset, the timout should be reached by now.") +} + +func (s *InactivityTimerTestSuite) TestStopTimeoutStopsTimeout() { + s.runner.StopTimeout() + s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsFalseBeforeDeadline() { + s.False(s.runner.TimeoutPassed()) +} + +func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsTrueAfterDeadline() { + time.Sleep(2 * tests.ShortTimeout) + s.True(s.runner.TimeoutPassed()) +} + +func (s *InactivityTimerTestSuite) TestTimerIsNotResetAfterDeadline() { + time.Sleep(2 * tests.ShortTimeout) + // We need to empty the returned channel so Return can send to it again. + tests.ChannelReceivesSomething(s.returned, 0) + s.runner.ResetTimeout() + s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestSetupTimeoutStopsOldTimeout() { + s.runner.SetupTimeout(3 * tests.ShortTimeout) + s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) + s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) +} + +func (s *InactivityTimerTestSuite) TestTimerIsInactiveWhenDurationIsZero() { + s.runner.SetupTimeout(0) + s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout)) +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 87eed0c..b4072a9 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -12,7 +12,6 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" "io" "strings" - "sync" "time" ) @@ -32,107 +31,7 @@ const ( executionTimeoutGracePeriod = 3 * time.Second ) -var ( - ErrorFileCopyFailed = errors.New("file copy failed") - ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded") -) - -// InactivityTimer is a wrapper around a timer that is used to delete a a Runner after some time of inactivity. -type InactivityTimer interface { - // SetupTimeout starts the timeout after a runner gets deleted. - SetupTimeout(duration time.Duration) - - // ResetTimeout resets the current timeout so that the runner gets deleted after the time set in Setup from now. - // It does not make an already expired timer run again. - ResetTimeout() - - // StopTimeout stops the timeout but does not remove the runner. - StopTimeout() - - // TimeoutPassed returns true if the timeout expired and false otherwise. - TimeoutPassed() bool -} - -type TimerState uint8 - -const ( - TimerInactive TimerState = 0 - TimerRunning TimerState = 1 - TimerExpired TimerState = 2 -) - -type InactivityTimerImplementation struct { - timer *time.Timer - duration time.Duration - state TimerState - runner Runner - manager Manager - sync.Mutex -} - -func NewInactivityTimer(runner Runner, manager Manager) InactivityTimer { - return &InactivityTimerImplementation{ - state: TimerInactive, - runner: runner, - manager: manager, - } -} - -func (t *InactivityTimerImplementation) SetupTimeout(duration time.Duration) { - t.Lock() - defer t.Unlock() - // Stop old timer if present. - if t.timer != nil { - t.timer.Stop() - } - if duration == 0 { - t.state = TimerInactive - return - } - t.state = TimerRunning - t.duration = duration - - t.timer = time.AfterFunc(duration, func() { - t.Lock() - t.state = TimerExpired - // The timer must be unlocked here already in order to avoid a deadlock with the call to StopTimout in Manager.Return. - t.Unlock() - err := t.manager.Return(t.runner) - if err != nil { - log.WithError(err).WithField("id", t.runner.ID()).Warn("Returning runner after inactivity caused an error") - } else { - log.WithField("id", t.runner.ID()).Info("Returning runner due to inactivity timeout") - } - }) -} - -func (t *InactivityTimerImplementation) ResetTimeout() { - t.Lock() - defer t.Unlock() - if t.state != TimerRunning { - // The timer has already expired or been stopped. We don't want to restart it. - return - } - if t.timer.Stop() { - t.timer.Reset(t.duration) - } else { - log.Error("Timer is in state running but stopped. This should never happen") - } -} - -func (t *InactivityTimerImplementation) StopTimeout() { - t.Lock() - defer t.Unlock() - if t.state != TimerRunning { - return - } - t.timer.Stop() - t.state = TimerInactive -} - -func (t *InactivityTimerImplementation) TimeoutPassed() bool { - return t.state == TimerExpired -} +var ErrorFileCopyFailed = errors.New("file copy failed") type Runner interface { // ID returns the id of the runner. diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go index 81d7481..071d457 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/runner_test.go @@ -362,83 +362,6 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader return files } -func TestInactivityTimerTestSuite(t *testing.T) { - suite.Run(t, new(InactivityTimerTestSuite)) -} - -type InactivityTimerTestSuite struct { - suite.Suite - runner Runner - manager *ManagerMock - returned chan bool -} - -func (s *InactivityTimerTestSuite) SetupTest() { - s.returned = make(chan bool, 1) - s.manager = &ManagerMock{} - s.manager.On("Return", mock.Anything).Run(func(_ mock.Arguments) { - s.returned <- true - }).Return(nil) - - s.runner = NewRunner(tests.DefaultRunnerID, s.manager) - - s.runner.SetupTimeout(tests.ShortTimeout) -} - -func (s *InactivityTimerTestSuite) TearDownTest() { - s.runner.StopTimeout() -} - -func (s *InactivityTimerTestSuite) TestRunnerIsReturnedAfterTimeout() { - s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) -} - -func (s *InactivityTimerTestSuite) TestRunnerIsNotReturnedBeforeTimeout() { - s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout/2)) -} - -func (s *InactivityTimerTestSuite) TestResetTimeoutExtendsTheDeadline() { - time.Sleep(3 * tests.ShortTimeout / 4) - s.runner.ResetTimeout() - s.False(tests.ChannelReceivesSomething(s.returned, 3*tests.ShortTimeout/4), - "Because of the reset, the timeout should not be reached by now.") - s.True(tests.ChannelReceivesSomething(s.returned, 5*tests.ShortTimeout/4), - "After reset, the timout should be reached by now.") -} - -func (s *InactivityTimerTestSuite) TestStopTimeoutStopsTimeout() { - s.runner.StopTimeout() - s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) -} - -func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsFalseBeforeDeadline() { - s.False(s.runner.TimeoutPassed()) -} - -func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsTrueAfterDeadline() { - time.Sleep(2 * tests.ShortTimeout) - s.True(s.runner.TimeoutPassed()) -} - -func (s *InactivityTimerTestSuite) TestTimerIsNotResetAfterDeadline() { - time.Sleep(2 * tests.ShortTimeout) - // We need to empty the returned channel so Return can send to it again. - tests.ChannelReceivesSomething(s.returned, 0) - s.runner.ResetTimeout() - s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) -} - -func (s *InactivityTimerTestSuite) TestSetupTimeoutStopsOldTimeout() { - s.runner.SetupTimeout(3 * tests.ShortTimeout) - s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) - s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout)) -} - -func (s *InactivityTimerTestSuite) TestTimerIsInactiveWhenDurationIsZero() { - s.runner.SetupTimeout(0) - s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout)) -} - // NewRunner creates a new runner with the provided id and manager. func NewRunner(id string, manager Manager) Runner { return NewNomadJob(id, nil, nil, manager)