Add listing of runners file system.

This commit is contained in:
Maximilian Paß
2022-08-17 22:40:31 +02:00
parent c7ee7c1e25
commit 152b77afe5
11 changed files with 408 additions and 13 deletions

View File

@ -51,13 +51,20 @@ components:
- networkAccess - networkAccess
- exposedPorts - exposedPorts
additionalProperties: false additionalProperties: false
File: FileHeader:
type: object type: object
properties: properties:
name: name:
description: The path of the file. description: The path of the file.
type: string type: string
example: ./logs/last.log example: ./logs/last.log
objectType:
description: The type of the object (file).
type: string
minLength: 1
maxLength: 1
enum: ["d", "l", "-"]
default: "-"
size: size:
description: The size of the file in bytes. description: The size of the file in bytes.
type: integer type: integer
@ -310,11 +317,11 @@ paths:
schema: schema:
type: object type: object
properties: properties:
Files: files:
description: A list of all Files description: A list of all Files
type: array type: array
items: items:
$ref: "#/components/schemas/File" $ref: "#/components/schemas/FileHeader"
"401": "401":
$ref: "#/components/responses/Unauthorized" $ref: "#/components/responses/Unauthorized"
patch: patch:

View File

@ -20,12 +20,14 @@ const (
ExecutePath = "/execute" ExecutePath = "/execute"
WebsocketPath = "/websocket" WebsocketPath = "/websocket"
UpdateFileSystemPath = "/files" UpdateFileSystemPath = "/files"
ListFileSystemRouteName = UpdateFileSystemPath + "_list"
FileContentRawPath = UpdateFileSystemPath + "/raw" FileContentRawPath = UpdateFileSystemPath + "/raw"
ProvideRoute = "provideRunner"
DeleteRoute = "deleteRunner" DeleteRoute = "deleteRunner"
RunnerIDKey = "runnerId" RunnerIDKey = "runnerId"
ExecutionIDKey = "executionID" ExecutionIDKey = "executionID"
PathKey = "path" PathKey = "path"
ProvideRoute = "provideRunner" RecursiveKey = "recursive"
) )
type RunnerController struct { type RunnerController struct {
@ -39,6 +41,8 @@ func (r *RunnerController) ConfigureRoutes(router *mux.Router) {
runnersRouter.HandleFunc("", r.provide).Methods(http.MethodPost).Name(ProvideRoute) runnersRouter.HandleFunc("", r.provide).Methods(http.MethodPost).Name(ProvideRoute)
r.runnerRouter = runnersRouter.PathPrefix(fmt.Sprintf("/{%s}", RunnerIDKey)).Subrouter() r.runnerRouter = runnersRouter.PathPrefix(fmt.Sprintf("/{%s}", RunnerIDKey)).Subrouter()
r.runnerRouter.Use(r.findRunnerMiddleware) r.runnerRouter.Use(r.findRunnerMiddleware)
r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.listFileSystem).Methods(http.MethodGet).
Name(ListFileSystemRouteName)
r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.updateFileSystem).Methods(http.MethodPatch). r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.updateFileSystem).Methods(http.MethodPatch).
Name(UpdateFileSystemPath) Name(UpdateFileSystemPath)
r.runnerRouter.HandleFunc(FileContentRawPath, r.fileContent).Methods(http.MethodGet).Name(FileContentRawPath) r.runnerRouter.HandleFunc(FileContentRawPath, r.fileContent).Methods(http.MethodGet).Name(FileContentRawPath)
@ -75,6 +79,29 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK) sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK)
} }
// listFileSystem handles the files API route with the method GET.
// It returns a listing of the file system of the provided runner.
func (r *RunnerController) listFileSystem(writer http.ResponseWriter, request *http.Request) {
targetRunner, _ := runner.FromContext(request.Context())
monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment())
recursiveRaw := request.URL.Query().Get(RecursiveKey)
recursive, err := strconv.ParseBool(recursiveRaw)
recursive = err != nil || recursive
path := request.URL.Query().Get(PathKey)
if path == "" {
path = "./"
}
writer.Header().Set("Content-Type", "application/json")
if err := targetRunner.ListFileSystem(path, recursive, writer, request.Context()); err != nil {
log.WithError(err).Error("Could not perform the requested listFileSystem.")
writeInternalServerError(writer, err, dto.ErrorUnknown)
return
}
}
// updateFileSystem handles the files API route. // updateFileSystem handles the files API route.
// It takes an dto.UpdateFileSystemRequest and sends it to the runner for processing. // It takes an dto.UpdateFileSystemRequest and sends it to the runner for processing.
func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request *http.Request) { func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request *http.Request) {
@ -96,7 +123,6 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request
} }
func (r *RunnerController) fileContent(writer http.ResponseWriter, request *http.Request) { func (r *RunnerController) fileContent(writer http.ResponseWriter, request *http.Request) {
monitoring.AddRequestSize(request)
targetRunner, _ := runner.FromContext(request.Context()) targetRunner, _ := runner.FromContext(request.Context())
path := request.URL.Query().Get(PathKey) path := request.URL.Query().Get(PathKey)

View File

@ -14,6 +14,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"strconv"
"strings" "strings"
"testing" "testing"
) )
@ -311,6 +312,66 @@ func (s *UpdateFileSystemRouteTestSuite) TestUpdateFileSystemReturnsInternalServ
s.Equal(http.StatusInternalServerError, s.recorder.Code) s.Equal(http.StatusInternalServerError, s.recorder.Code)
} }
func (s *UpdateFileSystemRouteTestSuite) TestListFileSystem() {
routeURL, err := s.router.Get(UpdateFileSystemPath).URL(RunnerIDKey, tests.DefaultMockID)
s.Require().NoError(err)
mockCall := s.runnerMock.On("ListFileSystem",
mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.Anything, mock.Anything)
s.Run("default parameters", func() {
mockCall.Run(func(args mock.Arguments) {
path, ok := args.Get(0).(string)
s.True(ok)
s.Equal("./", path)
recursive, ok := args.Get(1).(bool)
s.True(ok)
s.True(recursive)
mockCall.ReturnArguments = mock.Arguments{nil}
})
request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader(""))
s.Require().NoError(err)
s.router.ServeHTTP(s.recorder, request)
s.Equal(http.StatusOK, s.recorder.Code)
})
s.recorder = httptest.NewRecorder()
s.Run("passed parameters", func() {
expectedPath := "/flag"
mockCall.Run(func(args mock.Arguments) {
path, ok := args.Get(0).(string)
s.True(ok)
s.Equal(expectedPath, path)
recursive, ok := args.Get(1).(bool)
s.True(ok)
s.False(recursive)
mockCall.ReturnArguments = mock.Arguments{nil}
})
query := routeURL.Query()
query.Set(PathKey, expectedPath)
query.Set(RecursiveKey, strconv.FormatBool(false))
routeURL.RawQuery = query.Encode()
request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader(""))
s.Require().NoError(err)
s.router.ServeHTTP(s.recorder, request)
s.Equal(http.StatusOK, s.recorder.Code)
})
s.recorder = httptest.NewRecorder()
s.Run("Internal Server Error on failure", func() {
mockCall.Run(func(args mock.Arguments) {
mockCall.ReturnArguments = mock.Arguments{runner.ErrRunnerNotFound}
})
request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader(""))
s.Require().NoError(err)
s.router.ServeHTTP(s.recorder, request)
s.Equal(http.StatusInternalServerError, s.recorder.Code)
})
}
func (s *UpdateFileSystemRouteTestSuite) TestFileContent() { func (s *UpdateFileSystemRouteTestSuite) TestFileContent() {
routeURL, err := s.router.Get(FileContentRawPath).URL(RunnerIDKey, tests.DefaultMockID) routeURL, err := s.router.Get(FileContentRawPath).URL(RunnerIDKey, tests.DefaultMockID)
s.Require().NoError(err) s.Require().NoError(err)

View File

@ -101,6 +101,13 @@ func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, s
return exit, cancel, nil return exit, cancel, nil
} }
// ListFileSystem is currently not supported with this aws serverless function.
// This is because the function execution ends with the termination of the workload code.
// So an on-demand file system listing after the termination is not possible. Also, we do not want to copy all files.
func (w *AWSFunctionWorkload) ListFileSystem(_ string, _ bool, _ io.Writer, _ context.Context) error {
return dto.ErrNotSupported
}
// UpdateFileSystem copies Files into the executor. // UpdateFileSystem copies Files into the executor.
// Current limitation: No files can be deleted apart from the previously added files. // Current limitation: No files can be deleted apart from the previously added files.
// Future Work: Deduplication of the file systems, as the largest workload is likely to be used by additional // Future Work: Deduplication of the file systems, as the largest workload is likely to be used by additional

