Incorporate review comments

This commit is contained in:
Jan-Eric Hellenberg
2021-05-18 15:03:52 +02:00
parent 0590f31200
commit fe2ec4df35
13 changed files with 177 additions and 83 deletions

View File

@ -31,7 +31,7 @@ func NewRouter(runnerManager runner.Manager, environmentManager environment.Mana
return router
}
// configureV1Router configures a given router with the routes of version 1 of our API.
// configureV1Router configures a given router with the routes of version 1 of the Poseidon API.
func configureV1Router(router *mux.Router, runnerManager runner.Manager, environmentManager environment.Manager) {
v1 := router.PathPrefix(RouteBase).Subrouter()
v1.HandleFunc(RouteHealth, Health).Methods(http.MethodGet)

View File

@ -56,6 +56,7 @@ func TestNewRouterV1WithAuthenticationEnabled(t *testing.T) {
})
t.Run("protected route is not accessible", func(t *testing.T) {
// request an available API route that should be guarded by authentication (which one, in particular, does not matter here)
request, err := http.NewRequest(http.MethodPost, "/api/v1/runners", nil)
if err != nil {
t.Fatal(err)

View File

@ -13,12 +13,24 @@ type ExecutionRequest struct {
Environment map[string]string
}
// ExecutionEnvironmentRequest is the expected json structure of the request body for the create execution environment function.
// nolint:unused,structcheck
type ExecutionEnvironmentRequest struct {
prewarmingPoolSize uint
cpuLimit uint
memoryLimit uint
image string
networkAccess bool
exposedPorts []uint16
}
// RunnerResponse is the expected response when providing a runner.
type RunnerResponse struct {
Id string `json:"runnerId"`
}
// FileCreation is the expected json structure of the request body for the copy files route.
// TODO: specify content of the struct
type FileCreation struct{}
// WebsocketResponse is the expected response when creating an execution for a runner.

View File

@ -5,7 +5,7 @@ import (
)
// Health handles the health route.
// It tries to respond that the server is alive.
// It responds that the server is alive.
// If it is not, the response won't reach the client.
func Health(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusNoContent)

View File

@ -30,7 +30,7 @@ func (r *RunnerController) ConfigureRoutes(router *mux.Router) {
r.runnerRouter = runnersRouter.PathPrefix(fmt.Sprintf("/{%s}", RunnerIdKey)).Subrouter()
r.runnerRouter.Use(r.findRunnerMiddleware)
r.runnerRouter.HandleFunc(ExecutePath, r.execute).Methods(http.MethodPost).Name(ExecutePath)
r.runnerRouter.HandleFunc(WebsocketPath, connectToRunner).Methods(http.MethodGet).Name(WebsocketPath)
r.runnerRouter.HandleFunc(WebsocketPath, r.connectToRunner).Methods(http.MethodGet).Name(WebsocketPath)
r.runnerRouter.HandleFunc("", r.delete).Methods(http.MethodDelete).Name(DeleteRoute)
}
@ -43,7 +43,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
return
}
environmentId := runner.EnvironmentId(runnerRequest.ExecutionEnvironmentId)
nextRunner, err := r.manager.Use(environmentId)
nextRunner, err := r.manager.Claim(environmentId)
if err != nil {
if err == runner.ErrUnknownExecutionEnvironment {
writeNotFound(writer, err)
@ -101,14 +101,13 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req
// and adds the runner to the context of the request.
func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// Find runner
runnerId := mux.Vars(request)[RunnerIdKey]
r, err := r.manager.Get(runnerId)
targetRunner, err := r.manager.Get(runnerId)
if err != nil {
writeNotFound(writer, err)
return
}
ctx := runner.NewContext(request.Context(), r.(runner.Runner))
ctx := runner.NewContext(request.Context(), targetRunner.(runner.Runner))
requestWithRunner := request.WithContext(ctx)
next.ServeHTTP(writer, requestWithRunner)
})
@ -123,9 +122,9 @@ func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Requ
if err != nil {
if err == runner.ErrUnknownExecutionEnvironment {
writeNotFound(writer, err)
} else {
writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError)
}
writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError)
return
}

View File

