diff --git a/config/config.go b/config/config.go index 5ffc0f8..d0879d1 100644 --- a/config/config.go +++ b/config/config.go @@ -18,12 +18,13 @@ import ( var ( Config = &configuration{ Server: server{ - Address: "127.0.0.1", - Port: 7200, - Token: "", - TLS: false, - CertFile: "", - KeyFile: "", + Address: "127.0.0.1", + Port: 7200, + Token: "", + TLS: false, + CertFile: "", + KeyFile: "", + InteractiveStderr: true, }, Nomad: nomad{ Address: "127.0.0.1", @@ -48,12 +49,13 @@ var ( // server configures the Poseidon webserver. type server struct { - Address string - Port int - Token string - TLS bool - CertFile string - KeyFile string + Address string + Port int + Token string + TLS bool + CertFile string + KeyFile string + InteractiveStderr bool } // nomad configures the used Nomad cluster. diff --git a/configuration.example.yaml b/configuration.example.yaml index 538ffeb..caca1b1 100644 --- a/configuration.example.yaml +++ b/configuration.example.yaml @@ -12,6 +12,8 @@ server: certfile: ./poseidon.crt # The path to the key file used for TLS keyfile: ./poseidon.key + # If true, an additional WebSocket connection will be opened to split stdout and stderr when executing interactively + interactiveStderr: true # Configuration of the used Nomad cluster nomad: diff --git a/environment/default-job.hcl b/environment/default-job.hcl index 1cd6390..0dd6c2d 100644 --- a/environment/default-job.hcl +++ b/environment/default-job.hcl @@ -1,10 +1,10 @@ // This is the default job configuration that is used when no path to another default configuration is given -job "default-poseidon-job" { +job "python" { datacenters = ["dc1"] type = "batch" - group "default-poseidon-group" { + group "default-group" { ephemeral_disk { migrate = false size = 10 @@ -23,13 +23,13 @@ job "default-poseidon-job" { weight = 100 } - task "default-poseidon-task" { + task "default-task" { driver = "docker" kill_timeout = "0s" kill_signal = "SIGKILL" config { - image = "python:latest" + image = "drp.codemoon.xopic.de/openhpi/co_execenv_python:3.8" command = "sleep" args = ["infinity"] network_mode = "none" diff --git a/nomad/api_querier.go b/nomad/api_querier.go index 159695d..c579e46 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -24,8 +24,8 @@ type apiQuerier interface { // DeleteRunner deletes the runner with the given Id. DeleteRunner(runnerId string) (err error) - // ExecuteCommand runs a command in the passed allocation. - ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, + // Execute runs a command in the passed allocation. + Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) // loadRunners loads all allocations of the specified job. @@ -72,7 +72,7 @@ func (nc *nomadAPIClient) DeleteRunner(runnerID string) (err error) { return err } -func (nc *nomadAPIClient) ExecuteCommand(allocationID string, +func (nc *nomadAPIClient) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) { allocation, _, err := nc.client.Allocations().Info(allocationID, nil) diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index 36b9075..9dcbe34 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -79,8 +79,8 @@ func (_m *apiQuerierMock) EvaluationStream(evalID string, ctx context.Context) ( return r0, r1 } -// ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr -func (_m *apiQuerierMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { +// Execute provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr +func (_m *apiQuerierMock) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) var r0 int diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 536f6a4..9adf9f8 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -14,13 +14,13 @@ import ( url "net/url" ) -// ExecutorApiMock is an autogenerated mock type for the ExecutorAPI type -type ExecutorApiMock struct { +// ExecutorAPIMock is an autogenerated mock type for the ExecutorAPI type +type ExecutorAPIMock struct { mock.Mock } // AllocationStream provides a mock function with given fields: ctx -func (_m *ExecutorApiMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) { +func (_m *ExecutorAPIMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) { ret := _m.Called(ctx) var r0 <-chan *api.Events @@ -43,7 +43,7 @@ func (_m *ExecutorApiMock) AllocationStream(ctx context.Context) (<-chan *api.Ev } // DeleteRunner provides a mock function with given fields: runnerId -func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error { +func (_m *ExecutorAPIMock) DeleteRunner(runnerId string) error { ret := _m.Called(runnerId) var r0 error @@ -57,7 +57,7 @@ func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error { } // EvaluationStream provides a mock function with given fields: evalID, ctx -func (_m *ExecutorApiMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) { +func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) { ret := _m.Called(evalID, ctx) var r0 <-chan *api.Events @@ -79,8 +79,29 @@ func (_m *ExecutorApiMock) EvaluationStream(evalID string, ctx context.Context) return r0, r1 } +// Execute provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr +func (_m *ExecutorAPIMock) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { + ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) + + var r0 int + if rf, ok := ret.Get(0).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) int); ok { + r0 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + } else { + r0 = ret.Get(0).(int) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) error); ok { + r1 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr -func (_m *ExecutorApiMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { +func (_m *ExecutorAPIMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr) var r0 int @@ -101,7 +122,7 @@ func (_m *ExecutorApiMock) ExecuteCommand(allocationID string, ctx context.Conte } // JobScale provides a mock function with given fields: jobId -func (_m *ExecutorApiMock) JobScale(jobId string) (uint, error) { +func (_m *ExecutorAPIMock) JobScale(jobId string) (uint, error) { ret := _m.Called(jobId) var r0 uint @@ -122,7 +143,7 @@ func (_m *ExecutorApiMock) JobScale(jobId string) (uint, error) { } // LoadJobList provides a mock function with given fields: -func (_m *ExecutorApiMock) LoadJobList() ([]*api.JobListStub, error) { +func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) { ret := _m.Called() var r0 []*api.JobListStub @@ -144,13 +165,13 @@ func (_m *ExecutorApiMock) LoadJobList() ([]*api.JobListStub, error) { return r0, r1 } -// LoadRunners provides a mock function with given fields: jobId -func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) { - ret := _m.Called(jobId) +// LoadRunners provides a mock function with given fields: jobID +func (_m *ExecutorAPIMock) LoadRunners(jobID string) ([]string, error) { + ret := _m.Called(jobID) var r0 []string if rf, ok := ret.Get(0).(func(string) []string); ok { - r0 = rf(jobId) + r0 = rf(jobID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]string) @@ -159,7 +180,7 @@ func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) { var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(jobId) + r1 = rf(jobID) } else { r1 = ret.Error(1) } @@ -167,13 +188,13 @@ func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) { return r0, r1 } -// MonitorEvaluation provides a mock function with given fields: evalID, ctx -func (_m *ExecutorApiMock) MonitorEvaluation(evalID string, ctx context.Context) error { - ret := _m.Called(evalID, ctx) +// MonitorEvaluation provides a mock function with given fields: evaluationID, ctx +func (_m *ExecutorAPIMock) MonitorEvaluation(evaluationID string, ctx context.Context) error { + ret := _m.Called(evaluationID, ctx) var r0 error if rf, ok := ret.Get(0).(func(string, context.Context) error); ok { - r0 = rf(evalID, ctx) + r0 = rf(evaluationID, ctx) } else { r0 = ret.Error(0) } @@ -182,7 +203,7 @@ func (_m *ExecutorApiMock) MonitorEvaluation(evalID string, ctx context.Context) } // RegisterNomadJob provides a mock function with given fields: job -func (_m *ExecutorApiMock) RegisterNomadJob(job *api.Job) (string, error) { +func (_m *ExecutorAPIMock) RegisterNomadJob(job *api.Job) (string, error) { ret := _m.Called(job) var r0 string @@ -203,7 +224,7 @@ func (_m *ExecutorApiMock) RegisterNomadJob(job *api.Job) (string, error) { } // SetJobScale provides a mock function with given fields: jobId, count, reason -func (_m *ExecutorApiMock) SetJobScale(jobId string, count uint, reason string) error { +func (_m *ExecutorAPIMock) SetJobScale(jobId string, count uint, reason string) error { ret := _m.Called(jobId, count, reason) var r0 error @@ -217,7 +238,7 @@ func (_m *ExecutorApiMock) SetJobScale(jobId string, count uint, reason string) } // WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation -func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error { +func (_m *ExecutorAPIMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error { ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation) var r0 error @@ -231,7 +252,7 @@ func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation } // init provides a mock function with given fields: nomadURL, nomadNamespace -func (_m *ExecutorApiMock) init(nomadURL *url.URL, nomadNamespace string) error { +func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error { ret := _m.Called(nomadURL, nomadNamespace) var r0 error @@ -245,7 +266,7 @@ func (_m *ExecutorApiMock) init(nomadURL *url.URL, nomadNamespace string) error } // loadRunners provides a mock function with given fields: jobId -func (_m *ExecutorApiMock) loadRunners(jobId string) ([]*api.AllocationListStub, error) { +func (_m *ExecutorAPIMock) loadRunners(jobId string) ([]*api.AllocationListStub, error) { ret := _m.Called(jobId) var r0 []*api.AllocationListStub diff --git a/nomad/nomad.go b/nomad/nomad.go index 62d71a2..a102db1 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -6,7 +6,9 @@ import ( "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" + "gitlab.hpi.de/codeocean/codemoon/poseidon/config" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" + "io" "net/url" "time" ) @@ -36,6 +38,12 @@ type ExecutorAPI interface { // WatchAllocations listens on the Nomad event stream for allocation events. // Depending on the incoming event, any of the given function is executed. WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error + + // ExecuteCommand executes the given command in the allocation with the given id. + // It writes the output of the command to stdout/stderr and reads input from stdin. + // If tty is true, the command will run with a tty. + ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, + stdin io.Reader, stdout, stderr io.Writer) (int, error) } // APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real @@ -203,3 +211,63 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) { } return err } + +// nullReader is a struct that implements the io.Reader interface and returns nothing when reading +// from it. +type nullReader struct{} + +func (r nullReader) Read(_ []byte) (int, error) { + return 0, nil +} + +// ExecuteCommand executes the given command in the given allocation. +// If tty is true, Nomad would normally write stdout and stderr of the command +// both on the stdout stream. However, if the InteractiveStderr server config option is true, +// we make sure that stdout and stderr are split correctly. +func (a *APIClient) ExecuteCommand(allocationID string, + ctx context.Context, command []string, tty bool, + stdin io.Reader, stdout, stderr io.Writer) (int, error) { + if tty && config.Config.Server.InteractiveStderr { + return a.executeCommandInteractivelyWithStderr(allocationID, ctx, command, stdin, stdout, stderr) + } + return a.apiQuerier.Execute(allocationID, ctx, command, tty, stdin, stdout, stderr) +} + +func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, ctx context.Context, + command []string, stdin io.Reader, stdout, stderr io.Writer) (int, error) { + // Use current nano time to make the stderr fifo kind of unique. + currentNanoTime := time.Now().UnixNano() + // We expect the command to be like []string{..., "sh", "-c", "my-command"}. + oldCommand := command[len(command)-1] + // Take the last command which is the one to be executed and wrap it to redirect stderr. + command[len(command)-1] = wrapCommandForStderrFifo(currentNanoTime, oldCommand) + + stderrExitChan := make(chan int) + go func() { + // Catch stderr in separate execution. + exit, err := a.Execute(allocationID, ctx, stderrFifoCommand(currentNanoTime), true, nullReader{}, stderr, io.Discard) + if err != nil { + log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error") + } + stderrExitChan <- exit + }() + + exit, err := a.Execute(allocationID, ctx, command, true, stdin, stdout, io.Discard) + + // Wait until the stderr catch command finished to make sure we receive all output. + <-stderrExitChan + return exit, err +} + +const ( + stderrFifoCommandFormat = "mkfifo /tmp/stderr_%d.fifo && cat /tmp/stderr_%d.fifo" + stderrWrapperCommandFormat = "until [ -e /tmp/stderr_%d.fifo ]; do sleep 0.01; done; (%s) 2> /tmp/stderr_%d.fifo" +) + +func stderrFifoCommand(id int64) []string { + return []string{"sh", "-c", fmt.Sprintf(stderrFifoCommandFormat, id, id)} +} + +func wrapCommandForStderrFifo(id int64, command string) string { + return fmt.Sprintf(stderrWrapperCommandFormat, id, command, id) +} diff --git a/runner/manager.go b/runner/manager.go index 41c2a0d..2dff852 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -184,7 +184,7 @@ func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentID) { } jobScale, err := m.apiClient.JobScale(string(job.jobID)) if err != nil { - log.WithError(err).Printf("Failed get allocation count") + log.WithError(err).WithField("job", string(job.jobID)).Printf("Failed get allocation count") break } additionallyNeededRunners := int(job.desiredIdleRunnersCount) - job.idleRunners.Length() diff --git a/tests/e2e/runners_test.go b/tests/e2e/runners_test.go index 0919848..28cd99c 100644 --- a/tests/e2e/runners_test.go +++ b/tests/e2e/runners_test.go @@ -112,7 +112,7 @@ func (s *E2ETestSuite) TestCopyFilesRoute() { s.Equal(http.StatusNoContent, resp.StatusCode) s.Run("File content can be printed on runner", func() { - s.Equal(tests.DefaultFileContent, s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName)) + s.assertFileContent(runnerID, tests.DefaultFileName, tests.DefaultFileContent) }) }) @@ -134,11 +134,11 @@ func (s *E2ETestSuite) TestCopyFilesRoute() { s.Run("File content of file with relative path can be printed on runner", func() { // the print command is executed in the context of the default working directory of the container - s.Equal(relativeFileContent, s.PrintContentOfFileOnRunner(runnerID, relativeFilePath)) + s.assertFileContent(runnerID, relativeFilePath, relativeFileContent) }) s.Run("File content of file with absolute path can be printed on runner", func() { - s.Equal(absoluteFileContent, s.PrintContentOfFileOnRunner(runnerID, absoluteFilePath)) + s.assertFileContent(runnerID, absoluteFilePath, absoluteFileContent) }) }) @@ -152,7 +152,9 @@ func (s *E2ETestSuite) TestCopyFilesRoute() { s.Equal(http.StatusNoContent, resp.StatusCode) s.Run("File content can no longer be printed", func() { - s.Contains(s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName), "No such file or directory") + stdout, stderr := s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName) + s.Equal("", stdout) + s.Contains(stderr, "No such file or directory") }) }) @@ -168,7 +170,7 @@ func (s *E2ETestSuite) TestCopyFilesRoute() { _ = resp.Body.Close() s.Run("File content can be printed on runner", func() { - s.Equal(tests.DefaultFileContent, s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName)) + s.assertFileContent(runnerID, tests.DefaultFileName, tests.DefaultFileContent) }) }) @@ -191,7 +193,7 @@ func (s *E2ETestSuite) TestCopyFilesRoute() { _ = resp.Body.Close() s.Run("File content can be printed on runner", func() { - s.Equal(string(newFileContent), s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName)) + s.assertFileContent(runnerID, tests.DefaultFileName, string(newFileContent)) }) }) @@ -208,7 +210,13 @@ func (s *E2ETestSuite) TestCopyFilesRoute() { }) } -func (s *E2ETestSuite) PrintContentOfFileOnRunner(runnerId string, filename string) string { +func (s *E2ETestSuite) assertFileContent(runnerID, fileName string, expectedContent string) { + stdout, stderr := s.PrintContentOfFileOnRunner(runnerID, fileName) + s.Equal(expectedContent, stdout) + s.Equal("", stderr) +} + +func (s *E2ETestSuite) PrintContentOfFileOnRunner(runnerId string, filename string) (string, string) { webSocketURL, _ := ProvideWebSocketURL(&s.Suite, runnerId, &dto.ExecutionRequest{Command: fmt.Sprintf("cat %s", filename)}) connection, _ := ConnectToWebSocket(webSocketURL) @@ -216,6 +224,6 @@ func (s *E2ETestSuite) PrintContentOfFileOnRunner(runnerId string, filename stri s.Require().Error(err) s.Equal(&websocket.CloseError{Code: websocket.CloseNormalClosure}, err) - stdout, _, _ := helpers.WebSocketOutputMessages(messages) - return stdout + stdout, stderr, _ := helpers.WebSocketOutputMessages(messages) + return stdout, stderr } diff --git a/tests/e2e/websocket_test.go b/tests/e2e/websocket_test.go index 5151535..6392ebe 100644 --- a/tests/e2e/websocket_test.go +++ b/tests/e2e/websocket_test.go @@ -65,7 +65,6 @@ func (s *E2ETestSuite) TestOutputToStdout() { } func (s *E2ETestSuite) TestOutputToStderr() { - s.T().Skip("known bug causing all output to be written to stdout (even if it should be written to stderr)") connection, err := ProvideWebSocketConnection(&s.Suite, &dto.ExecutionRequest{Command: "cat -invalid"}) s.Require().NoError(err) @@ -74,8 +73,9 @@ func (s *E2ETestSuite) TestOutputToStderr() { s.Equal(&websocket.CloseError{Code: websocket.CloseNormalClosure}, err) controlMessages := helpers.WebSocketControlMessages(messages) + s.Require().Equal(2, len(controlMessages)) s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketMetaStart}, controlMessages[0]) - s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketExit}, controlMessages[1]) + s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: 1}, controlMessages[1]) stdout, stderr, errors := helpers.WebSocketOutputMessages(messages) s.NotContains(stdout, "cat: invalid option", "Stdout should not contain the error")