View File

@ -118,6 +118,27 @@ func (r *NomadJob) ExecuteInteractively(
return exit, cancel, nil return exit, cancel, nil
} }
func (r *NomadJob) ListFileSystem(path string, recursive bool, content io.Writer, ctx context.Context) error {
r.ResetTimeout()
command := "ls -l --time-style=+%s -1 --literal"
if recursive {
command += " --recursive"
}
ls2json := &nullio.Ls2JsonWriter{Target: content}
defer ls2json.Close()
retrieveCommand := (&dto.ExecutionRequest{Command: fmt.Sprintf("%s %s", command, path)}).FullCommand()
exitCode, err := r.api.ExecuteCommand(r.id, ctx, retrieveCommand, false, &nullio.Reader{}, ls2json, io.Discard)
if err != nil {
return fmt.Errorf("%w: nomad error during retrieve file headers: %v",
nomad.ErrorExecutorCommunicationFailed, err)
}
if exitCode != 0 {
return ErrFileNotFound
}
return nil
}
func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error {
r.ResetTimeout() r.ResetTimeout()

View File

@ -44,11 +44,16 @@ type Runner interface {
stderr io.Writer, stderr io.Writer,
) (exit <-chan ExitInfo, cancel context.CancelFunc, err error) ) (exit <-chan ExitInfo, cancel context.CancelFunc, err error)
// ListFileSystem streams the listing of the file system of the requested directory into the Writer provided.
// The result is streamed via the io.Writer in order to not overload the memory with user input.
ListFileSystem(path string, recursive bool, result io.Writer, ctx context.Context) error
// UpdateFileSystem processes a dto.UpdateFileSystemRequest by first deleting each given dto.FilePath recursively // UpdateFileSystem processes a dto.UpdateFileSystemRequest by first deleting each given dto.FilePath recursively
// and then copying each given dto.File to the runner. // and then copying each given dto.File to the runner.
UpdateFileSystem(request *dto.UpdateFileSystemRequest) error UpdateFileSystem(request *dto.UpdateFileSystemRequest) error
// GetFileContent streams the file content at the requested path into the Writer provided at content. // GetFileContent streams the file content at the requested path into the Writer provided at content.
// The result is streamed via the io.Writer in order to not overload the memory with user input.
GetFileContent(path string, content io.Writer, ctx context.Context) error GetFileContent(path string, content io.Writer, ctx context.Context) error
// Destroy destroys the Runner in Nomad. // Destroy destroys the Runner in Nomad.

