Stop stdout & stderr after timeout

Co-authored-by: Sebastian Serth <MrSerth@users.noreply.github.com>
This commit is contained in:
Maximilian Paß
2021-11-23 14:45:53 +01:00
parent 0d7e07eae0
commit a6eaa45097
2 changed files with 31 additions and 14 deletions

View File

@@ -151,23 +151,28 @@ func (cr *codeOceanToRawReader) Write(p []byte) (n int, err error) {
type rawToCodeOceanWriter struct { type rawToCodeOceanWriter struct {
proxy *webSocketProxy proxy *webSocketProxy
outputType dto.WebSocketMessageType outputType dto.WebSocketMessageType
stopped bool
} }
// Write implements the io.Writer interface. // Write implements the io.Writer interface.
// The passed data is forwarded to the WebSocket to CodeOcean. // The passed data is forwarded to the WebSocket to CodeOcean.
func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) { func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) {
if rc.stopped {
return 0, nil
}
err := rc.proxy.sendToClient(dto.WebSocketMessage{Type: rc.outputType, Data: string(p)}) err := rc.proxy.sendToClient(dto.WebSocketMessage{Type: rc.outputType, Data: string(p)})
return len(p), err return len(p), err
} }
// webSocketProxy is an encapsulation of logic for forwarding between Runners and CodeOcean. // webSocketProxy is an encapsulation of logic for forwarding between Runners and CodeOcean.
type webSocketProxy struct { type webSocketProxy struct {
userExit chan bool userExit chan bool
connection webSocketConnection connection webSocketConnection
connectionMu sync.Mutex connectionMu sync.Mutex
Stdin WebSocketReader Stdin WebSocketReader
Stdout io.Writer Stdout io.Writer
Stderr io.Writer Stderr io.Writer
cancelWebSocketWrite func()
} }
// upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection. // upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection.
@@ -190,8 +195,14 @@ func newWebSocketProxy(connection webSocketConnection) (*webSocketProxy, error)
Stdin: stdin, Stdin: stdin,
userExit: make(chan bool), userExit: make(chan bool),
} }
proxy.Stdout = &rawToCodeOceanWriter{proxy: proxy, outputType: dto.WebSocketOutputStdout} stdOut := &rawToCodeOceanWriter{proxy: proxy, outputType: dto.WebSocketOutputStdout}
proxy.Stderr = &rawToCodeOceanWriter{proxy: proxy, outputType: dto.WebSocketOutputStderr} stdErr := &rawToCodeOceanWriter{proxy: proxy, outputType: dto.WebSocketOutputStderr}
proxy.cancelWebSocketWrite = func() {
stdOut.stopped = true
stdErr.stopped = true
}
proxy.Stdout = stdOut
proxy.Stderr = stdErr
err := proxy.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaStart}) err := proxy.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaStart})
if err != nil { if err != nil {
@@ -216,9 +227,11 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
select { select {
case exitInfo = <-exit: case exitInfo = <-exit:
cancelInputLoop() cancelInputLoop()
wp.cancelWebSocketWrite()
log.Info("Execution returned") log.Info("Execution returned")
case <-wp.userExit: case <-wp.userExit:
cancelInputLoop() cancelInputLoop()
wp.cancelWebSocketWrite()
cancelExecution() cancelExecution()
log.Info("Client closed the connection") log.Info("Client closed the connection")
return return
@@ -265,8 +278,8 @@ func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error {
} }
err = wp.writeMessage(websocket.TextMessage, encodedMessage) err = wp.writeMessage(websocket.TextMessage, encodedMessage)
if err != nil { if err != nil {
errorMessage := "Error writing the exit message" errorMessage := "Error writing the message"
log.WithError(err).Warn(errorMessage) log.WithField("message", message).WithError(err).Warn(errorMessage)
wp.closeWithError(errorMessage) wp.closeWithError(errorMessage)
return fmt.Errorf("error writing WebSocket message: %w", err) return fmt.Errorf("error writing WebSocket message: %w", err)
} }

View File

@@ -224,10 +224,14 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co
// (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty. // (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty.
// By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal // By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal
// and ignore it, which is why we destroy the runner if the process does not terminate after a grace period. // and ignore it, which is why we destroy the runner if the process does not terminate after a grace period.
n, err := stdin.Write([]byte{SIGQUIT}) _, err := stdin.Write([]byte{SIGQUIT})
if n != 1 { // if n != 1 {
log.WithField("runner", r.id).Warn("Could not send SIGQUIT because nothing was written") // The SIGQUIT is sent and correctly processed by the allocation. However, for an unknown
} // reason, the number of bytes written is always zero even though the error is nil.
// Hence, we disabled this sanity check here. See the MR for more details:
// https://github.com/openHPI/poseidon/pull/45#discussion_r757029024
// log.WithField("runner", r.id).Warn("Could not send SIGQUIT because nothing was written")
// }
if err != nil { if err != nil {
log.WithField("runner", r.id).WithError(err).Warn("Could not send SIGQUIT due to error") log.WithField("runner", r.id).WithError(err).Warn("Could not send SIGQUIT due to error")
} }