Correct behavior when WebSocket closes.
This commit is contained in:
@@ -62,13 +62,14 @@ func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) {
|
|||||||
readMessage <- true
|
readMessage <- true
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-readMessage:
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-readMessage:
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Warn("Error reading client message")
|
log.WithField("remote", cr.connection.(*websocket.Conn).UnderlyingConn().RemoteAddr()).
|
||||||
|
WithError(err).Warn("Error reading client message")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if messageType != websocket.TextMessage {
|
if messageType != websocket.TextMessage {
|
||||||
@@ -83,9 +84,9 @@ func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
for _, character := range message {
|
for _, character := range message {
|
||||||
select {
|
select {
|
||||||
case cr.buffer <- character:
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case cr.buffer <- character:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -182,7 +183,6 @@ func newWebSocketProxy(connection webSocketConnection) (*webSocketProxy, error)
|
|||||||
// waitForExit waits for an exit of either the runner (when the command terminates) or the client closing the WebSocket
|
// waitForExit waits for an exit of either the runner (when the command terminates) or the client closing the WebSocket
|
||||||
// and handles WebSocket exit messages.
|
// and handles WebSocket exit messages.
|
||||||
func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecution context.CancelFunc) {
|
func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecution context.CancelFunc) {
|
||||||
defer wp.close()
|
|
||||||
cancelInputLoop := wp.Stdin.startReadInputLoop()
|
cancelInputLoop := wp.Stdin.startReadInputLoop()
|
||||||
var exitInfo runner.ExitInfo
|
var exitInfo runner.ExitInfo
|
||||||
select {
|
select {
|
||||||
@@ -198,7 +198,9 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
|||||||
|
|
||||||
if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) {
|
if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) {
|
||||||
err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout})
|
err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout})
|
||||||
if err != nil {
|
if err == nil {
|
||||||
|
wp.closeNormal()
|
||||||
|
} else {
|
||||||
log.WithError(err).Warn("Failed to send timeout message to client")
|
log.WithError(err).Warn("Failed to send timeout message to client")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -206,7 +208,9 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
|||||||
errorMessage := "Error executing the request"
|
errorMessage := "Error executing the request"
|
||||||
log.WithError(exitInfo.Err).Warn(errorMessage)
|
log.WithError(exitInfo.Err).Warn(errorMessage)
|
||||||
err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: errorMessage})
|
err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: errorMessage})
|
||||||
if err != nil {
|
if err == nil {
|
||||||
|
wp.closeNormal()
|
||||||
|
} else {
|
||||||
log.WithError(err).Warn("Failed to send output error message to client")
|
log.WithError(err).Warn("Failed to send output error message to client")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -218,8 +222,10 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
|||||||
ExitCode: exitInfo.Code,
|
ExitCode: exitInfo.Code,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithError(err).Warn("Error sending exit message")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
wp.closeNormal()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error {
|
func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error {
|
||||||
@@ -240,16 +246,15 @@ func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wp *webSocketProxy) closeWithError(message string) {
|
func (wp *webSocketProxy) closeWithError(message string) {
|
||||||
err := wp.connection.WriteMessage(websocket.CloseMessage,
|
wp.close(websocket.FormatCloseMessage(websocket.CloseInternalServerErr, message))
|
||||||
websocket.FormatCloseMessage(websocket.CloseInternalServerErr, message))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Warn("Error during websocket close")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wp *webSocketProxy) close() {
|
func (wp *webSocketProxy) closeNormal() {
|
||||||
err := wp.connection.WriteMessage(websocket.CloseMessage,
|
wp.close(websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
}
|
||||||
|
|
||||||
|
func (wp *webSocketProxy) close(message []byte) {
|
||||||
|
err := wp.connection.WriteMessage(websocket.CloseMessage, message)
|
||||||
_ = wp.connection.Close()
|
_ = wp.connection.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Warn("Error during websocket close")
|
log.WithError(err).Warn("Error during websocket close")
|
||||||
|
Reference in New Issue
Block a user