#136 Copy files back from Nomad runner.

This commit is contained in:
Maximilian Paß
2022-08-06 04:52:48 +02:00
parent ae7b3ccd58
commit f2b25566dd
11 changed files with 191 additions and 21 deletions

View File

@ -20,9 +20,11 @@ const (
ExecutePath = "/execute" ExecutePath = "/execute"
WebsocketPath = "/websocket" WebsocketPath = "/websocket"
UpdateFileSystemPath = "/files" UpdateFileSystemPath = "/files"
FileContentRawPath = UpdateFileSystemPath + "/raw"
DeleteRoute = "deleteRunner" DeleteRoute = "deleteRunner"
RunnerIDKey = "runnerId" RunnerIDKey = "runnerId"
ExecutionIDKey = "executionID" ExecutionIDKey = "executionID"
PathKey = "path"
ProvideRoute = "provideRunner" ProvideRoute = "provideRunner"
) )
@ -39,6 +41,7 @@ func (r *RunnerController) ConfigureRoutes(router *mux.Router) {
r.runnerRouter.Use(r.findRunnerMiddleware) r.runnerRouter.Use(r.findRunnerMiddleware)
r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.updateFileSystem).Methods(http.MethodPatch). r.runnerRouter.HandleFunc(UpdateFileSystemPath, r.updateFileSystem).Methods(http.MethodPatch).
Name(UpdateFileSystemPath) Name(UpdateFileSystemPath)
r.runnerRouter.HandleFunc(FileContentRawPath, r.fileContent).Methods(http.MethodGet).Name(FileContentRawPath)
r.runnerRouter.HandleFunc(ExecutePath, r.execute).Methods(http.MethodPost).Name(ExecutePath) r.runnerRouter.HandleFunc(ExecutePath, r.execute).Methods(http.MethodPost).Name(ExecutePath)
r.runnerRouter.HandleFunc(WebsocketPath, r.connectToRunner).Methods(http.MethodGet).Name(WebsocketPath) r.runnerRouter.HandleFunc(WebsocketPath, r.connectToRunner).Methods(http.MethodGet).Name(WebsocketPath)
r.runnerRouter.HandleFunc("", r.delete).Methods(http.MethodDelete).Name(DeleteRoute) r.runnerRouter.HandleFunc("", r.delete).Methods(http.MethodDelete).Name(DeleteRoute)
@ -92,6 +95,25 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request
writer.WriteHeader(http.StatusNoContent) writer.WriteHeader(http.StatusNoContent)
} }
func (r *RunnerController) fileContent(writer http.ResponseWriter, request *http.Request) {
monitoring.AddRequestSize(request)
targetRunner, _ := runner.FromContext(request.Context())
path := request.URL.Query().Get(PathKey)
err := targetRunner.GetFileContent(path, writer, request.Context())
if errors.Is(err, runner.ErrFileNotFound) {
writeNotFound(writer, err)
return
} else if err != nil {
log.WithError(err).Error("Could not retrieve the requested file.")
writeInternalServerError(writer, err, dto.ErrorUnknown)
return
}
writer.Header().Set("Content-Type", "application/octet-stream")
writer.WriteHeader(http.StatusOK)
}
// execute handles the execute API route. // execute handles the execute API route.
// It takes an ExecutionRequest and stores it for a runner. // It takes an ExecutionRequest and stores it for a runner.
// It returns a url to connect to for a websocket connection to this execution in the corresponding runner. // It returns a url to connect to for a websocket connection to this execution in the corresponding runner.

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/tests" "github.com/openHPI/poseidon/tests"
@ -310,6 +311,39 @@ func (s *UpdateFileSystemRouteTestSuite) TestUpdateFileSystemReturnsInternalServ
s.Equal(http.StatusInternalServerError, s.recorder.Code) s.Equal(http.StatusInternalServerError, s.recorder.Code)
} }
func (s *UpdateFileSystemRouteTestSuite) TestFileContent() {
routeURL, err := s.router.Get(FileContentRawPath).URL(RunnerIDKey, tests.DefaultMockID)
s.Require().NoError(err)
mockCall := s.runnerMock.
On("GetFileContent", mock.AnythingOfType("string"), mock.Anything, mock.Anything)
s.Run("Not Found", func() {
mockCall.Return(runner.ErrFileNotFound)
request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader(""))
s.Require().NoError(err)
s.router.ServeHTTP(s.recorder, request)
s.Equal(http.StatusNotFound, s.recorder.Code)
})
s.recorder = httptest.NewRecorder()
s.Run("Unknown Error", func() {
mockCall.Return(nomad.ErrorExecutorCommunicationFailed)
request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader(""))
s.Require().NoError(err)
s.router.ServeHTTP(s.recorder, request)
s.Equal(http.StatusInternalServerError, s.recorder.Code)
})
s.recorder = httptest.NewRecorder()
s.Run("No Error", func() {
mockCall.Return(nil)
request, err := http.NewRequest(http.MethodGet, routeURL.String(), strings.NewReader(""))
s.Require().NoError(err)
s.router.ServeHTTP(s.recorder, request)
s.Equal(http.StatusOK, s.recorder.Code)
})
}
func TestDeleteRunnerRouteTestSuite(t *testing.T) { func TestDeleteRunnerRouteTestSuite(t *testing.T) {
suite.Run(t, new(DeleteRunnerRouteTestSuite)) suite.Run(t, new(DeleteRunnerRouteTestSuite))
} }

