diff --git a/internal/api/runners.go b/internal/api/runners.go index a1ddcf4..c82bd3f 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -8,6 +8,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/config" "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" "net/http" "net/url" ) @@ -112,7 +113,7 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req writeInternalServerError(writer, err, dto.ErrorUnknown) return } - id := runner.ExecutionID(newUUID.String()) + id := execution.ID(newUUID.String()) targetRunner.Add(id, executionRequest) webSocketURL := url.URL{ Scheme: scheme, diff --git a/internal/api/runners_test.go b/internal/api/runners_test.go index 8ad2b7b..8cf761d 100644 --- a/internal/api/runners_test.go +++ b/internal/api/runners_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "net/http" "net/http/httptest" @@ -85,7 +86,7 @@ type RunnerRouteTestSuite struct { runnerManager *runner.ManagerMock router *mux.Router runner runner.Runner - executionID runner.ExecutionID + executionID execution.ID } func (s *RunnerRouteTestSuite) SetupTest() { @@ -200,7 +201,7 @@ func (s *RunnerRouteTestSuite) TestExecuteRoute() { webSocketURL, err := url.Parse(webSocketResponse.WebSocketURL) s.Require().NoError(err) executionID := webSocketURL.Query().Get(ExecutionIDKey) - storedExecutionRequest, ok := s.runner.Pop(runner.ExecutionID(executionID)) + storedExecutionRequest, ok := s.runner.Pop(execution.ID(executionID)) s.True(ok, "No execution request with this id: ", executionID) s.Equal(&executionRequest, storedExecutionRequest) diff --git a/internal/api/websocket.go b/internal/api/websocket.go index 9f201ba..db98595 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/websocket" "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" "io" "net/http" "sync" @@ -298,7 +299,7 @@ func (wp *webSocketProxy) writeMessage(messageType int, data []byte) error { // connectToRunner is the endpoint for websocket connections. func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) { targetRunner, _ := runner.FromContext(request.Context()) - executionID := runner.ExecutionID(request.URL.Query().Get(ExecutionIDKey)) + executionID := execution.ID(request.URL.Query().Get(ExecutionIDKey)) executionRequest, ok := targetRunner.Pop(executionID) if !ok { writeNotFound(writer, ErrUnknownExecutionID) diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 7111d39..4ec024e 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -15,6 +15,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/runner" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests/helpers" "io" @@ -33,7 +34,7 @@ func TestWebSocketTestSuite(t *testing.T) { type WebSocketTestSuite struct { suite.Suite router *mux.Router - executionID runner.ExecutionID + executionID execution.ID runner runner.Runner apiMock *nomad.ExecutorAPIMock server *httptest.Server @@ -124,7 +125,7 @@ func (s *WebSocketTestSuite) TestWebsocketConnection() { } func (s *WebSocketTestSuite) TestCancelWebSocketConnection() { - executionID := runner.ExecutionID("sleeping-execution") + executionID := execution.ID("sleeping-execution") s.runner.Add(executionID, &executionRequestSleep) canceled := mockAPIExecuteSleep(s.apiMock) @@ -155,7 +156,7 @@ func (s *WebSocketTestSuite) TestCancelWebSocketConnection() { } func (s *WebSocketTestSuite) TestWebSocketConnectionTimeout() { - executionID := runner.ExecutionID("time-out-execution") + executionID := execution.ID("time-out-execution") limitExecution := executionRequestSleep limitExecution.TimeLimit = 2 s.runner.Add(executionID, &limitExecution) @@ -189,7 +190,7 @@ func (s *WebSocketTestSuite) TestWebSocketConnectionTimeout() { } func (s *WebSocketTestSuite) TestWebsocketStdoutAndStderr() { - executionID := runner.ExecutionID("ls-execution") + executionID := execution.ID("ls-execution") s.runner.Add(executionID, &executionRequestLs) mockAPIExecuteLs(s.apiMock) @@ -209,7 +210,7 @@ func (s *WebSocketTestSuite) TestWebsocketStdoutAndStderr() { } func (s *WebSocketTestSuite) TestWebsocketError() { - executionID := runner.ExecutionID("error-execution") + executionID := execution.ID("error-execution") s.runner.Add(executionID, &executionRequestError) mockAPIExecuteError(s.apiMock) @@ -228,7 +229,7 @@ func (s *WebSocketTestSuite) TestWebsocketError() { } func (s *WebSocketTestSuite) TestWebsocketNonZeroExit() { - executionID := runner.ExecutionID("exit-execution") + executionID := execution.ID("exit-execution") s.runner.Add(executionID, &executionRequestExitNonZero) mockAPIExecuteExitNonZero(s.apiMock) @@ -250,7 +251,7 @@ func TestWebsocketTLS(t *testing.T) { runnerID := "runner-id" r, apiMock := newNomadAllocationWithMockedAPIClient(runnerID) - executionID := runner.ExecutionID("execution-id") + executionID := execution.ID("execution-id") r.Add(executionID, &executionRequestLs) mockAPIExecuteLs(apiMock) @@ -379,7 +380,7 @@ func newNomadAllocationWithMockedAPIClient(runnerID string) (runner.Runner, *nom } func webSocketURL(scheme string, server *httptest.Server, router *mux.Router, - runnerID string, executionID runner.ExecutionID, + runnerID string, executionID execution.ID, ) (*url.URL, error) { websocketURL, err := url.Parse(server.URL) if err != nil { @@ -395,7 +396,7 @@ func webSocketURL(scheme string, server *httptest.Server, router *mux.Router, return websocketURL, nil } -func (s *WebSocketTestSuite) webSocketURL(scheme, runnerID string, executionID runner.ExecutionID) (*url.URL, error) { +func (s *WebSocketTestSuite) webSocketURL(scheme, runnerID string, executionID execution.ID) (*url.URL, error) { return webSocketURL(scheme, s.server, s.router, runnerID, executionID) } diff --git a/internal/runner/execution_storage.go b/internal/runner/execution_storage.go deleted file mode 100644 index 6d548ab..0000000 --- a/internal/runner/execution_storage.go +++ /dev/null @@ -1,46 +0,0 @@ -package runner - -import ( - "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" - "sync" -) - -// ExecutionStorage stores executions. -type ExecutionStorage interface { - // Add adds a runner to the storage. - // It overwrites the existing execution if an execution with the same id already exists. - Add(id ExecutionID, executionRequest *dto.ExecutionRequest) - - // Pop deletes the execution with the given id from the storage and returns it. - // If no such execution exists, ok is false and true otherwise. - Pop(id ExecutionID) (request *dto.ExecutionRequest, ok bool) -} - -// localExecutionStorage stores execution objects in the local application memory. -// ToDo: Create implementation that use some persistent storage like a database. -type localExecutionStorage struct { - sync.RWMutex - executions map[ExecutionID]*dto.ExecutionRequest -} - -// NewLocalExecutionStorage responds with an ExecutionStorage implementation. -// This implementation stores the data thread-safe in the local application memory. -func NewLocalExecutionStorage() *localExecutionStorage { - return &localExecutionStorage{ - executions: make(map[ExecutionID]*dto.ExecutionRequest), - } -} - -func (s *localExecutionStorage) Add(id ExecutionID, executionRequest *dto.ExecutionRequest) { - s.Lock() - defer s.Unlock() - s.executions[id] = executionRequest -} - -func (s *localExecutionStorage) Pop(id ExecutionID) (*dto.ExecutionRequest, bool) { - s.Lock() - defer s.Unlock() - request, ok := s.executions[id] - delete(s.executions, id) - return request, ok -} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index b4072a9..2eb5b5c 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -10,6 +10,7 @@ import ( nomadApi "github.com/hashicorp/nomad/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" "io" "strings" "time" @@ -18,9 +19,6 @@ import ( // ContextKey is the type for keys in a request context. type ContextKey string -// ExecutionID is an id for an execution in a Runner. -type ExecutionID string - const ( // runnerContextKey is the key used to store runners in context.Context. runnerContextKey ContextKey = "runner" @@ -39,7 +37,7 @@ type Runner interface { // MappedPorts returns the mapped ports of the runner. MappedPorts() []*dto.MappedPort - ExecutionStorage + execution.Storage InactivityTimer // ExecuteInteractively runs the given execution request and forwards from and to the given reader and writers. @@ -62,7 +60,7 @@ type Runner interface { // NomadJob is an abstraction to communicate with Nomad environments. type NomadJob struct { - ExecutionStorage + execution.Storage InactivityTimer id string portMappings []nomadApi.PortMapping @@ -75,11 +73,11 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping, apiClient nomad.ExecutorAPI, manager Manager, ) *NomadJob { job := &NomadJob{ - id: id, - portMappings: portMappings, - api: apiClient, - ExecutionStorage: NewLocalExecutionStorage(), - manager: manager, + id: id, + portMappings: portMappings, + api: apiClient, + Storage: execution.NewLocalStorage(), + manager: manager, } job.InactivityTimer = NewInactivityTimer(job, manager) return job diff --git a/internal/runner/runner_mock.go b/internal/runner/runner_mock.go index 6ba68d0..a558189 100644 --- a/internal/runner/runner_mock.go +++ b/internal/runner/runner_mock.go @@ -4,6 +4,7 @@ package runner import ( context "context" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" io "io" dto "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" @@ -19,7 +20,7 @@ type RunnerMock struct { } // Add provides a mock function with given fields: id, executionRequest -func (_m *RunnerMock) Add(id ExecutionID, executionRequest *dto.ExecutionRequest) { +func (_m *RunnerMock) Add(id execution.ID, executionRequest *dto.ExecutionRequest) { _m.Called(id, executionRequest) } @@ -93,11 +94,11 @@ func (_m *RunnerMock) MappedPorts() []*dto.MappedPort { } // Pop provides a mock function with given fields: id -func (_m *RunnerMock) Pop(id ExecutionID) (*dto.ExecutionRequest, bool) { +func (_m *RunnerMock) Pop(id execution.ID) (*dto.ExecutionRequest, bool) { ret := _m.Called(id) var r0 *dto.ExecutionRequest - if rf, ok := ret.Get(0).(func(ExecutionID) *dto.ExecutionRequest); ok { + if rf, ok := ret.Get(0).(func(execution.ID) *dto.ExecutionRequest); ok { r0 = rf(id) } else { if ret.Get(0) != nil { @@ -106,7 +107,7 @@ func (_m *RunnerMock) Pop(id ExecutionID) (*dto.ExecutionRequest, bool) { } var r1 bool - if rf, ok := ret.Get(1).(func(ExecutionID) bool); ok { + if rf, ok := ret.Get(1).(func(execution.ID) bool); ok { r1 = rf(id) } else { r1 = ret.Get(1).(bool) diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go index 071d457..e1a007b 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/runner_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/suite" "gitlab.hpi.de/codeocean/codemoon/poseidon/internal/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/execution" "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/nullio" "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "io" @@ -48,7 +49,7 @@ func TestExecutionRequestIsStored(t *testing.T) { TimeLimit: 10, Environment: nil, } - id := ExecutionID("test-execution") + id := execution.ID("test-execution") runner.Add(id, executionRequest) storedExecutionRunner, ok := runner.Pop(id) @@ -118,11 +119,11 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() { s.manager.On("Return", mock.Anything).Return(nil) s.runner = &NomadJob{ - ExecutionStorage: NewLocalExecutionStorage(), - InactivityTimer: s.timer, - id: tests.DefaultRunnerID, - api: s.apiMock, - manager: s.manager, + Storage: execution.NewLocalStorage(), + InactivityTimer: s.timer, + id: tests.DefaultRunnerID, + api: s.apiMock, + manager: s.manager, } } @@ -141,8 +142,8 @@ func (s *ExecuteInteractivelyTestSuite) TestReturnsAfterTimeout() { }).Return(0, nil) timeLimit := 1 - execution := &dto.ExecutionRequest{TimeLimit: timeLimit} - exit, _ := s.runner.ExecuteInteractively(execution, &nullio.ReadWriter{}, nil, nil) + executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit} + exit, _ := s.runner.ExecuteInteractively(executionRequest, &nullio.ReadWriter{}, nil, nil) select { case <-exit: @@ -170,8 +171,8 @@ func (s *ExecuteInteractivelyTestSuite) TestSendsSignalAfterTimeout() { close(quit) }).Return(0, nil) timeLimit := 1 - execution := &dto.ExecutionRequest{TimeLimit: timeLimit} - _, _ = s.runner.ExecuteInteractively(execution, bytes.NewBuffer(make([]byte, 1)), nil, nil) + executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit} + _, _ = s.runner.ExecuteInteractively(executionRequest, bytes.NewBuffer(make([]byte, 1)), nil, nil) select { case <-time.After(2 * (time.Duration(timeLimit) * time.Second)): s.FailNow("The execution should receive a SIGQUIT after the timeout") @@ -184,22 +185,22 @@ func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal( select {} }) timeLimit := 1 - execution := &dto.ExecutionRequest{TimeLimit: timeLimit} - _, _ = s.runner.ExecuteInteractively(execution, bytes.NewBuffer(make([]byte, 1)), nil, nil) + executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit} + _, _ = s.runner.ExecuteInteractively(executionRequest, bytes.NewBuffer(make([]byte, 1)), nil, nil) <-time.After(executionTimeoutGracePeriod + time.Duration(timeLimit)*time.Second + tests.ShortTimeout) s.manager.AssertCalled(s.T(), "Return", s.runner) } func (s *ExecuteInteractivelyTestSuite) TestResetTimerGetsCalled() { - execution := &dto.ExecutionRequest{} - s.runner.ExecuteInteractively(execution, nil, nil, nil) + executionRequest := &dto.ExecutionRequest{} + s.runner.ExecuteInteractively(executionRequest, nil, nil, nil) s.timer.AssertCalled(s.T(), "ResetTimeout") } func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut() { s.mockedTimeoutPassedCall.Return(true) - execution := &dto.ExecutionRequest{} - exitChannel, _ := s.runner.ExecuteInteractively(execution, &nullio.ReadWriter{}, nil, nil) + executionRequest := &dto.ExecutionRequest{} + exitChannel, _ := s.runner.ExecuteInteractively(executionRequest, &nullio.ReadWriter{}, nil, nil) exit := <-exitChannel s.Equal(ErrorRunnerInactivityTimeout, exit.Err) } @@ -224,10 +225,10 @@ func (s *UpdateFileSystemTestSuite) SetupTest() { s.timer.On("ResetTimeout").Return() s.timer.On("TimeoutPassed").Return(false) s.runner = &NomadJob{ - ExecutionStorage: NewLocalExecutionStorage(), - InactivityTimer: s.timer, - id: tests.DefaultRunnerID, - api: s.apiMock, + Storage: execution.NewLocalStorage(), + InactivityTimer: s.timer, + id: tests.DefaultRunnerID, + api: s.apiMock, } s.mockedExecuteCommandCall = s.apiMock.On("ExecuteCommand", tests.DefaultRunnerID, mock.Anything, mock.Anything, false, mock.Anything, mock.Anything, mock.Anything). diff --git a/pkg/execution/execution.go b/pkg/execution/execution.go new file mode 100644 index 0000000..33a1315 --- /dev/null +++ b/pkg/execution/execution.go @@ -0,0 +1,19 @@ +package execution + +import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" +) + +// ID is an identifier for an execution. +type ID string + +// Storage stores executions. +type Storage interface { + // Add adds a runner to the storage. + // It overwrites the existing execution if an execution with the same id already exists. + Add(id ID, executionRequest *dto.ExecutionRequest) + + // Pop deletes the execution with the given id from the storage and returns it. + // If no such execution exists, ok is false and true otherwise. + Pop(id ID) (request *dto.ExecutionRequest, ok bool) +} diff --git a/pkg/execution/local_storage.go b/pkg/execution/local_storage.go new file mode 100644 index 0000000..df2a7b1 --- /dev/null +++ b/pkg/execution/local_storage.go @@ -0,0 +1,35 @@ +package execution + +import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "sync" +) + +// localStorage stores execution objects in the local application memory. +// ToDo: Create implementation that use some persistent storage like a database. +type localStorage struct { + sync.RWMutex + executions map[ID]*dto.ExecutionRequest +} + +// NewLocalStorage responds with an Storage implementation. +// This implementation stores the data thread-safe in the local application memory. +func NewLocalStorage() *localStorage { + return &localStorage{ + executions: make(map[ID]*dto.ExecutionRequest), + } +} + +func (s *localStorage) Add(id ID, executionRequest *dto.ExecutionRequest) { + s.Lock() + defer s.Unlock() + s.executions[id] = executionRequest +} + +func (s *localStorage) Pop(id ID) (*dto.ExecutionRequest, bool) { + s.Lock() + defer s.Unlock() + request, ok := s.executions[id] + delete(s.executions, id) + return request, ok +} diff --git a/pkg/execution/local_storage_test.go b/pkg/execution/local_storage_test.go new file mode 100644 index 0000000..7929d22 --- /dev/null +++ b/pkg/execution/local_storage_test.go @@ -0,0 +1,70 @@ +package execution + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.hpi.de/codeocean/codemoon/poseidon/pkg/dto" + "testing" +) + +var ( + testExecutionRequest = dto.ExecutionRequest{ + Command: "echo 'Hello Poseidon'", + TimeLimit: 42, + Environment: map[string]string{ + "PATH": "/bin", + }, + } + anotherTestExecutionRequest = dto.ExecutionRequest{ + Command: "echo 'Bye Poseidon'", + TimeLimit: 1337, + Environment: nil, + } + testID = ID("test") +) + +func TestNewLocalExecutionStorage(t *testing.T) { + storage := NewLocalStorage() + assert.Zero(t, len(storage.executions)) +} + +func TestLocalStorage(t *testing.T) { + storage := NewLocalStorage() + + t.Run("cannot pop when id does not exist", func(t *testing.T) { + request, ok := storage.Pop(testID) + assert.False(t, ok) + assert.Nil(t, request) + }) + + t.Run("adds execution request", func(t *testing.T) { + storage.Add(testID, &testExecutionRequest) + + request, ok := storage.executions[testID] + assert.Equal(t, len(storage.executions), 1) + assert.True(t, ok) + assert.Equal(t, testExecutionRequest, *request) + }) + + t.Run("overwrites execution request", func(t *testing.T) { + storage.Add(testID, &anotherTestExecutionRequest) + + request, ok := storage.executions[testID] + assert.Equal(t, len(storage.executions), 1) + assert.True(t, ok) + require.NotNil(t, request) + assert.Equal(t, anotherTestExecutionRequest, *request) + }) + + t.Run("removes execution request", func(t *testing.T) { + request, ok := storage.Pop(testID) + + assert.True(t, ok) + require.NotNil(t, request) + assert.Equal(t, anotherTestExecutionRequest, *request) + + request, ok = storage.executions[testID] + assert.Nil(t, request) + assert.False(t, ok) + }) +}