From 152b77afe524ab869dcb7f80d1e30e14b272877c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Wed, 17 Aug 2022 22:40:31 +0200 Subject: [PATCH] Add listing of runners file system. --- api/swagger.yaml | 13 +++- internal/api/runners.go | 46 +++++++++--- internal/api/runners_test.go | 61 ++++++++++++++++ internal/runner/aws_runner.go | 7 ++ internal/runner/nomad_runner.go | 21 ++++++ internal/runner/runner.go | 5 ++ internal/runner/runner_mock.go | 14 ++++ pkg/dto/dto.go | 13 ++++ pkg/nullio/ls2json.go | 126 ++++++++++++++++++++++++++++++++ pkg/nullio/ls2json_test.go | 74 +++++++++++++++++++ tests/e2e/runners_test.go | 41 +++++++++++ 11 files changed, 408 insertions(+), 13 deletions(-) create mode 100644 pkg/nullio/ls2json.go create mode 100644 pkg/nullio/ls2json_test.go diff --git a/api/swagger.yaml b/api/swagger.yaml index 273def3..f1e9552 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -51,13 +51,20 @@ components: - networkAccess - exposedPorts additionalProperties: false - File: + FileHeader: type: object properties: name: description: The path of the file. type: string example: ./logs/last.log + objectType: + description: The type of the object (file). + type: string + minLength: 1 + maxLength: 1 + enum: ["d", "l", "-"] + default: "-" size: description: The size of the file in bytes. type: integer @@ -310,11 +317,11 @@ paths: schema: type: object properties: - Files: + files: description: A list of all Files type: array items: - $ref: "#/components/schemas/File" + $ref: "#/components/schemas/FileHeader" "401": $ref: "#/components/responses/Unauthorized" patch: diff --git a/internal/api/runners.go b/internal/api/runners.go index 3f5387e..d1b954b 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -17,15 +17,17 @@ import ( ) const ( - ExecutePath = "/execute" - WebsocketPath = "/websocket" - UpdateFileSystemPath = "/files" - FileContentRawPath = UpdateFileSystemPath + "/raw" - DeleteRoute = "deleteRunner" - RunnerIDKey = "runnerId" - ExecutionIDKey = "executionID" - PathKey = "path" - ProvideRoute = "provideRunner" + ExecutePath = "/execute" + WebsocketPath = "/websocket" + UpdateFileSystemPath = "/files" + ListFileSystemRouteName = UpdateFileSystemPath + "_list" + FileContentRawPath = UpdateFileSystemPath + "/raw" + ProvideRoute = "provideRunner" + DeleteRoute = "deleteRunner" + RunnerIDKey = "runnerId" + ExecutionIDKey = "executionID" + PathKey = "path" + RecursiveKey = "recursive" ) type RunnerController struct { @@ -39,6 +41,8 @@ func (r *RunnerController) ConfigureRoutes(router *mux.Router) { runnersRouter.HandleFunc("", r.provide).Methods(http.MethodPost).Name(ProvideRoute) r.runnerRouter = runnersRouter.PathPrefix(fmt.Sprintf("/{%s}", RunnerIDKey)).Subrouter() r.runnerRouter.Use(r.findRunnerMiddleware) + r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.listFileSystem).Methods(http.MethodGet). + Name(ListFileSystemRouteName) r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.updateFileSystem).Methods(http.MethodPatch). Name(UpdateFileSystemPath) r.runnerRouter.HandleFunc(FileContentRawPath, r.fileContent).Methods(http.MethodGet).Name(FileContentRawPath) @@ -75,6 +79,29 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK) } +// listFileSystem handles the files API route with the method GET. +// It returns a listing of the file system of the provided runner. +func (r *RunnerController) listFileSystem(writer http.ResponseWriter, request *http.Request) { + targetRunner, _ := runner.FromContext(request.Context()) + monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment()) + + recursiveRaw := request.URL.Query().Get(RecursiveKey) + recursive, err := strconv.ParseBool(recursiveRaw) + recursive = err != nil || recursive + + path := request.URL.Query().Get(PathKey) + if path == "" { + path = "./" + } + + writer.Header().Set("Content-Type", "application/json") + if err := targetRunner.ListFileSystem(path, recursive, writer, request.Context()); err != nil { + log.WithError(err).Error("Could not perform the requested listFileSystem.") + writeInternalServerError(writer, err, dto.ErrorUnknown) + return + } +} + // updateFileSystem handles the files API route. // It takes an dto.UpdateFileSystemRequest and sends it to the runner for processing. func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request *http.Request) { @@ -96,7 +123,6 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request } func (r *RunnerController) fileContent(writer http.ResponseWriter, request *http.Request) { - monitoring.AddRequestSize(request) targetRunner, _ := runner.FromContext(request.Context()) path := request.URL.Query().Get(PathKey) diff --git a/internal/api/runners_test.go b/internal/api/runners_test.go index 75d6c54..9aec028 100644 --- a/internal/api/runners_test.go +++ b/internal/api/runners_test.go @@ -14,6 +14,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "strconv" "strings" "testing" ) @@ -311,6 +312,66 @@ func (s *UpdateFileSystemRouteTestSuite) TestUpdateFileSystemReturnsInternalServ s.Equal(http.StatusInternalServerError, s.recorder.Code) } +func (s *UpdateFileSystemRouteTestSuite) TestListFileSystem() { + routeURL, err := s.router.Get(UpdateFileSystemPath).URL(RunnerIDKey, tests.DefaultMockID) + s.Require().NoError(err) + mockCall := s.runnerMock.On("ListFileSystem", + mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.Anything, mock.Anything) + + s.Run("default parameters", func() { + mockCall.Run(func(args mock.Arguments) { + path, ok := args.Get(0).(string) + s.True(ok) + s.Equal("./", path) + recursive, ok := args.Get(1).(bool) + s.True(ok) + s.True(recursive) + mockCall.ReturnArguments = mock.Arguments{nil} + }) + request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader("")) + s.Require().NoError(err) + s.router.ServeHTTP(s.recorder, request) + s.Equal(http.StatusOK, s.recorder.Code) + }) + + s.recorder = httptest.NewRecorder() + s.Run("passed parameters", func() { + expectedPath := "/flag" + + mockCall.Run(func(args mock.Arguments) { + path, ok := args.Get(0).(string) + s.True(ok) + s.Equal(expectedPath, path) + recursive, ok := args.Get(1).(bool) + s.True(ok) + s.False(recursive) + mockCall.ReturnArguments = mock.Arguments{nil} + }) + + query := routeURL.Query() + query.Set(PathKey, expectedPath) + query.Set(RecursiveKey, strconv.FormatBool(false)) + routeURL.RawQuery = query.Encode() + + request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader("")) + s.Require().NoError(err) + s.router.ServeHTTP(s.recorder, request) + s.Equal(http.StatusOK, s.recorder.Code) + }) + + s.recorder = httptest.NewRecorder() + s.Run("Internal Server Error on failure", func() { + mockCall.Run(func(args mock.Arguments) { + mockCall.ReturnArguments = mock.Arguments{runner.ErrRunnerNotFound} + }) + + request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader("")) + s.Require().NoError(err) + s.router.ServeHTTP(s.recorder, request) + s.Equal(http.StatusInternalServerError, s.recorder.Code) + }) +} + func (s *UpdateFileSystemRouteTestSuite) TestFileContent() { routeURL, err := s.router.Get(FileContentRawPath).URL(RunnerIDKey, tests.DefaultMockID) s.Require().NoError(err) diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index 703d2e7..e62aa8a 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -101,6 +101,13 @@ func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, s return exit, cancel, nil } +// ListFileSystem is currently not supported with this aws serverless function. +// This is because the function execution ends with the termination of the workload code. +// So an on-demand file system listing after the termination is not possible. Also, we do not want to copy all files. +func (w *AWSFunctionWorkload) ListFileSystem(_ string, _ bool, _ io.Writer, _ context.Context) error { + return dto.ErrNotSupported +} + // UpdateFileSystem copies Files into the executor. // Current limitation: No files can be deleted apart from the previously added files. // Future Work: Deduplication of the file systems, as the largest workload is likely to be used by additional diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 50d4caa..aae1407 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -118,6 +118,27 @@ func (r *NomadJob) ExecuteInteractively( return exit, cancel, nil } +func (r *NomadJob) ListFileSystem(path string, recursive bool, content io.Writer, ctx context.Context) error { + r.ResetTimeout() + command := "ls -l --time-style=+%s -1 --literal" + if recursive { + command += " --recursive" + } + + ls2json := &nullio.Ls2JsonWriter{Target: content} + defer ls2json.Close() + retrieveCommand := (&dto.ExecutionRequest{Command: fmt.Sprintf("%s %s", command, path)}).FullCommand() + exitCode, err := r.api.ExecuteCommand(r.id, ctx, retrieveCommand, false, &nullio.Reader{}, ls2json, io.Discard) + if err != nil { + return fmt.Errorf("%w: nomad error during retrieve file headers: %v", + nomad.ErrorExecutorCommunicationFailed, err) + } + if exitCode != 0 { + return ErrFileNotFound + } + return nil +} + func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { r.ResetTimeout() diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 9752210..14750f5 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -44,11 +44,16 @@ type Runner interface { stderr io.Writer, ) (exit <-chan ExitInfo, cancel context.CancelFunc, err error) + // ListFileSystem streams the listing of the file system of the requested directory into the Writer provided. + // The result is streamed via the io.Writer in order to not overload the memory with user input. + ListFileSystem(path string, recursive bool, result io.Writer, ctx context.Context) error + // UpdateFileSystem processes a dto.UpdateFileSystemRequest by first deleting each given dto.FilePath recursively // and then copying each given dto.File to the runner. UpdateFileSystem(request *dto.UpdateFileSystemRequest) error // GetFileContent streams the file content at the requested path into the Writer provided at content. + // The result is streamed via the io.Writer in order to not overload the memory with user input. GetFileContent(path string, content io.Writer, ctx context.Context) error // Destroy destroys the Runner in Nomad. diff --git a/internal/runner/runner_mock.go b/internal/runner/runner_mock.go index 5b9c8a8..e912c84 100644 --- a/internal/runner/runner_mock.go +++ b/internal/runner/runner_mock.go @@ -120,6 +120,20 @@ func (_m *RunnerMock) ID() string { return r0 } +// ListFileSystem provides a mock function with given fields: path, recursive, result, ctx +func (_m *RunnerMock) ListFileSystem(path string, recursive bool, result io.Writer, ctx context.Context) error { + ret := _m.Called(path, recursive, result, ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(string, bool, io.Writer, context.Context) error); ok { + r0 = rf(path, recursive, result, ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // MappedPorts provides a mock function with given fields: func (_m *RunnerMock) MappedPorts() []*dto.MappedPort { ret := _m.Called() diff --git a/pkg/dto/dto.go b/pkg/dto/dto.go index 65bcbfa..8ad29dd 100644 --- a/pkg/dto/dto.go +++ b/pkg/dto/dto.go @@ -100,6 +100,11 @@ type ExecutionResponse struct { WebSocketURL string `json:"websocketUrl"` } +// ListFileSystemResponse is the expected response when listing the file system. +type ListFileSystemResponse struct { + Files []FileHeader `json:"files"` +} + // UpdateFileSystemRequest is the expected json structure of the request body for the update file system route. type UpdateFileSystemRequest struct { Delete []FilePath `json:"delete"` @@ -109,6 +114,14 @@ type UpdateFileSystemRequest struct { // FilePath specifies the path of a file and is part of the UpdateFileSystemRequest. type FilePath string +// FileHeader specifies the information provided for listing a File. +type FileHeader struct { + Name FilePath `json:"name"` + ObjectType string `json:"objectType"` + Size int `json:"size"` + ModificationTime int `json:"modificationTime"` +} + // File is a DTO for transmitting file contents. It is part of the UpdateFileSystemRequest. type File struct { Path FilePath `json:"path"` diff --git a/pkg/nullio/ls2json.go b/pkg/nullio/ls2json.go new file mode 100644 index 0000000..5d31700 --- /dev/null +++ b/pkg/nullio/ls2json.go @@ -0,0 +1,126 @@ +package nullio + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/logging" + "io" + "regexp" + "strconv" +) + +var ( + log = logging.GetLogger("nullio") + pathLineRegex = regexp.MustCompile(`(.*):$`) + headerLineRegex = regexp.MustCompile(`([dl-])[-rwxXsS]{9} \d* .*? .*? +(\d+) (\d+) (.*)$`) +) + +// Ls2JsonWriter implements io.Writer. +// It streams the passed data to the Target and transforms the data into the json format. +type Ls2JsonWriter struct { + Target io.Writer + jsonStartSend bool + setCommaPrefix bool + remaining []byte + latestPath []byte +} + +func (w *Ls2JsonWriter) Write(p []byte) (int, error) { + i, err := w.initializeJSONObject() + if err != nil { + return i, err + } + + start := 0 + for i, char := range p { + if char != '\n' { + continue + } + + line := p[start:i] + if len(w.remaining) > 0 { + line = append(w.remaining, line...) + w.remaining = []byte("") + } + + if len(line) != 0 { + count, err := w.writeLine(line) + if err != nil { + log.WithError(err).Warn("Could not write line to Target") + return count, err + } + } + start = i + 1 + } + + if start < len(p) { + w.remaining = p[start:] + } + + return len(p), nil +} + +func (w *Ls2JsonWriter) initializeJSONObject() (count int, err error) { + if !w.jsonStartSend { + count, err = w.Target.Write([]byte("{\"files\": [")) + if count == 0 || err != nil { + log.WithError(err).Warn("Could not write to target") + err = fmt.Errorf("could not write to target: %w", err) + } else { + w.jsonStartSend = true + } + } + return count, err +} + +func (w *Ls2JsonWriter) Close() { + count, err := w.Target.Write([]byte("]}")) + if count == 0 || err != nil { + log.WithError(err).Warn("Could not Close ls2json writer") + } +} + +func (w *Ls2JsonWriter) writeLine(line []byte) (count int, err error) { + matches := pathLineRegex.FindSubmatch(line) + if matches != nil { + w.latestPath = append(bytes.TrimSuffix(matches[1], []byte("/")), '/') + return 0, nil + } + + matches = headerLineRegex.FindSubmatch(line) + if matches != nil { + size, err1 := strconv.Atoi(string(matches[2])) + timestamp, err2 := strconv.Atoi(string(matches[3])) + if err1 != nil || err2 != nil { + return 0, fmt.Errorf("could not parse file details: %w %+v", err1, err2) + } + + response, err1 := json.Marshal(dto.FileHeader{ + Name: dto.FilePath(append(w.latestPath, matches[4]...)), + ObjectType: string(matches[1][0]), + Size: size, + ModificationTime: timestamp, + }) + if err1 != nil { + return 0, fmt.Errorf("could not marshal file header: %w", err) + } + + // Skip the first leading comma + if w.setCommaPrefix { + response = append([]byte{','}, response...) + } else { + w.setCommaPrefix = true + } + + count, err1 = w.Target.Write(response) + if err1 != nil { + err = fmt.Errorf("could not write to target: %w", err) + } else if count == len(response) { + count = len(line) + } + } + + return count, err +} diff --git a/pkg/nullio/ls2json_test.go b/pkg/nullio/ls2json_test.go new file mode 100644 index 0000000..8287e8c --- /dev/null +++ b/pkg/nullio/ls2json_test.go @@ -0,0 +1,74 @@ +package nullio + +import ( + "bytes" + "github.com/stretchr/testify/suite" + "testing" +) + +func TestLs2JsonTestSuite(t *testing.T) { + suite.Run(t, new(Ls2JsonTestSuite)) +} + +type Ls2JsonTestSuite struct { + suite.Suite + buf *bytes.Buffer + writer *Ls2JsonWriter +} + +func (s *Ls2JsonTestSuite) SetupTest() { + s.buf = &bytes.Buffer{} + s.writer = &Ls2JsonWriter{Target: s.buf} +} + +func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteCreationAndClose() { + count, err := s.writer.Write([]byte("")) + s.Zero(count) + s.NoError(err) + + s.Equal("{\"files\": [", s.buf.String()) + + s.writer.Close() + s.Equal("{\"files\": []}", s.buf.String()) +} + +func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteFile() { + input := "total 0\n-rw-rw-r-- 1 kali kali 0 1660763446 flag\n" + count, err := s.writer.Write([]byte(input)) + s.Equal(len(input), count) + s.NoError(err) + s.writer.Close() + + s.Equal("{\"files\": [{\"name\":\"flag\",\"objectType\":\"-\",\"size\":0,\"modificationTime\":1660763446}]}", + s.buf.String()) +} + +func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteRecursive() { + input := ".:\ntotal 4\ndrwxrwxr-x 2 kali kali 4096 1660764411 dir\n" + + "-rw-rw-r-- 1 kali kali 0 1660763446 flag\n" + + "\n./dir:\ntotal 4\n-rw-rw-r-- 1 kali kali 3 1660764366 another.txt\n" + count, err := s.writer.Write([]byte(input)) + s.Equal(len(input), count) + s.NoError(err) + s.writer.Close() + + s.Equal("{\"files\": [{\"name\":\"./dir\",\"objectType\":\"d\",\"size\":4096,\"modificationTime\":1660764411},"+ + "{\"name\":\"./flag\",\"objectType\":\"-\",\"size\":0,\"modificationTime\":1660763446},"+ + "{\"name\":\"./dir/another.txt\",\"objectType\":\"-\",\"size\":3,\"modificationTime\":1660764366}]}", + s.buf.String()) +} + +func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteRemaining() { + input1 := "total 4\n-rw-rw-r-- 1 kali kali 3 1660764366 another.txt\n-rw-rw-r-- 1 kal" + _, err := s.writer.Write([]byte(input1)) + s.NoError(err) + s.Equal("{\"files\": [{\"name\":\"another.txt\",\"objectType\":\"-\",\"size\":3,\"modificationTime\":1660764366}", + s.buf.String()) + + input2 := "i kali 0 1660763446 flag\n" + _, err = s.writer.Write([]byte(input2)) + s.NoError(err) + s.writer.Close() + s.Equal("{\"files\": [{\"name\":\"another.txt\",\"objectType\":\"-\",\"size\":3,\"modificationTime\":1660764366},"+ + "{\"name\":\"flag\",\"objectType\":\"-\",\"size\":0,\"modificationTime\":1660763446}]}", s.buf.String()) +} diff --git a/tests/e2e/runners_test.go b/tests/e2e/runners_test.go index c609f1a..9dde5e4 100644 --- a/tests/e2e/runners_test.go +++ b/tests/e2e/runners_test.go @@ -119,6 +119,47 @@ func (s *E2ETestSuite) TestDeleteRunnerRoute() { } } +func (s *E2ETestSuite) TestListFileSystem_Nomad() { + runnerID, err := ProvideRunner(&dto.RunnerRequest{ + ExecutionEnvironmentID: tests.DefaultEnvironmentIDAsInteger, + }) + require.NoError(s.T(), err) + + s.Run("No files", func() { + getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.UpdateFileSystemPath)) + s.Require().NoError(err) + response, err := http.Get(getFileURL.String()) + s.Require().NoError(err) + s.Equal(http.StatusOK, response.StatusCode) + data, err := io.ReadAll(response.Body) + s.NoError(err) + s.Equal("{\"files\": []}", string(data)) + }) + + s.Run("With file", func() { + resp, err := CopyFiles(runnerID, &dto.UpdateFileSystemRequest{ + Copy: []dto.File{{Path: tests.DefaultFileName, Content: []byte{}}}, + }) + s.Require().NoError(err) + s.Equal(http.StatusNoContent, resp.StatusCode) + + getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.UpdateFileSystemPath)) + s.Require().NoError(err) + response, err := http.Get(getFileURL.String()) + s.Require().NoError(err) + s.Equal(http.StatusOK, response.StatusCode) + + listFilesResponse := new(dto.ListFileSystemResponse) + err = json.NewDecoder(response.Body).Decode(listFilesResponse) + s.Require().NoError(err) + s.Require().Equal(len(listFilesResponse.Files), 1) + fileHeader := listFilesResponse.Files[0] + s.Equal(dto.FilePath("./"+tests.DefaultFileName), fileHeader.Name) + s.Equal("-", fileHeader.ObjectType) + s.Equal(0, fileHeader.Size) + }) +} + //nolint:funlen // there are a lot of tests for the files route, this function can be a little longer than 100 lines ;) func (s *E2ETestSuite) TestCopyFilesRoute() { for _, environmentID := range environmentIDs {