diff --git a/internal/api/websocket.go b/internal/api/websocket.go index 8b75adb..5c31b5c 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -65,6 +65,7 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti case <-wp.ctx.Done(): log.WithContext(wp.ctx).Info("Client closed the connection") wp.Input.Stop() + wp.Output.Close(nil) cancelExecution() <-exit // /internal/runner/runner.go handleExitOrContextDone does not require client connection anymore. <-exit // The goroutine closes this channel indicating that it does not use the connection to the executor anymore. diff --git a/internal/api/ws/codeocean_writer.go b/internal/api/ws/codeocean_writer.go index 011b40f..5063b86 100644 --- a/internal/api/ws/codeocean_writer.go +++ b/internal/api/ws/codeocean_writer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/gorilla/websocket" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/runner" @@ -16,13 +17,22 @@ const CodeOceanOutputWriterBufferSize = 64 // rawToCodeOceanWriter is a simple io.Writer implementation that just forwards the call to sendMessage. type rawToCodeOceanWriter struct { - sendMessage func(string) + outputType dto.WebSocketMessageType + sendMessage func(*dto.WebSocketMessage) + ctx context.Context } // Write implements the io.Writer interface. func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) { - rc.sendMessage(string(p)) - return len(p), nil + switch { + case rc.ctx.Err() != nil: + return 0, fmt.Errorf("CodeOceanWriter context done: %w", rc.ctx.Err()) + case len(p) == 0: + return 0, nil + default: + rc.sendMessage(&dto.WebSocketMessage{Type: rc.outputType, Data: string(p)}) + return len(p), nil + } } // WebSocketWriter is an interface that defines which data is required and which information can be passed. @@ -36,8 +46,8 @@ type WebSocketWriter interface { // It forwards the data written to stdOut or stdErr (Nomad, AWS) to the WebSocket connection (CodeOcean). type codeOceanOutputWriter struct { connection Connection - stdOut io.Writer - stdErr io.Writer + stdOut *rawToCodeOceanWriter + stdErr *rawToCodeOceanWriter queue chan *writingLoopMessage ctx context.Context } @@ -51,18 +61,22 @@ type writingLoopMessage struct { // NewCodeOceanOutputWriter provides an codeOceanOutputWriter for the time the context ctx is active. // The codeOceanOutputWriter handles all the messages defined in the websocket.schema.json (start, timeout, stdout, ..). func NewCodeOceanOutputWriter( - connection Connection, ctx context.Context, done context.CancelFunc) *codeOceanOutputWriter { + connection Connection, ctx context.Context, done context.CancelFunc) WebSocketWriter { cw := &codeOceanOutputWriter{ connection: connection, queue: make(chan *writingLoopMessage, CodeOceanOutputWriterBufferSize), ctx: ctx, } - cw.stdOut = &rawToCodeOceanWriter{sendMessage: func(s string) { - cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputStdout, Data: s}) - }} - cw.stdErr = &rawToCodeOceanWriter{sendMessage: func(s string) { - cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputStderr, Data: s}) - }} + cw.stdOut = &rawToCodeOceanWriter{ + outputType: dto.WebSocketOutputStdout, + sendMessage: cw.send, + ctx: ctx, + } + cw.stdErr = &rawToCodeOceanWriter{ + outputType: dto.WebSocketOutputStderr, + sendMessage: cw.send, + ctx: ctx, + } go cw.startWritingLoop(done) cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaStart}) @@ -81,9 +95,13 @@ func (cw *codeOceanOutputWriter) StdErr() io.Writer { // Close forwards the kind of exit (timeout, error, normal) to CodeOcean. // This results in the closing of the WebSocket connection. +// The call of Close is mandantory! func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { + defer func() { cw.queue <- &writingLoopMessage{done: true} }() // Mask the internal stop reason before disclosing/forwarding it externally/to CodeOcean. switch { + case info == nil: + return case info.Err == nil: cw.send(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: info.Code}) case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout): @@ -92,7 +110,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: runner.ErrOOMKilled.Error()}) case errors.Is(info.Err, nomad.ErrorAllocationCompleted), errors.Is(info.Err, runner.ErrDestroyedByAPIRequest): message := "the allocation stopped as expected" - log.WithContext(cw.ctx).WithError(info.Err).Debug(message) + log.WithContext(cw.ctx).WithError(info.Err).Trace(message) cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: message}) default: errorMessage := "Error executing the request" @@ -103,15 +121,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) { // send forwards the passed dto.WebSocketMessage to the writing loop. func (cw *codeOceanOutputWriter) send(message *dto.WebSocketMessage) { - select { - case <-cw.ctx.Done(): - return - default: - done := message.Type == dto.WebSocketExit || - message.Type == dto.WebSocketMetaTimeout || - message.Type == dto.WebSocketOutputError - cw.queue <- &writingLoopMessage{done: done, data: message} - } + cw.queue <- &writingLoopMessage{done: false, data: message} } // startWritingLoop enables the writing loop. @@ -128,15 +138,15 @@ func (cw *codeOceanOutputWriter) startWritingLoop(writingLoopDone context.Cancel }() for { - select { - case <-cw.ctx.Done(): + message := <-cw.queue + done := true + if message.data != nil { + done = sendMessage(cw.connection, message.data, cw.ctx) + } + if done || message.done { + log.Debug("Writing loop done") + writingLoopDone() return - case message := <-cw.queue: - done := sendMessage(cw.connection, message.data, cw.ctx) - if done || message.done { - writingLoopDone() - return - } } } } diff --git a/internal/nomad/sentry_debug_writer.go b/internal/nomad/sentry_debug_writer.go index 5ed7b49..2e74575 100644 --- a/internal/nomad/sentry_debug_writer.go +++ b/internal/nomad/sentry_debug_writer.go @@ -44,6 +44,15 @@ func NewSentryDebugWriter(target io.Writer, ctx context.Context) *SentryDebugWri // Improve: Handling of a split debug messages (usually, p is exactly one debug message, not less and not more). func (s *SentryDebugWriter) Write(p []byte) (n int, err error) { + if s.Ctx.Err() != nil { + return 0, err + } + // Peaking if the target is able to write. + // If not we should not process the data (see #325). + if _, err = s.Target.Write([]byte{}); err != nil { + return 0, fmt.Errorf("SentryDebugWriter cannot write to target: %w", err) + } + if !timeDebugMessagePatternStart.Match(p) { count, err := s.Target.Write(p) if err != nil {