Split stdout and stderr on interactive execution
When running a command interactively, we previously would get stdout and stderr both served on stdout by Nomad. To circumvent this issue, we now start a separate execution inside the allocation to split both streams.
This commit is contained in:

committed by
Tobias Kantusch

parent
19cd4b840e
commit
f122dd9376
@ -18,12 +18,13 @@ import (
|
|||||||
var (
|
var (
|
||||||
Config = &configuration{
|
Config = &configuration{
|
||||||
Server: server{
|
Server: server{
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
Port: 7200,
|
Port: 7200,
|
||||||
Token: "",
|
Token: "",
|
||||||
TLS: false,
|
TLS: false,
|
||||||
CertFile: "",
|
CertFile: "",
|
||||||
KeyFile: "",
|
KeyFile: "",
|
||||||
|
InteractiveStderr: true,
|
||||||
},
|
},
|
||||||
Nomad: nomad{
|
Nomad: nomad{
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
@ -48,12 +49,13 @@ var (
|
|||||||
|
|
||||||
// server configures the Poseidon webserver.
|
// server configures the Poseidon webserver.
|
||||||
type server struct {
|
type server struct {
|
||||||
Address string
|
Address string
|
||||||
Port int
|
Port int
|
||||||
Token string
|
Token string
|
||||||
TLS bool
|
TLS bool
|
||||||
CertFile string
|
CertFile string
|
||||||
KeyFile string
|
KeyFile string
|
||||||
|
InteractiveStderr bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// nomad configures the used Nomad cluster.
|
// nomad configures the used Nomad cluster.
|
||||||
|
@ -12,6 +12,8 @@ server:
|
|||||||
certfile: ./poseidon.crt
|
certfile: ./poseidon.crt
|
||||||
# The path to the key file used for TLS
|
# The path to the key file used for TLS
|
||||||
keyfile: ./poseidon.key
|
keyfile: ./poseidon.key
|
||||||
|
# If true, an additional WebSocket connection will be opened to split stdout and stderr when executing interactively
|
||||||
|
interactiveStderr: true
|
||||||
|
|
||||||
# Configuration of the used Nomad cluster
|
# Configuration of the used Nomad cluster
|
||||||
nomad:
|
nomad:
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
// This is the default job configuration that is used when no path to another default configuration is given
|
// This is the default job configuration that is used when no path to another default configuration is given
|
||||||
|
|
||||||
job "default-poseidon-job" {
|
job "python" {
|
||||||
datacenters = ["dc1"]
|
datacenters = ["dc1"]
|
||||||
type = "batch"
|
type = "batch"
|
||||||
|
|
||||||
group "default-poseidon-group" {
|
group "default-group" {
|
||||||
ephemeral_disk {
|
ephemeral_disk {
|
||||||
migrate = false
|
migrate = false
|
||||||
size = 10
|
size = 10
|
||||||
@ -23,13 +23,13 @@ job "default-poseidon-job" {
|
|||||||
weight = 100
|
weight = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
task "default-poseidon-task" {
|
task "default-task" {
|
||||||
driver = "docker"
|
driver = "docker"
|
||||||
kill_timeout = "0s"
|
kill_timeout = "0s"
|
||||||
kill_signal = "SIGKILL"
|
kill_signal = "SIGKILL"
|
||||||
|
|
||||||
config {
|
config {
|
||||||
image = "python:latest"
|
image = "drp.codemoon.xopic.de/openhpi/co_execenv_python:3.8"
|
||||||
command = "sleep"
|
command = "sleep"
|
||||||
args = ["infinity"]
|
args = ["infinity"]
|
||||||
network_mode = "none"
|
network_mode = "none"
|
||||||
|
@ -24,8 +24,8 @@ type apiQuerier interface {
|
|||||||
// DeleteRunner deletes the runner with the given Id.
|
// DeleteRunner deletes the runner with the given Id.
|
||||||
DeleteRunner(runnerId string) (err error)
|
DeleteRunner(runnerId string) (err error)
|
||||||
|
|
||||||
// ExecuteCommand runs a command in the passed allocation.
|
// Execute runs a command in the passed allocation.
|
||||||
ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool,
|
Execute(allocationID string, ctx context.Context, command []string, tty bool,
|
||||||
stdin io.Reader, stdout, stderr io.Writer) (int, error)
|
stdin io.Reader, stdout, stderr io.Writer) (int, error)
|
||||||
|
|
||||||
// loadRunners loads all allocations of the specified job.
|
// loadRunners loads all allocations of the specified job.
|
||||||
@ -72,7 +72,7 @@ func (nc *nomadAPIClient) DeleteRunner(runnerID string) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *nomadAPIClient) ExecuteCommand(allocationID string,
|
func (nc *nomadAPIClient) Execute(allocationID string,
|
||||||
ctx context.Context, command []string, tty bool,
|
ctx context.Context, command []string, tty bool,
|
||||||
stdin io.Reader, stdout, stderr io.Writer) (int, error) {
|
stdin io.Reader, stdout, stderr io.Writer) (int, error) {
|
||||||
allocation, _, err := nc.client.Allocations().Info(allocationID, nil)
|
allocation, _, err := nc.client.Allocations().Info(allocationID, nil)
|
||||||
|
@ -79,8 +79,8 @@ func (_m *apiQuerierMock) EvaluationStream(evalID string, ctx context.Context) (
|
|||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr
|
// Execute provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr
|
||||||
func (_m *apiQuerierMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
func (_m *apiQuerierMock) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
||||||
ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
|
||||||
var r0 int
|
var r0 int
|
||||||
|
@ -14,13 +14,13 @@ import (
|
|||||||
url "net/url"
|
url "net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExecutorApiMock is an autogenerated mock type for the ExecutorAPI type
|
// ExecutorAPIMock is an autogenerated mock type for the ExecutorAPI type
|
||||||
type ExecutorApiMock struct {
|
type ExecutorAPIMock struct {
|
||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllocationStream provides a mock function with given fields: ctx
|
// AllocationStream provides a mock function with given fields: ctx
|
||||||
func (_m *ExecutorApiMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) {
|
func (_m *ExecutorAPIMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) {
|
||||||
ret := _m.Called(ctx)
|
ret := _m.Called(ctx)
|
||||||
|
|
||||||
var r0 <-chan *api.Events
|
var r0 <-chan *api.Events
|
||||||
@ -43,7 +43,7 @@ func (_m *ExecutorApiMock) AllocationStream(ctx context.Context) (<-chan *api.Ev
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRunner provides a mock function with given fields: runnerId
|
// DeleteRunner provides a mock function with given fields: runnerId
|
||||||
func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error {
|
func (_m *ExecutorAPIMock) DeleteRunner(runnerId string) error {
|
||||||
ret := _m.Called(runnerId)
|
ret := _m.Called(runnerId)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
@ -57,7 +57,7 @@ func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// EvaluationStream provides a mock function with given fields: evalID, ctx
|
// EvaluationStream provides a mock function with given fields: evalID, ctx
|
||||||
func (_m *ExecutorApiMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) {
|
func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) {
|
||||||
ret := _m.Called(evalID, ctx)
|
ret := _m.Called(evalID, ctx)
|
||||||
|
|
||||||
var r0 <-chan *api.Events
|
var r0 <-chan *api.Events
|
||||||
@ -79,8 +79,29 @@ func (_m *ExecutorApiMock) EvaluationStream(evalID string, ctx context.Context)
|
|||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Execute provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr
|
||||||
|
func (_m *ExecutorAPIMock) Execute(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
||||||
|
ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
|
||||||
|
var r0 int
|
||||||
|
if rf, ok := ret.Get(0).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) int); ok {
|
||||||
|
r0 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(int)
|
||||||
|
}
|
||||||
|
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(1).(func(string, context.Context, []string, bool, io.Reader, io.Writer, io.Writer) error); ok {
|
||||||
|
r1 = rf(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
// ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr
|
// ExecuteCommand provides a mock function with given fields: allocationID, ctx, command, tty, stdin, stdout, stderr
|
||||||
func (_m *ExecutorApiMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
func (_m *ExecutorAPIMock) ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
||||||
ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
ret := _m.Called(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
|
||||||
var r0 int
|
var r0 int
|
||||||
@ -101,7 +122,7 @@ func (_m *ExecutorApiMock) ExecuteCommand(allocationID string, ctx context.Conte
|
|||||||
}
|
}
|
||||||
|
|
||||||
// JobScale provides a mock function with given fields: jobId
|
// JobScale provides a mock function with given fields: jobId
|
||||||
func (_m *ExecutorApiMock) JobScale(jobId string) (uint, error) {
|
func (_m *ExecutorAPIMock) JobScale(jobId string) (uint, error) {
|
||||||
ret := _m.Called(jobId)
|
ret := _m.Called(jobId)
|
||||||
|
|
||||||
var r0 uint
|
var r0 uint
|
||||||
@ -122,7 +143,7 @@ func (_m *ExecutorApiMock) JobScale(jobId string) (uint, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// LoadJobList provides a mock function with given fields:
|
// LoadJobList provides a mock function with given fields:
|
||||||
func (_m *ExecutorApiMock) LoadJobList() ([]*api.JobListStub, error) {
|
func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) {
|
||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|
||||||
var r0 []*api.JobListStub
|
var r0 []*api.JobListStub
|
||||||
@ -144,13 +165,13 @@ func (_m *ExecutorApiMock) LoadJobList() ([]*api.JobListStub, error) {
|
|||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadRunners provides a mock function with given fields: jobId
|
// LoadRunners provides a mock function with given fields: jobID
|
||||||
func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) {
|
func (_m *ExecutorAPIMock) LoadRunners(jobID string) ([]string, error) {
|
||||||
ret := _m.Called(jobId)
|
ret := _m.Called(jobID)
|
||||||
|
|
||||||
var r0 []string
|
var r0 []string
|
||||||
if rf, ok := ret.Get(0).(func(string) []string); ok {
|
if rf, ok := ret.Get(0).(func(string) []string); ok {
|
||||||
r0 = rf(jobId)
|
r0 = rf(jobID)
|
||||||
} else {
|
} else {
|
||||||
if ret.Get(0) != nil {
|
if ret.Get(0) != nil {
|
||||||
r0 = ret.Get(0).([]string)
|
r0 = ret.Get(0).([]string)
|
||||||
@ -159,7 +180,7 @@ func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) {
|
|||||||
|
|
||||||
var r1 error
|
var r1 error
|
||||||
if rf, ok := ret.Get(1).(func(string) error); ok {
|
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||||
r1 = rf(jobId)
|
r1 = rf(jobID)
|
||||||
} else {
|
} else {
|
||||||
r1 = ret.Error(1)
|
r1 = ret.Error(1)
|
||||||
}
|
}
|
||||||
@ -167,13 +188,13 @@ func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) {
|
|||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// MonitorEvaluation provides a mock function with given fields: evalID, ctx
|
// MonitorEvaluation provides a mock function with given fields: evaluationID, ctx
|
||||||
func (_m *ExecutorApiMock) MonitorEvaluation(evalID string, ctx context.Context) error {
|
func (_m *ExecutorAPIMock) MonitorEvaluation(evaluationID string, ctx context.Context) error {
|
||||||
ret := _m.Called(evalID, ctx)
|
ret := _m.Called(evaluationID, ctx)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
if rf, ok := ret.Get(0).(func(string, context.Context) error); ok {
|
if rf, ok := ret.Get(0).(func(string, context.Context) error); ok {
|
||||||
r0 = rf(evalID, ctx)
|
r0 = rf(evaluationID, ctx)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Error(0)
|
r0 = ret.Error(0)
|
||||||
}
|
}
|
||||||
@ -182,7 +203,7 @@ func (_m *ExecutorApiMock) MonitorEvaluation(evalID string, ctx context.Context)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterNomadJob provides a mock function with given fields: job
|
// RegisterNomadJob provides a mock function with given fields: job
|
||||||
func (_m *ExecutorApiMock) RegisterNomadJob(job *api.Job) (string, error) {
|
func (_m *ExecutorAPIMock) RegisterNomadJob(job *api.Job) (string, error) {
|
||||||
ret := _m.Called(job)
|
ret := _m.Called(job)
|
||||||
|
|
||||||
var r0 string
|
var r0 string
|
||||||
@ -203,7 +224,7 @@ func (_m *ExecutorApiMock) RegisterNomadJob(job *api.Job) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetJobScale provides a mock function with given fields: jobId, count, reason
|
// SetJobScale provides a mock function with given fields: jobId, count, reason
|
||||||
func (_m *ExecutorApiMock) SetJobScale(jobId string, count uint, reason string) error {
|
func (_m *ExecutorAPIMock) SetJobScale(jobId string, count uint, reason string) error {
|
||||||
ret := _m.Called(jobId, count, reason)
|
ret := _m.Called(jobId, count, reason)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
@ -217,7 +238,7 @@ func (_m *ExecutorApiMock) SetJobScale(jobId string, count uint, reason string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation
|
// WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation
|
||||||
func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error {
|
func (_m *ExecutorAPIMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error {
|
||||||
ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation)
|
ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
@ -231,7 +252,7 @@ func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation
|
|||||||
}
|
}
|
||||||
|
|
||||||
// init provides a mock function with given fields: nomadURL, nomadNamespace
|
// init provides a mock function with given fields: nomadURL, nomadNamespace
|
||||||
func (_m *ExecutorApiMock) init(nomadURL *url.URL, nomadNamespace string) error {
|
func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error {
|
||||||
ret := _m.Called(nomadURL, nomadNamespace)
|
ret := _m.Called(nomadURL, nomadNamespace)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
@ -245,7 +266,7 @@ func (_m *ExecutorApiMock) init(nomadURL *url.URL, nomadNamespace string) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// loadRunners provides a mock function with given fields: jobId
|
// loadRunners provides a mock function with given fields: jobId
|
||||||
func (_m *ExecutorApiMock) loadRunners(jobId string) ([]*api.AllocationListStub, error) {
|
func (_m *ExecutorAPIMock) loadRunners(jobId string) ([]*api.AllocationListStub, error) {
|
||||||
ret := _m.Called(jobId)
|
ret := _m.Called(jobId)
|
||||||
|
|
||||||
var r0 []*api.AllocationListStub
|
var r0 []*api.AllocationListStub
|
||||||
|
@ -6,7 +6,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
nomadApi "github.com/hashicorp/nomad/api"
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/config"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
||||||
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -36,6 +38,12 @@ type ExecutorAPI interface {
|
|||||||
// WatchAllocations listens on the Nomad event stream for allocation events.
|
// WatchAllocations listens on the Nomad event stream for allocation events.
|
||||||
// Depending on the incoming event, any of the given function is executed.
|
// Depending on the incoming event, any of the given function is executed.
|
||||||
WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error
|
WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error
|
||||||
|
|
||||||
|
// ExecuteCommand executes the given command in the allocation with the given id.
|
||||||
|
// It writes the output of the command to stdout/stderr and reads input from stdin.
|
||||||
|
// If tty is true, the command will run with a tty.
|
||||||
|
ExecuteCommand(allocationID string, ctx context.Context, command []string, tty bool,
|
||||||
|
stdin io.Reader, stdout, stderr io.Writer) (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
|
// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
|
||||||
@ -203,3 +211,63 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nullReader is a struct that implements the io.Reader interface and returns nothing when reading
|
||||||
|
// from it.
|
||||||
|
type nullReader struct{}
|
||||||
|
|
||||||
|
func (r nullReader) Read(_ []byte) (int, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExecuteCommand executes the given command in the given allocation.
|
||||||
|
// If tty is true, Nomad would normally write stdout and stderr of the command
|
||||||
|
// both on the stdout stream. However, if the InteractiveStderr server config option is true,
|
||||||
|
// we make sure that stdout and stderr are split correctly.
|
||||||
|
func (a *APIClient) ExecuteCommand(allocationID string,
|
||||||
|
ctx context.Context, command []string, tty bool,
|
||||||
|
stdin io.Reader, stdout, stderr io.Writer) (int, error) {
|
||||||
|
if tty && config.Config.Server.InteractiveStderr {
|
||||||
|
return a.executeCommandInteractivelyWithStderr(allocationID, ctx, command, stdin, stdout, stderr)
|
||||||
|
}
|
||||||
|
return a.apiQuerier.Execute(allocationID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, ctx context.Context,
|
||||||
|
command []string, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
|
||||||
|
// Use current nano time to make the stderr fifo kind of unique.
|
||||||
|
currentNanoTime := time.Now().UnixNano()
|
||||||
|
// We expect the command to be like []string{..., "sh", "-c", "my-command"}.
|
||||||
|
oldCommand := command[len(command)-1]
|
||||||
|
// Take the last command which is the one to be executed and wrap it to redirect stderr.
|
||||||
|
command[len(command)-1] = wrapCommandForStderrFifo(currentNanoTime, oldCommand)
|
||||||
|
|
||||||
|
stderrExitChan := make(chan int)
|
||||||
|
go func() {
|
||||||
|
// Catch stderr in separate execution.
|
||||||
|
exit, err := a.Execute(allocationID, ctx, stderrFifoCommand(currentNanoTime), true, nullReader{}, stderr, io.Discard)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
|
||||||
|
}
|
||||||
|
stderrExitChan <- exit
|
||||||
|
}()
|
||||||
|
|
||||||
|
exit, err := a.Execute(allocationID, ctx, command, true, stdin, stdout, io.Discard)
|
||||||
|
|
||||||
|
// Wait until the stderr catch command finished to make sure we receive all output.
|
||||||
|
<-stderrExitChan
|
||||||
|
return exit, err
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
stderrFifoCommandFormat = "mkfifo /tmp/stderr_%d.fifo && cat /tmp/stderr_%d.fifo"
|
||||||
|
stderrWrapperCommandFormat = "until [ -e /tmp/stderr_%d.fifo ]; do sleep 0.01; done; (%s) 2> /tmp/stderr_%d.fifo"
|
||||||
|
)
|
||||||
|
|
||||||
|
func stderrFifoCommand(id int64) []string {
|
||||||
|
return []string{"sh", "-c", fmt.Sprintf(stderrFifoCommandFormat, id, id)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func wrapCommandForStderrFifo(id int64, command string) string {
|
||||||
|
return fmt.Sprintf(stderrWrapperCommandFormat, id, command, id)
|
||||||
|
}
|
||||||
|
@ -184,7 +184,7 @@ func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentID) {
|
|||||||
}
|
}
|
||||||
jobScale, err := m.apiClient.JobScale(string(job.jobID))
|
jobScale, err := m.apiClient.JobScale(string(job.jobID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Printf("Failed get allocation count")
|
log.WithError(err).WithField("job", string(job.jobID)).Printf("Failed get allocation count")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
additionallyNeededRunners := int(job.desiredIdleRunnersCount) - job.idleRunners.Length()
|
additionallyNeededRunners := int(job.desiredIdleRunnersCount) - job.idleRunners.Length()
|
||||||
|
@ -112,7 +112,7 @@ func (s *E2ETestSuite) TestCopyFilesRoute() {
|
|||||||
s.Equal(http.StatusNoContent, resp.StatusCode)
|
s.Equal(http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
s.Run("File content can be printed on runner", func() {
|
s.Run("File content can be printed on runner", func() {
|
||||||
s.Equal(tests.DefaultFileContent, s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName))
|
s.assertFileContent(runnerID, tests.DefaultFileName, tests.DefaultFileContent)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -134,11 +134,11 @@ func (s *E2ETestSuite) TestCopyFilesRoute() {
|
|||||||
|
|
||||||
s.Run("File content of file with relative path can be printed on runner", func() {
|
s.Run("File content of file with relative path can be printed on runner", func() {
|
||||||
// the print command is executed in the context of the default working directory of the container
|
// the print command is executed in the context of the default working directory of the container
|
||||||
s.Equal(relativeFileContent, s.PrintContentOfFileOnRunner(runnerID, relativeFilePath))
|
s.assertFileContent(runnerID, relativeFilePath, relativeFileContent)
|
||||||
})
|
})
|
||||||
|
|
||||||
s.Run("File content of file with absolute path can be printed on runner", func() {
|
s.Run("File content of file with absolute path can be printed on runner", func() {
|
||||||
s.Equal(absoluteFileContent, s.PrintContentOfFileOnRunner(runnerID, absoluteFilePath))
|
s.assertFileContent(runnerID, absoluteFilePath, absoluteFileContent)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -152,7 +152,9 @@ func (s *E2ETestSuite) TestCopyFilesRoute() {
|
|||||||
s.Equal(http.StatusNoContent, resp.StatusCode)
|
s.Equal(http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
s.Run("File content can no longer be printed", func() {
|
s.Run("File content can no longer be printed", func() {
|
||||||
s.Contains(s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName), "No such file or directory")
|
stdout, stderr := s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName)
|
||||||
|
s.Equal("", stdout)
|
||||||
|
s.Contains(stderr, "No such file or directory")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -168,7 +170,7 @@ func (s *E2ETestSuite) TestCopyFilesRoute() {
|
|||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
s.Run("File content can be printed on runner", func() {
|
s.Run("File content can be printed on runner", func() {
|
||||||
s.Equal(tests.DefaultFileContent, s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName))
|
s.assertFileContent(runnerID, tests.DefaultFileName, tests.DefaultFileContent)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -191,7 +193,7 @@ func (s *E2ETestSuite) TestCopyFilesRoute() {
|
|||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
s.Run("File content can be printed on runner", func() {
|
s.Run("File content can be printed on runner", func() {
|
||||||
s.Equal(string(newFileContent), s.PrintContentOfFileOnRunner(runnerID, tests.DefaultFileName))
|
s.assertFileContent(runnerID, tests.DefaultFileName, string(newFileContent))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -208,7 +210,13 @@ func (s *E2ETestSuite) TestCopyFilesRoute() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *E2ETestSuite) PrintContentOfFileOnRunner(runnerId string, filename string) string {
|
func (s *E2ETestSuite) assertFileContent(runnerID, fileName string, expectedContent string) {
|
||||||
|
stdout, stderr := s.PrintContentOfFileOnRunner(runnerID, fileName)
|
||||||
|
s.Equal(expectedContent, stdout)
|
||||||
|
s.Equal("", stderr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *E2ETestSuite) PrintContentOfFileOnRunner(runnerId string, filename string) (string, string) {
|
||||||
webSocketURL, _ := ProvideWebSocketURL(&s.Suite, runnerId, &dto.ExecutionRequest{Command: fmt.Sprintf("cat %s", filename)})
|
webSocketURL, _ := ProvideWebSocketURL(&s.Suite, runnerId, &dto.ExecutionRequest{Command: fmt.Sprintf("cat %s", filename)})
|
||||||
connection, _ := ConnectToWebSocket(webSocketURL)
|
connection, _ := ConnectToWebSocket(webSocketURL)
|
||||||
|
|
||||||
@ -216,6 +224,6 @@ func (s *E2ETestSuite) PrintContentOfFileOnRunner(runnerId string, filename stri
|
|||||||
s.Require().Error(err)
|
s.Require().Error(err)
|
||||||
s.Equal(&websocket.CloseError{Code: websocket.CloseNormalClosure}, err)
|
s.Equal(&websocket.CloseError{Code: websocket.CloseNormalClosure}, err)
|
||||||
|
|
||||||
stdout, _, _ := helpers.WebSocketOutputMessages(messages)
|
stdout, stderr, _ := helpers.WebSocketOutputMessages(messages)
|
||||||
return stdout
|
return stdout, stderr
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,6 @@ func (s *E2ETestSuite) TestOutputToStdout() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *E2ETestSuite) TestOutputToStderr() {
|
func (s *E2ETestSuite) TestOutputToStderr() {
|
||||||
s.T().Skip("known bug causing all output to be written to stdout (even if it should be written to stderr)")
|
|
||||||
connection, err := ProvideWebSocketConnection(&s.Suite, &dto.ExecutionRequest{Command: "cat -invalid"})
|
connection, err := ProvideWebSocketConnection(&s.Suite, &dto.ExecutionRequest{Command: "cat -invalid"})
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
@ -74,8 +73,9 @@ func (s *E2ETestSuite) TestOutputToStderr() {
|
|||||||
s.Equal(&websocket.CloseError{Code: websocket.CloseNormalClosure}, err)
|
s.Equal(&websocket.CloseError{Code: websocket.CloseNormalClosure}, err)
|
||||||
|
|
||||||
controlMessages := helpers.WebSocketControlMessages(messages)
|
controlMessages := helpers.WebSocketControlMessages(messages)
|
||||||
|
s.Require().Equal(2, len(controlMessages))
|
||||||
s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketMetaStart}, controlMessages[0])
|
s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketMetaStart}, controlMessages[0])
|
||||||
s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketExit}, controlMessages[1])
|
s.Require().Equal(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: 1}, controlMessages[1])
|
||||||
|
|
||||||
stdout, stderr, errors := helpers.WebSocketOutputMessages(messages)
|
stdout, stderr, errors := helpers.WebSocketOutputMessages(messages)
|
||||||
s.NotContains(stdout, "cat: invalid option", "Stdout should not contain the error")
|
s.NotContains(stdout, "cat: invalid option", "Stdout should not contain the error")
|
||||||
|
Reference in New Issue
Block a user