Files
poseidon/internal/api/websocket.go
2022-08-01 11:24:56 +02:00

108 lines
3.7 KiB
Go

package api
import (
"context"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/openHPI/poseidon/internal/api/ws"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"net/http"
)
var ErrUnknownExecutionID = errors.New("execution id unknown")
// webSocketProxy is an encapsulation of logic for forwarding between Runners and CodeOcean.
type webSocketProxy struct {
ctx context.Context
cancel context.CancelFunc
Input ws.WebSocketReader
Output ws.WebSocketWriter
}
// upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection.
func upgradeConnection(writer http.ResponseWriter, request *http.Request) (ws.Connection, error) {
connUpgrader := websocket.Upgrader{}
connection, err := connUpgrader.Upgrade(writer, request, nil)
if err != nil {
log.WithError(err).Warn("Connection upgrade failed")
return nil, fmt.Errorf("error upgrading the connection: %w", err)
}
return connection, nil
}
// newWebSocketProxy returns an initiated and started webSocketProxy.
// As this proxy is already started, a start message is send to the client.
func newWebSocketProxy(connection ws.Connection, proxyCtx context.Context) *webSocketProxy {
wsCtx, cancelWsCommunication := context.WithCancel(proxyCtx)
proxy := &webSocketProxy{
ctx: wsCtx,
cancel: cancelWsCommunication,
Input: ws.NewCodeOceanToRawReader(connection, wsCtx, proxyCtx),
Output: ws.NewCodeOceanOutputWriter(connection, wsCtx),
}
connection.SetCloseHandler(func(code int, text string) error {
log.WithField("code", code).WithField("text", text).Debug("The client closed the connection.")
cancelWsCommunication()
return nil
})
return proxy
}
// waitForExit waits for an exit of either the runner (when the command terminates) or the client closing the WebSocket
// and handles WebSocket exit messages.
func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecution context.CancelFunc) {
wp.Input.Start()
var exitInfo runner.ExitInfo
select {
case <-wp.ctx.Done():
log.Info("Client closed the connection")
wp.Input.Stop()
cancelExecution()
<-exit // /internal/runner/runner.go handleExitOrContextDone does not require client connection anymore.
<-exit // The goroutine closes this channel indicating that it does not use the connection to the executor anymore.
case exitInfo = <-exit:
log.Info("Execution returned")
wp.Input.Stop()
wp.Output.SendExitInfo(&exitInfo)
}
}
// connectToRunner is the endpoint for websocket connections.
func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) {
targetRunner, _ := runner.FromContext(request.Context())
monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment())
executionID := request.URL.Query().Get(ExecutionIDKey)
if !targetRunner.ExecutionExists(executionID) {
writeNotFound(writer, ErrUnknownExecutionID)
return
}
connection, err := upgradeConnection(writer, request)
if err != nil {
writeInternalServerError(writer, err, dto.ErrorUnknown)
return
}
proxyCtx, cancelProxy := context.WithCancel(context.Background())
defer cancelProxy()
proxy := newWebSocketProxy(connection, proxyCtx)
log.WithField("runnerId", targetRunner.ID()).
WithField("executionID", logging.RemoveNewlineSymbol(executionID)).
Info("Running execution")
exit, cancel, err := targetRunner.ExecuteInteractively(executionID,
proxy.Input, proxy.Output.StdOut(), proxy.Output.StdErr())
if err != nil {
log.WithError(err).Warn("Cannot execute request.")
return // The proxy is stopped by the defered cancel.
}
proxy.waitForExit(exit, cancel)
}