Fix Goroutine Leak of Nomad execute command
that was triggered when [the execution timeout got exceeded, the runner got destroyed, or the WebSocket connection to CodeOcean closed] and the Allocation did not react to the SIGQUIT within the grace period.
This commit is contained in:
@ -45,8 +45,8 @@ func (s *WebSocketTestSuite) SetupTest() {
|
|||||||
|
|
||||||
// default execution
|
// default execution
|
||||||
s.executionID = tests.DefaultExecutionID
|
s.executionID = tests.DefaultExecutionID
|
||||||
s.runner.StoreExecution(s.executionID, &executionRequestHead)
|
s.runner.StoreExecution(s.executionID, &executionRequestLs)
|
||||||
mockAPIExecuteHead(s.apiMock)
|
mockAPIExecuteLs(s.apiMock)
|
||||||
|
|
||||||
runnerManager := &runner.ManagerMock{}
|
runnerManager := &runner.ManagerMock{}
|
||||||
runnerManager.On("Get", s.runner.ID()).Return(s.runner, nil)
|
runnerManager.On("Get", s.runner.ID()).Return(s.runner, nil)
|
||||||
@ -66,6 +66,8 @@ func (s *WebSocketTestSuite) TestWebsocketConnectionCanBeEstablished() {
|
|||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil)
|
conn, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
<-time.After(tests.ShortTimeout)
|
||||||
err = conn.Close()
|
err = conn.Close()
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
}
|
}
|
||||||
@ -90,6 +92,8 @@ func (s *WebSocketTestSuite) TestWebsocketReturns400IfRequestedViaHttp() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebSocketTestSuite) TestWebsocketConnection() {
|
func (s *WebSocketTestSuite) TestWebsocketConnection() {
|
||||||
|
s.runner.StoreExecution(s.executionID, &executionRequestHead)
|
||||||
|
mockAPIExecuteHead(s.apiMock)
|
||||||
wsURL, err := s.webSocketURL("ws", s.runner.ID(), s.executionID)
|
wsURL, err := s.webSocketURL("ws", s.runner.ID(), s.executionID)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
connection, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil)
|
connection, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil)
|
||||||
@ -461,6 +465,7 @@ func mockAPIExecuteExitNonZero(api *nomad.ExecutorAPIMock) {
|
|||||||
func mockAPIExecute(api *nomad.ExecutorAPIMock, request *dto.ExecutionRequest,
|
func mockAPIExecute(api *nomad.ExecutorAPIMock, request *dto.ExecutionRequest,
|
||||||
run func(runnerId string, ctx context.Context, command string, tty bool,
|
run func(runnerId string, ctx context.Context, command string, tty bool,
|
||||||
stdin io.Reader, stdout, stderr io.Writer) (int, error)) {
|
stdin io.Reader, stdout, stderr io.Writer) (int, error)) {
|
||||||
|
tests.RemoveMethodFromMock(&api.Mock, "ExecuteCommand")
|
||||||
call := api.On("ExecuteCommand",
|
call := api.On("ExecuteCommand",
|
||||||
mock.AnythingOfType("string"),
|
mock.AnythingOfType("string"),
|
||||||
mock.Anything,
|
mock.Anything,
|
||||||
|
@ -281,7 +281,10 @@ func (r *NomadJob) executeCommand(ctx context.Context, command string, privilege
|
|||||||
stdin io.ReadWriter, stdout, stderr io.Writer, exit chan<- ExitInfo,
|
stdin io.ReadWriter, stdout, stderr io.Writer, exit chan<- ExitInfo,
|
||||||
) {
|
) {
|
||||||
exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, privilegedExecution, stdin, stdout, stderr)
|
exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, privilegedExecution, stdin, stdout, stderr)
|
||||||
exit <- ExitInfo{uint8(exitCode), err}
|
select {
|
||||||
|
case exit <- ExitInfo{uint8(exitCode), err}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute context.CancelFunc,
|
func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute context.CancelFunc,
|
||||||
|
@ -219,12 +219,8 @@ func (s *ExecuteInteractivelyTestSuite) TestSendsSignalAfterTimeout() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal() {
|
func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal() {
|
||||||
s.T().Skip("ToDo: Refactor NomadJob.executeCommand. Stuck in sending to channel") // ToDo
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
||||||
<-ctx.Done()
|
<-s.TestCtx.Done()
|
||||||
})
|
})
|
||||||
runnerDestroyed := false
|
runnerDestroyed := false
|
||||||
s.runner.onDestroy = func(_ Runner) error {
|
s.runner.onDestroy = func(_ Runner) error {
|
||||||
@ -255,10 +251,8 @@ func (s *ExecuteInteractivelyTestSuite) TestResetTimerGetsCalled() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut() {
|
func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut() {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
||||||
<-ctx.Done()
|
<-s.TestCtx.Done()
|
||||||
}).Return(0, nil)
|
}).Return(0, nil)
|
||||||
s.mockedTimeoutPassedCall.Return(true)
|
s.mockedTimeoutPassedCall.Return(true)
|
||||||
executionRequest := &dto.ExecutionRequest{}
|
executionRequest := &dto.ExecutionRequest{}
|
||||||
@ -274,12 +268,8 @@ func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ExecuteInteractivelyTestSuite) TestDestroyReasonIsPassedToExecution() {
|
func (s *ExecuteInteractivelyTestSuite) TestDestroyReasonIsPassedToExecution() {
|
||||||
s.T().Skip("See TestDestroysRunnerAfterTimeoutAndSignal")
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
s.mockedExecuteCommandCall.Run(func(args mock.Arguments) {
|
||||||
<-ctx.Done()
|
<-s.TestCtx.Done()
|
||||||
}).Return(0, nil)
|
}).Return(0, nil)
|
||||||
s.mockedTimeoutPassedCall.Return(true)
|
s.mockedTimeoutPassedCall.Return(true)
|
||||||
executionRequest := &dto.ExecutionRequest{}
|
executionRequest := &dto.ExecutionRequest{}
|
||||||
|
Reference in New Issue
Block a user