Implement review suggestions
Improve logging, constants for routes, RWMutex for ExerciseRunners, use url.URL for websocket url building
This commit is contained in:
@ -28,12 +28,12 @@ func sendJson(writer http.ResponseWriter, content interface{}, httpStatusCode in
|
||||
return
|
||||
}
|
||||
if _, err = writer.Write(response); err != nil {
|
||||
log.Printf("Error writing JSON to response: %v\n", err)
|
||||
log.WithError(err).Error("Could not write JSON response")
|
||||
http.Error(writer, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
func parseRequestBodyJSON(writer http.ResponseWriter, request *http.Request, structure interface{}) error {
|
||||
func parseJSONRequestBody(writer http.ResponseWriter, request *http.Request, structure interface{}) error {
|
||||
if err := json.NewDecoder(request.Body).Decode(structure); err != nil {
|
||||
writeBadRequest(writer, err)
|
||||
return err
|
||||
|
@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||
@ -9,13 +10,21 @@ import (
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment/pool"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
const (
|
||||
ExecutePath = "/execute"
|
||||
WebsocketPath = "/websocket"
|
||||
RunnerIdKey = "runnerId"
|
||||
ExecutionIdKey = "executionId"
|
||||
)
|
||||
|
||||
// provideRunner tries to respond with the id of a runner
|
||||
// This runner is then reserved for future use
|
||||
func provideRunner(writer http.ResponseWriter, request *http.Request) {
|
||||
runnerRequest := new(dto.RunnerRequest)
|
||||
if err := parseRequestBodyJSON(writer, request, runnerRequest); err != nil {
|
||||
if err := parseJSONRequestBody(writer, request, runnerRequest); err != nil {
|
||||
return
|
||||
}
|
||||
environment, err := environment.GetExecutionEnvironment(runnerRequest.ExecutionEnvironmentId)
|
||||
@ -36,7 +45,7 @@ func provideRunner(writer http.ResponseWriter, request *http.Request) {
|
||||
func executeCommand(router *mux.Router) func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(writer http.ResponseWriter, request *http.Request) {
|
||||
executionRequest := new(dto.ExecutionRequest)
|
||||
if err := parseRequestBodyJSON(writer, request, executionRequest); err != nil {
|
||||
if err := parseJSONRequestBody(writer, request, executionRequest); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -48,30 +57,39 @@ func executeCommand(router *mux.Router) func(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
r, ok := runner.FromContext(request.Context())
|
||||
if !ok {
|
||||
log.Fatal("Expected runner in context! Something must be broken ...")
|
||||
log.Error("Runner not set in request context.")
|
||||
writeInternalServerError(writer, errors.New("findRunnerMiddleware failure"), dto.ErrorUnknown)
|
||||
return
|
||||
}
|
||||
|
||||
path, err := router.Get(WebsocketPath).URL(RunnerIdKey, r.Id())
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not create runner websocket URL.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
return
|
||||
}
|
||||
id, err := r.AddExecution(*executionRequest)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not store execution.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
return
|
||||
}
|
||||
websocketUrl := url.URL{
|
||||
Scheme: scheme,
|
||||
Host: request.Host,
|
||||
Path: path.String(),
|
||||
RawQuery: fmt.Sprintf("%s=%s", ExecutionIdKey, id),
|
||||
}
|
||||
|
||||
path, err := router.Get("runner-websocket").URL("runnerId", r.Id())
|
||||
if err != nil {
|
||||
log.Printf("Error creating runner websocket URL %v", err)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
return
|
||||
}
|
||||
websocketUrl := fmt.Sprintf("%s://%s%s?executionId=%s", scheme, request.Host, path, id)
|
||||
sendJson(writer, &dto.WebsocketResponse{WebsocketUrl: websocketUrl}, http.StatusOK)
|
||||
sendJson(writer, &dto.WebsocketResponse{WebsocketUrl: websocketUrl.String()}, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// connectToRunner is a placeholder for now and will become the endpoint for websocket connections.
|
||||
func connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
||||
// Todo: Execute the command, upgrade the connection to websocket and handle forwarding
|
||||
executionId := request.URL.Query()["executionId"]
|
||||
log.Printf("Websocket for execution %s requested", executionId)
|
||||
executionId := request.URL.Query()[ExecutionIdKey]
|
||||
log.WithField("executionId", executionId).Info("Websocket for execution requested.")
|
||||
writer.WriteHeader(http.StatusNotImplemented)
|
||||
}
|
||||
|
||||
@ -81,7 +99,7 @@ func findRunnerMiddleware(runnerPool pool.RunnerPool) func(handler http.Handler)
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
// Find runner
|
||||
runnerId := mux.Vars(request)["runnerId"]
|
||||
runnerId := mux.Vars(request)[RunnerIdKey]
|
||||
r, ok := runnerPool.GetRunner(runnerId)
|
||||
if !ok {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
@ -96,8 +114,8 @@ func findRunnerMiddleware(runnerPool pool.RunnerPool) func(handler http.Handler)
|
||||
|
||||
func registerRunnerRoutes(router *mux.Router, runnerPool pool.RunnerPool) {
|
||||
router.HandleFunc("", provideRunner).Methods(http.MethodPost)
|
||||
runnerRouter := router.PathPrefix("/{runnerId}").Subrouter()
|
||||
runnerRouter := router.PathPrefix(fmt.Sprintf("/{%s}", RunnerIdKey)).Subrouter()
|
||||
runnerRouter.Use(findRunnerMiddleware(runnerPool))
|
||||
runnerRouter.HandleFunc("/execute", executeCommand(runnerRouter)).Methods(http.MethodPost).Name("runner-execute")
|
||||
runnerRouter.HandleFunc("/websocket", connectToRunner).Methods(http.MethodGet).Name("runner-websocket")
|
||||
runnerRouter.HandleFunc(ExecutePath, executeCommand(runnerRouter)).Methods(http.MethodPost).Name(ExecutePath)
|
||||
runnerRouter.HandleFunc(WebsocketPath, connectToRunner).Methods(http.MethodGet).Name(WebsocketPath)
|
||||
}
|
||||
|
@ -3,12 +3,12 @@ package api
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment/pool"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
@ -19,7 +19,6 @@ import (
|
||||
func TestFindRunnerMiddleware(t *testing.T) {
|
||||
runnerPool := pool.NewLocalRunnerPool()
|
||||
var capturedRunner runner.Runner
|
||||
|
||||
testRunner := runner.NewExerciseRunner("testRunner")
|
||||
runnerPool.AddRunner(testRunner)
|
||||
|
||||
@ -32,18 +31,16 @@ func TestFindRunnerMiddleware(t *testing.T) {
|
||||
writer.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.Use(findRunnerMiddleware(runnerPool))
|
||||
router.HandleFunc("/test/{runnerId}", testRunnerIdRoute).Name("test-runner-id")
|
||||
router.HandleFunc(fmt.Sprintf("/test/{%s}", RunnerIdKey), testRunnerIdRoute).Name("test-runner-id")
|
||||
|
||||
testRunnerRequest := func(t *testing.T, runnerId string) *http.Request {
|
||||
path, err := router.Get("test-runner-id").URL("runnerId", runnerId)
|
||||
path, err := router.Get("test-runner-id").URL(RunnerIdKey, runnerId)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
request, err := http.NewRequest(
|
||||
http.MethodPost, path.String(), nil)
|
||||
request, err := http.NewRequest(http.MethodPost, path.String(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -70,15 +67,13 @@ func TestFindRunnerMiddleware(t *testing.T) {
|
||||
|
||||
func TestExecuteRoute(t *testing.T) {
|
||||
runnerPool := pool.NewLocalRunnerPool()
|
||||
|
||||
router := NewRouter(runnerPool)
|
||||
|
||||
testRunner := runner.NewExerciseRunner("testRunner")
|
||||
runnerPool.AddRunner(testRunner)
|
||||
|
||||
path, err := router.Get("runner-execute").URL("runnerId", testRunner.Id())
|
||||
path, err := router.Get(ExecutePath).URL(RunnerIdKey, testRunner.Id())
|
||||
if err != nil {
|
||||
t.Fatal("Could not construct execute url")
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("valid request", func(t *testing.T) {
|
||||
@ -99,53 +94,36 @@ func TestExecuteRoute(t *testing.T) {
|
||||
|
||||
router.ServeHTTP(recorder, request)
|
||||
|
||||
responseBody, err := io.ReadAll(recorder.Result().Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var websocketResponse dto.WebsocketResponse
|
||||
err = json.Unmarshal(responseBody, &websocketResponse)
|
||||
err = json.NewDecoder(recorder.Result().Body).Decode(&websocketResponse)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("returns 200", func(t *testing.T) {
|
||||
assert.Equal(t, http.StatusOK, recorder.Code)
|
||||
})
|
||||
assert.Equal(t, http.StatusOK, recorder.Code)
|
||||
|
||||
t.Run("creates an execution request for the runner", func(t *testing.T) {
|
||||
url, err := url.Parse(websocketResponse.WebsocketUrl)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
executionId := url.Query().Get("executionId")
|
||||
executionId := url.Query().Get(ExecutionIdKey)
|
||||
storedExecutionRequest, ok := testRunner.Execution(runner.ExecutionId(executionId))
|
||||
|
||||
assert.True(t, ok, "No execution request with this id")
|
||||
assert.True(t, ok, "No execution request with this id: ", executionId)
|
||||
assert.Equal(t, executionRequest, storedExecutionRequest)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("invalid request", func(t *testing.T) {
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
body := ""
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, path.String(), strings.NewReader(body))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
router.ServeHTTP(recorder, request)
|
||||
|
||||
_, err = io.ReadAll(recorder.Result().Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("returns 400", func(t *testing.T) {
|
||||
assert.Equal(t, http.StatusBadRequest, recorder.Code)
|
||||
})
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, recorder.Code)
|
||||
})
|
||||
}
|
||||
|
2
main.go
2
main.go
@ -79,8 +79,6 @@ func main() {
|
||||
environment.DebugInit(runnerPool, nomadAPIClient)
|
||||
|
||||
server := initServer(runnerPool)
|
||||
log.WithField("address", server.Addr).Info("Starting server")
|
||||
|
||||
go runServer(server)
|
||||
shutdownOnOSSignal(server)
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ type Status string
|
||||
// ContextKey is the type for keys in a request context.
|
||||
type ContextKey string
|
||||
|
||||
// ExecutionId is an id for a execution in a Runner.
|
||||
// ExecutionId is an id for an execution in a Runner.
|
||||
type ExecutionId string
|
||||
|
||||
const (
|
||||
@ -38,7 +38,7 @@ type Runner interface {
|
||||
Id() string
|
||||
|
||||
// Execution looks up an ExecutionId for the runner and returns the associated RunnerRequest.
|
||||
// If this request does not exits, ok is false, else true.
|
||||
// If this request does not exit, ok is false, else true.
|
||||
Execution(ExecutionId) (request dto.ExecutionRequest, ok bool)
|
||||
|
||||
// AddExecution saves the supplied ExecutionRequest for the runner and returns an ExecutionId to retrieve it again.
|
||||
@ -50,7 +50,7 @@ type Runner interface {
|
||||
|
||||
// ExerciseRunner is an abstraction to communicate with Nomad allocations.
|
||||
type ExerciseRunner struct {
|
||||
sync.Mutex
|
||||
sync.RWMutex
|
||||
id string
|
||||
status Status
|
||||
ch chan bool
|
||||
@ -62,7 +62,6 @@ func NewExerciseRunner(id string) *ExerciseRunner {
|
||||
return &ExerciseRunner{
|
||||
id: id,
|
||||
status: StatusReady,
|
||||
Mutex: sync.Mutex{},
|
||||
ch: make(chan bool),
|
||||
executions: make(map[ExecutionId]dto.ExecutionRequest),
|
||||
}
|
||||
@ -87,8 +86,8 @@ func (r *ExerciseRunner) SetStatus(status Status) {
|
||||
}
|
||||
|
||||
func (r *ExerciseRunner) Status() Status {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
return r.status
|
||||
}
|
||||
|
||||
@ -97,8 +96,8 @@ func (r *ExerciseRunner) Id() string {
|
||||
}
|
||||
|
||||
func (r *ExerciseRunner) Execution(id ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
executionRequest, ok = r.executions[id]
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user