diff --git a/internal/api/websocket.go b/internal/api/websocket.go index d3b1ba3..cad2a8b 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -62,13 +62,14 @@ func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) { readMessage <- true }() select { - case <-readMessage: case <-ctx.Done(): return + case <-readMessage: } if err != nil { - log.WithError(err).Warn("Error reading client message") + log.WithField("remote", cr.connection.(*websocket.Conn).UnderlyingConn().RemoteAddr()). + WithError(err).Warn("Error reading client message") return } if messageType != websocket.TextMessage { @@ -83,9 +84,9 @@ func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) { } for _, character := range message { select { - case cr.buffer <- character: case <-ctx.Done(): return + case cr.buffer <- character: } } } @@ -182,7 +183,6 @@ func newWebSocketProxy(connection webSocketConnection) (*webSocketProxy, error) // waitForExit waits for an exit of either the runner (when the command terminates) or the client closing the WebSocket // and handles WebSocket exit messages. func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecution context.CancelFunc) { - defer wp.close() cancelInputLoop := wp.Stdin.startReadInputLoop() var exitInfo runner.ExitInfo select { @@ -198,7 +198,9 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) { err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) - if err != nil { + if err == nil { + wp.closeNormal() + } else { log.WithError(err).Warn("Failed to send timeout message to client") } return @@ -206,7 +208,9 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti errorMessage := "Error executing the request" log.WithError(exitInfo.Err).Warn(errorMessage) err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: errorMessage}) - if err != nil { + if err == nil { + wp.closeNormal() + } else { log.WithError(err).Warn("Failed to send output error message to client") } return @@ -218,8 +222,10 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti ExitCode: exitInfo.Code, }) if err != nil { + log.WithError(err).Warn("Error sending exit message") return } + wp.closeNormal() } func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error { @@ -240,16 +246,15 @@ func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error { } func (wp *webSocketProxy) closeWithError(message string) { - err := wp.connection.WriteMessage(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseInternalServerErr, message)) - if err != nil { - log.WithError(err).Warn("Error during websocket close") - } + wp.close(websocket.FormatCloseMessage(websocket.CloseInternalServerErr, message)) } -func (wp *webSocketProxy) close() { - err := wp.connection.WriteMessage(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) +func (wp *webSocketProxy) closeNormal() { + wp.close(websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) +} + +func (wp *webSocketProxy) close(message []byte) { + err := wp.connection.WriteMessage(websocket.CloseMessage, message) _ = wp.connection.Close() if err != nil { log.WithError(err).Warn("Error during websocket close")