Avoid concurrent writes to the websocket connection
Previously, the server sometimes crashed due to concurrent writes to the websocket connection. Now, we ensure that only one concurrent function writes to the websocket at a time by enclosing the WriteMessage function with a mutex.
This commit is contained in:

committed by
Tobias Kantusch

parent
6929169cb5
commit
e2d71a11ad
@@ -10,6 +10,7 @@ import (
|
|||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
const CodeOceanToRawReaderBufferSize = 1024
|
const CodeOceanToRawReaderBufferSize = 1024
|
||||||
@@ -161,11 +162,12 @@ func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) {
|
|||||||
|
|
||||||
// webSocketProxy is an encapsulation of logic for forwarding between Runners and CodeOcean.
|
// webSocketProxy is an encapsulation of logic for forwarding between Runners and CodeOcean.
|
||||||
type webSocketProxy struct {
|
type webSocketProxy struct {
|
||||||
userExit chan bool
|
userExit chan bool
|
||||||
connection webSocketConnection
|
connection webSocketConnection
|
||||||
Stdin WebSocketReader
|
connectionMu sync.Mutex
|
||||||
Stdout io.Writer
|
Stdin WebSocketReader
|
||||||
Stderr io.Writer
|
Stdout io.Writer
|
||||||
|
Stderr io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection.
|
// upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection.
|
||||||
@@ -261,7 +263,7 @@ func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error {
|
|||||||
wp.closeWithError("Error creating message")
|
wp.closeWithError("Error creating message")
|
||||||
return fmt.Errorf("error marshaling WebSocket message: %w", err)
|
return fmt.Errorf("error marshaling WebSocket message: %w", err)
|
||||||
}
|
}
|
||||||
err = wp.connection.WriteMessage(websocket.TextMessage, encodedMessage)
|
err = wp.writeMessage(websocket.TextMessage, encodedMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorMessage := "Error writing the exit message"
|
errorMessage := "Error writing the exit message"
|
||||||
log.WithError(err).Warn(errorMessage)
|
log.WithError(err).Warn(errorMessage)
|
||||||
@@ -280,13 +282,19 @@ func (wp *webSocketProxy) closeNormal() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wp *webSocketProxy) close(message []byte) {
|
func (wp *webSocketProxy) close(message []byte) {
|
||||||
err := wp.connection.WriteMessage(websocket.CloseMessage, message)
|
err := wp.writeMessage(websocket.CloseMessage, message)
|
||||||
_ = wp.connection.Close()
|
_ = wp.connection.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Warn("Error during websocket close")
|
log.WithError(err).Warn("Error during websocket close")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wp *webSocketProxy) writeMessage(messageType int, data []byte) error {
|
||||||
|
wp.connectionMu.Lock()
|
||||||
|
defer wp.connectionMu.Unlock()
|
||||||
|
return wp.connection.WriteMessage(messageType, data) //nolint:wrapcheck // Wrap the original WriteMessage in a mutex.
|
||||||
|
}
|
||||||
|
|
||||||
// connectToRunner is the endpoint for websocket connections.
|
// connectToRunner is the endpoint for websocket connections.
|
||||||
func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
||||||
targetRunner, _ := runner.FromContext(request.Context())
|
targetRunner, _ := runner.FromContext(request.Context())
|
||||||
|
Reference in New Issue
Block a user