Add inactivity timeout for runners.
By removing runners after a specified timeout they no longer stay around indefinitely and block Nomads capacities. The timeout can be set individually per runner when requesting the provide route. If it is set to 0, the runner is never removed automatically. The timeout is reset when activity is detected. Currently that is when something gets executed or the filesystem gets modified.
This commit is contained in:
@ -9,6 +9,7 @@ import (
|
|||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -49,16 +50,19 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
|
|||||||
environmentId := runner.EnvironmentID(runnerRequest.ExecutionEnvironmentId)
|
environmentId := runner.EnvironmentID(runnerRequest.ExecutionEnvironmentId)
|
||||||
nextRunner, err := r.manager.Claim(environmentId)
|
nextRunner, err := r.manager.Claim(environmentId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == runner.ErrUnknownExecutionEnvironment {
|
switch err {
|
||||||
|
case runner.ErrUnknownExecutionEnvironment:
|
||||||
writeNotFound(writer, err)
|
writeNotFound(writer, err)
|
||||||
} else if err == runner.ErrNoRunnersAvailable {
|
case runner.ErrNoRunnersAvailable:
|
||||||
log.WithField("environment", environmentId).Warn("No runners available")
|
log.WithField("environment", environmentId).Warn("No runners available")
|
||||||
writeInternalServerError(writer, err, dto.ErrorNomadOverload)
|
writeInternalServerError(writer, err, dto.ErrorNomadOverload)
|
||||||
} else {
|
default:
|
||||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
timeout := time.Duration(runnerRequest.InactivityTimeout) * time.Second
|
||||||
|
nextRunner.SetupTimeout(timeout, nextRunner, r.manager)
|
||||||
|
|
||||||
sendJson(writer, &dto.RunnerResponse{Id: nextRunner.Id()}, http.StatusOK)
|
sendJson(writer, &dto.RunnerResponse{Id: nextRunner.Id()}, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
@ -186,11 +186,8 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if exitInfo.Err == context.DeadlineExceeded {
|
if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) {
|
||||||
err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout})
|
_ = wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout})
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
} else if exitInfo.Err != nil {
|
} else if exitInfo.Err != nil {
|
||||||
errorMessage := "Error executing the request"
|
errorMessage := "Error executing the request"
|
||||||
|
@ -63,8 +63,8 @@ type ExecutorAPI interface {
|
|||||||
ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool,
|
ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool,
|
||||||
stdin io.Reader, stdout, stderr io.Writer) (int, error)
|
stdin io.Reader, stdout, stderr io.Writer) (int, error)
|
||||||
|
|
||||||
// MarkRunnerAsUsed marks the runner with the given ID as used.
|
// MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata.
|
||||||
MarkRunnerAsUsed(runnerID string) error
|
MarkRunnerAsUsed(runnerID string, duration int) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
|
// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
|
||||||
|
@ -185,6 +185,8 @@ func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error)
|
|||||||
return nil, fmt.Errorf("can't mark runner as used: %w", err)
|
return nil, fmt.Errorf("can't mark runner as used: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
runner.SetupTimeout(time.Duration(duration) * time.Second)
|
||||||
|
|
||||||
err = m.scaleEnvironment(environmentID)
|
err = m.scaleEnvironment(environmentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("environmentID", environmentID).Error("Couldn't scale environment")
|
log.WithError(err).WithField("environmentID", environmentID).Error("Couldn't scale environment")
|
||||||
@ -202,6 +204,7 @@ func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *NomadRunnerManager) Return(r Runner) (err error) {
|
func (m *NomadRunnerManager) Return(r Runner) (err error) {
|
||||||
|
r.StopTimeout()
|
||||||
err = m.apiClient.DeleteRunner(r.Id())
|
err = m.apiClient.DeleteRunner(r.Id())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -234,9 +237,15 @@ func (m *NomadRunnerManager) Load() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
|
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
|
||||||
newJob := NewNomadJob(*job.ID, m.apiClient)
|
newJob := NewNomadJob(*job.ID, m.apiClient, m)
|
||||||
if isUsed {
|
if isUsed {
|
||||||
m.usedRunners.Add(newJob)
|
m.usedRunners.Add(newJob)
|
||||||
|
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Warn("Error loading timeout from meta values")
|
||||||
|
} else {
|
||||||
|
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
environment.idleRunners.Add(newJob)
|
environment.idleRunners.Add(newJob)
|
||||||
}
|
}
|
||||||
@ -273,7 +282,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
|
|||||||
|
|
||||||
job, ok := m.environments.Get(environmentID)
|
job, ok := m.environments.Get(environmentID)
|
||||||
if ok {
|
if ok {
|
||||||
job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient))
|
job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient, m))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
116
runner/runner.go
116
runner/runner.go
@ -11,6 +11,7 @@ import (
|
|||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -26,14 +27,113 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrorFileCopyFailed = errors.New("file copy failed")
|
ErrorFileCopyFailed = errors.New("file copy failed")
|
||||||
|
ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// InactivityTimer is a wrapper around a timer that is used to delete a a Runner after some time of inactivity.
|
||||||
|
type InactivityTimer interface {
|
||||||
|
// SetupTimeout starts the timeout after a runner gets deleted.
|
||||||
|
SetupTimeout(duration time.Duration)
|
||||||
|
|
||||||
|
// ResetTimeout resets the current timeout so that the runner gets deleted after the time set in Setup from now.
|
||||||
|
// It does not make an already expired timer run again.
|
||||||
|
ResetTimeout()
|
||||||
|
|
||||||
|
// StopTimeout stops the timeout but does not remove the runner.
|
||||||
|
StopTimeout()
|
||||||
|
|
||||||
|
// TimeoutPassed returns true if the timeout expired and false otherwise.
|
||||||
|
TimeoutPassed() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimerState uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
TimerInactive TimerState = 0
|
||||||
|
TimerRunning TimerState = 1
|
||||||
|
TimerExpired TimerState = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
type InactivityTimerImplementation struct {
|
||||||
|
timer *time.Timer
|
||||||
|
duration time.Duration
|
||||||
|
state TimerState
|
||||||
|
runner Runner
|
||||||
|
manager Manager
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInactivityTimer(runner Runner, manager Manager) InactivityTimer {
|
||||||
|
return &InactivityTimerImplementation{
|
||||||
|
state: TimerInactive,
|
||||||
|
runner: runner,
|
||||||
|
manager: manager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *InactivityTimerImplementation) SetupTimeout(duration time.Duration) {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
// Stop old timer if present.
|
||||||
|
if t.timer != nil {
|
||||||
|
t.timer.Stop()
|
||||||
|
}
|
||||||
|
if duration == 0 {
|
||||||
|
t.state = TimerInactive
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.state = TimerRunning
|
||||||
|
t.duration = duration
|
||||||
|
|
||||||
|
t.timer = time.AfterFunc(duration, func() {
|
||||||
|
t.Lock()
|
||||||
|
t.state = TimerExpired
|
||||||
|
// The timer must be unlocked here already in order to avoid a deadlock with the call to StopTimout in Manager.Return.
|
||||||
|
t.Unlock()
|
||||||
|
err := t.manager.Return(t.runner)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).WithField("id", t.runner.Id()).Warn("Returning runner after inactivity caused an error")
|
||||||
|
} else {
|
||||||
|
log.WithField("id", t.runner.Id()).Info("Returning runner due to inactivity timeout")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *InactivityTimerImplementation) ResetTimeout() {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
if t.state != TimerRunning {
|
||||||
|
// The timer has already expired or been stopped. We don't want to restart it.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if t.timer.Stop() {
|
||||||
|
t.timer.Reset(t.duration)
|
||||||
|
} else {
|
||||||
|
log.Error("Timer is in state running but stopped. This should never happen")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *InactivityTimerImplementation) StopTimeout() {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
if t.state != TimerRunning {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.timer.Stop()
|
||||||
|
t.state = TimerInactive
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *InactivityTimerImplementation) TimeoutPassed() bool {
|
||||||
|
return t.state == TimerExpired
|
||||||
|
}
|
||||||
|
|
||||||
type Runner interface {
|
type Runner interface {
|
||||||
// Id returns the id of the runner.
|
// Id returns the id of the runner.
|
||||||
Id() string
|
Id() string
|
||||||
|
|
||||||
ExecutionStorage
|
ExecutionStorage
|
||||||
|
InactivityTimer
|
||||||
|
|
||||||
// ExecuteInteractively runs the given execution request and forwards from and to the given reader and writers.
|
// ExecuteInteractively runs the given execution request and forwards from and to the given reader and writers.
|
||||||
// An ExitInfo is sent to the exit channel on command completion.
|
// An ExitInfo is sent to the exit channel on command completion.
|
||||||
@ -53,17 +153,20 @@ type Runner interface {
|
|||||||
// NomadJob is an abstraction to communicate with Nomad environments.
|
// NomadJob is an abstraction to communicate with Nomad environments.
|
||||||
type NomadJob struct {
|
type NomadJob struct {
|
||||||
ExecutionStorage
|
ExecutionStorage
|
||||||
|
InactivityTimer
|
||||||
id string
|
id string
|
||||||
api nomad.ExecutorAPI
|
api nomad.ExecutorAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNomadJob creates a new NomadJob with the provided id.
|
// NewNomadJob creates a new NomadJob with the provided id.
|
||||||
func NewNomadJob(id string, apiClient nomad.ExecutorAPI) *NomadJob {
|
func NewNomadJob(id string, apiClient nomad.ExecutorAPI, manager Manager) *NomadJob {
|
||||||
return &NomadJob{
|
job := &NomadJob{
|
||||||
id: id,
|
id: id,
|
||||||
api: apiClient,
|
api: apiClient,
|
||||||
ExecutionStorage: NewLocalExecutionStorage(),
|
ExecutionStorage: NewLocalExecutionStorage(),
|
||||||
}
|
}
|
||||||
|
job.InactivityTimer = NewInactivityTimer(job, manager)
|
||||||
|
return job
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *NomadJob) Id() string {
|
func (r *NomadJob) Id() string {
|
||||||
@ -80,6 +183,8 @@ func (r *NomadJob) ExecuteInteractively(
|
|||||||
stdin io.Reader,
|
stdin io.Reader,
|
||||||
stdout, stderr io.Writer,
|
stdout, stderr io.Writer,
|
||||||
) (<-chan ExitInfo, context.CancelFunc) {
|
) (<-chan ExitInfo, context.CancelFunc) {
|
||||||
|
r.ResetTimeout()
|
||||||
|
|
||||||
command := request.FullCommand()
|
command := request.FullCommand()
|
||||||
var ctx context.Context
|
var ctx context.Context
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
@ -91,6 +196,9 @@ func (r *NomadJob) ExecuteInteractively(
|
|||||||
exit := make(chan ExitInfo)
|
exit := make(chan ExitInfo)
|
||||||
go func() {
|
go func() {
|
||||||
exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, stdin, stdout, stderr)
|
exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, stdin, stdout, stderr)
|
||||||
|
if err == nil && r.TimeoutPassed() {
|
||||||
|
err = ErrorRunnerInactivityTimeout
|
||||||
|
}
|
||||||
exit <- ExitInfo{uint8(exitCode), err}
|
exit <- ExitInfo{uint8(exitCode), err}
|
||||||
close(exit)
|
close(exit)
|
||||||
}()
|
}()
|
||||||
@ -98,6 +206,8 @@ func (r *NomadJob) ExecuteInteractively(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error {
|
func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error {
|
||||||
|
r.ResetTimeout()
|
||||||
|
|
||||||
var tarBuffer bytes.Buffer
|
var tarBuffer bytes.Buffer
|
||||||
if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil {
|
if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -80,8 +80,8 @@ func TestExecuteCallsAPI(t *testing.T) {
|
|||||||
request := &dto.ExecutionRequest{Command: "echo 'Hello World!'"}
|
request := &dto.ExecutionRequest{Command: "echo 'Hello World!'"}
|
||||||
runner.ExecuteInteractively(request, nil, nil, nil)
|
runner.ExecuteInteractively(request, nil, nil, nil)
|
||||||
|
|
||||||
<-time.After(50 * time.Millisecond)
|
time.Sleep(tests.ShortTimeout)
|
||||||
apiMock.AssertCalled(t, "ExecuteCommand", tests.DefaultRunnerID, mock.Anything, request.FullCommand(), true, mock.Anything, mock.Anything, mock.Anything)
|
s.apiMock.AssertCalled(s.T(), "ExecuteCommand", tests.DefaultRunnerID, mock.Anything, request.FullCommand(), true, mock.Anything, mock.Anything, mock.Anything)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExecuteReturnsAfterTimeout(t *testing.T) {
|
func TestExecuteReturnsAfterTimeout(t *testing.T) {
|
||||||
@ -252,6 +252,81 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader
|
|||||||
return files
|
return files
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInactivityTimerTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(InactivityTimerTestSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
type InactivityTimerTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
runner Runner
|
||||||
|
manager *ManagerMock
|
||||||
|
returned chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) SetupTest() {
|
||||||
|
s.returned = make(chan bool)
|
||||||
|
s.runner = NewRunner(tests.DefaultRunnerID)
|
||||||
|
s.manager = &ManagerMock{}
|
||||||
|
s.manager.On("Return", mock.Anything).Run(func(_ mock.Arguments) {
|
||||||
|
s.returned <- true
|
||||||
|
}).Return(nil)
|
||||||
|
|
||||||
|
s.runner.SetupTimeout(tests.ShortTimeout, s.runner, s.manager)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TearDownTest() {
|
||||||
|
s.runner.StopTimeout()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestRunnerIsReturnedAfterTimeout() {
|
||||||
|
s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestRunnerIsNotReturnedBeforeTimeout() {
|
||||||
|
s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestResetTimeoutExtendsTheDeadline() {
|
||||||
|
time.Sleep(2 * tests.ShortTimeout / 4)
|
||||||
|
s.runner.ResetTimeout()
|
||||||
|
s.False(tests.ChannelReceivesSomething(s.returned, 3*tests.ShortTimeout/4),
|
||||||
|
"Because of the reset, the timeout should not be reached by now.")
|
||||||
|
s.True(tests.ChannelReceivesSomething(s.returned, 5*tests.ShortTimeout/4),
|
||||||
|
"After reset, the timout should be reached by now.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestStopTimeoutStopsTimeout() {
|
||||||
|
s.runner.StopTimeout()
|
||||||
|
s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsFalseBeforeDeadline() {
|
||||||
|
s.False(s.runner.TimeoutPassed())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestTimeoutPassedReturnsTrueAfterDeadline() {
|
||||||
|
time.Sleep(2 * tests.ShortTimeout)
|
||||||
|
s.True(s.runner.TimeoutPassed())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestTimerIsNotResetAfterDeadline() {
|
||||||
|
time.Sleep(2 * tests.ShortTimeout)
|
||||||
|
<-s.returned
|
||||||
|
s.runner.ResetTimeout()
|
||||||
|
s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestSetupTimoutStopsOldTimout() {
|
||||||
|
s.runner.SetupTimeout(3*tests.ShortTimeout, s.runner, s.manager)
|
||||||
|
s.False(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout))
|
||||||
|
s.True(tests.ChannelReceivesSomething(s.returned, 2*tests.ShortTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InactivityTimerTestSuite) TestTimerIsInactiveWhenDurationIsZero() {
|
||||||
|
s.runner.SetupTimeout(0, s.runner, s.manager)
|
||||||
|
s.False(tests.ChannelReceivesSomething(s.returned, tests.ShortTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
// NewRunner creates a new runner with the provided id.
|
// NewRunner creates a new runner with the provided id.
|
||||||
func NewRunner(id string) Runner {
|
func NewRunner(id string) Runner {
|
||||||
return NewNomadJob(id, nil)
|
return NewNomadJob(id, nil)
|
||||||
|
Reference in New Issue
Block a user