From f2b25566dd7e263e289e0465f239e18b7109e70a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Sat, 6 Aug 2022 04:52:48 +0200 Subject: [PATCH] #136 Copy files back from Nomad runner. --- internal/api/runners.go | 22 ++++++++++++++ internal/api/runners_test.go | 34 +++++++++++++++++++++ internal/runner/aws_runner.go | 7 +++++ internal/runner/nomad_runner.go | 19 ++++++++++++ internal/runner/nomad_runner_test.go | 7 +++++ internal/runner/runner.go | 3 ++ internal/runner/runner_mock.go | 31 +++++++++++++++++++- pkg/dto/dto.go | 1 + pkg/nullio/nullio.go | 2 +- pkg/nullio/nullio_test.go | 44 +++++++++++++++++----------- tests/e2e/runners_test.go | 42 ++++++++++++++++++++++++-- 11 files changed, 191 insertions(+), 21 deletions(-) diff --git a/internal/api/runners.go b/internal/api/runners.go index 9e8a4c8..a619c3b 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -20,9 +20,11 @@ const ( ExecutePath = "/execute" WebsocketPath = "/websocket" UpdateFileSystemPath = "/files" + FileContentRawPath = UpdateFileSystemPath + "/raw" DeleteRoute = "deleteRunner" RunnerIDKey = "runnerId" ExecutionIDKey = "executionID" + PathKey = "path" ProvideRoute = "provideRunner" ) @@ -39,6 +41,7 @@ func (r *RunnerController) ConfigureRoutes(router *mux.Router) { r.runnerRouter.Use(r.findRunnerMiddleware) r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.updateFileSystem).Methods(http.MethodPatch). Name(UpdateFileSystemPath) + r.runnerRouter.HandleFunc(FileContentRawPath, r.fileContent).Methods(http.MethodGet).Name(FileContentRawPath) r.runnerRouter.HandleFunc(ExecutePath, r.execute).Methods(http.MethodPost).Name(ExecutePath) r.runnerRouter.HandleFunc(WebsocketPath, r.connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) r.runnerRouter.HandleFunc("", r.delete).Methods(http.MethodDelete).Name(DeleteRoute) @@ -92,6 +95,25 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request writer.WriteHeader(http.StatusNoContent) } +func (r *RunnerController) fileContent(writer http.ResponseWriter, request *http.Request) { + monitoring.AddRequestSize(request) + targetRunner, _ := runner.FromContext(request.Context()) + path := request.URL.Query().Get(PathKey) + + err := targetRunner.GetFileContent(path, writer, request.Context()) + if errors.Is(err, runner.ErrFileNotFound) { + writeNotFound(writer, err) + return + } else if err != nil { + log.WithError(err).Error("Could not retrieve the requested file.") + writeInternalServerError(writer, err, dto.ErrorUnknown) + return + } + + writer.Header().Set("Content-Type", "application/octet-stream") + writer.WriteHeader(http.StatusOK) +} + // execute handles the execute API route. // It takes an ExecutionRequest and stores it for a runner. // It returns a url to connect to for a websocket connection to this execution in the corresponding runner. diff --git a/internal/api/runners_test.go b/internal/api/runners_test.go index 005f82a..75d6c54 100644 --- a/internal/api/runners_test.go +++ b/internal/api/runners_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/gorilla/mux" + "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/tests" @@ -310,6 +311,39 @@ func (s *UpdateFileSystemRouteTestSuite) TestUpdateFileSystemReturnsInternalServ s.Equal(http.StatusInternalServerError, s.recorder.Code) } +func (s *UpdateFileSystemRouteTestSuite) TestFileContent() { + routeURL, err := s.router.Get(FileContentRawPath).URL(RunnerIDKey, tests.DefaultMockID) + s.Require().NoError(err) + mockCall := s.runnerMock. + On("GetFileContent", mock.AnythingOfType("string"), mock.Anything, mock.Anything) + + s.Run("Not Found", func() { + mockCall.Return(runner.ErrFileNotFound) + request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader("")) + s.Require().NoError(err) + s.router.ServeHTTP(s.recorder, request) + s.Equal(http.StatusNotFound, s.recorder.Code) + }) + + s.recorder = httptest.NewRecorder() + s.Run("Unknown Error", func() { + mockCall.Return(nomad.ErrorExecutorCommunicationFailed) + request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader("")) + s.Require().NoError(err) + s.router.ServeHTTP(s.recorder, request) + s.Equal(http.StatusInternalServerError, s.recorder.Code) + }) + + s.recorder = httptest.NewRecorder() + s.Run("No Error", func() { + mockCall.Return(nil) + request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader("")) + s.Require().NoError(err) + s.router.ServeHTTP(s.recorder, request) + s.Equal(http.StatusOK, s.recorder.Code) + }) +} + func TestDeleteRunnerRouteTestSuite(t *testing.T) { suite.Run(t, new(DeleteRunnerRouteTestSuite)) } diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index 888ac7a..703d2e7 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -115,6 +115,13 @@ func (w *AWSFunctionWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequ return nil } +// GetFileContent is currently not supported with this aws serverless function. +// This is because the function execution ends with the termination of the workload code. +// So an on-demand file streaming after the termination is not possible. Also, we do not want to copy all files. +func (w *AWSFunctionWorkload) GetFileContent(_ string, _ io.Writer, _ context.Context) error { + return dto.ErrNotSupported +} + func (w *AWSFunctionWorkload) Destroy() error { for _, cancel := range w.runningExecutions { cancel() diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index fdcc754..50d4caa 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -12,6 +12,7 @@ import ( "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/monitoring" + "github.com/openHPI/poseidon/pkg/nullio" "github.com/openHPI/poseidon/pkg/storage" "io" "strings" @@ -31,6 +32,7 @@ const ( var ( ErrorUnknownExecution = errors.New("unknown execution") ErrorFileCopyFailed = errors.New("file copy failed") + ErrFileNotFound = errors.New("file not found") ) // NomadJob is an abstraction to communicate with Nomad environments. @@ -149,6 +151,23 @@ func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) er return nil } +func (r *NomadJob) GetFileContent(path string, content io.Writer, ctx context.Context) error { + r.ResetTimeout() + + retrieveCommand := (&dto.ExecutionRequest{Command: fmt.Sprintf("cat %s", path)}).FullCommand() + // Improve: Instead of using io.Discard use a **fixed-sized** buffer. With that we could improve the error message. + exitCode, err := r.api.ExecuteCommand(r.id, ctx, retrieveCommand, false, &nullio.Reader{}, content, io.Discard) + + if err != nil { + return fmt.Errorf("%w: nomad error during retrieve file content copy: %v", + nomad.ErrorExecutorCommunicationFailed, err) + } + if exitCode != 0 { + return ErrFileNotFound + } + return nil +} + func (r *NomadJob) Destroy() error { if err := r.onDestroy(r); err != nil { return fmt.Errorf("error while destroying runner: %w", err) diff --git a/internal/runner/nomad_runner_test.go b/internal/runner/nomad_runner_test.go index a059989..5fa2d5f 100644 --- a/internal/runner/nomad_runner_test.go +++ b/internal/runner/nomad_runner_test.go @@ -399,3 +399,10 @@ func NewRunner(id string, manager Accessor) Runner { } return NewNomadJob(id, nil, nil, handler) } + +func (s *UpdateFileSystemTestSuite) TestGetFileContentReturnsErrorIfExitCodeIsNotZero() { + s.mockedExecuteCommandCall.RunFn = nil + s.mockedExecuteCommandCall.Return(1, nil) + err := s.runner.GetFileContent("", &bytes.Buffer{}, context.Background()) + s.ErrorIs(err, ErrFileNotFound) +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index b7a3d53..9752210 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -48,6 +48,9 @@ type Runner interface { // and then copying each given dto.File to the runner. UpdateFileSystem(request *dto.UpdateFileSystemRequest) error + // GetFileContent streams the file content at the requested path into the Writer provided at content. + GetFileContent(path string, content io.Writer, ctx context.Context) error + // Destroy destroys the Runner in Nomad. Destroy() error } diff --git a/internal/runner/runner_mock.go b/internal/runner/runner_mock.go index 5879ceb..5b9c8a8 100644 --- a/internal/runner/runner_mock.go +++ b/internal/runner/runner_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.10.4. DO NOT EDIT. +// Code generated by mockery v2.14.0. DO NOT EDIT. package runner @@ -92,6 +92,20 @@ func (_m *RunnerMock) ExecutionExists(id string) bool { return r0 } +// GetFileContent provides a mock function with given fields: path, content, ctx +func (_m *RunnerMock) GetFileContent(path string, content io.Writer, ctx context.Context) error { + ret := _m.Called(path, content, ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(string, io.Writer, context.Context) error); ok { + r0 = rf(path, content, ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *RunnerMock) ID() string { ret := _m.Called() @@ -169,3 +183,18 @@ func (_m *RunnerMock) UpdateFileSystem(request *dto.UpdateFileSystemRequest) err return r0 } + +type mockConstructorTestingTNewRunnerMock interface { + mock.TestingT + Cleanup(func()) +} + +// NewRunnerMock creates a new instance of RunnerMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewRunnerMock(t mockConstructorTestingTNewRunnerMock) *RunnerMock { + mock := &RunnerMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/dto/dto.go b/pkg/dto/dto.go index 9b17078..65bcbfa 100644 --- a/pkg/dto/dto.go +++ b/pkg/dto/dto.go @@ -159,6 +159,7 @@ var ( ErrMissingType = errors.New("type is missing") ErrMissingData = errors.New("data is missing") ErrInvalidType = errors.New("invalid type") + ErrNotSupported = errors.New("not supported") ) // WebSocketMessage is the type for all messages send in the WebSocket to the client. diff --git a/pkg/nullio/nullio.go b/pkg/nullio/nullio.go index 1c39c6a..ad170df 100644 --- a/pkg/nullio/nullio.go +++ b/pkg/nullio/nullio.go @@ -20,7 +20,7 @@ type Reader struct { } func (r Reader) Read(_ []byte) (int, error) { - if r.Ctx.Err() != nil { + if r.Ctx == nil || r.Ctx.Err() != nil { return 0, io.EOF } diff --git a/pkg/nullio/nullio_test.go b/pkg/nullio/nullio_test.go index e31166f..1b3b34b 100644 --- a/pkg/nullio/nullio_test.go +++ b/pkg/nullio/nullio_test.go @@ -10,28 +10,38 @@ import ( const shortTimeout = 100 * time.Millisecond -func TestReaderDoesNotReturnImmediately(t *testing.T) { - readingContext, cancel := context.WithCancel(context.Background()) - defer cancel() - - reader := &Reader{readingContext} - readerReturned := make(chan bool) - go func() { +func TestReader_Read(t *testing.T) { + read := func(reader io.Reader, ret chan<- bool) { p := make([]byte, 0, 5) _, err := reader.Read(p) assert.ErrorIs(t, io.EOF, err) - close(readerReturned) - }() - - var received bool - select { - case <-readerReturned: - received = true - case <-time.After(shortTimeout): - received = false + close(ret) } - assert.False(t, received) + t.Run("WithContext_DoesNotReturnImmediately", func(t *testing.T) { + readingContext, cancel := context.WithCancel(context.Background()) + defer cancel() + + readerReturned := make(chan bool) + go read(&Reader{readingContext}, readerReturned) + + select { + case <-readerReturned: + assert.Fail(t, "The reader returned before the timeout was reached") + case <-time.After(shortTimeout): + } + }) + + t.Run("WithoutContext_DoesReturnImmediately", func(t *testing.T) { + readerReturned := make(chan bool) + go read(&Reader{}, readerReturned) + + select { + case <-readerReturned: + case <-time.After(shortTimeout): + assert.Fail(t, "The reader returned before the timeout was reached") + } + }) } func TestReadWriterWritesEverything(t *testing.T) { diff --git a/tests/e2e/runners_test.go b/tests/e2e/runners_test.go index 8b235b8..c609f1a 100644 --- a/tests/e2e/runners_test.go +++ b/tests/e2e/runners_test.go @@ -9,7 +9,10 @@ import ( "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/tests" "github.com/openHPI/poseidon/tests/helpers" + "github.com/stretchr/testify/require" + "io" "net/http" + "net/url" "strings" "time" ) @@ -55,13 +58,13 @@ func (s *E2ETestSuite) TestProvideRunnerRoute() { // ProvideRunner creates a runner with the given RunnerRequest via an external request. // It needs a running Poseidon instance to work. func ProvideRunner(request *dto.RunnerRequest) (string, error) { - url := helpers.BuildURL(api.BasePath, api.RunnersPath) + runnerURL := helpers.BuildURL(api.BasePath, api.RunnersPath) runnerRequestByteString, err := json.Marshal(request) if err != nil { return "", err } reader := strings.NewReader(string(runnerRequestByteString)) - resp, err := http.Post(url, "application/json", reader) //nolint:gosec // url is not influenced by a user + resp, err := http.Post(runnerURL, "application/json", reader) //nolint:gosec // runnerURL is not influenced by a user if err != nil { return "", err } @@ -277,6 +280,41 @@ func (s *E2ETestSuite) TestCopyFilesRoute_PermissionDenied() { }) } +func (s *E2ETestSuite) TestGetFileContent_Nomad() { + runnerID, err := ProvideRunner(&dto.RunnerRequest{ + ExecutionEnvironmentID: tests.DefaultEnvironmentIDAsInteger, + }) + require.NoError(s.T(), err) + + s.Run("Not Found", func() { + getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.FileContentRawPath)) + s.Require().NoError(err) + getFileURL.RawQuery = fmt.Sprintf("%s=%s", api.PathKey, tests.DefaultFileName) + response, err := http.Get(getFileURL.String()) + s.Require().NoError(err) + s.Equal(http.StatusNotFound, response.StatusCode) + }) + + s.Run("Ok", func() { + newFileContent := []byte("New content") + resp, err := CopyFiles(runnerID, &dto.UpdateFileSystemRequest{ + Copy: []dto.File{{Path: tests.DefaultFileName, Content: newFileContent}}, + }) + s.Require().NoError(err) + s.Equal(http.StatusNoContent, resp.StatusCode) + + getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.FileContentRawPath)) + s.Require().NoError(err) + getFileURL.RawQuery = fmt.Sprintf("%s=%s", api.PathKey, tests.DefaultFileName) + response, err := http.Get(getFileURL.String()) + s.Require().NoError(err) + s.Equal(http.StatusOK, response.StatusCode) + content, err := io.ReadAll(response.Body) + s.Require().NoError(err) + s.Equal(newFileContent, content) + }) +} + func (s *E2ETestSuite) TestRunnerGetsDestroyedAfterInactivityTimeout() { for _, environmentID := range environmentIDs { s.Run(environmentID.ToString(), func() {