diff --git a/api/api.go b/api/api.go index 72ba33c..8791ef0 100644 --- a/api/api.go +++ b/api/api.go @@ -31,7 +31,7 @@ func NewRouter(runnerManager runner.Manager, environmentManager environment.Mana return router } -// configureV1Router configures a given router with the routes of version 1 of our API. +// configureV1Router configures a given router with the routes of version 1 of the Poseidon API. func configureV1Router(router *mux.Router, runnerManager runner.Manager, environmentManager environment.Manager) { v1 := router.PathPrefix(RouteBase).Subrouter() v1.HandleFunc(RouteHealth, Health).Methods(http.MethodGet) diff --git a/api/api_test.go b/api/api_test.go index 58ac881..37a9e87 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -56,6 +56,7 @@ func TestNewRouterV1WithAuthenticationEnabled(t *testing.T) { }) t.Run("protected route is not accessible", func(t *testing.T) { + // request an available API route that should be guarded by authentication (which one, in particular, does not matter here) request, err := http.NewRequest(http.MethodPost, "/api/v1/runners", nil) if err != nil { t.Fatal(err) diff --git a/api/dto/dto.go b/api/dto/dto.go index 2f1eab2..3049606 100644 --- a/api/dto/dto.go +++ b/api/dto/dto.go @@ -13,12 +13,24 @@ type ExecutionRequest struct { Environment map[string]string } +// ExecutionEnvironmentRequest is the expected json structure of the request body for the create execution environment function. +// nolint:unused,structcheck +type ExecutionEnvironmentRequest struct { + prewarmingPoolSize uint + cpuLimit uint + memoryLimit uint + image string + networkAccess bool + exposedPorts []uint16 +} + // RunnerResponse is the expected response when providing a runner. type RunnerResponse struct { Id string `json:"runnerId"` } // FileCreation is the expected json structure of the request body for the copy files route. +// TODO: specify content of the struct type FileCreation struct{} // WebsocketResponse is the expected response when creating an execution for a runner. diff --git a/api/health.go b/api/health.go index 70cd402..f3cd795 100644 --- a/api/health.go +++ b/api/health.go @@ -5,7 +5,7 @@ import ( ) // Health handles the health route. -// It tries to respond that the server is alive. +// It responds that the server is alive. // If it is not, the response won't reach the client. func Health(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusNoContent) diff --git a/api/runners.go b/api/runners.go index 3048cf7..e616ba4 100644 --- a/api/runners.go +++ b/api/runners.go @@ -30,7 +30,7 @@ func (r *RunnerController) ConfigureRoutes(router *mux.Router) { r.runnerRouter = runnersRouter.PathPrefix(fmt.Sprintf("/{%s}", RunnerIdKey)).Subrouter() r.runnerRouter.Use(r.findRunnerMiddleware) r.runnerRouter.HandleFunc(ExecutePath, r.execute).Methods(http.MethodPost).Name(ExecutePath) - r.runnerRouter.HandleFunc(WebsocketPath, connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) + r.runnerRouter.HandleFunc(WebsocketPath, r.connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) r.runnerRouter.HandleFunc("", r.delete).Methods(http.MethodDelete).Name(DeleteRoute) } @@ -43,7 +43,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req return } environmentId := runner.EnvironmentId(runnerRequest.ExecutionEnvironmentId) - nextRunner, err := r.manager.Use(environmentId) + nextRunner, err := r.manager.Claim(environmentId) if err != nil { if err == runner.ErrUnknownExecutionEnvironment { writeNotFound(writer, err) @@ -101,14 +101,13 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req // and adds the runner to the context of the request. func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - // Find runner runnerId := mux.Vars(request)[RunnerIdKey] - r, err := r.manager.Get(runnerId) + targetRunner, err := r.manager.Get(runnerId) if err != nil { writeNotFound(writer, err) return } - ctx := runner.NewContext(request.Context(), r.(runner.Runner)) + ctx := runner.NewContext(request.Context(), targetRunner.(runner.Runner)) requestWithRunner := request.WithContext(ctx) next.ServeHTTP(writer, requestWithRunner) }) @@ -123,9 +122,9 @@ func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Requ if err != nil { if err == runner.ErrUnknownExecutionEnvironment { writeNotFound(writer, err) + } else { + writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError) } - - writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError) return } diff --git a/api/websocket.go b/api/websocket.go index 38f2e03..b3085a2 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -9,16 +9,16 @@ import ( ) // connectToRunner is the endpoint for websocket connections. -func connectToRunner(writer http.ResponseWriter, request *http.Request) { - r, _ := runner.FromContext(request.Context()) +func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) { + targetRunner, _ := runner.FromContext(request.Context()) executionId := runner.ExecutionId(request.URL.Query().Get(ExecutionIdKey)) - _, ok := r.Execution(executionId) + _, ok := targetRunner.Execution(executionId) if !ok { writeNotFound(writer, errors.New("executionId does not exist")) return } log. - WithField("runnerId", r.Id()). + WithField("runnerId", targetRunner.Id()). WithField("executionId", executionId). Info("Running execution") connUpgrader := websocket.Upgrader{} diff --git a/environment/manager.go b/environment/manager.go index 688c1dc..418b64a 100644 --- a/environment/manager.go +++ b/environment/manager.go @@ -1,6 +1,7 @@ package environment import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" ) @@ -14,15 +15,10 @@ type Manager interface { // Create creates a new execution environment on the executor. Create( id string, - prewarmingPoolSize uint, - cpuLimit uint, - memoryLimit uint, - image string, - networkAccess bool, - exposedPorts []uint16, + request dto.ExecutionEnvironmentRequest, ) - // Delete remove the execution environment with the given id from the executor. + // Delete removes the execution environment with the given id from the executor. Delete(id string) } @@ -39,12 +35,7 @@ type NomadEnvironmentManager struct { func (m *NomadEnvironmentManager) Create( id string, - prewarmingPoolSize uint, - cpuLimit uint, - memoryLimit uint, - image string, - networkAccess bool, - exposedPorts []uint16, + request dto.ExecutionEnvironmentRequest, ) { } diff --git a/runner/job_store.go b/runner/job_store.go new file mode 100644 index 0000000..d67d5a5 --- /dev/null +++ b/runner/job_store.go @@ -0,0 +1,57 @@ +package runner + +import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/store" + "sync" +) + +// JobStore is a type of entity store that should store job entities. +type JobStore interface { + store.EntityStore +} + +// nomadJobStore stores NomadJob objects in the local application memory. +type nomadJobStore struct { + sync.RWMutex + jobs map[string]*NomadJob +} + +// NewNomadJobStore responds with a Pool implementation. +// This implementation stores the data thread-safe in the local application memory. +func NewNomadJobStore() *nomadJobStore { + return &nomadJobStore{ + jobs: make(map[string]*NomadJob), + } +} + +func (pool *nomadJobStore) Add(nomadJob store.Entity) { + pool.Lock() + defer pool.Unlock() + jobEntity, ok := nomadJob.(*NomadJob) + if !ok { + log. + WithField("pool", pool). + WithField("entity", nomadJob). + Fatal("Entity of type NomadJob was expected, but wasn't given.") + } + pool.jobs[nomadJob.Id()] = jobEntity +} + +func (pool *nomadJobStore) Get(id string) (nomadJob store.Entity, ok bool) { + pool.RLock() + defer pool.RUnlock() + nomadJob, ok = pool.jobs[id] + return +} + +func (pool *nomadJobStore) Delete(id string) { + pool.Lock() + defer pool.Unlock() + delete(pool.jobs, id) +} + +func (pool *nomadJobStore) Len() int { + pool.RLock() + defer pool.RUnlock() + return len(pool.jobs) +} diff --git a/runner/manager.go b/runner/manager.go index 042e79c..2e5add9 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -14,62 +14,72 @@ var ( ErrRunnerNotFound = errors.New("no runner found with this id") ) +type EnvironmentId int +func (e EnvironmentId) toString() string { + return string(rune(e)) +} + +type NomadJobId string + // Manager keeps track of the used and unused runners of all execution environments in order to provide unused runners to new clients and ensure no runner is used twice. type Manager interface { - // RegisterEnvironment adds a new environment for being managed. + // RegisterEnvironment adds a new environment that should be managed. RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int) - // Use returns a new runner. - // It makes sure that runner is not in use yet and returns an error if no runner could be provided. - Use(id EnvironmentId) (Runner, error) + // Claim returns a new runner. + // It makes sure that the runner is not in use yet and returns an error if no runner could be provided. + Claim(id EnvironmentId) (Runner, error) // Get returns the used runner with the given runnerId. // If no runner with the given runnerId is currently used, it returns an error. Get(runnerId string) (Runner, error) - // Return hands back the runner. + // Return signals that the runner is no longer used by the caller and can be claimed by someone else. // The runner is deleted or cleaned up for reuse depending on the used executor. Return(r Runner) error } +type NomadRunnerManager struct { + apiClient nomad.ExecutorApi + jobs JobStore + usedRunners Pool +} + func NewNomadRunnerManager(apiClient nomad.ExecutorApi) *NomadRunnerManager { return &NomadRunnerManager{ apiClient, - make(map[EnvironmentId]*NomadJob), + NewNomadJobStore(), NewLocalRunnerPool(), } } -type EnvironmentId int -type NomadJobId string - type NomadJob struct { + environmentId EnvironmentId jobId NomadJobId idleRunners Pool desiredIdleRunnersCount int } -type NomadRunnerManager struct { - apiClient nomad.ExecutorApi - jobs map[EnvironmentId]*NomadJob - usedRunners Pool +func (j *NomadJob) Id() string { + return j.environmentId.toString() } func (m *NomadRunnerManager) RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int) { - m.jobs[environmentId] = &NomadJob{ + m.jobs.Add(&NomadJob{ + environmentId, nomadJobId, NewLocalRunnerPool(), desiredIdleRunnersCount, - } + }) go m.refreshEnvironment(environmentId) } -func (m *NomadRunnerManager) Use(environmentId EnvironmentId) (Runner, error) { - job, ok := m.jobs[environmentId] +func (m *NomadRunnerManager) Claim(environmentId EnvironmentId) (Runner, error) { + job, ok := m.jobs.Get(environmentId.toString()) if !ok { return nil, ErrUnknownExecutionEnvironment } - runner, ok := job.idleRunners.Sample() + runner, ok := job.(*NomadJob).idleRunners.Sample() if !ok { return nil, ErrNoRunnersAvailable } @@ -77,7 +87,7 @@ func (m *NomadRunnerManager) Use(environmentId EnvironmentId) (Runner, error) { return runner, nil } -func (m *NomadRunnerManager) Get(runnerId string) (r Runner, err error) { +func (m *NomadRunnerManager) Get(runnerId string) (Runner, error) { runner, ok := m.usedRunners.Get(runnerId) if !ok { return nil, ErrRunnerNotFound @@ -88,7 +98,7 @@ func (m *NomadRunnerManager) Get(runnerId string) (r Runner, err error) { func (m *NomadRunnerManager) Return(r Runner) (err error) { err = m.apiClient.DeleteRunner(r.Id()) if err != nil { - return err + return } m.usedRunners.Delete(r.Id()) return @@ -96,7 +106,12 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) { // Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) { - job := m.jobs[id] + jobEntity, ok := m.jobs.Get(id.toString()) + if !ok { + // this environment does not exist + return + } + job := jobEntity.(*NomadJob) lastJobScaling := -1 for { runners, err := m.apiClient.LoadRunners(string(job.jobId)) @@ -132,10 +147,15 @@ func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) { func (m *NomadRunnerManager) unusedRunners(environmentId EnvironmentId, fetchedRunnerIds []string) (newRunners []Runner) { newRunners = make([]Runner, 0) + jobEntity, ok := m.jobs.Get(environmentId.toString()) + if !ok { + // the environment does not exist, so it won't have any unused runners + return + } for _, runnerId := range fetchedRunnerIds { _, ok := m.usedRunners.Get(runnerId) if !ok { - _, ok = m.jobs[environmentId].idleRunners.Get(runnerId) + _, ok = jobEntity.(*NomadJob).idleRunners.Get(runnerId) if !ok { newRunners = append(newRunners, NewRunner(runnerId)) } diff --git a/runner/manager_mock.go b/runner/manager_mock.go index 76a9156..af5bdba 100644 --- a/runner/manager_mock.go +++ b/runner/manager_mock.go @@ -52,7 +52,7 @@ func (_m *ManagerMock) Return(r Runner) error { } // Use provides a mock function with given fields: id -func (_m *ManagerMock) Use(id EnvironmentId) (Runner, error) { +func (_m *ManagerMock) Claim(id EnvironmentId) (Runner, error) { ret := _m.Called(id) var r0 Runner diff --git a/runner/manager_test.go b/runner/manager_test.go index 8374370..69160b3 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -9,11 +9,14 @@ import ( "time" ) -const anotherRunnerId = "4n0th3r-runn3r-1d" -const defaultEnvironmentId = EnvironmentId(0) -const otherEnvironmentId = EnvironmentId(42) -const jobId = "4n0th3r-j0b-1d" -const waitTime = 100 * time.Millisecond +const ( + anotherRunnerId = "4n0th3r-runn3r-1d" + defaultEnvironmentId = EnvironmentId(0) + otherEnvironmentId = EnvironmentId(42) + defaultDesiredRunnersCount = 5 + jobId = "4n0th3r-j0b-1d" + waitTime = 100 * time.Millisecond +) func TestGetNextRunnerTestSuite(t *testing.T) { suite.Run(t, new(ManagerTestSuite)) @@ -43,63 +46,71 @@ func (suite *ManagerTestSuite) mockRunnerQueries(returnedRunnerIds []string) { } func (suite *ManagerTestSuite) registerDefaultEnvironment() { - suite.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentId, jobId, 5) + suite.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentId, jobId, defaultDesiredRunnersCount) +} + +func (suite *ManagerTestSuite) AddIdleRunnerForDefaultEnvironment(r Runner) { + jobEntity, _ := suite.nomadRunnerManager.jobs.Get(defaultEnvironmentId.toString()) + jobEntity.(*NomadJob).idleRunners.Add(r) } func (suite *ManagerTestSuite) waitForRunnerRefresh() { time.Sleep(waitTime) } -func (suite *ManagerTestSuite) TestRegisterEnvironmentAddsANewJob() { - suite.NotNil(suite.nomadRunnerManager.jobs[defaultEnvironmentId]) +func (suite *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() { + suite.nomadRunnerManager.RegisterEnvironment(otherEnvironmentId, jobId, defaultDesiredRunnersCount) + jobEntity, ok := suite.nomadRunnerManager.jobs.Get(defaultEnvironmentId.toString()) + suite.True(ok) + suite.NotNil(jobEntity) } -func (suite *ManagerTestSuite) TestUseReturnsNotFoundErrorIfEnvironmentNotFound() { - runner, err := suite.nomadRunnerManager.Use(EnvironmentId(42)) +func (suite *ManagerTestSuite) TestClaimReturnsNotFoundErrorIfEnvironmentNotFound() { + runner, err := suite.nomadRunnerManager.Claim(EnvironmentId(42)) suite.Nil(runner) suite.Equal(ErrUnknownExecutionEnvironment, err) } -func (suite *ManagerTestSuite) TestUseReturnsRunnerIfAvailable() { - suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner) - receivedRunner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId) +func (suite *ManagerTestSuite) TestClaimReturnsRunnerIfAvailable() { + suite.AddIdleRunnerForDefaultEnvironment(suite.exerciseRunner) + receivedRunner, err := suite.nomadRunnerManager.Claim(defaultEnvironmentId) suite.NoError(err) suite.Equal(suite.exerciseRunner, receivedRunner) } -func (suite *ManagerTestSuite) TestUseReturnsErrorIfNoRunnerAvailable() { +func (suite *ManagerTestSuite) TestClaimReturnsErrorIfNoRunnerAvailable() { suite.waitForRunnerRefresh() - runner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId) + runner, err := suite.nomadRunnerManager.Claim(defaultEnvironmentId) suite.Nil(runner) suite.Equal(ErrNoRunnersAvailable, err) } -func (suite *ManagerTestSuite) TestUseReturnsNoRunnerOfDifferentEnvironment() { - suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner) - receivedRunner, err := suite.nomadRunnerManager.Use(otherEnvironmentId) +func (suite *ManagerTestSuite) TestClaimReturnsNoRunnerOfDifferentEnvironment() { + suite.AddIdleRunnerForDefaultEnvironment(suite.exerciseRunner) + receivedRunner, err := suite.nomadRunnerManager.Claim(otherEnvironmentId) suite.Nil(receivedRunner) suite.Error(err) } -func (suite *ManagerTestSuite) TestUseDoesNotReturnTheSameRunnerTwice() { - suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner) - suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(NewRunner(anotherRunnerId)) +func (suite *ManagerTestSuite) TestClaimDoesNotReturnTheSameRunnerTwice() { + suite.AddIdleRunnerForDefaultEnvironment(suite.exerciseRunner) + suite.AddIdleRunnerForDefaultEnvironment(NewRunner(anotherRunnerId)) - firstReceivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) - secondReceivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + firstReceivedRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId) + secondReceivedRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId) suite.NotEqual(firstReceivedRunner, secondReceivedRunner) } -func (suite *ManagerTestSuite) TestUseThrowsAnErrorIfNoRunnersAvailable() { - receivedRunner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId) +func (suite *ManagerTestSuite) TestClaimThrowsAnErrorIfNoRunnersAvailable() { + receivedRunner, err := suite.nomadRunnerManager.Claim(defaultEnvironmentId) suite.Nil(receivedRunner) suite.Error(err) } -func (suite *ManagerTestSuite) TestUseAddsRunnerToUsedRunners() { +func (suite *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { suite.mockRunnerQueries([]string{RunnerId}) suite.waitForRunnerRefresh() - receivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + receivedRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId) savedRunner, ok := suite.nomadRunnerManager.usedRunners.Get(receivedRunner.Id()) suite.True(ok) suite.Equal(savedRunner, receivedRunner) @@ -134,7 +145,7 @@ func (suite *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() { suite.apiMock.AssertCalled(suite.T(), "DeleteRunner", suite.exerciseRunner.Id()) } -func (suite *ManagerTestSuite) TestReturnThrowsErrorWhenApiCallFailed() { +func (suite *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() { suite.apiMock.On("DeleteRunner", mock.AnythingOfType("string")).Return(errors.New("return failed")) err := suite.nomadRunnerManager.Return(suite.exerciseRunner) suite.Error(err) @@ -149,23 +160,24 @@ func (suite *ManagerTestSuite) TestRefreshFetchesRunners() { func (suite *ManagerTestSuite) TestNewRunnersFoundInRefreshAreAddedToUnusedRunners() { suite.mockRunnerQueries([]string{RunnerId}) suite.waitForRunnerRefresh() - availableRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId) + availableRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId) suite.Equal(availableRunner.Id(), RunnerId) } func (suite *ManagerTestSuite) TestRefreshScalesJob() { suite.mockRunnerQueries([]string{RunnerId}) suite.waitForRunnerRefresh() - // use one runner - _, _ = suite.nomadRunnerManager.Use(defaultEnvironmentId) + // use one runner to necessitate rescaling + _, _ = suite.nomadRunnerManager.Claim(defaultEnvironmentId) suite.waitForRunnerRefresh() - suite.apiMock.AssertCalled(suite.T(), "SetJobScale", jobId, 6, "Runner Requested") + suite.apiMock.AssertCalled(suite.T(), "SetJobScale", jobId, defaultDesiredRunnersCount+1, "Runner Requested") } func (suite *ManagerTestSuite) TestRefreshAddsRunnerToPool() { suite.mockRunnerQueries([]string{RunnerId}) suite.waitForRunnerRefresh() - poolRunner, ok := suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Get(RunnerId) + jobEntity, _ := suite.nomadRunnerManager.jobs.Get(defaultEnvironmentId.toString()) + poolRunner, ok := jobEntity.(*NomadJob).idleRunners.Get(RunnerId) suite.True(ok) suite.Equal(RunnerId, poolRunner.Id()) } diff --git a/runner/pool.go b/runner/pool.go index edded59..d1c8050 100644 --- a/runner/pool.go +++ b/runner/pool.go @@ -21,7 +21,7 @@ type localRunnerPool struct { runners map[string]Runner } -// NewLocalRunnerPool responds with a Pool implementation +// NewLocalRunnerPool responds with a Pool implementation. // This implementation stores the data thread-safe in the local application memory func NewLocalRunnerPool() *localRunnerPool { return &localRunnerPool{ diff --git a/runner/runner.go b/runner/runner.go index 7c33606..4fd5b13 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -26,6 +26,8 @@ type Runner interface { // AddExecution saves the supplied ExecutionRequest for the runner and returns an ExecutionId to retrieve it again. AddExecution(dto.ExecutionRequest) (ExecutionId, error) + // Execution looks up an ExecutionId for the runner and returns the associated RunnerRequest. + // If this request does not exit, ok is false, else true. Execution(ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) // DeleteExecution deletes the execution of the runner with the specified id.