From 465577fea6294f97f59560a9bf9f4ddb1bacd08c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= Date: Fri, 7 May 2021 14:10:12 +0200 Subject: [PATCH] Add basic websocket structure and request upgrader --- api/runners.go | 12 ++--------- api/websocket.go | 46 +++++++++++++++++++++++++++++++++++++++++++ api/websocket_test.go | 12 +++++++++++ runner/runner.go | 21 ++++++++++++++++++++ 4 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 api/websocket.go create mode 100644 api/websocket_test.go diff --git a/api/runners.go b/api/runners.go index c9ca6be..62d2d33 100644 --- a/api/runners.go +++ b/api/runners.go @@ -32,12 +32,12 @@ func provideRunner(writer http.ResponseWriter, request *http.Request) { writeNotFound(writer, err) return } - runner, err := env.NextRunner() + nextRunner, err := env.NextRunner() if err != nil { writeInternalServerError(writer, err, dto.ErrorNomadOverload) return } - sendJson(writer, &dto.RunnerResponse{Id: runner.Id()}, http.StatusOK) + sendJson(writer, &dto.RunnerResponse{Id: nextRunner.Id()}, http.StatusOK) } // executeCommand takes an ExecutionRequest and stores it for a runner. @@ -80,14 +80,6 @@ func executeCommand(router *mux.Router) func(w http.ResponseWriter, r *http.Requ } } -// connectToRunner is a placeholder for now and will become the endpoint for websocket connections. -func connectToRunner(writer http.ResponseWriter, request *http.Request) { - // Todo: Execute the command, upgrade the connection to websocket and handle forwarding - executionId := request.URL.Query()[ExecutionIdKey] - log.WithField("executionId", executionId).Info("Websocket for execution requested.") - writer.WriteHeader(http.StatusNotImplemented) -} - // The findRunnerMiddleware looks up the runnerId for routes containing it // and adds the runner to the context of the request. func findRunnerMiddleware(runnerPool environment.RunnerPool) func(handler http.Handler) http.Handler { diff --git a/api/websocket.go b/api/websocket.go new file mode 100644 index 0000000..d3cedf7 --- /dev/null +++ b/api/websocket.go @@ -0,0 +1,46 @@ +package api + +import ( + "errors" + "github.com/gorilla/websocket" + "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" + "net/http" +) + +var connUpgrade = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// connectToRunner is a placeholder for now and will become the endpoint for websocket connections. +func connectToRunner(writer http.ResponseWriter, request *http.Request) { + r, ok := runner.FromContext(request.Context()) + if !ok { + log.Error("Runner not set in request context.") + writeInternalServerError(writer, errors.New("findRunnerMiddleware failure"), dto.ErrorUnknown) + return + } + executionId := request.URL.Query().Get(ExecutionIdKey) + connClient, err := connUpgrade.Upgrade(writer, request, nil) + if err != nil { + writeInternalServerError(writer, err, dto.ErrorUnknown) + return + } + defer func(connClient *websocket.Conn) { + err := connClient.Close() + if err != nil { + writeInternalServerError(writer, err, dto.ErrorUnknown) + } + }(connClient) + + // ToDo: Implement communication forwarding + err, ok = r.Execute(runner.ExecutionId(executionId)) + if !ok { + writeBadRequest(writer, errors.New("invalid Execution Id")) + return + } else if err != nil { + writeInternalServerError(writer, err, dto.ErrorUnknown) + return + } +} diff --git a/api/websocket_test.go b/api/websocket_test.go new file mode 100644 index 0000000..b465457 --- /dev/null +++ b/api/websocket_test.go @@ -0,0 +1,12 @@ +package api + +import ( + "testing" +) + +func TestInvalidExecutionId(t *testing.T) { + +} + +func TestEstablishWebsocketConnection(t *testing.T) { +} diff --git a/runner/runner.go b/runner/runner.go index a815a4c..b367c98 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -5,10 +5,14 @@ import ( "encoding/json" "github.com/google/uuid" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/store" "sync" + "time" ) +var log = logging.GetLogger("runner") + // Status is the type for the status of a Runner. type Status string @@ -46,6 +50,10 @@ type Runner interface { // DeleteExecution deletes the execution of the runner with the specified id. DeleteExecution(ExecutionId) + + // Execute runs one of the runners Executions by it's id + // ok will be false if the runner has no execution with the provided id + Execute(ExecutionId) (err error, ok bool) } // ExerciseRunner is an abstraction to communicate with Nomad allocations. @@ -120,6 +128,19 @@ func (r *ExerciseRunner) DeleteExecution(id ExecutionId) { delete(r.executions, id) } +func (r *ExerciseRunner) Execute(id ExecutionId) (err error, ok bool) { + execution, ok := r.Execution(id) + if !ok { + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(execution.TimeLimit)*time.Second) + defer cancel() + log.WithField("Context", ctx).Printf("ToDo: Running execution") + // ToDo: Implement command execution + // r.nomadApiClient.ExecuteCommand(r.id, ctx, ) + return +} + // NewContext creates a context containing a runner. func NewContext(ctx context.Context, runner Runner) context.Context { return context.WithValue(ctx, runnerContextKey, runner)