From 3afe8ddb6699038bd557d7304ef9ea1eb250b6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Sun, 19 Jun 2022 22:27:05 +0200 Subject: [PATCH] #155 Enable stopping of the CodeOcean WebSocket read independently of writing to CodeOcean. --- internal/api/websocket.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/internal/api/websocket.go b/internal/api/websocket.go index e04a912..1eb93db 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -33,6 +33,7 @@ type WebSocketReader interface { io.Reader io.Writer startReadInputLoop() + stopReadInputLoop() } // codeOceanToRawReader is an io.Reader implementation that provides the content of the WebSocket connection @@ -40,8 +41,9 @@ type WebSocketReader interface { type codeOceanToRawReader struct { connection webSocketConnection - // wsCtx is the context in that messages from CodeOcean are read. - wsCtx context.Context + // readCtx is the context in that messages from CodeOcean are read. + readCtx context.Context + cancelReadCtx context.CancelFunc // executorCtx is the context in that messages are forwarded to the executor. executorCtx context.Context @@ -58,7 +60,8 @@ type codeOceanToRawReader struct { func newCodeOceanToRawReader(connection webSocketConnection, wsCtx, executorCtx context.Context) *codeOceanToRawReader { return &codeOceanToRawReader{ connection: connection, - wsCtx: wsCtx, + readCtx: wsCtx, // This context may be canceled before the executorCtx. + cancelReadCtx: func() {}, executorCtx: executorCtx, buffer: make(chan byte, CodeOceanToRawReaderBufferSize), priorityBuffer: make(chan byte, CodeOceanToRawReaderBufferSize), @@ -68,9 +71,9 @@ func newCodeOceanToRawReader(connection webSocketConnection, wsCtx, executorCtx // readInputLoop reads from the WebSocket connection and buffers the user's input. // This is necessary because input must be read for the connection to handle special messages like close and call the // CloseHandler. -func (cr *codeOceanToRawReader) readInputLoop() { +func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) { readMessage := make(chan bool) - loopContext, cancelInputLoop := context.WithCancel(cr.wsCtx) + loopContext, cancelInputLoop := context.WithCancel(ctx) defer cancelInputLoop() readingContext, cancelNextMessage := context.WithCancel(loopContext) defer cancelNextMessage() @@ -133,7 +136,14 @@ func handleInput(messageType int, reader io.Reader, err error, buffer chan byte, // startReadInputLoop start the read input loop asynchronously. func (cr *codeOceanToRawReader) startReadInputLoop() { - go cr.readInputLoop() + ctx, cancel := context.WithCancel(cr.readCtx) + cr.cancelReadCtx = cancel + go cr.readInputLoop(ctx) +} + +// startReadInputLoop stops the asynchronous read input loop. +func (cr *codeOceanToRawReader) stopReadInputLoop() { + cr.cancelReadCtx() } // Read implements the io.Reader interface. @@ -259,7 +269,8 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti return case exitInfo = <-exit: log.Info("Execution returned") - wp.cancelWebSocket() + wp.Stdin.stopReadInputLoop() // Here we stop reading from the client + defer wp.cancelWebSocket() // At the end of this method we stop writing to the client } if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) {