@ -9,16 +9,16 @@ import (
)
// connectToRunner is the endpoint for websocket connections.
func connectToRunner(writer http.ResponseWriter, request *http.Request) {
r, _ := runner.FromContext(request.Context())
func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) {
targetRunner, _ := runner.FromContext(request.Context())
executionId := runner.ExecutionId(request.URL.Query().Get(ExecutionIdKey))
_, ok := r.Execution(executionId)
_, ok := targetRunner.Execution(executionId)
if !ok {
writeNotFound(writer, errors.New("executionId does not exist"))
return
}
log.
WithField("runnerId", r.Id()).
WithField("runnerId", targetRunner.Id()).
WithField("executionId", executionId).
Info("Running execution")
connUpgrader := websocket.Upgrader{}

View File

@ -1,6 +1,7 @@
package environment
import (
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
)
@ -14,15 +15,10 @@ type Manager interface {
// Create creates a new execution environment on the executor.
Create(
id string,
prewarmingPoolSize uint,
cpuLimit uint,
memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16,
request dto.ExecutionEnvironmentRequest,
)
// Delete remove the execution environment with the given id from the executor.
// Delete removes the execution environment with the given id from the executor.
Delete(id string)
}
@ -39,12 +35,7 @@ type NomadEnvironmentManager struct {
func (m *NomadEnvironmentManager) Create(
id string,
prewarmingPoolSize uint,
cpuLimit uint,
memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16,
request dto.ExecutionEnvironmentRequest,
) {
}

57
runner/job_store.go Normal file
View File

@ -0,0 +1,57 @@
package runner
import (
"gitlab.hpi.de/codeocean/codemoon/poseidon/store"
"sync"
)
// JobStore is a type of entity store that should store job entities.
type JobStore interface {
store.EntityStore
}
// nomadJobStore stores NomadJob objects in the local application memory.
type nomadJobStore struct {
sync.RWMutex
jobs map[string]*NomadJob
}
// NewNomadJobStore responds with a Pool implementation.
// This implementation stores the data thread-safe in the local application memory.
func NewNomadJobStore() *nomadJobStore {
return &nomadJobStore{
jobs: make(map[string]*NomadJob),
}
}
func (pool *nomadJobStore) Add(nomadJob store.Entity) {
pool.Lock()
defer pool.Unlock()
jobEntity, ok := nomadJob.(*NomadJob)
if !ok {
log.
WithField("pool", pool).
WithField("entity", nomadJob).
Fatal("Entity of type NomadJob was expected, but wasn't given.")
}
pool.jobs[nomadJob.Id()] = jobEntity
}
func (pool *nomadJobStore) Get(id string) (nomadJob store.Entity, ok bool) {
pool.RLock()
defer pool.RUnlock()
nomadJob, ok = pool.jobs[id]
return
}
func (pool *nomadJobStore) Delete(id string) {
pool.Lock()
defer pool.Unlock()
delete(pool.jobs, id)
}
func (pool *nomadJobStore) Len() int {
pool.RLock()
defer pool.RUnlock()
return len(pool.jobs)
}

View File

@ -14,62 +14,72 @@ var (
ErrRunnerNotFound = errors.New("no runner found with this id")
)
type EnvironmentId int
func (e EnvironmentId) toString() string {
return string(rune(e))
}
type NomadJobId string
// Manager keeps track of the used and unused runners of all execution environments in order to provide unused runners to new clients and ensure no runner is used twice.
type Manager interface {
// RegisterEnvironment adds a new environment for being managed.
// RegisterEnvironment adds a new environment that should be managed.
RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int)
// Use returns a new runner.
// It makes sure that runner is not in use yet and returns an error if no runner could be provided.
Use(id EnvironmentId) (Runner, error)
// Claim returns a new runner.
// It makes sure that the runner is not in use yet and returns an error if no runner could be provided.
Claim(id EnvironmentId) (Runner, error)
// Get returns the used runner with the given runnerId.
// If no runner with the given runnerId is currently used, it returns an error.
Get(runnerId string) (Runner, error)
// Return hands back the runner.
// Return signals that the runner is no longer used by the caller and can be claimed by someone else.
// The runner is deleted or cleaned up for reuse depending on the used executor.
Return(r Runner) error
}
type NomadRunnerManager struct {
apiClient nomad.ExecutorApi
jobs JobStore
usedRunners Pool
}
func NewNomadRunnerManager(apiClient nomad.ExecutorApi) *NomadRunnerManager {
return &NomadRunnerManager{
apiClient,
make(map[EnvironmentId]*NomadJob),
NewNomadJobStore(),
NewLocalRunnerPool(),
}
}
type EnvironmentId int
type NomadJobId string
type NomadJob struct {
environmentId EnvironmentId
jobId NomadJobId
idleRunners Pool
desiredIdleRunnersCount int
}
type NomadRunnerManager struct {
apiClient nomad.ExecutorApi
jobs map[EnvironmentId]*NomadJob
usedRunners Pool
func (j *NomadJob) Id() string {
return j.environmentId.toString()
}
func (m *NomadRunnerManager) RegisterEnvironment(environmentId EnvironmentId, nomadJobId NomadJobId, desiredIdleRunnersCount int) {
m.jobs[environmentId] = &NomadJob{
m.jobs.Add(&NomadJob{
environmentId,
nomadJobId,
NewLocalRunnerPool(),
desiredIdleRunnersCount,
}
})
go m.refreshEnvironment(environmentId)
}
func (m *NomadRunnerManager) Use(environmentId EnvironmentId) (Runner, error) {
job, ok := m.jobs[environmentId]
func (m *NomadRunnerManager) Claim(environmentId EnvironmentId) (Runner, error) {
job, ok := m.jobs.Get(environmentId.toString())
if !ok {
return nil, ErrUnknownExecutionEnvironment
}
runner, ok := job.idleRunners.Sample()
runner, ok := job.(*NomadJob).idleRunners.Sample()
if !ok {
return nil, ErrNoRunnersAvailable
}
@ -77,7 +87,7 @@ func (m *NomadRunnerManager) Use(environmentId EnvironmentId) (Runner, error) {
return runner, nil
}
func (m *NomadRunnerManager) Get(runnerId string) (r Runner, err error) {
func (m *NomadRunnerManager) Get(runnerId string) (Runner, error) {
runner, ok := m.usedRunners.Get(runnerId)
if !ok {
return nil, ErrRunnerNotFound
@ -88,7 +98,7 @@ func (m *NomadRunnerManager) Get(runnerId string) (r Runner, err error) {
func (m *NomadRunnerManager) Return(r Runner) (err error) {
err = m.apiClient.DeleteRunner(r.Id())
if err != nil {
return err
return
}
m.usedRunners.Delete(r.Id())
return
@ -96,7 +106,12 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) {
// Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable...
func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) {
job := m.jobs[id]
jobEntity, ok := m.jobs.Get(id.toString())
if !ok {
// this environment does not exist
return
}
job := jobEntity.(*NomadJob)
lastJobScaling := -1
for {
runners, err := m.apiClient.LoadRunners(string(job.jobId))
@ -132,10 +147,15 @@ func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) {
func (m *NomadRunnerManager) unusedRunners(environmentId EnvironmentId, fetchedRunnerIds []string) (newRunners []Runner) {
newRunners = make([]Runner, 0)
jobEntity, ok := m.jobs.Get(environmentId.toString())
if !ok {
// the environment does not exist, so it won't have any unused runners
return
}
for _, runnerId := range fetchedRunnerIds {
_, ok := m.usedRunners.Get(runnerId)
if !ok {
_, ok = m.jobs[environmentId].idleRunners.Get(runnerId)
_, ok = jobEntity.(*NomadJob).idleRunners.Get(runnerId)
if !ok {
newRunners = append(newRunners, NewRunner(runnerId))
}

View File

@ -52,7 +52,7 @@ func (_m *ManagerMock) Return(r Runner) error {
}
// Use provides a mock function with given fields: id
func (_m *ManagerMock) Use(id EnvironmentId) (Runner, error) {
func (_m *ManagerMock) Claim(id EnvironmentId) (Runner, error) {
ret := _m.Called(id)
var r0 Runner

View File

@ -9,11 +9,14 @@ import (
"time"
)
const anotherRunnerId = "4n0th3r-runn3r-1d"
const defaultEnvironmentId = EnvironmentId(0)
const otherEnvironmentId = EnvironmentId(42)
const jobId = "4n0th3r-j0b-1d"
const waitTime = 100 * time.Millisecond
const (
anotherRunnerId = "4n0th3r-runn3r-1d"
defaultEnvironmentId = EnvironmentId(0)
otherEnvironmentId = EnvironmentId(42)
defaultDesiredRunnersCount = 5
jobId = "4n0th3r-j0b-1d"
waitTime = 100 * time.Millisecond
)
func TestGetNextRunnerTestSuite(t *testing.T) {
suite.Run(t, new(ManagerTestSuite))
@ -43,63 +46,71 @@ func (suite *ManagerTestSuite) mockRunnerQueries(returnedRunnerIds []string) {
}
func (suite *ManagerTestSuite) registerDefaultEnvironment() {
suite.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentId, jobId, 5)
suite.nomadRunnerManager.RegisterEnvironment(defaultEnvironmentId, jobId, defaultDesiredRunnersCount)
}
func (suite *ManagerTestSuite) AddIdleRunnerForDefaultEnvironment(r Runner) {
jobEntity, _ := suite.nomadRunnerManager.jobs.Get(defaultEnvironmentId.toString())
jobEntity.(*NomadJob).idleRunners.Add(r)
}
func (suite *ManagerTestSuite) waitForRunnerRefresh() {
time.Sleep(waitTime)
}
func (suite *ManagerTestSuite) TestRegisterEnvironmentAddsANewJob() {
suite.NotNil(suite.nomadRunnerManager.jobs[defaultEnvironmentId])
func (suite *ManagerTestSuite) TestRegisterEnvironmentAddsNewJob() {
suite.nomadRunnerManager.RegisterEnvironment(otherEnvironmentId, jobId, defaultDesiredRunnersCount)
jobEntity, ok := suite.nomadRunnerManager.jobs.Get(defaultEnvironmentId.toString())
suite.True(ok)
suite.NotNil(jobEntity)
}
func (suite *ManagerTestSuite) TestUseReturnsNotFoundErrorIfEnvironmentNotFound() {
runner, err := suite.nomadRunnerManager.Use(EnvironmentId(42))
func (suite *ManagerTestSuite) TestClaimReturnsNotFoundErrorIfEnvironmentNotFound() {
runner, err := suite.nomadRunnerManager.Claim(EnvironmentId(42))
suite.Nil(runner)
suite.Equal(ErrUnknownExecutionEnvironment, err)
}
func (suite *ManagerTestSuite) TestUseReturnsRunnerIfAvailable() {
suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner)
receivedRunner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId)
func (suite *ManagerTestSuite) TestClaimReturnsRunnerIfAvailable() {
suite.AddIdleRunnerForDefaultEnvironment(suite.exerciseRunner)
receivedRunner, err := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
suite.NoError(err)
suite.Equal(suite.exerciseRunner, receivedRunner)
}
func (suite *ManagerTestSuite) TestUseReturnsErrorIfNoRunnerAvailable() {
func (suite *ManagerTestSuite) TestClaimReturnsErrorIfNoRunnerAvailable() {
suite.waitForRunnerRefresh()
runner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId)
runner, err := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
suite.Nil(runner)
suite.Equal(ErrNoRunnersAvailable, err)
}
func (suite *ManagerTestSuite) TestUseReturnsNoRunnerOfDifferentEnvironment() {
suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner)
receivedRunner, err := suite.nomadRunnerManager.Use(otherEnvironmentId)
func (suite *ManagerTestSuite) TestClaimReturnsNoRunnerOfDifferentEnvironment() {
suite.AddIdleRunnerForDefaultEnvironment(suite.exerciseRunner)
receivedRunner, err := suite.nomadRunnerManager.Claim(otherEnvironmentId)
suite.Nil(receivedRunner)
suite.Error(err)
}
func (suite *ManagerTestSuite) TestUseDoesNotReturnTheSameRunnerTwice() {
suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(suite.exerciseRunner)
suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Add(NewRunner(anotherRunnerId))
func (suite *ManagerTestSuite) TestClaimDoesNotReturnTheSameRunnerTwice() {
suite.AddIdleRunnerForDefaultEnvironment(suite.exerciseRunner)
suite.AddIdleRunnerForDefaultEnvironment(NewRunner(anotherRunnerId))
firstReceivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId)
secondReceivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId)
firstReceivedRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
secondReceivedRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
suite.NotEqual(firstReceivedRunner, secondReceivedRunner)
}
func (suite *ManagerTestSuite) TestUseThrowsAnErrorIfNoRunnersAvailable() {
receivedRunner, err := suite.nomadRunnerManager.Use(defaultEnvironmentId)
func (suite *ManagerTestSuite) TestClaimThrowsAnErrorIfNoRunnersAvailable() {
receivedRunner, err := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
suite.Nil(receivedRunner)
suite.Error(err)
}
func (suite *ManagerTestSuite) TestUseAddsRunnerToUsedRunners() {
func (suite *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() {
suite.mockRunnerQueries([]string{RunnerId})
suite.waitForRunnerRefresh()
receivedRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId)
receivedRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
savedRunner, ok := suite.nomadRunnerManager.usedRunners.Get(receivedRunner.Id())
suite.True(ok)
suite.Equal(savedRunner, receivedRunner)
@ -134,7 +145,7 @@ func (suite *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() {
suite.apiMock.AssertCalled(suite.T(), "DeleteRunner", suite.exerciseRunner.Id())
}
func (suite *ManagerTestSuite) TestReturnThrowsErrorWhenApiCallFailed() {
func (suite *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() {
suite.apiMock.On("DeleteRunner", mock.AnythingOfType("string")).Return(errors.New("return failed"))
err := suite.nomadRunnerManager.Return(suite.exerciseRunner)
suite.Error(err)
@ -149,23 +160,24 @@ func (suite *ManagerTestSuite) TestRefreshFetchesRunners() {
func (suite *ManagerTestSuite) TestNewRunnersFoundInRefreshAreAddedToUnusedRunners() {
suite.mockRunnerQueries([]string{RunnerId})
suite.waitForRunnerRefresh()
availableRunner, _ := suite.nomadRunnerManager.Use(defaultEnvironmentId)
availableRunner, _ := suite.nomadRunnerManager.Claim(defaultEnvironmentId)
suite.Equal(availableRunner.Id(), RunnerId)
}
func (suite *ManagerTestSuite) TestRefreshScalesJob() {
suite.mockRunnerQueries([]string{RunnerId})
suite.waitForRunnerRefresh()
// use one runner
_, _ = suite.nomadRunnerManager.Use(defaultEnvironmentId)
// use one runner to necessitate rescaling
_, _ = suite.nomadRunnerManager.Claim(defaultEnvironmentId)
suite.waitForRunnerRefresh()
suite.apiMock.AssertCalled(suite.T(), "SetJobScale", jobId, 6, "Runner Requested")
suite.apiMock.AssertCalled(suite.T(), "SetJobScale", jobId, defaultDesiredRunnersCount+1, "Runner Requested")
}
func (suite *ManagerTestSuite) TestRefreshAddsRunnerToPool() {
suite.mockRunnerQueries([]string{RunnerId})
suite.waitForRunnerRefresh()
poolRunner, ok := suite.nomadRunnerManager.jobs[defaultEnvironmentId].idleRunners.Get(RunnerId)
jobEntity, _ := suite.nomadRunnerManager.jobs.Get(defaultEnvironmentId.toString())
poolRunner, ok := jobEntity.(*NomadJob).idleRunners.Get(RunnerId)
suite.True(ok)
suite.Equal(RunnerId, poolRunner.Id())
}

View File

@ -21,7 +21,7 @@ type localRunnerPool struct {
runners map[string]Runner
}
// NewLocalRunnerPool responds with a Pool implementation
// NewLocalRunnerPool responds with a Pool implementation.
// This implementation stores the data thread-safe in the local application memory
func NewLocalRunnerPool() *localRunnerPool {
return &localRunnerPool{

View File

@ -26,6 +26,8 @@ type Runner interface {
// AddExecution saves the supplied ExecutionRequest for the runner and returns an ExecutionId to retrieve it again.
AddExecution(dto.ExecutionRequest) (ExecutionId, error)
// Execution looks up an ExecutionId for the runner and returns the associated RunnerRequest.
// If this request does not exit, ok is false, else true.
Execution(ExecutionId) (executionRequest dto.ExecutionRequest, ok bool)
// DeleteExecution deletes the execution of the runner with the specified id.