View File

@ -120,6 +120,20 @@ func (_m *RunnerMock) ID() string {
return r0 return r0
} }
// ListFileSystem provides a mock function with given fields: path, recursive, result, ctx
func (_m *RunnerMock) ListFileSystem(path string, recursive bool, result io.Writer, ctx context.Context) error {
ret := _m.Called(path, recursive, result, ctx)
var r0 error
if rf, ok := ret.Get(0).(func(string, bool, io.Writer, context.Context) error); ok {
r0 = rf(path, recursive, result, ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// MappedPorts provides a mock function with given fields: // MappedPorts provides a mock function with given fields:
func (_m *RunnerMock) MappedPorts() []*dto.MappedPort { func (_m *RunnerMock) MappedPorts() []*dto.MappedPort {
ret := _m.Called() ret := _m.Called()

View File

@ -100,6 +100,11 @@ type ExecutionResponse struct {
WebSocketURL string `json:"websocketUrl"` WebSocketURL string `json:"websocketUrl"`
} }
// ListFileSystemResponse is the expected response when listing the file system.
type ListFileSystemResponse struct {
Files []FileHeader `json:"files"`
}
// UpdateFileSystemRequest is the expected json structure of the request body for the update file system route. // UpdateFileSystemRequest is the expected json structure of the request body for the update file system route.
type UpdateFileSystemRequest struct { type UpdateFileSystemRequest struct {
Delete []FilePath `json:"delete"` Delete []FilePath `json:"delete"`
@ -109,6 +114,14 @@ type UpdateFileSystemRequest struct {
// FilePath specifies the path of a file and is part of the UpdateFileSystemRequest. // FilePath specifies the path of a file and is part of the UpdateFileSystemRequest.
type FilePath string type FilePath string
// FileHeader specifies the information provided for listing a File.
type FileHeader struct {
Name FilePath `json:"name"`
ObjectType string `json:"objectType"`
Size int `json:"size"`
ModificationTime int `json:"modificationTime"`
}
// File is a DTO for transmitting file contents. It is part of the UpdateFileSystemRequest. // File is a DTO for transmitting file contents. It is part of the UpdateFileSystemRequest.
type File struct { type File struct {
Path FilePath `json:"path"` Path FilePath `json:"path"`

126
pkg/nullio/ls2json.go Normal file
View File

@ -0,0 +1,126 @@
package nullio
import (
"bytes"
"encoding/json"
"fmt"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"io"
"regexp"
"strconv"
)
var (
log = logging.GetLogger("nullio")
pathLineRegex = regexp.MustCompile(`(.*):$`)
headerLineRegex = regexp.MustCompile(`([dl-])[-rwxXsS]{9} \d* .*? .*? +(\d+) (\d+) (.*)$`)
)
// Ls2JsonWriter implements io.Writer.
// It streams the passed data to the Target and transforms the data into the json format.
type Ls2JsonWriter struct {
Target io.Writer
jsonStartSend bool
setCommaPrefix bool
remaining []byte
latestPath []byte
}
func (w *Ls2JsonWriter) Write(p []byte) (int, error) {
i, err := w.initializeJSONObject()
if err != nil {
return i, err
}
start := 0
for i, char := range p {
if char != '\n' {
continue
}
line := p[start:i]
if len(w.remaining) > 0 {
line = append(w.remaining, line...)
w.remaining = []byte("")
}
if len(line) != 0 {
count, err := w.writeLine(line)
if err != nil {
log.WithError(err).Warn("Could not write line to Target")
return count, err
}
}
start = i + 1
}
if start < len(p) {
w.remaining = p[start:]
}
return len(p), nil
}
func (w *Ls2JsonWriter) initializeJSONObject() (count int, err error) {
if !w.jsonStartSend {
count, err = w.Target.Write([]byte("{\"files\": ["))
if count == 0 || err != nil {
log.WithError(err).Warn("Could not write to target")
err = fmt.Errorf("could not write to target: %w", err)
} else {
w.jsonStartSend = true
}
}
return count, err
}
func (w *Ls2JsonWriter) Close() {
count, err := w.Target.Write([]byte("]}"))
if count == 0 || err != nil {
log.WithError(err).Warn("Could not Close ls2json writer")
}
}
func (w *Ls2JsonWriter) writeLine(line []byte) (count int, err error) {
matches := pathLineRegex.FindSubmatch(line)
if matches != nil {
w.latestPath = append(bytes.TrimSuffix(matches[1], []byte("/")), '/')
return 0, nil
}
matches = headerLineRegex.FindSubmatch(line)
if matches != nil {
size, err1 := strconv.Atoi(string(matches[2]))
timestamp, err2 := strconv.Atoi(string(matches[3]))
if err1 != nil || err2 != nil {
return 0, fmt.Errorf("could not parse file details: %w %+v", err1, err2)
}
response, err1 := json.Marshal(dto.FileHeader{
Name: dto.FilePath(append(w.latestPath, matches[4]...)),
ObjectType: string(matches[1][0]),
Size: size,
ModificationTime: timestamp,
})
if err1 != nil {
return 0, fmt.Errorf("could not marshal file header: %w", err)
}
// Skip the first leading comma
if w.setCommaPrefix {
response = append([]byte{','}, response...)
} else {
w.setCommaPrefix = true
}
count, err1 = w.Target.Write(response)
if err1 != nil {
err = fmt.Errorf("could not write to target: %w", err)
} else if count == len(response) {
count = len(line)
}
}
return count, err
}

View File

@ -0,0 +1,74 @@
package nullio
import (
"bytes"
"github.com/stretchr/testify/suite"
"testing"
)
func TestLs2JsonTestSuite(t *testing.T) {
suite.Run(t, new(Ls2JsonTestSuite))
}
type Ls2JsonTestSuite struct {
suite.Suite
buf *bytes.Buffer
writer *Ls2JsonWriter
}
func (s *Ls2JsonTestSuite) SetupTest() {
s.buf = &bytes.Buffer{}
s.writer = &Ls2JsonWriter{Target: s.buf}
}
func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteCreationAndClose() {
count, err := s.writer.Write([]byte(""))
s.Zero(count)
s.NoError(err)
s.Equal("{\"files\": [", s.buf.String())
s.writer.Close()
s.Equal("{\"files\": []}", s.buf.String())
}
func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteFile() {
input := "total 0\n-rw-rw-r-- 1 kali kali 0 1660763446 flag\n"
count, err := s.writer.Write([]byte(input))
s.Equal(len(input), count)
s.NoError(err)
s.writer.Close()
s.Equal("{\"files\": [{\"name\":\"flag\",\"objectType\":\"-\",\"size\":0,\"modificationTime\":1660763446}]}",
s.buf.String())
}
func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteRecursive() {
input := ".:\ntotal 4\ndrwxrwxr-x 2 kali kali 4096 1660764411 dir\n" +
"-rw-rw-r-- 1 kali kali 0 1660763446 flag\n" +
"\n./dir:\ntotal 4\n-rw-rw-r-- 1 kali kali 3 1660764366 another.txt\n"
count, err := s.writer.Write([]byte(input))
s.Equal(len(input), count)
s.NoError(err)
s.writer.Close()
s.Equal("{\"files\": [{\"name\":\"./dir\",\"objectType\":\"d\",\"size\":4096,\"modificationTime\":1660764411},"+
"{\"name\":\"./flag\",\"objectType\":\"-\",\"size\":0,\"modificationTime\":1660763446},"+
"{\"name\":\"./dir/another.txt\",\"objectType\":\"-\",\"size\":3,\"modificationTime\":1660764366}]}",
s.buf.String())
}
func (s *Ls2JsonTestSuite) TestLs2JsonWriter_WriteRemaining() {
input1 := "total 4\n-rw-rw-r-- 1 kali kali 3 1660764366 another.txt\n-rw-rw-r-- 1 kal"
_, err := s.writer.Write([]byte(input1))
s.NoError(err)
s.Equal("{\"files\": [{\"name\":\"another.txt\",\"objectType\":\"-\",\"size\":3,\"modificationTime\":1660764366}",
s.buf.String())
input2 := "i kali 0 1660763446 flag\n"
_, err = s.writer.Write([]byte(input2))
s.NoError(err)
s.writer.Close()
s.Equal("{\"files\": [{\"name\":\"another.txt\",\"objectType\":\"-\",\"size\":3,\"modificationTime\":1660764366},"+
"{\"name\":\"flag\",\"objectType\":\"-\",\"size\":0,\"modificationTime\":1660763446}]}", s.buf.String())
}

View File

@ -119,6 +119,47 @@ func (s *E2ETestSuite) TestDeleteRunnerRoute() {
} }
} }
func (s *E2ETestSuite) TestListFileSystem_Nomad() {
runnerID, err := ProvideRunner(&dto.RunnerRequest{
ExecutionEnvironmentID: tests.DefaultEnvironmentIDAsInteger,
})
require.NoError(s.T(), err)
s.Run("No files", func() {
getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.UpdateFileSystemPath))
s.Require().NoError(err)
response, err := http.Get(getFileURL.String())
s.Require().NoError(err)
s.Equal(http.StatusOK, response.StatusCode)
data, err := io.ReadAll(response.Body)
s.NoError(err)
s.Equal("{\"files\": []}", string(data))
})
s.Run("With file", func() {
resp, err := CopyFiles(runnerID, &dto.UpdateFileSystemRequest{
Copy: []dto.File{{Path: tests.DefaultFileName, Content: []byte{}}},
})
s.Require().NoError(err)
s.Equal(http.StatusNoContent, resp.StatusCode)
getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.UpdateFileSystemPath))
s.Require().NoError(err)
response, err := http.Get(getFileURL.String())
s.Require().NoError(err)
s.Equal(http.StatusOK, response.StatusCode)
listFilesResponse := new(dto.ListFileSystemResponse)
err = json.NewDecoder(response.Body).Decode(listFilesResponse)
s.Require().NoError(err)
s.Require().Equal(len(listFilesResponse.Files), 1)
fileHeader := listFilesResponse.Files[0]
s.Equal(dto.FilePath("./"+tests.DefaultFileName), fileHeader.Name)
s.Equal("-", fileHeader.ObjectType)
s.Equal(0, fileHeader.Size)
})
}
//nolint:funlen // there are a lot of tests for the files route, this function can be a little longer than 100 lines ;) //nolint:funlen // there are a lot of tests for the files route, this function can be a little longer than 100 lines ;)
func (s *E2ETestSuite) TestCopyFilesRoute() { func (s *E2ETestSuite) TestCopyFilesRoute() {
for _, environmentID := range environmentIDs { for _, environmentID := range environmentIDs {