View File

@ -115,6 +115,13 @@ func (w *AWSFunctionWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequ
return nil return nil
} }
// GetFileContent is currently not supported with this aws serverless function.
// This is because the function execution ends with the termination of the workload code.
// So an on-demand file streaming after the termination is not possible. Also, we do not want to copy all files.
func (w *AWSFunctionWorkload) GetFileContent(_ string, _ io.Writer, _ context.Context) error {
return dto.ErrNotSupported
}
func (w *AWSFunctionWorkload) Destroy() error { func (w *AWSFunctionWorkload) Destroy() error {
for _, cancel := range w.runningExecutions { for _, cancel := range w.runningExecutions {
cancel() cancel()

View File

@ -12,6 +12,7 @@ import (
"github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/nullio"
"github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/storage"
"io" "io"
"strings" "strings"
@ -31,6 +32,7 @@ const (
var ( var (
ErrorUnknownExecution = errors.New("unknown execution") ErrorUnknownExecution = errors.New("unknown execution")
ErrorFileCopyFailed = errors.New("file copy failed") ErrorFileCopyFailed = errors.New("file copy failed")
ErrFileNotFound = errors.New("file not found")
) )
// NomadJob is an abstraction to communicate with Nomad environments. // NomadJob is an abstraction to communicate with Nomad environments.
@ -149,6 +151,23 @@ func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) er
return nil return nil
} }
func (r *NomadJob) GetFileContent(path string, content io.Writer, ctx context.Context) error {
r.ResetTimeout()
retrieveCommand := (&dto.ExecutionRequest{Command: fmt.Sprintf("cat %s", path)}).FullCommand()
// Improve: Instead of using io.Discard use a **fixed-sized** buffer. With that we could improve the error message.
exitCode, err := r.api.ExecuteCommand(r.id, ctx, retrieveCommand, false, &nullio.Reader{}, content, io.Discard)
if err != nil {
return fmt.Errorf("%w: nomad error during retrieve file content copy: %v",
nomad.ErrorExecutorCommunicationFailed, err)
}
if exitCode != 0 {
return ErrFileNotFound
}
return nil
}
func (r *NomadJob) Destroy() error { func (r *NomadJob) Destroy() error {
if err := r.onDestroy(r); err != nil { if err := r.onDestroy(r); err != nil {
return fmt.Errorf("error while destroying runner: %w", err) return fmt.Errorf("error while destroying runner: %w", err)

View File

@ -399,3 +399,10 @@ func NewRunner(id string, manager Accessor) Runner {
} }
return NewNomadJob(id, nil, nil, handler) return NewNomadJob(id, nil, nil, handler)
} }
func (s *UpdateFileSystemTestSuite) TestGetFileContentReturnsErrorIfExitCodeIsNotZero() {
s.mockedExecuteCommandCall.RunFn = nil
s.mockedExecuteCommandCall.Return(1, nil)
err := s.runner.GetFileContent("", &bytes.Buffer{}, context.Background())
s.ErrorIs(err, ErrFileNotFound)
}

View File

@ -48,6 +48,9 @@ type Runner interface {
// and then copying each given dto.File to the runner. // and then copying each given dto.File to the runner.
UpdateFileSystem(request *dto.UpdateFileSystemRequest) error UpdateFileSystem(request *dto.UpdateFileSystemRequest) error
// GetFileContent streams the file content at the requested path into the Writer provided at content.
GetFileContent(path string, content io.Writer, ctx context.Context) error
// Destroy destroys the Runner in Nomad. // Destroy destroys the Runner in Nomad.
Destroy() error Destroy() error
} }

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.10.4. DO NOT EDIT. // Code generated by mockery v2.14.0. DO NOT EDIT.
package runner package runner
@ -92,6 +92,20 @@ func (_m *RunnerMock) ExecutionExists(id string) bool {
return r0 return r0
} }
// GetFileContent provides a mock function with given fields: path, content, ctx
func (_m *RunnerMock) GetFileContent(path string, content io.Writer, ctx context.Context) error {
ret := _m.Called(path, content, ctx)
var r0 error
if rf, ok := ret.Get(0).(func(string, io.Writer, context.Context) error); ok {
r0 = rf(path, content, ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// ID provides a mock function with given fields: // ID provides a mock function with given fields:
func (_m *RunnerMock) ID() string { func (_m *RunnerMock) ID() string {
ret := _m.Called() ret := _m.Called()
@ -169,3 +183,18 @@ func (_m *RunnerMock) UpdateFileSystem(request *dto.UpdateFileSystemRequest) err
return r0 return r0
} }
type mockConstructorTestingTNewRunnerMock interface {
mock.TestingT
Cleanup(func())
}
// NewRunnerMock creates a new instance of RunnerMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewRunnerMock(t mockConstructorTestingTNewRunnerMock) *RunnerMock {
mock := &RunnerMock{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -159,6 +159,7 @@ var (
ErrMissingType = errors.New("type is missing") ErrMissingType = errors.New("type is missing")
ErrMissingData = errors.New("data is missing") ErrMissingData = errors.New("data is missing")
ErrInvalidType = errors.New("invalid type") ErrInvalidType = errors.New("invalid type")
ErrNotSupported = errors.New("not supported")
) )
// WebSocketMessage is the type for all messages send in the WebSocket to the client. // WebSocketMessage is the type for all messages send in the WebSocket to the client.

View File

@ -20,7 +20,7 @@ type Reader struct {
} }
func (r Reader) Read(_ []byte) (int, error) { func (r Reader) Read(_ []byte) (int, error) {
if r.Ctx.Err() != nil { if r.Ctx == nil || r.Ctx.Err() != nil {
return 0, io.EOF return 0, io.EOF
} }

View File

@ -10,28 +10,38 @@ import (
const shortTimeout = 100 * time.Millisecond const shortTimeout = 100 * time.Millisecond
func TestReaderDoesNotReturnImmediately(t *testing.T) { func TestReader_Read(t *testing.T) {
readingContext, cancel := context.WithCancel(context.Background()) read := func(reader io.Reader, ret chan<- bool) {
defer cancel()
reader := &Reader{readingContext}
readerReturned := make(chan bool)
go func() {
p := make([]byte, 0, 5) p := make([]byte, 0, 5)
_, err := reader.Read(p) _, err := reader.Read(p)
assert.ErrorIs(t, io.EOF, err) assert.ErrorIs(t, io.EOF, err)
close(readerReturned) close(ret)
}()
var received bool
select {
case <-readerReturned:
received = true
case <-time.After(shortTimeout):
received = false
} }
assert.False(t, received) t.Run("WithContext_DoesNotReturnImmediately", func(t *testing.T) {
readingContext, cancel := context.WithCancel(context.Background())
defer cancel()
readerReturned := make(chan bool)
go read(&Reader{readingContext}, readerReturned)
select {
case <-readerReturned:
assert.Fail(t, "The reader returned before the timeout was reached")
case <-time.After(shortTimeout):
}
})
t.Run("WithoutContext_DoesReturnImmediately", func(t *testing.T) {
readerReturned := make(chan bool)
go read(&Reader{}, readerReturned)
select {
case <-readerReturned:
case <-time.After(shortTimeout):
assert.Fail(t, "The reader returned before the timeout was reached")
}
})
} }
func TestReadWriterWritesEverything(t *testing.T) { func TestReadWriterWritesEverything(t *testing.T) {

View File

@ -9,7 +9,10 @@ import (
"github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/tests" "github.com/openHPI/poseidon/tests"
"github.com/openHPI/poseidon/tests/helpers" "github.com/openHPI/poseidon/tests/helpers"
"github.com/stretchr/testify/require"
"io"
"net/http" "net/http"
"net/url"
"strings" "strings"
"time" "time"
) )
@ -55,13 +58,13 @@ func (s *E2ETestSuite) TestProvideRunnerRoute() {
// ProvideRunner creates a runner with the given RunnerRequest via an external request. // ProvideRunner creates a runner with the given RunnerRequest via an external request.
// It needs a running Poseidon instance to work. // It needs a running Poseidon instance to work.
func ProvideRunner(request *dto.RunnerRequest) (string, error) { func ProvideRunner(request *dto.RunnerRequest) (string, error) {
url := helpers.BuildURL(api.BasePath, api.RunnersPath) runnerURL := helpers.BuildURL(api.BasePath, api.RunnersPath)
runnerRequestByteString, err := json.Marshal(request) runnerRequestByteString, err := json.Marshal(request)
if err != nil { if err != nil {
return "", err return "", err
} }
reader := strings.NewReader(string(runnerRequestByteString)) reader := strings.NewReader(string(runnerRequestByteString))
resp, err := http.Post(url, "application/json", reader) //nolint:gosec // url is not influenced by a user resp, err := http.Post(runnerURL, "application/json", reader) //nolint:gosec // runnerURL is not influenced by a user
if err != nil { if err != nil {
return "", err return "", err
} }
@ -277,6 +280,41 @@ func (s *E2ETestSuite) TestCopyFilesRoute_PermissionDenied() {
}) })
} }
func (s *E2ETestSuite) TestGetFileContent_Nomad() {
runnerID, err := ProvideRunner(&dto.RunnerRequest{
ExecutionEnvironmentID: tests.DefaultEnvironmentIDAsInteger,
})
require.NoError(s.T(), err)
s.Run("Not Found", func() {
getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.FileContentRawPath))
s.Require().NoError(err)
getFileURL.RawQuery = fmt.Sprintf("%s=%s", api.PathKey, tests.DefaultFileName)
response, err := http.Get(getFileURL.String())
s.Require().NoError(err)
s.Equal(http.StatusNotFound, response.StatusCode)
})
s.Run("Ok", func() {
newFileContent := []byte("New content")
resp, err := CopyFiles(runnerID, &dto.UpdateFileSystemRequest{
Copy: []dto.File{{Path: tests.DefaultFileName, Content: newFileContent}},
})
s.Require().NoError(err)
s.Equal(http.StatusNoContent, resp.StatusCode)
getFileURL, err := url.Parse(helpers.BuildURL(api.BasePath, api.RunnersPath, runnerID, api.FileContentRawPath))
s.Require().NoError(err)
getFileURL.RawQuery = fmt.Sprintf("%s=%s", api.PathKey, tests.DefaultFileName)
response, err := http.Get(getFileURL.String())
s.Require().NoError(err)
s.Equal(http.StatusOK, response.StatusCode)
content, err := io.ReadAll(response.Body)
s.Require().NoError(err)
s.Equal(newFileContent, content)
})
}
func (s *E2ETestSuite) TestRunnerGetsDestroyedAfterInactivityTimeout() { func (s *E2ETestSuite) TestRunnerGetsDestroyedAfterInactivityTimeout() {
for _, environmentID := range environmentIDs { for _, environmentID := range environmentIDs {
s.Run(environmentID.ToString(), func() { s.Run(environmentID.ToString(), func() {