#155 Enable stopping of the CodeOcean WebSocket read independently of writing to CodeOcean.
This commit is contained in:
@ -33,6 +33,7 @@ type WebSocketReader interface {
|
|||||||
io.Reader
|
io.Reader
|
||||||
io.Writer
|
io.Writer
|
||||||
startReadInputLoop()
|
startReadInputLoop()
|
||||||
|
stopReadInputLoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// codeOceanToRawReader is an io.Reader implementation that provides the content of the WebSocket connection
|
// codeOceanToRawReader is an io.Reader implementation that provides the content of the WebSocket connection
|
||||||
@ -40,8 +41,9 @@ type WebSocketReader interface {
|
|||||||
type codeOceanToRawReader struct {
|
type codeOceanToRawReader struct {
|
||||||
connection webSocketConnection
|
connection webSocketConnection
|
||||||
|
|
||||||
// wsCtx is the context in that messages from CodeOcean are read.
|
// readCtx is the context in that messages from CodeOcean are read.
|
||||||
wsCtx context.Context
|
readCtx context.Context
|
||||||
|
cancelReadCtx context.CancelFunc
|
||||||
// executorCtx is the context in that messages are forwarded to the executor.
|
// executorCtx is the context in that messages are forwarded to the executor.
|
||||||
executorCtx context.Context
|
executorCtx context.Context
|
||||||
|
|
||||||
@ -58,7 +60,8 @@ type codeOceanToRawReader struct {
|
|||||||
func newCodeOceanToRawReader(connection webSocketConnection, wsCtx, executorCtx context.Context) *codeOceanToRawReader {
|
func newCodeOceanToRawReader(connection webSocketConnection, wsCtx, executorCtx context.Context) *codeOceanToRawReader {
|
||||||
return &codeOceanToRawReader{
|
return &codeOceanToRawReader{
|
||||||
connection: connection,
|
connection: connection,
|
||||||
wsCtx: wsCtx,
|
readCtx: wsCtx, // This context may be canceled before the executorCtx.
|
||||||
|
cancelReadCtx: func() {},
|
||||||
executorCtx: executorCtx,
|
executorCtx: executorCtx,
|
||||||
buffer: make(chan byte, CodeOceanToRawReaderBufferSize),
|
buffer: make(chan byte, CodeOceanToRawReaderBufferSize),
|
||||||
priorityBuffer: make(chan byte, CodeOceanToRawReaderBufferSize),
|
priorityBuffer: make(chan byte, CodeOceanToRawReaderBufferSize),
|
||||||
@ -68,9 +71,9 @@ func newCodeOceanToRawReader(connection webSocketConnection, wsCtx, executorCtx
|
|||||||
// readInputLoop reads from the WebSocket connection and buffers the user's input.
|
// readInputLoop reads from the WebSocket connection and buffers the user's input.
|
||||||
// This is necessary because input must be read for the connection to handle special messages like close and call the
|
// This is necessary because input must be read for the connection to handle special messages like close and call the
|
||||||
// CloseHandler.
|
// CloseHandler.
|
||||||
func (cr *codeOceanToRawReader) readInputLoop() {
|
func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) {
|
||||||
readMessage := make(chan bool)
|
readMessage := make(chan bool)
|
||||||
loopContext, cancelInputLoop := context.WithCancel(cr.wsCtx)
|
loopContext, cancelInputLoop := context.WithCancel(ctx)
|
||||||
defer cancelInputLoop()
|
defer cancelInputLoop()
|
||||||
readingContext, cancelNextMessage := context.WithCancel(loopContext)
|
readingContext, cancelNextMessage := context.WithCancel(loopContext)
|
||||||
defer cancelNextMessage()
|
defer cancelNextMessage()
|
||||||
@ -133,7 +136,14 @@ func handleInput(messageType int, reader io.Reader, err error, buffer chan byte,
|
|||||||
|
|
||||||
// startReadInputLoop start the read input loop asynchronously.
|
// startReadInputLoop start the read input loop asynchronously.
|
||||||
func (cr *codeOceanToRawReader) startReadInputLoop() {
|
func (cr *codeOceanToRawReader) startReadInputLoop() {
|
||||||
go cr.readInputLoop()
|
ctx, cancel := context.WithCancel(cr.readCtx)
|
||||||
|
cr.cancelReadCtx = cancel
|
||||||
|
go cr.readInputLoop(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// startReadInputLoop stops the asynchronous read input loop.
|
||||||
|
func (cr *codeOceanToRawReader) stopReadInputLoop() {
|
||||||
|
cr.cancelReadCtx()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read implements the io.Reader interface.
|
// Read implements the io.Reader interface.
|
||||||
@ -259,7 +269,8 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
|||||||
return
|
return
|
||||||
case exitInfo = <-exit:
|
case exitInfo = <-exit:
|
||||||
log.Info("Execution returned")
|
log.Info("Execution returned")
|
||||||
wp.cancelWebSocket()
|
wp.Stdin.stopReadInputLoop() // Here we stop reading from the client
|
||||||
|
defer wp.cancelWebSocket() // At the end of this method we stop writing to the client
|
||||||
}
|
}
|
||||||
|
|
||||||
if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) {
|
if errors.Is(exitInfo.Err, context.DeadlineExceeded) || errors.Is(exitInfo.Err, runner.ErrorRunnerInactivityTimeout) {
|
||||||
|
Reference in New Issue
Block a user