diff --git a/Makefile b/Makefile index ddf9a95..e115212 100644 --- a/Makefile +++ b/Makefile @@ -55,7 +55,6 @@ snaked_name=$(shell sed -e "s/\([A-Z]\)/_\L\1/g" -e "s/^_//" <<< "$(name)") mock: deps ## Create/Update a mock. Example: make mock name=apiQuerier pkg=./nomad @mockery \ --name=$(name) \ - --output=$(pkg) \ --structname=$(name)Mock \ --filename=$(snaked_name)_mock.go \ --inpackage \ diff --git a/README.md b/README.md index 8164d74..decf94e 100644 --- a/README.md +++ b/README.md @@ -105,8 +105,7 @@ $ make e2e-docker DOCKER_OPTS="" ### Mocks -For mocks we use [mockery](https://github.com/vektra/mockery). To generate a mock, first navigate to the package the interface is defined in. -You can then create a mock for the interface of your choice by running +For mocks we use [mockery](https://github.com/vektra/mockery). You can create a mock for the interface of your choice by running ```bash make mock name=INTERFACE_NAME pkg=./PATH/TO/PKG @@ -120,4 +119,26 @@ For example, for an interface called `ExecutorApi` in the package `nomad`, you m make mock name=ExecutorApi pkg=./nomad ``` -If the interface changes, you can rerun this command. +If the interface changes, you can rerun this command (deleting the mock file first to avoid errors may be necessary). + +Mocks can also be generated by using mockery directly on a specific interface. To do this, first navigate to the package the interface is defined in. Then run + +```bash +mockery \ + --name=<> \ + --structname=<>Mock \ + --filename=<>Mock.go \ + --inpackage +``` + +For example, for an interface called `ExecutorApi` in the package `nomad`, you might run + +```bash +mockery \ +--name=ExecutorApi \ +--structname=ExecutorApiMock \ +--filename=ExecutorApiMock.go \ +--inpackage +``` + +Note that per default, the mocks are created in a `mocks` sub-folder. However, in some cases (if the mock implements private interface methods), it needs to be in the same package as the interface it is mocking. The `--inpackage` flag can be used to avoid creating it in a subdirectory. diff --git a/api/api.go b/api/api.go index d50f4b6..72ba33c 100644 --- a/api/api.go +++ b/api/api.go @@ -5,7 +5,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/api/auth" "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" ) @@ -17,32 +17,34 @@ const ( RouteRunners = "/runners" ) -// NewRouter returns an HTTP handler (http.Handler) which can be +// NewRouter returns a *mux.Router which can be // used by the net/http package to serve the routes of our API. It // always returns a router for the newest version of our API. We // use gorilla/mux because it is more convenient than net/http, e.g. // when extracting path parameters. -func NewRouter(apiClient nomad.ExecutorApi, runnerPool environment.RunnerPool) *mux.Router { +func NewRouter(runnerManager runner.Manager, environmentManager environment.Manager) *mux.Router { router := mux.NewRouter() // this can later be restricted to a specific host with // `router.Host(...)` and to HTTPS with `router.Schemes("https")` - router = newRouterV1(router, apiClient, runnerPool) + configureV1Router(router, runnerManager, environmentManager) router.Use(logging.HTTPLoggingMiddleware) return router } -// newRouterV1 returns a sub-router containing the routes of version 1 of our API. -func newRouterV1(router *mux.Router, apiClient nomad.ExecutorApi, runnerPool environment.RunnerPool) *mux.Router { +// configureV1Router configures a given router with the routes of version 1 of our API. +func configureV1Router(router *mux.Router, runnerManager runner.Manager, environmentManager environment.Manager) { v1 := router.PathPrefix(RouteBase).Subrouter() v1.HandleFunc(RouteHealth, Health).Methods(http.MethodGet) + runnerController := &RunnerController{manager: runnerManager} + if auth.InitializeAuthentication() { // Create new authenticated subrouter. // All routes added to v1 after this require authentication. - v1 = v1.PathPrefix("").Subrouter() - v1.Use(auth.HTTPAuthenticationMiddleware) + authenticatedV1Router := v1.PathPrefix("").Subrouter() + authenticatedV1Router.Use(auth.HTTPAuthenticationMiddleware) + runnerController.ConfigureRoutes(authenticatedV1Router) + } else { + runnerController.ConfigureRoutes(v1) } - registerRunnerRoutes(v1.PathPrefix(RouteRunners).Subrouter(), apiClient, runnerPool) - - return v1 } diff --git a/api/api_test.go b/api/api_test.go index 4fc48de..58ac881 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -4,7 +4,6 @@ import ( "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "gitlab.hpi.de/codeocean/codemoon/poseidon/config" - "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" "net/http" "net/http/httptest" "testing" @@ -17,7 +16,7 @@ func mockHTTPHandler(writer http.ResponseWriter, _ *http.Request) { func TestNewRouterV1WithAuthenticationDisabled(t *testing.T) { config.Config.Server.Token = "" router := mux.NewRouter() - v1 := newRouterV1(router, nil, environment.NewLocalRunnerPool()) + configureV1Router(router, nil, nil) t.Run("health route is accessible", func(t *testing.T) { request, err := http.NewRequest(http.MethodGet, "/api/v1/health", nil) @@ -30,7 +29,7 @@ func TestNewRouterV1WithAuthenticationDisabled(t *testing.T) { }) t.Run("added route is accessible", func(t *testing.T) { - v1.HandleFunc("/test", mockHTTPHandler) + router.HandleFunc("/api/v1/test", mockHTTPHandler) request, err := http.NewRequest(http.MethodGet, "/api/v1/test", nil) if err != nil { t.Fatal(err) @@ -44,7 +43,7 @@ func TestNewRouterV1WithAuthenticationDisabled(t *testing.T) { func TestNewRouterV1WithAuthenticationEnabled(t *testing.T) { config.Config.Server.Token = "TestToken" router := mux.NewRouter() - v1 := newRouterV1(router, nil, environment.NewLocalRunnerPool()) + configureV1Router(router, nil, nil) t.Run("health route is accessible", func(t *testing.T) { request, err := http.NewRequest(http.MethodGet, "/api/v1/health", nil) @@ -56,9 +55,8 @@ func TestNewRouterV1WithAuthenticationEnabled(t *testing.T) { assert.Equal(t, http.StatusNoContent, recorder.Code) }) - t.Run("added route is not accessible", func(t *testing.T) { - v1.HandleFunc("/test", mockHTTPHandler) - request, err := http.NewRequest(http.MethodGet, "/api/v1/test", nil) + t.Run("protected route is not accessible", func(t *testing.T) { + request, err := http.NewRequest(http.MethodPost, "/api/v1/runners", nil) if err != nil { t.Fatal(err) } diff --git a/api/dto/dto.go b/api/dto/dto.go index 989a51e..2f1eab2 100644 --- a/api/dto/dto.go +++ b/api/dto/dto.go @@ -18,6 +18,9 @@ type RunnerResponse struct { Id string `json:"runnerId"` } +// FileCreation is the expected json structure of the request body for the copy files route. +type FileCreation struct{} + // WebsocketResponse is the expected response when creating an execution for a runner. type WebsocketResponse struct { WebsocketUrl string `json:"websocketUrl"` diff --git a/api/environments.go b/api/environments.go new file mode 100644 index 0000000..527ae0b --- /dev/null +++ b/api/environments.go @@ -0,0 +1,20 @@ +package api + +import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" + "net/http" +) + +type EnvironmentController struct { + manager environment.Manager // nolint:unused,structcheck +} + +// create creates a new execution environment on the executor. +func (e *EnvironmentController) create(writer http.ResponseWriter, request *http.Request) { // nolint:unused + +} + +// delete removes an execution environment from the executor +func (e *EnvironmentController) delete(writer http.ResponseWriter, request *http.Request) { // nolint:unused + +} diff --git a/api/health.go b/api/health.go index 18dfdd8..70cd402 100644 --- a/api/health.go +++ b/api/health.go @@ -4,7 +4,8 @@ import ( "net/http" ) -// Health tries to respond that the server is alive. +// Health handles the health route. +// It tries to respond that the server is alive. // If it is not, the response won't reach the client. func Health(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusNoContent) diff --git a/api/runners.go b/api/runners.go index 102a5bf..3048cf7 100644 --- a/api/runners.go +++ b/api/runners.go @@ -1,13 +1,10 @@ package api import ( - "errors" "fmt" "github.com/gorilla/mux" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/config" - "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" "net/url" @@ -21,106 +18,116 @@ const ( ExecutionIdKey = "executionId" ) -// provideRunner tries to respond with the id of a runner +type RunnerController struct { + manager runner.Manager + runnerRouter *mux.Router +} + +// ConfigureRoutes configures a given router with the runner routes of our API. +func (r *RunnerController) ConfigureRoutes(router *mux.Router) { + runnersRouter := router.PathPrefix(RouteRunners).Subrouter() + runnersRouter.HandleFunc("", r.provide).Methods(http.MethodPost) + r.runnerRouter = runnersRouter.PathPrefix(fmt.Sprintf("/{%s}", RunnerIdKey)).Subrouter() + r.runnerRouter.Use(r.findRunnerMiddleware) + r.runnerRouter.HandleFunc(ExecutePath, r.execute).Methods(http.MethodPost).Name(ExecutePath) + r.runnerRouter.HandleFunc(WebsocketPath, connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) + r.runnerRouter.HandleFunc("", r.delete).Methods(http.MethodDelete).Name(DeleteRoute) +} + +// provide handles the provide runners API route. +// It tries to respond with the id of a unused runner. // This runner is then reserved for future use -func provideRunner(writer http.ResponseWriter, request *http.Request) { +func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Request) { runnerRequest := new(dto.RunnerRequest) if err := parseJSONRequestBody(writer, request, runnerRequest); err != nil { return } - executionEnvironment, err := environment.GetExecutionEnvironment(runnerRequest.ExecutionEnvironmentId) + environmentId := runner.EnvironmentId(runnerRequest.ExecutionEnvironmentId) + nextRunner, err := r.manager.Use(environmentId) if err != nil { - writeNotFound(writer, err) - return - } - nextRunner, err := executionEnvironment.NextRunner() - if err != nil { - writeInternalServerError(writer, err, dto.ErrorNomadOverload) + if err == runner.ErrUnknownExecutionEnvironment { + writeNotFound(writer, err) + } else if err == runner.ErrNoRunnersAvailable { + writeInternalServerError(writer, err, dto.ErrorNomadOverload) + } else { + writeInternalServerError(writer, err, dto.ErrorUnknown) + } return } + sendJson(writer, &dto.RunnerResponse{Id: nextRunner.Id()}, http.StatusOK) } -// executeCommand takes an ExecutionRequest and stores it for a runner. +// execute handles the execute API route. +// It takes an ExecutionRequest and stores it for a runner. // It returns a url to connect to for a websocket connection to this execution in the corresponding runner. -func executeCommand(router *mux.Router) func(w http.ResponseWriter, r *http.Request) { - return func(writer http.ResponseWriter, request *http.Request) { - executionRequest := new(dto.ExecutionRequest) - if err := parseJSONRequestBody(writer, request, executionRequest); err != nil { - return - } - - var scheme string - if config.Config.Server.TLS { - scheme = "wss" - } else { - scheme = "ws" - } - r, _ := runner.FromContext(request.Context()) - - path, err := router.Get(WebsocketPath).URL(RunnerIdKey, r.Id()) - if err != nil { - log.WithError(err).Error("Could not create runner websocket URL.") - writeInternalServerError(writer, err, dto.ErrorUnknown) - return - } - id, err := r.AddExecution(*executionRequest) - if err != nil { - log.WithError(err).Error("Could not store execution.") - writeInternalServerError(writer, err, dto.ErrorUnknown) - return - } - websocketUrl := url.URL{ - Scheme: scheme, - Host: request.Host, - Path: path.String(), - RawQuery: fmt.Sprintf("%s=%s", ExecutionIdKey, id), - } - - sendJson(writer, &dto.WebsocketResponse{WebsocketUrl: websocketUrl.String()}, http.StatusOK) +func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Request) { + executionRequest := new(dto.ExecutionRequest) + if err := parseJSONRequestBody(writer, request, executionRequest); err != nil { + return } + + var scheme string + if config.Config.Server.TLS { + scheme = "wss" + } else { + scheme = "ws" + } + targetRunner, _ := runner.FromContext(request.Context()) + + path, err := r.runnerRouter.Get(WebsocketPath).URL(RunnerIdKey, targetRunner.Id()) + if err != nil { + log.WithError(err).Error("Could not create runner websocket URL.") + writeInternalServerError(writer, err, dto.ErrorUnknown) + return + } + id, err := targetRunner.AddExecution(*executionRequest) + if err != nil { + log.WithError(err).Error("Could not store execution.") + writeInternalServerError(writer, err, dto.ErrorUnknown) + return + } + websocketUrl := url.URL{ + Scheme: scheme, + Host: request.Host, + Path: path.String(), + RawQuery: fmt.Sprintf("%s=%s", ExecutionIdKey, id), + } + + sendJson(writer, &dto.WebsocketResponse{WebsocketUrl: websocketUrl.String()}, http.StatusOK) } // The findRunnerMiddleware looks up the runnerId for routes containing it // and adds the runner to the context of the request. -func findRunnerMiddleware(runnerPool environment.RunnerPool) func(handler http.Handler) http.Handler { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - // Find runner - runnerId := mux.Vars(request)[RunnerIdKey] - r, ok := runnerPool.Get(runnerId) - if !ok { - writeNotFound(writer, errors.New("no runner with this id")) - return - } - ctx := runner.NewContext(request.Context(), r.(runner.Runner)) - requestWithRunner := request.WithContext(ctx) - next.ServeHTTP(writer, requestWithRunner) - }) - } -} - -func deleteRunner(apiClient nomad.ExecutorApi, runnerPool environment.RunnerPool) func(writer http.ResponseWriter, request *http.Request) { - return func(writer http.ResponseWriter, request *http.Request) { - targetRunner, _ := runner.FromContext(request.Context()) - - err := apiClient.DeleteRunner(targetRunner.Id()) +func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + // Find runner + runnerId := mux.Vars(request)[RunnerIdKey] + r, err := r.manager.Get(runnerId) if err != nil { - writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError) + writeNotFound(writer, err) return } + ctx := runner.NewContext(request.Context(), r.(runner.Runner)) + requestWithRunner := request.WithContext(ctx) + next.ServeHTTP(writer, requestWithRunner) + }) +} - runnerPool.Delete(targetRunner.Id()) +// delete handles the delete runner API route. +// It destroys the given runner on the executor and removes it from the used runners list. +func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Request) { + targetRunner, _ := runner.FromContext(request.Context()) - writer.WriteHeader(http.StatusNoContent) + err := r.manager.Return(targetRunner) + if err != nil { + if err == runner.ErrUnknownExecutionEnvironment { + writeNotFound(writer, err) + } + + writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError) + return } -} -func registerRunnerRoutes(router *mux.Router, apiClient nomad.ExecutorApi, runnerPool environment.RunnerPool) { - router.HandleFunc("", provideRunner).Methods(http.MethodPost) - runnerRouter := router.PathPrefix(fmt.Sprintf("/{%s}", RunnerIdKey)).Subrouter() - runnerRouter.Use(findRunnerMiddleware(runnerPool)) - runnerRouter.HandleFunc(ExecutePath, executeCommand(runnerRouter)).Methods(http.MethodPost).Name(ExecutePath) - runnerRouter.HandleFunc(WebsocketPath, connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) - runnerRouter.HandleFunc("", deleteRunner(apiClient, runnerPool)).Methods(http.MethodDelete).Name(DeleteRoute) + writer.WriteHeader(http.StatusNoContent) } diff --git a/api/runners_test.go b/api/runners_test.go index ada9bb1..6bd521f 100644 --- a/api/runners_test.go +++ b/api/runners_test.go @@ -3,15 +3,12 @@ package api import ( "bytes" "encoding/json" - "errors" "fmt" "github.com/gorilla/mux" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" "net/http/httptest" @@ -20,11 +17,27 @@ import ( "testing" ) -func TestFindRunnerMiddleware(t *testing.T) { - runnerPool := environment.NewLocalRunnerPool() +type MiddlewareTestSuite struct { + suite.Suite + manager *runner.ManagerMock + router *mux.Router + runnerController *RunnerController + testRunner runner.Runner +} + +func (suite *MiddlewareTestSuite) SetupTest() { + suite.manager = &runner.ManagerMock{} + suite.router = mux.NewRouter() + suite.runnerController = &RunnerController{suite.manager, suite.router} + suite.testRunner = runner.NewRunner("runner") +} + +func TestMiddlewareTestSuite(t *testing.T) { + suite.Run(t, new(MiddlewareTestSuite)) +} + +func (suite *MiddlewareTestSuite) TestFindRunnerMiddleware() { var capturedRunner runner.Runner - testRunner := runner.NewExerciseRunner("testRunner") - runnerPool.Add(testRunner) testRunnerIdRoute := func(writer http.ResponseWriter, request *http.Request) { var ok bool @@ -35,12 +48,8 @@ func TestFindRunnerMiddleware(t *testing.T) { writer.WriteHeader(http.StatusInternalServerError) } } - router := mux.NewRouter() - router.Use(findRunnerMiddleware(runnerPool)) - router.HandleFunc(fmt.Sprintf("/test/{%s}", RunnerIdKey), testRunnerIdRoute).Name("test-runner-id") - testRunnerRequest := func(t *testing.T, runnerId string) *http.Request { - path, err := router.Get("test-runner-id").URL(RunnerIdKey, runnerId) + path, err := suite.router.Get("test-runner-id").URL(RunnerIdKey, runnerId) if err != nil { t.Fatal(err) } @@ -51,36 +60,57 @@ func TestFindRunnerMiddleware(t *testing.T) { return request } - t.Run("sets runner in context if runner exists", func(t *testing.T) { + suite.router.Use(suite.runnerController.findRunnerMiddleware) + suite.router.HandleFunc(fmt.Sprintf("/test/{%s}", RunnerIdKey), testRunnerIdRoute).Name("test-runner-id") + + suite.manager.On("Get", suite.testRunner.Id()).Return(suite.testRunner, nil) + suite.T().Run("sets runner in context if runner exists", func(t *testing.T) { capturedRunner = nil recorder := httptest.NewRecorder() - router.ServeHTTP(recorder, testRunnerRequest(t, testRunner.Id())) + suite.router.ServeHTTP(recorder, testRunnerRequest(t, suite.testRunner.Id())) assert.Equal(t, http.StatusOK, recorder.Code) - assert.Equal(t, testRunner, capturedRunner) + assert.Equal(t, suite.testRunner, capturedRunner) }) - t.Run("returns 404 if runner does not exist", func(t *testing.T) { + invalidID := "some-invalid-runner-id" + suite.manager.On("Get", invalidID).Return(nil, runner.ErrRunnerNotFound) + suite.T().Run("returns 404 if runner does not exist", func(t *testing.T) { recorder := httptest.NewRecorder() - router.ServeHTTP(recorder, testRunnerRequest(t, "some-invalid-runner-id")) + suite.router.ServeHTTP(recorder, testRunnerRequest(t, invalidID)) assert.Equal(t, http.StatusNotFound, recorder.Code) }) } -func TestExecuteRoute(t *testing.T) { - runnerPool := environment.NewLocalRunnerPool() - router := NewRouter(nil, runnerPool) - testRunner := runner.NewExerciseRunner("testRunner") - runnerPool.Add(testRunner) +func TestRunnerRouteTestSuite(t *testing.T) { + suite.Run(t, new(RunnerRouteTestSuite)) +} - path, err := router.Get(ExecutePath).URL(RunnerIdKey, testRunner.Id()) +type RunnerRouteTestSuite struct { + suite.Suite + runnerManager *runner.ManagerMock + environmentManager *environment.ManagerMock + router *mux.Router + runner runner.Runner +} + +func (suite *RunnerRouteTestSuite) SetupTest() { + suite.runnerManager = &runner.ManagerMock{} + suite.environmentManager = &environment.ManagerMock{} + suite.router = NewRouter(suite.runnerManager, suite.environmentManager) + suite.runner = runner.NewRunner("test_runner") + suite.runnerManager.On("Get", suite.runner.Id()).Return(suite.runner, nil) +} + +func (suite *RunnerRouteTestSuite) TestExecuteRoute() { + path, err := suite.router.Get(ExecutePath).URL(RunnerIdKey, suite.runner.Id()) if err != nil { - t.Fatal(err) + suite.T().Fatal() } - t.Run("valid request", func(t *testing.T) { + suite.Run("valid request", func() { recorder := httptest.NewRecorder() executionRequest := dto.ExecutionRequest{ Command: "command", @@ -89,131 +119,70 @@ func TestExecuteRoute(t *testing.T) { } body, err := json.Marshal(executionRequest) if err != nil { - t.Fatal(err) + suite.T().Fatal(err) } request, err := http.NewRequest(http.MethodPost, path.String(), bytes.NewReader(body)) if err != nil { - t.Fatal(err) + suite.T().Fatal(err) } - router.ServeHTTP(recorder, request) + suite.router.ServeHTTP(recorder, request) var websocketResponse dto.WebsocketResponse err = json.NewDecoder(recorder.Result().Body).Decode(&websocketResponse) if err != nil { - t.Fatal(err) + suite.T().Fatal(err) } - assert.Equal(t, http.StatusOK, recorder.Code) + suite.Equal(http.StatusOK, recorder.Code) - t.Run("creates an execution request for the runner", func(t *testing.T) { + suite.Run("creates an execution request for the runner", func() { url, err := url.Parse(websocketResponse.WebsocketUrl) if err != nil { - t.Fatal(err) + suite.T().Fatal(err) } executionId := url.Query().Get(ExecutionIdKey) - storedExecutionRequest, ok := testRunner.Execution(runner.ExecutionId(executionId)) + storedExecutionRequest, ok := suite.runner.Execution(runner.ExecutionId(executionId)) - assert.True(t, ok, "No execution request with this id: ", executionId) - assert.Equal(t, executionRequest, storedExecutionRequest) + suite.True(ok, "No execution request with this id: ", executionId) + suite.Equal(executionRequest, storedExecutionRequest) }) }) - t.Run("invalid request", func(t *testing.T) { + suite.Run("invalid request", func() { recorder := httptest.NewRecorder() body := "" request, err := http.NewRequest(http.MethodPost, path.String(), strings.NewReader(body)) if err != nil { - t.Fatal(err) + suite.T().Fatal(err) } - router.ServeHTTP(recorder, request) + suite.router.ServeHTTP(recorder, request) - assert.Equal(t, http.StatusBadRequest, recorder.Code) + suite.Equal(http.StatusBadRequest, recorder.Code) }) } -func TestDeleteRunnerRouteTestSuite(t *testing.T) { - suite.Run(t, new(DeleteRunnerRouteTestSuite)) -} - -type DeleteRunnerRouteTestSuite struct { - suite.Suite - runnerPool environment.RunnerPool - apiClient *nomad.ExecutorApiMock - router *mux.Router - testRunner runner.Runner - path string -} - -func (suite *DeleteRunnerRouteTestSuite) SetupTest() { - suite.runnerPool = environment.NewLocalRunnerPool() - suite.apiClient = &nomad.ExecutorApiMock{} - suite.router = NewRouter(suite.apiClient, suite.runnerPool) - - suite.testRunner = runner.NewExerciseRunner("testRunner") - suite.runnerPool.Add(suite.testRunner) - - var err error - runnerUrl, err := suite.router.Get(DeleteRoute).URL(RunnerIdKey, suite.testRunner.Id()) +func (suite *RunnerRouteTestSuite) TestDeleteRoute() { + deleteURL, err := suite.router.Get(DeleteRoute).URL(RunnerIdKey, suite.runner.Id()) if err != nil { suite.T().Fatal(err) } - suite.path = runnerUrl.String() -} + deletePath := deleteURL.String() + suite.runnerManager.On("Return", suite.runner).Return(nil) -func (suite *DeleteRunnerRouteTestSuite) TestValidRequestReturnsNoContent() { - suite.apiClient.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil) + suite.Run("valid request", func() { + recorder := httptest.NewRecorder() + request, err := http.NewRequest(http.MethodDelete, deletePath, nil) + if err != nil { + suite.T().Fatal(err) + } - recorder := httptest.NewRecorder() - request, err := http.NewRequest(http.MethodDelete, suite.path, nil) - if err != nil { - suite.T().Fatal(err) - } + suite.router.ServeHTTP(recorder, request) - suite.router.ServeHTTP(recorder, request) + suite.Equal(http.StatusNoContent, recorder.Code) - suite.Equal(http.StatusNoContent, recorder.Code) - - suite.Run("runner is deleted on nomad", func() { - suite.apiClient.AssertCalled(suite.T(), "DeleteRunner", suite.testRunner.Id()) - }) - - suite.Run("runner is deleted from runnerPool", func() { - returnedRunner, ok := suite.runnerPool.Get(suite.testRunner.Id()) - suite.Nil(returnedRunner) - suite.False(ok) + suite.Run("runner was returned to runner manager", func() { + suite.runnerManager.AssertCalled(suite.T(), "Return", suite.runner) + }) }) } - -func (suite *DeleteRunnerRouteTestSuite) TestReturnInternalServerErrorWhenApiCallToNomadFailed() { - suite.apiClient.On("DeleteRunner", mock.AnythingOfType("string")).Return(errors.New("API call failed")) - - recorder := httptest.NewRecorder() - request, err := http.NewRequest(http.MethodDelete, suite.path, nil) - if err != nil { - suite.T().Fatal(err) - } - - suite.router.ServeHTTP(recorder, request) - - suite.Equal(http.StatusInternalServerError, recorder.Code) -} - -func (suite *DeleteRunnerRouteTestSuite) TestDeleteInvalidRunnerIdReturnsNotFound() { - var err error - runnersUrl, err := suite.router.Get(DeleteRoute).URL(RunnerIdKey, "1nv4l1dID") - if err != nil { - suite.T().Fatal(err) - } - suite.path = runnersUrl.String() - - recorder := httptest.NewRecorder() - request, err := http.NewRequest(http.MethodDelete, suite.path, nil) - if err != nil { - suite.T().Fatal(err) - } - - suite.router.ServeHTTP(recorder, request) - - suite.Equal(http.StatusNotFound, recorder.Code) -} diff --git a/api/websocket_test.go b/api/websocket_test.go index cbc3837..667356a 100644 --- a/api/websocket_test.go +++ b/api/websocket_test.go @@ -2,7 +2,6 @@ package api import ( "fmt" - "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" @@ -15,10 +14,8 @@ import ( ) type WebsocketTestSuite struct { - suite.Suite - runner runner.Runner + RunnerRouteTestSuite server *httptest.Server - router *mux.Router executionId runner.ExecutionId } @@ -26,10 +23,12 @@ func TestWebsocketTestSuite(t *testing.T) { suite.Run(t, new(WebsocketTestSuite)) } -func (suite *WebsocketTestSuite) SetupSuite() { - runnerPool := environment.NewLocalRunnerPool() - suite.runner = runner.NewExerciseRunner("testRunner") - runnerPool.Add(suite.runner) +func (suite *WebsocketTestSuite) SetupTest() { + suite.runnerManager = &runner.ManagerMock{} + suite.environmentManager = &environment.ManagerMock{} + suite.router = NewRouter(suite.runnerManager, suite.environmentManager) + suite.runner = runner.NewRunner("test_runner") + suite.runnerManager.On("Get", suite.runner.Id()).Return(suite.runner, nil) var err error suite.executionId, err = suite.runner.AddExecution(dto.ExecutionRequest{ Command: "command", @@ -38,11 +37,12 @@ func (suite *WebsocketTestSuite) SetupSuite() { }) suite.Require().NoError(err) - router := mux.NewRouter() - router.Use(findRunnerMiddleware(runnerPool)) - router.HandleFunc(fmt.Sprintf("%s/{%s}%s", RouteRunners, RunnerIdKey, WebsocketPath), connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) - suite.server = httptest.NewServer(router) - suite.router = router + // router.HandleFunc(fmt.Sprintf("%s/{%s}%s", RouteRunners, RunnerIdKey, WebsocketPath), connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) + suite.server = httptest.NewServer(suite.router) +} + +func (suite *WebsocketTestSuite) TearDownSuite() { + suite.server.Close() } func (suite *WebsocketTestSuite) websocketUrl(scheme, runnerId string, executionId runner.ExecutionId) (*url.URL, error) { @@ -56,10 +56,6 @@ func (suite *WebsocketTestSuite) websocketUrl(scheme, runnerId string, execution return websocketUrl, nil } -func (suite *WebsocketTestSuite) TearDownSuite() { - suite.server.Close() -} - func (suite *WebsocketTestSuite) TestWebsocketConnectionCanBeEstablished() { path, err := suite.websocketUrl("ws", suite.runner.Id(), suite.executionId) suite.Require().NoError(err) diff --git a/environment/execution_environment.go b/environment/execution_environment.go deleted file mode 100644 index a698184..0000000 --- a/environment/execution_environment.go +++ /dev/null @@ -1,106 +0,0 @@ -package environment - -import ( - "errors" - "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" - "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" - "time" -) - -var log = logging.GetLogger("execution_environment") - -// ExecutionEnvironment is a partial image of an execution environment in CodeOcean. -type ExecutionEnvironment interface { - // NextRunner gets the next available runner and marks it as running. - // If no runner is available it throws a error after a small timeout - NextRunner() (runner.Runner, error) - // Refresh fetches the runners for this execution environment and sends them to the pool. - // This function does not terminate. Instead it fetches new runners periodically. - Refresh() -} - -// NomadExecutionEnvironment is an implementation that returns a nomad specific execution environment. -// Here it is mapped on a Job in Nomad. -// The jobId has to match the only Nomad task group name! -type NomadExecutionEnvironment struct { - id int - jobId string - availableRunners chan runner.Runner - allRunners RunnerPool - nomadApiClient nomad.ExecutorApi -} - -var executionEnvironment ExecutionEnvironment - -// DebugInit initializes one execution environment so that its runners can be provided. -// ToDo: This should be replaced by a create Execution Environment route -func DebugInit(runnersPool RunnerPool, nomadApi nomad.ExecutorApi) { - executionEnvironment = &NomadExecutionEnvironment{ - id: 0, - jobId: "python", - availableRunners: make(chan runner.Runner, 5), - nomadApiClient: nomadApi, - allRunners: runnersPool, - } - go executionEnvironment.Refresh() -} - -// GetExecutionEnvironment returns a previously added ExecutionEnvironment. -// This way you can access all Runners for that environment, -func GetExecutionEnvironment(id int) (ExecutionEnvironment, error) { - // TODO: Remove hardcoded execution environment - return executionEnvironment, nil -} - -func (environment *NomadExecutionEnvironment) NextRunner() (r runner.Runner, err error) { - select { - case r = <-environment.availableRunners: - r.SetStatus(runner.StatusRunning) - return r, nil - case <-time.After(50 * time.Millisecond): - return nil, errors.New("no runners available") - } -} - -// Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... -func (environment *NomadExecutionEnvironment) Refresh() { - for { - runners, err := environment.nomadApiClient.LoadRunners(environment.jobId) - if err != nil { - log.WithError(err).Warn("Failed fetching runners") - break - } - for _, r := range environment.unusedRunners(runners) { - // ToDo: Listen on Nomad event stream - log.WithField("allocation", r).Debug("Adding allocation") - environment.allRunners.Add(r) - environment.availableRunners <- r - } - jobScale, err := environment.nomadApiClient.JobScale(environment.jobId) - if err != nil { - log.WithError(err).Warn("Failed get allocation count") - break - } - neededRunners := cap(environment.availableRunners) - len(environment.availableRunners) + 1 - runnerCount := jobScale + neededRunners - time.Sleep(50 * time.Millisecond) - log.WithField("count", runnerCount).Debug("Set job scaling") - err = environment.nomadApiClient.SetJobScale(environment.jobId, runnerCount, "Runner Requested") - if err != nil { - log.WithError(err).Warn("Failed to set allocation scaling") - continue - } - } -} - -func (environment *NomadExecutionEnvironment) unusedRunners(fetchedRunnerIds []string) (newRunners []runner.Runner) { - newRunners = make([]runner.Runner, 0) - for _, runnerId := range fetchedRunnerIds { - _, ok := environment.allRunners.Get(runnerId) - if !ok { - newRunners = append(newRunners, runner.NewExerciseRunner(runnerId)) - } - } - return -} diff --git a/environment/execution_environment_test.go b/environment/execution_environment_test.go deleted file mode 100644 index 3c28043..0000000 --- a/environment/execution_environment_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package environment - -import ( - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" - "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" - "testing" - "time" -) - -const anotherRunnerId = "4n0th3r-1d" -const jobId = "4n0th3r-1d" - -func TestGetNextRunnerTestSuite(t *testing.T) { - suite.Run(t, new(GetNextRunnerTestSuite)) -} - -type GetNextRunnerTestSuite struct { - suite.Suite - nomadExecutionEnvironment *NomadExecutionEnvironment - exerciseRunner runner.Runner -} - -func (suite *GetNextRunnerTestSuite) SetupTest() { - suite.nomadExecutionEnvironment = &NomadExecutionEnvironment{ - availableRunners: make(chan runner.Runner, 50), - allRunners: NewLocalRunnerPool(), - } - suite.exerciseRunner = CreateTestRunner() -} - -func (suite *GetNextRunnerTestSuite) TestGetNextRunnerReturnsRunnerIfAvailable() { - suite.nomadExecutionEnvironment.availableRunners <- suite.exerciseRunner - - receivedRunner, err := suite.nomadExecutionEnvironment.NextRunner() - suite.NoError(err) - suite.Equal(suite.exerciseRunner, receivedRunner) -} - -func (suite *GetNextRunnerTestSuite) TestGetNextRunnerChangesStatusOfRunner() { - suite.nomadExecutionEnvironment.availableRunners <- suite.exerciseRunner - - receivedRunner, _ := suite.nomadExecutionEnvironment.NextRunner() - suite.Equal(runner.StatusRunning, receivedRunner.Status()) -} - -func (suite *GetNextRunnerTestSuite) TestGetNextRunnerDoesNotReturnTheSameRunnerTwice() { - suite.nomadExecutionEnvironment.availableRunners <- suite.exerciseRunner - suite.nomadExecutionEnvironment.availableRunners <- runner.NewExerciseRunner(anotherRunnerId) - - firstReceivedRunner, _ := suite.nomadExecutionEnvironment.NextRunner() - secondReceivedRunner, _ := suite.nomadExecutionEnvironment.NextRunner() - suite.NotEqual(firstReceivedRunner, secondReceivedRunner) -} - -func (suite *GetNextRunnerTestSuite) TestGetNextRunnerThrowsAnErrorIfNoRunnersAvailable() { - receivedRunner, err := suite.nomadExecutionEnvironment.NextRunner() - suite.Nil(receivedRunner) - suite.Error(err) -} - -func TestRefreshFetchRunners(t *testing.T) { - apiMock, environment := newRefreshMock([]string{RunnerId}, NewLocalRunnerPool()) - // ToDo: Terminate Refresh when test finished (also in other tests) - go environment.Refresh() - _, _ = environment.NextRunner() - apiMock.AssertCalled(t, "LoadRunners", jobId) -} - -func TestRefreshFetchesRunnersIntoChannel(t *testing.T) { - _, environment := newRefreshMock([]string{RunnerId}, NewLocalRunnerPool()) - go environment.Refresh() - availableRunner, _ := environment.NextRunner() - assert.Equal(t, availableRunner.Id(), RunnerId) -} - -func TestRefreshScalesJob(t *testing.T) { - apiMock, environment := newRefreshMock([]string{RunnerId}, NewLocalRunnerPool()) - go environment.Refresh() - _, _ = environment.NextRunner() - time.Sleep(100 * time.Millisecond) // ToDo: Be safe this test is not flaky - apiMock.AssertCalled(t, "SetJobScale", jobId, 52, "Runner Requested") -} - -func TestRefreshAddsRunnerToPool(t *testing.T) { - runnersInUse := NewLocalRunnerPool() - _, environment := newRefreshMock([]string{RunnerId}, runnersInUse) - go environment.Refresh() - availableRunner, _ := environment.NextRunner() - poolRunner, ok := runnersInUse.Get(availableRunner.Id()) - assert.True(t, ok) - assert.Equal(t, availableRunner, poolRunner) -} - -func newRefreshMock(returnedRunnerIds []string, allRunners RunnerPool) (apiClient *nomad.ExecutorApiMock, environment *NomadExecutionEnvironment) { - apiClient = &nomad.ExecutorApiMock{} - apiClient.On("LoadRunners", jobId).Return(returnedRunnerIds, nil) - apiClient.On("JobScale", jobId).Return(len(returnedRunnerIds), nil) - apiClient.On("SetJobScale", jobId, mock.AnythingOfType("int"), "Runner Requested").Return(nil) - environment = &NomadExecutionEnvironment{ - jobId: jobId, - availableRunners: make(chan runner.Runner, 50), - allRunners: allRunners, - nomadApiClient: apiClient, - } - return -} diff --git a/environment/manager.go b/environment/manager.go new file mode 100644 index 0000000..688c1dc --- /dev/null +++ b/environment/manager.go @@ -0,0 +1,59 @@ +package environment + +import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" +) + +// Manager encapsulates API calls to the executor API for creation and deletion of execution environments. +type Manager interface { + // Load fetches all already created execution environments from the executor and registers them at the runner manager. + // It should be called during the startup process (e.g. on creation of the Manager). + Load() + + // Create creates a new execution environment on the executor. + Create( + id string, + prewarmingPoolSize uint, + cpuLimit uint, + memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16, + ) + + // Delete remove the execution environment with the given id from the executor. + Delete(id string) +} + +func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorApi) *NomadEnvironmentManager { + environmentManager := &NomadEnvironmentManager{runnerManager, apiClient} + environmentManager.Load() + return environmentManager +} + +type NomadEnvironmentManager struct { + runnerManager runner.Manager + api nomad.ExecutorApi +} + +func (m *NomadEnvironmentManager) Create( + id string, + prewarmingPoolSize uint, + cpuLimit uint, + memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16, +) { + +} + +func (m *NomadEnvironmentManager) Delete(id string) { + +} + +func (m *NomadEnvironmentManager) Load() { + // ToDo: remove create default execution environment for debugging purposes + m.runnerManager.RegisterEnvironment(runner.EnvironmentId(0), "python", 5) +} diff --git a/environment/manager_mock.go b/environment/manager_mock.go new file mode 100644 index 0000000..0e17c28 --- /dev/null +++ b/environment/manager_mock.go @@ -0,0 +1,25 @@ +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. + +package environment + +import mock "github.com/stretchr/testify/mock" + +// ManagerMock is an autogenerated mock type for the Manager type +type ManagerMock struct { + mock.Mock +} + +// Create provides a mock function with given fields: id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts +func (_m *ManagerMock) Create(id string, prewarmingPoolSize uint, cpuLimit uint, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) { + _m.Called(id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) +} + +// Delete provides a mock function with given fields: id +func (_m *ManagerMock) Delete(id string) { + _m.Called(id) +} + +// Load provides a mock function with given fields: +func (_m *ManagerMock) Load() { + _m.Called() +} diff --git a/environment/test_constants.go b/environment/test_constants.go deleted file mode 100644 index fd94c1a..0000000 --- a/environment/test_constants.go +++ /dev/null @@ -1,9 +0,0 @@ -package environment - -import "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" - -const RunnerId = "s0m3-r4nd0m-1d" - -func CreateTestRunner() runner.Runner { - return runner.NewExerciseRunner(RunnerId) -} diff --git a/go.mod b/go.mod index 8698162..af203bc 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 - github.com/gorilla/websocket v1.4.2 // indirect + github.com/gorilla/websocket v1.4.2 github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/nomad v1.0.4 github.com/hashicorp/nomad/api v0.0.0-20210505182403-7d5a9ecde95c diff --git a/main.go b/main.go index 5f2e4e5..8ebab8f 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/environment" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "net/http" "os" "os/signal" @@ -38,13 +39,22 @@ func runServer(server *http.Server) { } } -func initServer(apiClient nomad.ExecutorApi, runnerPool environment.RunnerPool) *http.Server { +func initServer() *http.Server { + // API initialization + nomadAPIClient, err := nomad.NewExecutorApi(config.Config.NomadAPIURL(), config.Config.Nomad.Namespace) + if err != nil { + log.WithError(err).WithField("nomad url", config.Config.NomadAPIURL()).Fatal("Error parsing the nomad url") + } + + runnerManager := runner.NewNomadRunnerManager(nomadAPIClient) + environmentManager := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) + return &http.Server{ Addr: config.Config.PoseidonAPIURL().Host, WriteTimeout: time.Second * 15, ReadTimeout: time.Second * 15, IdleTimeout: time.Second * 60, - Handler: api.NewRouter(apiClient, runnerPool), + Handler: api.NewRouter(runnerManager, environmentManager), } } @@ -68,17 +78,7 @@ func main() { } logging.InitializeLogging(config.Config.Logger.Level) - // API initialization - nomadAPIClient, err := nomad.NewExecutorApi(config.Config.NomadAPIURL(), config.Config.Nomad.Namespace) - if err != nil { - log.WithError(err).WithField("nomad url", config.Config.NomadAPIURL()).Fatal("Error parsing the nomad url") - } - - // ToDo: Move to create execution environment - runnerPool := environment.NewLocalRunnerPool() - environment.DebugInit(runnerPool, nomadAPIClient) - - server := initServer(nomadAPIClient, runnerPool) + server := initServer() go runServer(server) shutdownOnOSSignal(server) } diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index dc3d4a9..f9b8ccb 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -72,8 +72,8 @@ func (_m *apiQuerierMock) LoadJobList() ([]*api.JobListStub, error) { return r0, r1 } -// SetJobScaling provides a mock function with given fields: jobId, count, reason -func (_m *apiQuerierMock) SetJobScaling(jobId string, count int, reason string) error { +// SetJobScale provides a mock function with given fields: jobId, count, reason +func (_m *apiQuerierMock) SetJobScale(jobId string, count int, reason string) error { ret := _m.Called(jobId, count, reason) var r0 error @@ -122,17 +122,3 @@ func (_m *apiQuerierMock) loadRunners(jobId string) ([]*api.AllocationListStub, return r0, r1 } - -// SetJobScale provides a mock function with given fields: jobId, count, reason -func (_m *apiQuerierMock) SetJobScale(jobId string, count int, reason string) error { - ret := _m.Called(jobId, count, reason) - - var r0 error - if rf, ok := ret.Get(0).(func(string, int, string) error); ok { - r0 = rf(jobId, count, reason) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/runner/manager.go b/runner/manager.go new file mode 100644 index 0000000..042e79c --- /dev/null +++ b/runner/manager.go @@ -0,0 +1,145 @@ +package runner + +import ( + "errors" + "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" + "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "time" +) + +var ( + log = logging.GetLogger("runner") + ErrUnknownExecutionEnvironment = errors.New("execution environment not found") + ErrNoRunnersAvailable = errors.New("no runners available for this execution environment") + ErrRunnerNotFound = errors.New("no runner found with this id") +) + +// Manager keeps track of the used and unused runners of all execution environments in order to provide unused runners to new clients and ensure no runner is used twice. +type Manager interface { + // RegisterEnvironment adds a new environment for being managed. + RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int) + + // Use returns a new runner. + // It makes sure that runner is not in use yet and returns an error if no runner could be provided. + Use(id EnvironmentId) (Runner, error) + + // Get returns the used runner with the given runnerId. + // If no runner with the given runnerId is currently used, it returns an error. + Get(runnerId string) (Runner, error) + + // Return hands back the runner. + // The runner is deleted or cleaned up for reuse depending on the used executor. + Return(r Runner) error +} + +func NewNomadRunnerManager(apiClient nomad.ExecutorApi) *NomadRunnerManager { + return &NomadRunnerManager{ + apiClient, + make(map[EnvironmentId]*NomadJob), + NewLocalRunnerPool(), + } +} + +type EnvironmentId int +type NomadJobId string + +type NomadJob struct { + jobId NomadJobId + idleRunners Pool + desiredIdleRunnersCount int +} + +type NomadRunnerManager struct { + apiClient nomad.ExecutorApi + jobs map[EnvironmentId]*NomadJob + usedRunners Pool +} + +func (m *NomadRunnerManager) RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int) { + m.jobs[environmentId] = &NomadJob{ + nomadJobId, + NewLocalRunnerPool(), + desiredIdleRunnersCount, + } + go m.refreshEnvironment(environmentId) +} + +func (m *NomadRunnerManager) Use(environmentId EnvironmentId) (Runner, error) { + job, ok := m.jobs[environmentId] + if !ok { + return nil, ErrUnknownExecutionEnvironment + } + runner, ok := job.idleRunners.Sample() + if !ok { + return nil, ErrNoRunnersAvailable + } + m.usedRunners.Add(runner) + return runner, nil +} + +func (m *NomadRunnerManager) Get(runnerId string) (r Runner, err error) { + runner, ok := m.usedRunners.Get(runnerId) + if !ok { + return nil, ErrRunnerNotFound + } + return runner.(Runner), nil +} + +func (m *NomadRunnerManager) Return(r Runner) (err error) { + err = m.apiClient.DeleteRunner(r.Id()) + if err != nil { + return err + } + m.usedRunners.Delete(r.Id()) + return +} + +// Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... +func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) { + job := m.jobs[id] + lastJobScaling := -1 + for { + runners, err := m.apiClient.LoadRunners(string(job.jobId)) + if err != nil { + log.WithError(err).Printf("Failed fetching runners") + break + } + for _, r := range m.unusedRunners(id, runners) { + // ToDo: Listen on Nomad event stream + log.Printf("Adding allocation %+v", r) + + job.idleRunners.Add(r) + } + jobScale, err := m.apiClient.JobScale(string(job.jobId)) + if err != nil { + log.WithError(err).Printf("Failed get allocation count") + break + } + neededRunners := job.desiredIdleRunnersCount - job.idleRunners.Len() + 1 + runnerCount := jobScale + neededRunners + time.Sleep(50 * time.Millisecond) + if runnerCount != lastJobScaling { + log.Printf("Set job scaling %d", runnerCount) + err = m.apiClient.SetJobScale(string(job.jobId), runnerCount, "Runner Requested") + if err != nil { + log.WithError(err).Printf("Failed set allocation scaling") + continue + } + lastJobScaling = runnerCount + } + } +} + +func (m *NomadRunnerManager) unusedRunners(environmentId EnvironmentId, fetchedRunnerIds []string) (newRunners []Runner) { + newRunners = make([]Runner, 0) + for _, runnerId := range fetchedRunnerIds { + _, ok := m.usedRunners.Get(runnerId) + if !ok { + _, ok = m.jobs[environmentId].idleRunners.Get(runnerId) + if !ok { + newRunners = append(newRunners, NewRunner(runnerId)) + } + } + } + return +} diff --git a/runner/manager_mock.go b/runner/manager_mock.go new file mode 100644 index 0000000..76a9156 --- /dev/null +++ b/runner/manager_mock.go @@ -0,0 +1,75 @@ +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. + +package runner + +import mock "github.com/stretchr/testify/mock" + +// ManagerMock is an autogenerated mock type for the Manager type +type ManagerMock struct { + mock.Mock +} + +// Get provides a mock function with given fields: runnerId +func (_m *ManagerMock) Get(runnerId string) (Runner, error) { + ret := _m.Called(runnerId) + + var r0 Runner + if rf, ok := ret.Get(0).(func(string) Runner); ok { + r0 = rf(runnerId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(Runner) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(runnerId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RegisterEnvironment provides a mock function with given fields: environmentId, nomadJobId, desiredIdleRunnersCount +func (_m *ManagerMock) RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int) { + _m.Called(environmentId, nomadJobId, desiredIdleRunnersCount) +} + +// Return provides a mock function with given fields: r +func (_m *ManagerMock) Return(r Runner) error { + ret := _m.Called(r) + + var r0 error + if rf, ok := ret.Get(0).(func(Runner) error); ok { + r0 = rf(r) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Use provides a mock function with given fields: id +func (_m *ManagerMock) Use(id EnvironmentId) (Runner, error) { + ret := _m.Called(id) + + var r0 Runner + if rf, ok := ret.Get(0).(func(EnvironmentId) Runner); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(Runner) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(EnvironmentId) error); ok { + r1 = rf(id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/runner/manager_test.go b/runner/manager_test.go new file mode 100644 index 0000000..412a43d --- /dev/null +++ b/runner/manager_test.go @@ -0,0 +1,172 @@ +package runner + +import ( + "errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "testing" + "time" +) + +const anotherRunnerId = "4n0th3r-runn3r-1d" +const defaultEnvironmentId = EnvironmentId(0) +const otherEnvironmentId = EnvironmentId(42) +const jobId = "4n0th3r-j0b-1d" +const waitTime = 100 * time.Millisecond + +func TestGetNextRunnerTestSuite(t *testing.T) { + suite.Run(t, new(ManagerTestSuite)) +} + +type ManagerTestSuite struct { + suite.Suite + apiMock *nomad.ExecutorApiMock + nomadRunnerManager *NomadRunnerManager + exerciseRunner Runner +} + +func (suite *ManagerTestSuite) setUp(returnedRunnerIds []string) { + suite.apiMock = &nomad.ExecutorApiMock{} + suite.nomadRunnerManager = NewNomadRunnerManager(suite.apiMock) + suite.exerciseRunner = CreateTestRunner() + suite.mockRunnerQueries(returnedRunnerIds) + suite.registerDefaultEnvironment() +} + +func (suite *ManagerTestSuite) mockRunnerQueries(returnedRunnerIds []string) { + suite.apiMock.On("LoadRunners", jobId).Return(returnedRunnerIds, nil) + suite.apiMock.On("JobScale", jobId).Return(len(returnedRunnerIds), nil) + suite.apiMock.On("SetJobScale", jobId, mock.AnythingOfType("int"), "Runner Requested").Return(nil) +} + +func (suite *ManagerTestSuite) registerDefaultEnvironment() { + suite.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentId, jobId, 5) +} + +func (suite *ManagerTestSuite) TestRegisterEnvironmentAddsANewJob() { + suite.NotNil(suite.nomadRunnerManager.jobs[defaultEnvironmentId]) +} + +func (suite *ManagerTestSuite) TestUseReturnsNotFoundErrorIfEnvironmentNotFound() { + runner, err := suite.nomadRunnerManager.Use(EnvironmentId(42)) + suite.Nil(runner) + suite.Equal(ErrUnknownExecutionEnvironment, err) +} + +func (suite *ManagerTestSuite) TestUseReturnsRunnerIfAvailable() { + suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner) + receivedRunner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId) + suite.NoError(err) + suite.Equal(suite.exerciseRunner, receivedRunner) +} + +func (suite *ManagerTestSuite) TestUseReturnsErrorIfNoRunnerAvailable() { + suite.setUp([]string{}) + time.Sleep(waitTime) + runner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId) + suite.Nil(runner) + suite.Equal(ErrNoRunnersAvailable, err) +} + +func (suite *ManagerTestSuite) TestUseReturnsNoRunnerOfDifferentEnvironment() { + suite.setUp([]string{}) + suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner) + receivedRunner, err := suite.nomadRunnerManager.Use(otherEnvironmentId) + suite.Nil(receivedRunner) + suite.Error(err) +} + +func (suite *ManagerTestSuite) TestUseDoesNotReturnTheSameRunnerTwice() { + suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner) + suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(NewRunner(anotherRunnerId)) + + firstReceivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + secondReceivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + suite.NotEqual(firstReceivedRunner, secondReceivedRunner) +} + +func (suite *ManagerTestSuite) TestUseThrowsAnErrorIfNoRunnersAvailable() { + receivedRunner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId) + suite.Nil(receivedRunner) + suite.Error(err) +} + +func (suite *ManagerTestSuite) TestUseAddsRunnerToUsedRunners() { + suite.setUp([]string{RunnerId}) + time.Sleep(waitTime) + receivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + savedRunner, ok := suite.nomadRunnerManager.usedRunners.Get(receivedRunner.Id()) + suite.True(ok) + suite.Equal(savedRunner, receivedRunner) +} + +func (suite *ManagerTestSuite) TestGetReturnsRunnerIfRunnerIsUsed() { + suite.setUp([]string{}) + suite.nomadRunnerManager.usedRunners.Add(suite.exerciseRunner) + savedRunner, err := suite.nomadRunnerManager.Get(suite.exerciseRunner.Id()) + suite.NoError(err) + suite.Equal(savedRunner, suite.exerciseRunner) +} + +func (suite *ManagerTestSuite) TestGetReturnsErrorIfRunnerNotFound() { + suite.setUp([]string{}) + savedRunner, err := suite.nomadRunnerManager.Get(RunnerId) + suite.Nil(savedRunner) + suite.Error(err) +} + +func (suite *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() { + suite.setUp([]string{}) + suite.apiMock.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil) + suite.nomadRunnerManager.usedRunners.Add(suite.exerciseRunner) + err := suite.nomadRunnerManager.Return(suite.exerciseRunner) + suite.Nil(err) + _, ok := suite.nomadRunnerManager.usedRunners.Get(suite.exerciseRunner.Id()) + suite.False(ok) +} + +func (suite *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() { + suite.setUp([]string{}) + suite.apiMock.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil) + err := suite.nomadRunnerManager.Return(suite.exerciseRunner) + suite.Nil(err) + suite.apiMock.AssertCalled(suite.T(), "DeleteRunner", suite.exerciseRunner.Id()) +} + +func (suite *ManagerTestSuite) TestReturnThrowsErrorWhenApiCallFailed() { + suite.setUp([]string{}) + suite.apiMock.On("DeleteRunner", mock.AnythingOfType("string")).Return(errors.New("return failed")) + err := suite.nomadRunnerManager.Return(suite.exerciseRunner) + suite.Error(err) +} + +func (suite *ManagerTestSuite) TestRefreshFetchesRunners() { + suite.setUp([]string{RunnerId}) + time.Sleep(waitTime) + suite.apiMock.AssertCalled(suite.T(), "LoadRunners", jobId) +} + +func (suite *ManagerTestSuite) TestNewRunnersFoundInRefreshAreAddedToUnusedRunners() { + suite.setUp([]string{RunnerId}) + time.Sleep(waitTime) + availableRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + suite.Equal(availableRunner.Id(), RunnerId) +} + +func (suite *ManagerTestSuite) TestRefreshScalesJob() { + suite.setUp([]string{RunnerId}) + time.Sleep(waitTime) + // use one runner + _, _ = suite.nomadRunnerManager.Use(defaultEnvironmentId) + time.Sleep(waitTime) + suite.apiMock.AssertCalled(suite.T(), "SetJobScale", jobId, 6, "Runner Requested") +} + +func (suite *ManagerTestSuite) TestRefreshAddsRunnerToPool() { + suite.setUp([]string{RunnerId}) + time.Sleep(waitTime) + poolRunner, ok := suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Get(RunnerId) + suite.True(ok) + suite.Equal(RunnerId, poolRunner.Id()) +} diff --git a/environment/runner_pool.go b/runner/pool.go similarity index 59% rename from environment/runner_pool.go rename to runner/pool.go index 64b15cc..edded59 100644 --- a/environment/runner_pool.go +++ b/runner/pool.go @@ -1,35 +1,38 @@ -package environment +package runner import ( - "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/store" "sync" ) -// RunnerPool is a type of entity store that should store runner entities. -type RunnerPool interface { +// Pool is a type of entity store that should store runner entities. +type Pool interface { store.EntityStore + + // Sample returns and removes an arbitrary entity from the pool. + // ok is true iff a runner was returned. + Sample() (r Runner, ok bool) } // localRunnerPool stores runner objects in the local application memory. // ToDo: Create implementation that use some persistent storage like a database type localRunnerPool struct { sync.RWMutex - runners map[string]runner.Runner + runners map[string]Runner } -// NewLocalRunnerPool responds with a RunnerPool implementation +// NewLocalRunnerPool responds with a Pool implementation // This implementation stores the data thread-safe in the local application memory func NewLocalRunnerPool() *localRunnerPool { return &localRunnerPool{ - runners: make(map[string]runner.Runner), + runners: make(map[string]Runner), } } func (pool *localRunnerPool) Add(r store.Entity) { pool.Lock() defer pool.Unlock() - runnerEntity, ok := r.(runner.Runner) + runnerEntity, ok := r.(Runner) if !ok { log. WithField("pool", pool). @@ -51,3 +54,17 @@ func (pool *localRunnerPool) Delete(id string) { defer pool.Unlock() delete(pool.runners, id) } + +func (pool *localRunnerPool) Sample() (Runner, bool) { + pool.Lock() + defer pool.Unlock() + for _, runner := range pool.runners { + delete(pool.runners, runner.Id()) + return runner, true + } + return nil, false +} + +func (pool *localRunnerPool) Len() int { + return len(pool.runners) +} diff --git a/environment/runner_pool_test.go b/runner/pool_test.go similarity index 93% rename from environment/runner_pool_test.go rename to runner/pool_test.go index 8fcf254..9b2f2f2 100644 --- a/environment/runner_pool_test.go +++ b/runner/pool_test.go @@ -1,10 +1,9 @@ -package environment +package runner import ( "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/suite" - "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" "testing" ) @@ -21,7 +20,7 @@ func TestRunnerPoolTestSuite(t *testing.T) { type RunnerPoolTestSuite struct { suite.Suite runnerPool *localRunnerPool - runner runner.Runner + runner Runner } func (suite *RunnerPoolTestSuite) SetupTest() { @@ -60,7 +59,7 @@ func (suite *RunnerPoolTestSuite) TestAddedRunnerCanBeRetrieved() { } func (suite *RunnerPoolTestSuite) TestRunnerWithSameIdOverwritesOldOne() { - otherRunnerWithSameId := runner.NewExerciseRunner(suite.runner.Id()) + otherRunnerWithSameId := NewRunner(suite.runner.Id()) // assure runner is actually different suite.NotEqual(suite.runner, otherRunnerWithSameId) diff --git a/runner/runner.go b/runner/runner.go index a815a4c..7c33606 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -9,9 +9,6 @@ import ( "sync" ) -// Status is the type for the status of a Runner. -type Status string - // ContextKey is the type for keys in a request context. type ContextKey string @@ -19,11 +16,6 @@ type ContextKey string type ExecutionId string const ( - StatusReady Status = "ready" - StatusRunning Status = "running" - StatusTimeout Status = "timeout" - StatusFinished Status = "finished" - // runnerContextKey is the key used to store runners in context.Context runnerContextKey ContextKey = "runner" ) @@ -31,37 +23,33 @@ const ( type Runner interface { store.Entity - // SetStatus sets the status of the runner. - SetStatus(Status) - - // Status gets the status of the runner. - Status() Status - - // Execution looks up an ExecutionId for the runner and returns the associated RunnerRequest. - // If this request does not exit, ok is false, else true. - Execution(ExecutionId) (request dto.ExecutionRequest, ok bool) - // AddExecution saves the supplied ExecutionRequest for the runner and returns an ExecutionId to retrieve it again. AddExecution(dto.ExecutionRequest) (ExecutionId, error) + Execution(ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) + // DeleteExecution deletes the execution of the runner with the specified id. DeleteExecution(ExecutionId) + + // Execute executes the execution with the given ID. + Execute(ExecutionId) + + // Copy copies the specified files into the runner. + Copy(dto.FileCreation) } -// ExerciseRunner is an abstraction to communicate with Nomad allocations. -type ExerciseRunner struct { +// NomadAllocation is an abstraction to communicate with Nomad allocations. +type NomadAllocation struct { sync.RWMutex id string - status Status ch chan bool executions map[ExecutionId]dto.ExecutionRequest } -// NewExerciseRunner creates a new exercise runner with the provided id. -func NewExerciseRunner(id string) *ExerciseRunner { - return &ExerciseRunner{ +// NewRunner creates a new runner with the provided id. +func NewRunner(id string) Runner { + return &NomadAllocation{ id: id, - status: StatusReady, ch: make(chan bool), executions: make(map[ExecutionId]dto.ExecutionRequest), } @@ -69,40 +57,26 @@ func NewExerciseRunner(id string) *ExerciseRunner { // MarshalJSON implements json.Marshaler interface. // This exports private attributes like the id too. -func (r *ExerciseRunner) MarshalJSON() ([]byte, error) { +func (r *NomadAllocation) MarshalJSON() ([]byte, error) { return json.Marshal(struct { - Id string `json:"runnerId"` - Status Status `json:"status"` + Id string `json:"runnerId"` }{ - Id: r.Id(), - Status: r.Status(), + Id: r.Id(), }) } -func (r *ExerciseRunner) SetStatus(status Status) { - r.Lock() - defer r.Unlock() - r.status = status -} - -func (r *ExerciseRunner) Status() Status { - r.RLock() - defer r.RUnlock() - return r.status -} - -func (r *ExerciseRunner) Id() string { +func (r *NomadAllocation) Id() string { return r.id } -func (r *ExerciseRunner) Execution(id ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) { +func (r *NomadAllocation) Execution(id ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) { r.RLock() defer r.RUnlock() executionRequest, ok = r.executions[id] return } -func (r *ExerciseRunner) AddExecution(request dto.ExecutionRequest) (ExecutionId, error) { +func (r *NomadAllocation) AddExecution(request dto.ExecutionRequest) (ExecutionId, error) { r.Lock() defer r.Unlock() idUuid, err := uuid.NewRandom() @@ -114,7 +88,15 @@ func (r *ExerciseRunner) AddExecution(request dto.ExecutionRequest) (ExecutionId return id, err } -func (r *ExerciseRunner) DeleteExecution(id ExecutionId) { +func (r *NomadAllocation) Execute(id ExecutionId) { + +} + +func (r *NomadAllocation) Copy(files dto.FileCreation) { + +} + +func (r *NomadAllocation) DeleteExecution(id ExecutionId) { r.Lock() defer r.Unlock() delete(r.executions, id) diff --git a/runner/runner_test.go b/runner/runner_test.go index 0af109f..ce7e024 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -9,32 +9,19 @@ import ( ) func TestIdIsStored(t *testing.T) { - runner := NewExerciseRunner("42") + runner := NewRunner("42") assert.Equal(t, "42", runner.Id()) } -func TestStatusIsStored(t *testing.T) { - runner := NewExerciseRunner("42") - for _, status := range []Status{StatusReady, StatusRunning, StatusTimeout, StatusFinished} { - runner.SetStatus(status) - assert.Equal(t, status, runner.Status(), "The status is returned as it is stored") - } -} - -func TestDefaultStatus(t *testing.T) { - runner := NewExerciseRunner("42") - assert.Equal(t, StatusReady, runner.status) -} - func TestMarshalRunner(t *testing.T) { - runner := NewExerciseRunner("42") + runner := NewRunner("42") marshal, err := json.Marshal(runner) assert.NoError(t, err) - assert.Equal(t, "{\"runnerId\":\"42\",\"status\":\"ready\"}", string(marshal)) + assert.Equal(t, "{\"runnerId\":\"42\"}", string(marshal)) } func TestExecutionRequestIsStored(t *testing.T) { - runner := NewExerciseRunner("42") + runner := NewRunner("42") executionRequest := dto.ExecutionRequest{ Command: "command", TimeLimit: 10, @@ -49,7 +36,7 @@ func TestExecutionRequestIsStored(t *testing.T) { } func TestNewContextReturnsNewContextWithRunner(t *testing.T) { - runner := NewExerciseRunner("testRunner") + runner := NewRunner("testRunner") ctx := context.Background() newCtx := NewContext(ctx, runner) storedRunner := newCtx.Value(runnerContextKey).(Runner) @@ -59,7 +46,7 @@ func TestNewContextReturnsNewContextWithRunner(t *testing.T) { } func TestFromContextReturnsRunner(t *testing.T) { - runner := NewExerciseRunner("testRunner") + runner := NewRunner("testRunner") ctx := NewContext(context.Background(), runner) storedRunner, ok := FromContext(ctx) diff --git a/runner/test_constants.go b/runner/test_constants.go new file mode 100644 index 0000000..48798ee --- /dev/null +++ b/runner/test_constants.go @@ -0,0 +1,7 @@ +package runner + +const RunnerId = "s0m3-r4nd0m-1d" + +func CreateTestRunner() Runner { + return NewRunner(RunnerId) +} diff --git a/store/entity_store.go b/store/entity_store.go index 7e63aaa..8cd6a20 100644 --- a/store/entity_store.go +++ b/store/entity_store.go @@ -13,6 +13,9 @@ type EntityStore interface { // Delete deletes the entity with the passed id from the store. Delete(id string) + + // Len returns the number of currently stored entities in the store. + Len() int } type Entity interface {