From dd41e0d5c4060ed6f40eed0ce36f99b28e469838 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Thu, 20 Jan 2022 13:56:41 +0100 Subject: [PATCH] Generate structures for an AWS environment and runner --- internal/api/websocket_test.go | 4 +- internal/environment/aws_environment.go | 98 ++++++ .../{environment.go => nomad_environment.go} | 0 ...ment_test.go => nomad_environment_test.go} | 0 internal/runner/aws_runner.go | 65 ++++ internal/runner/inactivity_timer.go | 22 +- internal/runner/nomad_manager.go | 4 +- internal/runner/nomad_runner.go | 293 ++++++++++++++++++ .../{runner_test.go => nomad_runner_test.go} | 10 +- internal/runner/runner.go | 288 +---------------- 10 files changed, 481 insertions(+), 303 deletions(-) create mode 100644 internal/environment/aws_environment.go rename internal/environment/{environment.go => nomad_environment.go} (100%) rename internal/environment/{environment_test.go => nomad_environment_test.go} (100%) create mode 100644 internal/runner/aws_runner.go create mode 100644 internal/runner/nomad_runner.go rename internal/runner/{runner_test.go => nomad_runner_test.go} (98%) diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index cc1b3c5..d6db468 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -410,7 +410,7 @@ func newNomadAllocationWithMockedAPIClient(runnerID string) (runner.Runner, *nom executorAPIMock := &nomad.ExecutorAPIMock{} manager := &runner.ManagerMock{} manager.On("Return", mock.Anything).Return(nil) - r := runner.NewNomadJob(runnerID, nil, executorAPIMock, manager) + r := runner.NewNomadJob(runnerID, nil, executorAPIMock, manager.Return) return r, executorAPIMock } @@ -430,7 +430,7 @@ func newRunnerWithNotMockedRunnerManager(t *testing.T, apiMock *nomad.ExecutorAP server := httptest.NewServer(router) runnerID := tests.DefaultRunnerID - runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, runnerManager) + runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, runnerManager.Return) e, err := environment.NewNomadEnvironment(apiMock, "job \"template-0\" {}") require.NoError(t, err) eID, err := nomad.EnvironmentIDFromRunnerID(runnerID) diff --git a/internal/environment/aws_environment.go b/internal/environment/aws_environment.go new file mode 100644 index 0000000..4a5e94b --- /dev/null +++ b/internal/environment/aws_environment.go @@ -0,0 +1,98 @@ +package environment + +import ( + "github.com/openHPI/poseidon/internal/runner" + "github.com/openHPI/poseidon/pkg/dto" +) + +type AWSEnvironment struct { + id dto.EnvironmentID +} + +func NewAWSEnvironment() *AWSEnvironment { + return &AWSEnvironment{} +} + +func (a *AWSEnvironment) MarshalJSON() ([]byte, error) { + panic("implement me") +} + +func (a *AWSEnvironment) ID() dto.EnvironmentID { + return a.id +} + +func (a *AWSEnvironment) SetID(id dto.EnvironmentID) { + a.id = id +} + +func (a *AWSEnvironment) PrewarmingPoolSize() uint { + panic("implement me") +} + +func (a *AWSEnvironment) SetPrewarmingPoolSize(_ uint) { + panic("implement me") +} + +func (a *AWSEnvironment) ApplyPrewarmingPoolSize() error { + panic("implement me") +} + +func (a *AWSEnvironment) CPULimit() uint { + panic("implement me") +} + +func (a *AWSEnvironment) SetCPULimit(_ uint) { + panic("implement me") +} + +func (a *AWSEnvironment) MemoryLimit() uint { + panic("implement me") +} + +func (a *AWSEnvironment) SetMemoryLimit(_ uint) { + panic("implement me") +} + +func (a *AWSEnvironment) Image() string { + panic("implement me") +} + +func (a *AWSEnvironment) SetImage(_ string) { + panic("implement me") +} + +func (a *AWSEnvironment) NetworkAccess() (enabled bool, mappedPorts []uint16) { + panic("implement me") +} + +func (a *AWSEnvironment) SetNetworkAccess(_ bool, _ []uint16) { + panic("implement me") +} + +func (a *AWSEnvironment) SetConfigFrom(_ runner.ExecutionEnvironment) { + panic("implement me") +} + +func (a *AWSEnvironment) Register() error { + panic("implement me") +} + +func (a *AWSEnvironment) Delete() error { + panic("implement me") +} + +func (a *AWSEnvironment) Sample() (r runner.Runner, ok bool) { + panic("implement me") +} + +func (a *AWSEnvironment) AddRunner(_ runner.Runner) { + panic("implement me") +} + +func (a *AWSEnvironment) DeleteRunner(_ string) { + panic("implement me") +} + +func (a *AWSEnvironment) IdleRunnerCount() int { + panic("implement me") +} diff --git a/internal/environment/environment.go b/internal/environment/nomad_environment.go similarity index 100% rename from internal/environment/environment.go rename to internal/environment/nomad_environment.go diff --git a/internal/environment/environment_test.go b/internal/environment/nomad_environment_test.go similarity index 100% rename from internal/environment/environment_test.go rename to internal/environment/nomad_environment_test.go diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go new file mode 100644 index 0000000..e66a03c --- /dev/null +++ b/internal/runner/aws_runner.go @@ -0,0 +1,65 @@ +package runner + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/execution" + "io" +) + +// AWSFunctionWorkload is an abstraction to build a request to an AWS Lambda Function. +type AWSFunctionWorkload struct { + InactivityTimer + id string + fs map[dto.FilePath][]byte + executions execution.Storer + onDestroy destroyRunnerHandler +} + +// NewAWSFunctionWorkload creates a new AWSFunctionWorkload with the provided id. +func NewAWSFunctionWorkload(onDestroy destroyRunnerHandler) (*AWSFunctionWorkload, error) { + newUUID, err := uuid.NewUUID() + if err != nil { + return nil, fmt.Errorf("failed generating runner id: %w", err) + } + + workload := &AWSFunctionWorkload{ + id: newUUID.String(), + executions: execution.NewLocalStorage(), + onDestroy: onDestroy, + fs: make(map[dto.FilePath][]byte), + } + workload.InactivityTimer = NewInactivityTimer(workload, onDestroy) + return workload, nil +} + +func (w *AWSFunctionWorkload) ID() string { + return w.id +} + +func (w *AWSFunctionWorkload) MappedPorts() []*dto.MappedPort { + panic("implement me") +} + +func (w *AWSFunctionWorkload) StoreExecution(_ string, _ *dto.ExecutionRequest) { + panic("implement me") +} + +func (w *AWSFunctionWorkload) ExecutionExists(_ string) bool { + panic("implement me") +} + +func (w *AWSFunctionWorkload) ExecuteInteractively(_ string, _ io.ReadWriter, _, _ io.Writer) ( + exit <-chan ExitInfo, cancel context.CancelFunc, err error) { + panic("implement me") +} + +func (w *AWSFunctionWorkload) UpdateFileSystem(_ *dto.UpdateFileSystemRequest) error { + panic("implement me") +} + +func (w *AWSFunctionWorkload) Destroy() error { + panic("implement me") +} diff --git a/internal/runner/inactivity_timer.go b/internal/runner/inactivity_timer.go index eafee28..3bda980 100644 --- a/internal/runner/inactivity_timer.go +++ b/internal/runner/inactivity_timer.go @@ -33,19 +33,19 @@ const ( var ErrorRunnerInactivityTimeout = errors.New("runner inactivity timeout exceeded") type InactivityTimerImplementation struct { - timer *time.Timer - duration time.Duration - state TimerState - runner Runner - manager Accessor - mu sync.Mutex + timer *time.Timer + duration time.Duration + state TimerState + runner Runner + onDestroy destroyRunnerHandler + mu sync.Mutex } -func NewInactivityTimer(runner Runner, manager Accessor) InactivityTimer { +func NewInactivityTimer(runner Runner, onDestroy destroyRunnerHandler) InactivityTimer { return &InactivityTimerImplementation{ - state: TimerInactive, - runner: runner, - manager: manager, + state: TimerInactive, + runner: runner, + onDestroy: onDestroy, } } @@ -68,7 +68,7 @@ func (t *InactivityTimerImplementation) SetupTimeout(duration time.Duration) { t.state = TimerExpired // The timer must be unlocked here already in order to avoid a deadlock with the call to StopTimout in Manager.Return. t.mu.Unlock() - err := t.manager.Return(t.runner) + err := t.onDestroy(t.runner) if err != nil { log.WithError(err).WithField("id", t.runner.ID()).Warn("Returning runner after inactivity caused an error") } else { diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 7da2743..5b5b30b 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -153,7 +153,7 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger environmentLogger.WithError(err).Warn("Error loading runner portMappings") return } - newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m) + newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.Return) if isUsed { m.usedRunners.Add(newJob) timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey]) @@ -196,7 +196,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { if alloc.AllocatedResources != nil { mappedPorts = alloc.AllocatedResources.Shared.Ports } - environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m)) + environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.Return)) } } diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go new file mode 100644 index 0000000..22c178a --- /dev/null +++ b/internal/runner/nomad_runner.go @@ -0,0 +1,293 @@ +package runner + +import ( + "archive/tar" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + nomadApi "github.com/hashicorp/nomad/api" + "github.com/openHPI/poseidon/internal/nomad" + "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/execution" + "io" + "strings" + "time" +) + +// ContextKey is the type for keys in a request context. +type ContextKey string + +const ( + // runnerContextKey is the key used to store runners in context.Context. + runnerContextKey ContextKey = "runner" + // SIGQUIT is the character that causes a tty to send the SIGQUIT signal to the controlled process. + SIGQUIT = 0x1c + // executionTimeoutGracePeriod is the time to wait after sending a SIGQUIT signal to a timed out execution. + // If the execution does not return after this grace period, the runner is destroyed. + executionTimeoutGracePeriod = 3 * time.Second +) + +var ( + ErrorUnknownExecution = errors.New("unknown execution") + ErrorFileCopyFailed = errors.New("file copy failed") +) + +// NomadJob is an abstraction to communicate with Nomad environments. +type NomadJob struct { + InactivityTimer + executions execution.Storer + id string + portMappings []nomadApi.PortMapping + api nomad.ExecutorAPI + onDestroy func(r Runner) error +} + +// NewNomadJob creates a new NomadJob with the provided id. +func NewNomadJob(id string, portMappings []nomadApi.PortMapping, + apiClient nomad.ExecutorAPI, onDestroy func(r Runner) error, +) *NomadJob { + job := &NomadJob{ + id: id, + portMappings: portMappings, + api: apiClient, + executions: execution.NewLocalStorage(), + onDestroy: onDestroy, + } + job.InactivityTimer = NewInactivityTimer(job, onDestroy) + return job +} + +func (r *NomadJob) ID() string { + return r.id +} + +func (r *NomadJob) MappedPorts() []*dto.MappedPort { + ports := make([]*dto.MappedPort, 0, len(r.portMappings)) + for _, portMapping := range r.portMappings { + ports = append(ports, &dto.MappedPort{ + ExposedPort: uint(portMapping.To), + HostAddress: fmt.Sprintf("%s:%d", portMapping.HostIP, portMapping.Value), + }) + } + return ports +} + +func (r *NomadJob) StoreExecution(id string, request *dto.ExecutionRequest) { + r.executions.Add(execution.ID(id), request) +} + +func (r *NomadJob) ExecutionExists(id string) bool { + return r.executions.Exists(execution.ID(id)) +} + +func (r *NomadJob) ExecuteInteractively( + id string, + stdin io.ReadWriter, + stdout, stderr io.Writer, +) (<-chan ExitInfo, context.CancelFunc, error) { + request, ok := r.executions.Pop(execution.ID(id)) + if !ok { + return nil, nil, ErrorUnknownExecution + } + + r.ResetTimeout() + + command, ctx, cancel := prepareExecution(request) + exitInternal := make(chan ExitInfo) + exit := make(chan ExitInfo, 1) + ctxExecute, cancelExecute := context.WithCancel(context.Background()) + + go r.executeCommand(ctxExecute, command, stdin, stdout, stderr, exitInternal) + go r.handleExitOrContextDone(ctx, cancelExecute, exitInternal, exit, stdin) + + return exit, cancel, nil +} + +func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { + r.ResetTimeout() + + var tarBuffer bytes.Buffer + if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil { + return err + } + + fileDeletionCommand := fileDeletionCommand(copyRequest.Delete) + copyCommand := "tar --extract --absolute-names --verbose --file=/dev/stdin;" + updateFileCommand := (&dto.ExecutionRequest{Command: fileDeletionCommand + copyCommand}).FullCommand() + stdOut := bytes.Buffer{} + stdErr := bytes.Buffer{} + exitCode, err := r.api.ExecuteCommand(r.id, context.Background(), updateFileCommand, false, + &tarBuffer, &stdOut, &stdErr) + + if err != nil { + return fmt.Errorf( + "%w: nomad error during file copy: %v", + nomad.ErrorExecutorCommunicationFailed, + err) + } + if exitCode != 0 { + return fmt.Errorf( + "%w: stderr output '%s' and stdout output '%s'", + ErrorFileCopyFailed, + stdErr.String(), + stdOut.String()) + } + return nil +} + +func (r *NomadJob) Destroy() error { + if err := r.onDestroy(r); err != nil { + return fmt.Errorf("error while destroying runner: %w", err) + } + return nil +} + +func prepareExecution(request *dto.ExecutionRequest) ( + command []string, ctx context.Context, cancel context.CancelFunc, +) { + command = request.FullCommand() + if request.TimeLimit == 0 { + ctx, cancel = context.WithCancel(context.Background()) + } else { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(request.TimeLimit)*time.Second) + } + return command, ctx, cancel +} + +func (r *NomadJob) executeCommand(ctx context.Context, command []string, + stdin io.ReadWriter, stdout, stderr io.Writer, exit chan<- ExitInfo, +) { + exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, stdin, stdout, stderr) + if err == nil && r.TimeoutPassed() { + err = ErrorRunnerInactivityTimeout + } + exit <- ExitInfo{uint8(exitCode), err} +} + +func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute context.CancelFunc, + exitInternal <-chan ExitInfo, exit chan<- ExitInfo, stdin io.ReadWriter, +) { + defer cancelExecute() + defer close(exit) // When this function has finished the connection to the executor is closed. + + select { + case exitInfo := <-exitInternal: + exit <- exitInfo + return + case <-ctx.Done(): + } + + // From this time on the WebSocket connection to the client is closed in /internal/api/websocket.go + // waitForExit. Input can still be sent to the executor. + exit <- ExitInfo{255, ctx.Err()} + + // This injects the SIGQUIT character into the stdin. This character is parsed by the tty line discipline + // (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty. + // By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal + // and ignore it, which is why we destroy the runner if the process does not terminate after a grace period. + _, err := stdin.Write([]byte{SIGQUIT}) + // if n != 1 { + // The SIGQUIT is sent and correctly processed by the allocation. However, for an unknown + // reason, the number of bytes written is always zero even though the error is nil. + // Hence, we disabled this sanity check here. See the MR for more details: + // https://github.com/openHPI/poseidon/pull/45#discussion_r757029024 + // log.WithField("runner", r.id).Warn("Could not send SIGQUIT because nothing was written") + // } + if err != nil { + log.WithField("runner", r.id).WithError(err).Warn("Could not send SIGQUIT due to error") + } + + select { + case <-exitInternal: + log.WithField("runner", r.id).Debug("Execution terminated after SIGQUIT") + case <-time.After(executionTimeoutGracePeriod): + log.WithField("runner", r.id).Info("Execution did not quit after SIGQUIT") + if err := r.Destroy(); err != nil { + log.WithField("runner", r.id).Error("Error when destroying runner") + } + } +} + +func createTarArchiveForFiles(filesToCopy []dto.File, w io.Writer) error { + tarWriter := tar.NewWriter(w) + for _, file := range filesToCopy { + if err := tarWriter.WriteHeader(tarHeader(file)); err != nil { + err := fmt.Errorf("error writing tar file header: %w", err) + log. + WithField("file", file). + Error(err) + return err + } + if _, err := tarWriter.Write(file.ByteContent()); err != nil { + err := fmt.Errorf("error writing tar file content: %w", err) + log. + WithField("file", file). + Error(err) + return err + } + } + if err := tarWriter.Close(); err != nil { + return fmt.Errorf("error closing tar writer: %w", err) + } + return nil +} + +func fileDeletionCommand(pathsToDelete []dto.FilePath) string { + if len(pathsToDelete) == 0 { + return "" + } + command := "rm --recursive --force " + for _, filePath := range pathsToDelete { + // To avoid command injection, filenames need to be quoted. + // See https://unix.stackexchange.com/questions/347332/what-characters-need-to-be-escaped-in-files-without-quotes + // for details. + singleQuoteEscapedFileName := strings.ReplaceAll(filePath.Cleaned(), "'", "'\\''") + command += fmt.Sprintf("'%s' ", singleQuoteEscapedFileName) + } + command += ";" + return command +} + +func tarHeader(file dto.File) *tar.Header { + if file.IsDirectory() { + return &tar.Header{ + Typeflag: tar.TypeDir, + Name: file.CleanedPath(), + Mode: 0o755, + } + } else { + return &tar.Header{ + Typeflag: tar.TypeReg, + Name: file.CleanedPath(), + Mode: 0o744, + Size: int64(len(file.Content)), + } + } +} + +// MarshalJSON implements json.Marshaler interface. +// This exports private attributes like the id too. +func (r *NomadJob) MarshalJSON() ([]byte, error) { + res, err := json.Marshal(struct { + ID string `json:"runnerId"` + }{ + ID: r.ID(), + }) + if err != nil { + return nil, fmt.Errorf("error marshaling Nomad job: %w", err) + } + return res, nil +} + +// NewContext creates a context containing a runner. +func NewContext(ctx context.Context, runner Runner) context.Context { + return context.WithValue(ctx, runnerContextKey, runner) +} + +// FromContext returns a runner from a context. +func FromContext(ctx context.Context) (Runner, bool) { + runner, ok := ctx.Value(runnerContextKey).(Runner) + return runner, ok +} diff --git a/internal/runner/runner_test.go b/internal/runner/nomad_runner_test.go similarity index 98% rename from internal/runner/runner_test.go rename to internal/runner/nomad_runner_test.go index 3d065e1..a97ebc7 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/nomad_runner_test.go @@ -125,7 +125,7 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() { InactivityTimer: s.timer, id: tests.DefaultRunnerID, api: s.apiMock, - manager: s.manager, + onDestroy: s.manager.Return, } } @@ -391,5 +391,11 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader // NewRunner creates a new runner with the provided id and manager. func NewRunner(id string, manager Accessor) Runner { - return NewNomadJob(id, nil, nil, manager) + var handler destroyRunnerHandler + if manager != nil { + handler = manager.Return + } else { + handler = func(_ Runner) error { return nil } + } + return NewNomadJob(id, nil, nil, handler) } diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 1c91104..c5675bb 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -1,37 +1,9 @@ package runner import ( - "archive/tar" - "bytes" "context" - "encoding/json" - "errors" - "fmt" - nomadApi "github.com/hashicorp/nomad/api" - "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" - "github.com/openHPI/poseidon/pkg/execution" "io" - "strings" - "time" -) - -// ContextKey is the type for keys in a request context. -type ContextKey string - -const ( - // runnerContextKey is the key used to store runners in context.Context. - runnerContextKey ContextKey = "runner" - // SIGQUIT is the character that causes a tty to send the SIGQUIT signal to the controlled process. - SIGQUIT = 0x1c - // executionTimeoutGracePeriod is the time to wait after sending a SIGQUIT signal to a timed out execution. - // If the execution does not return after this grace period, the runner is destroyed. - executionTimeoutGracePeriod = 3 * time.Second -) - -var ( - ErrorUnknownExecution = errors.New("unknown execution") - ErrorFileCopyFailed = errors.New("file copy failed") ) type ExitInfo struct { @@ -39,6 +11,8 @@ type ExitInfo struct { Err error } +type destroyRunnerHandler = func(r Runner) error + type Runner interface { InactivityTimer @@ -71,261 +45,3 @@ type Runner interface { // Destroy destroys the Runner in Nomad. Destroy() error } - -// NomadJob is an abstraction to communicate with Nomad environments. -type NomadJob struct { - InactivityTimer - executions execution.Storer - id string - portMappings []nomadApi.PortMapping - api nomad.ExecutorAPI - manager Accessor -} - -// NewNomadJob creates a new NomadJob with the provided id. -func NewNomadJob(id string, portMappings []nomadApi.PortMapping, - apiClient nomad.ExecutorAPI, manager Accessor, -) *NomadJob { - job := &NomadJob{ - id: id, - portMappings: portMappings, - api: apiClient, - executions: execution.NewLocalStorage(), - manager: manager, - } - job.InactivityTimer = NewInactivityTimer(job, manager) - return job -} - -func (r *NomadJob) ID() string { - return r.id -} - -func (r *NomadJob) MappedPorts() []*dto.MappedPort { - ports := make([]*dto.MappedPort, 0, len(r.portMappings)) - for _, portMapping := range r.portMappings { - ports = append(ports, &dto.MappedPort{ - ExposedPort: uint(portMapping.To), - HostAddress: fmt.Sprintf("%s:%d", portMapping.HostIP, portMapping.Value), - }) - } - return ports -} - -func (r *NomadJob) StoreExecution(id string, request *dto.ExecutionRequest) { - r.executions.Add(execution.ID(id), request) -} - -func (r *NomadJob) ExecutionExists(id string) bool { - return r.executions.Exists(execution.ID(id)) -} - -func (r *NomadJob) ExecuteInteractively( - id string, - stdin io.ReadWriter, - stdout, stderr io.Writer, -) (<-chan ExitInfo, context.CancelFunc, error) { - request, ok := r.executions.Pop(execution.ID(id)) - if !ok { - return nil, nil, ErrorUnknownExecution - } - - r.ResetTimeout() - - command, ctx, cancel := prepareExecution(request) - exitInternal := make(chan ExitInfo) - exit := make(chan ExitInfo, 1) - ctxExecute, cancelExecute := context.WithCancel(context.Background()) - - go r.executeCommand(ctxExecute, command, stdin, stdout, stderr, exitInternal) - go r.handleExitOrContextDone(ctx, cancelExecute, exitInternal, exit, stdin) - - return exit, cancel, nil -} - -func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest) error { - r.ResetTimeout() - - var tarBuffer bytes.Buffer - if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil { - return err - } - - fileDeletionCommand := fileDeletionCommand(copyRequest.Delete) - copyCommand := "tar --extract --absolute-names --verbose --file=/dev/stdin;" - updateFileCommand := (&dto.ExecutionRequest{Command: fileDeletionCommand + copyCommand}).FullCommand() - stdOut := bytes.Buffer{} - stdErr := bytes.Buffer{} - exitCode, err := r.api.ExecuteCommand(r.id, context.Background(), updateFileCommand, false, - &tarBuffer, &stdOut, &stdErr) - - if err != nil { - return fmt.Errorf( - "%w: nomad error during file copy: %v", - nomad.ErrorExecutorCommunicationFailed, - err) - } - if exitCode != 0 { - return fmt.Errorf( - "%w: stderr output '%s' and stdout output '%s'", - ErrorFileCopyFailed, - stdErr.String(), - stdOut.String()) - } - return nil -} - -func (r *NomadJob) Destroy() error { - if err := r.manager.Return(r); err != nil { - return fmt.Errorf("error while destroying runner: %w", err) - } - return nil -} - -func prepareExecution(request *dto.ExecutionRequest) ( - command []string, ctx context.Context, cancel context.CancelFunc, -) { - command = request.FullCommand() - if request.TimeLimit == 0 { - ctx, cancel = context.WithCancel(context.Background()) - } else { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(request.TimeLimit)*time.Second) - } - return command, ctx, cancel -} - -func (r *NomadJob) executeCommand(ctx context.Context, command []string, - stdin io.ReadWriter, stdout, stderr io.Writer, exit chan<- ExitInfo, -) { - exitCode, err := r.api.ExecuteCommand(r.id, ctx, command, true, stdin, stdout, stderr) - if err == nil && r.TimeoutPassed() { - err = ErrorRunnerInactivityTimeout - } - exit <- ExitInfo{uint8(exitCode), err} -} - -func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute context.CancelFunc, - exitInternal <-chan ExitInfo, exit chan<- ExitInfo, stdin io.ReadWriter, -) { - defer cancelExecute() - defer close(exit) // When this function has finished the connection to the executor is closed. - - select { - case exitInfo := <-exitInternal: - exit <- exitInfo - return - case <-ctx.Done(): - } - - // From this time on the WebSocket connection to the client is closed in /internal/api/websocket.go - // waitForExit. Input can still be sent to the executor. - exit <- ExitInfo{255, ctx.Err()} - - // This injects the SIGQUIT character into the stdin. This character is parsed by the tty line discipline - // (tty has to be true) and converted to a SIGQUIT signal sent to the foreground process attached to the tty. - // By default, SIGQUIT causes the process to terminate and produces a core dump. Processes can catch this signal - // and ignore it, which is why we destroy the runner if the process does not terminate after a grace period. - _, err := stdin.Write([]byte{SIGQUIT}) - // if n != 1 { - // The SIGQUIT is sent and correctly processed by the allocation. However, for an unknown - // reason, the number of bytes written is always zero even though the error is nil. - // Hence, we disabled this sanity check here. See the MR for more details: - // https://github.com/openHPI/poseidon/pull/45#discussion_r757029024 - // log.WithField("runner", r.id).Warn("Could not send SIGQUIT because nothing was written") - // } - if err != nil { - log.WithField("runner", r.id).WithError(err).Warn("Could not send SIGQUIT due to error") - } - - select { - case <-exitInternal: - log.WithField("runner", r.id).Debug("Execution terminated after SIGQUIT") - case <-time.After(executionTimeoutGracePeriod): - log.WithField("runner", r.id).Info("Execution did not quit after SIGQUIT") - if err := r.Destroy(); err != nil { - log.WithField("runner", r.id).Error("Error when destroying runner") - } - } -} - -func createTarArchiveForFiles(filesToCopy []dto.File, w io.Writer) error { - tarWriter := tar.NewWriter(w) - for _, file := range filesToCopy { - if err := tarWriter.WriteHeader(tarHeader(file)); err != nil { - err := fmt.Errorf("error writing tar file header: %w", err) - log. - WithField("file", file). - Error(err) - return err - } - if _, err := tarWriter.Write(file.ByteContent()); err != nil { - err := fmt.Errorf("error writing tar file content: %w", err) - log. - WithField("file", file). - Error(err) - return err - } - } - if err := tarWriter.Close(); err != nil { - return fmt.Errorf("error closing tar writer: %w", err) - } - return nil -} - -func fileDeletionCommand(pathsToDelete []dto.FilePath) string { - if len(pathsToDelete) == 0 { - return "" - } - command := "rm --recursive --force " - for _, filePath := range pathsToDelete { - // To avoid command injection, filenames need to be quoted. - // See https://unix.stackexchange.com/questions/347332/what-characters-need-to-be-escaped-in-files-without-quotes - // for details. - singleQuoteEscapedFileName := strings.ReplaceAll(filePath.Cleaned(), "'", "'\\''") - command += fmt.Sprintf("'%s' ", singleQuoteEscapedFileName) - } - command += ";" - return command -} - -func tarHeader(file dto.File) *tar.Header { - if file.IsDirectory() { - return &tar.Header{ - Typeflag: tar.TypeDir, - Name: file.CleanedPath(), - Mode: 0o755, - } - } else { - return &tar.Header{ - Typeflag: tar.TypeReg, - Name: file.CleanedPath(), - Mode: 0o744, - Size: int64(len(file.Content)), - } - } -} - -// MarshalJSON implements json.Marshaler interface. -// This exports private attributes like the id too. -func (r *NomadJob) MarshalJSON() ([]byte, error) { - res, err := json.Marshal(struct { - ID string `json:"runnerId"` - }{ - ID: r.ID(), - }) - if err != nil { - return nil, fmt.Errorf("error marshaling Nomad job: %w", err) - } - return res, nil -} - -// NewContext creates a context containing a runner. -func NewContext(ctx context.Context, runner Runner) context.Context { - return context.WithValue(ctx, runnerContextKey, runner) -} - -// FromContext returns a runner from a context. -func FromContext(ctx context.Context) (Runner, bool) { - runner, ok := ctx.Value(runnerContextKey).(Runner) - return runner, ok -}