From 6123d205252c17a24e4819e773ea69933b32b487 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Thu, 20 Jan 2022 20:47:29 +0100 Subject: [PATCH] Implement core functionality of AWS integration --- api/swagger.yaml | 4 + cmd/poseidon/main.go | 36 ++++-- configuration.example.yaml | 8 ++ internal/api/runners_test.go | 8 +- internal/config/config.go | 13 ++ internal/environment/abstract_manager.go | 8 +- internal/environment/aws_environment.go | 42 ++++--- internal/environment/aws_manager.go | 64 +++++++--- internal/environment/nomad_environment.go | 2 +- internal/runner/abstract_manager.go | 55 +++++++-- internal/runner/aws_manager.go | 56 ++++----- internal/runner/aws_runner.go | 143 +++++++++++++++++++--- internal/runner/manager.go | 1 + internal/runner/manager_mock.go | 16 ++- internal/runner/nomad_manager.go | 35 +----- internal/runner/nomad_runner.go | 15 +-- internal/runner/runner.go | 11 ++ 17 files changed, 360 insertions(+), 157 deletions(-) diff --git a/api/swagger.yaml b/api/swagger.yaml index c08cbee..d1d1c09 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -188,6 +188,10 @@ paths: description: Specifies the execution environment of the runner type: integer example: 6 + useAWS: + description: Should Poseidon use AWS for the execution. + type: boolean + default: false required: - executionEnvironmentId additionalProperties: false diff --git a/cmd/poseidon/main.go b/cmd/poseidon/main.go index 9281c49..5fb20df 100644 --- a/cmd/poseidon/main.go +++ b/cmd/poseidon/main.go @@ -57,6 +57,22 @@ func runServer(server *http.Server) { } } +type managerCreator func() (runnerManager runner.Manager, environmentManager environment.ManagerHandler) + +// createManagerHandler adds the managers of the passed managerCreator to the chain of responsibility. +func createManagerHandler(handler managerCreator, enabled bool, + nextRunnerManager runner.Manager, nextEnvironmentManager environment.ManagerHandler) ( + runnerManager runner.Manager, environmentManager environment.ManagerHandler) { + if !enabled { + return nextRunnerManager, nextEnvironmentManager + } + + runnerManager, environmentManager = handler() + runnerManager.SetNextHandler(nextRunnerManager) + environmentManager.SetNextHandler(nextEnvironmentManager) + return runnerManager, environmentManager +} + func createNomadManager() (runnerManager runner.Manager, environmentManager environment.ManagerHandler) { // API initialization nomadAPIClient, err := nomad.NewExecutorAPI(&config.Config.Nomad) @@ -73,31 +89,27 @@ func createNomadManager() (runnerManager runner.Manager, environmentManager envi return runnerManager, environmentManager } -func createAWSManager(nextRunnerManager runner.Manager, nextEnvironmentManager environment.ManagerHandler) ( - runnerManager runner.Manager, environmentManager environment.ManagerHandler) { +func createAWSManager() (runnerManager runner.Manager, environmentManager environment.ManagerHandler) { runnerManager = runner.NewAWSRunnerManager() - runnerManager.SetNextHandler(nextRunnerManager) - - environmentManager = environment.NewAWSEnvironmentManager(runnerManager) - environmentManager.SetNextHandler(nextEnvironmentManager) - - return runnerManager, environmentManager + return runnerManager, environment.NewAWSEnvironmentManager(runnerManager) } // initServer builds the http server and configures it with the chain of responsibility for multiple managers. func initServer() *http.Server { - nomadRunnerManager, nomadEnvironmentManager := createNomadManager() - awsRunnerManager, awsEnvironmentManager := createAWSManager(nomadRunnerManager, nomadEnvironmentManager) + runnerManager, environmentManager := createManagerHandler(createNomadManager, config.Config.Nomad.Enabled, + runner.NewAbstractManager(), &environment.AbstractManager{}) + runnerManager, environmentManager = createManagerHandler(createAWSManager, config.Config.AWS.Enabled, + runnerManager, environmentManager) return &http.Server{ Addr: config.Config.Server.URL().Host, ReadTimeout: time.Second * 15, IdleTimeout: time.Second * 60, - Handler: api.NewRouter(awsRunnerManager, awsEnvironmentManager), + Handler: api.NewRouter(runnerManager, environmentManager), } } -// shutdownOnOSSignal listens for a signal from the operation system +// shutdownOnOSSignal listens for a signal from the operating system // When receiving a signal the server shuts down but waits up to 15 seconds to close remaining connections. func shutdownOnOSSignal(server *http.Server) { // wait for SIGINT diff --git a/configuration.example.yaml b/configuration.example.yaml index 435760c..42b04e0 100644 --- a/configuration.example.yaml +++ b/configuration.example.yaml @@ -22,6 +22,8 @@ server: # Configuration of the used Nomad cluster nomad: + # Specifies whether Nomad should be used as executor. + enabled: true # IP address / domain of the Nomad server address: 127.0.0.1 # Port of the Nomad server @@ -41,6 +43,12 @@ nomad: # Nomad namespace to use. If unset, 'default' is used namespace: poseidon +aws: + # Specifies whether AWS should be used as executor. + enabled: false + # The enpoint of the WebSocket API + endpoint: wss://abcdef1234.execute-api.eu-central-1.amazonaws.com/production + # Configuration of the logger logger: # Log level that is used after reading the config (INFO until then) diff --git a/internal/api/runners_test.go b/internal/api/runners_test.go index 5e06a26..c4b823e 100644 --- a/internal/api/runners_test.go +++ b/internal/api/runners_test.go @@ -139,8 +139,9 @@ func (s *ProvideRunnerTestSuite) SetupTest() { } func (s *ProvideRunnerTestSuite) TestValidRequestReturnsRunner() { - s.runnerManager.On("Claim", mock.AnythingOfType("dto.EnvironmentID"), - mock.AnythingOfType("int")).Return(s.runner, nil) + s.runnerManager. + On("Claim", mock.AnythingOfType("dto.EnvironmentID"), mock.AnythingOfType("int")). + Return(s.runner, nil) recorder := httptest.NewRecorder() s.router.ServeHTTP(recorder, s.defaultRequest) @@ -175,7 +176,8 @@ func (s *ProvideRunnerTestSuite) TestWhenExecutionEnvironmentDoesNotExistReturns } func (s *ProvideRunnerTestSuite) TestWhenNoRunnerAvailableReturnsNomadOverload() { - s.runnerManager.On("Claim", mock.AnythingOfType("dto.EnvironmentID"), mock.AnythingOfType("int")). + s.runnerManager. + On("Claim", mock.AnythingOfType("dto.EnvironmentID"), mock.AnythingOfType("int")). Return(nil, runner.ErrNoRunnersAvailable) recorder := httptest.NewRecorder() diff --git a/internal/config/config.go b/internal/config/config.go index 7820fd7..5ae2e76 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,6 +32,7 @@ var ( TemplateJobFile: "", }, Nomad: Nomad{ + Enabled: true, Address: "127.0.0.1", Port: 4646, Token: "", @@ -43,6 +44,10 @@ var ( }, Namespace: "default", }, + AWS: AWS{ + Enabled: false, + Endpoint: "", + }, Logger: logger{ Level: "INFO", }, @@ -76,6 +81,7 @@ func (s *server) URL() *url.URL { // Nomad configures the used Nomad cluster. type Nomad struct { + Enabled bool Address string Port int Token string @@ -88,6 +94,12 @@ func (n *Nomad) URL() *url.URL { return parseURL(n.Address, n.Port, n.TLS.Active) } +// AWS configures the AWS Lambda usage. +type AWS struct { + Enabled bool + Endpoint string +} + // TLS configures TLS on a connection. type TLS struct { Active bool @@ -105,6 +117,7 @@ type logger struct { type configuration struct { Server server Nomad Nomad + AWS AWS Logger logger Sentry sentry.ClientOptions } diff --git a/internal/environment/abstract_manager.go b/internal/environment/abstract_manager.go index 38788b6..e8cafd6 100644 --- a/internal/environment/abstract_manager.go +++ b/internal/environment/abstract_manager.go @@ -16,7 +16,11 @@ func (n *AbstractManager) SetNextHandler(next ManagerHandler) { } func (n *AbstractManager) NextHandler() ManagerHandler { - return n.nextHandler + if n.nextHandler != nil { + return n.nextHandler + } else { + return &AbstractManager{} + } } func (n *AbstractManager) List(_ bool) ([]runner.ExecutionEnvironment, error) { @@ -24,7 +28,7 @@ func (n *AbstractManager) List(_ bool) ([]runner.ExecutionEnvironment, error) { } func (n *AbstractManager) Get(_ dto.EnvironmentID, _ bool) (runner.ExecutionEnvironment, error) { - return nil, runner.ErrNullObject + return nil, runner.ErrRunnerNotFound } func (n *AbstractManager) CreateOrUpdate(_ dto.EnvironmentID, _ dto.ExecutionEnvironmentRequest) (bool, error) { diff --git a/internal/environment/aws_environment.go b/internal/environment/aws_environment.go index 4a5e94b..a927b92 100644 --- a/internal/environment/aws_environment.go +++ b/internal/environment/aws_environment.go @@ -1,12 +1,15 @@ package environment import ( + "encoding/json" + "fmt" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" ) type AWSEnvironment struct { - id dto.EnvironmentID + id dto.EnvironmentID + awsEndpoint string } func NewAWSEnvironment() *AWSEnvironment { @@ -14,7 +17,14 @@ func NewAWSEnvironment() *AWSEnvironment { } func (a *AWSEnvironment) MarshalJSON() ([]byte, error) { - panic("implement me") + res, err := json.Marshal(dto.ExecutionEnvironmentData{ + ID: int(a.ID()), + ExecutionEnvironmentRequest: dto.ExecutionEnvironmentRequest{Image: a.Image()}, + }) + if err != nil { + return res, fmt.Errorf("couldn't marshal aws execution environment: %w", err) + } + return res, nil } func (a *AWSEnvironment) ID() dto.EnvironmentID { @@ -26,24 +36,21 @@ func (a *AWSEnvironment) SetID(id dto.EnvironmentID) { } func (a *AWSEnvironment) PrewarmingPoolSize() uint { - panic("implement me") + return 0 } -func (a *AWSEnvironment) SetPrewarmingPoolSize(_ uint) { - panic("implement me") -} +func (a *AWSEnvironment) SetPrewarmingPoolSize(_ uint) {} func (a *AWSEnvironment) ApplyPrewarmingPoolSize() error { - panic("implement me") + return nil } func (a *AWSEnvironment) CPULimit() uint { - panic("implement me") + return 0 } -func (a *AWSEnvironment) SetCPULimit(_ uint) { - panic("implement me") -} +// SetCPULimit is disabled as one can only set the memory limit with AWS Lambda. +func (a *AWSEnvironment) SetCPULimit(_ uint) {} func (a *AWSEnvironment) MemoryLimit() uint { panic("implement me") @@ -53,12 +60,13 @@ func (a *AWSEnvironment) SetMemoryLimit(_ uint) { panic("implement me") } +// Image is used to specify the AWS Endpoint Poseidon is connecting to. func (a *AWSEnvironment) Image() string { - panic("implement me") + return a.awsEndpoint } -func (a *AWSEnvironment) SetImage(_ string) { - panic("implement me") +func (a *AWSEnvironment) SetImage(awsEndpoint string) { + a.awsEndpoint = awsEndpoint } func (a *AWSEnvironment) NetworkAccess() (enabled bool, mappedPorts []uint16) { @@ -82,7 +90,11 @@ func (a *AWSEnvironment) Delete() error { } func (a *AWSEnvironment) Sample() (r runner.Runner, ok bool) { - panic("implement me") + workload, err := runner.NewAWSFunctionWorkload(a, nil) + if err != nil { + return nil, false + } + return workload, true } func (a *AWSEnvironment) AddRunner(_ runner.Runner) { diff --git a/internal/environment/aws_manager.go b/internal/environment/aws_manager.go index edb2a53..6793714 100644 --- a/internal/environment/aws_manager.go +++ b/internal/environment/aws_manager.go @@ -10,48 +10,80 @@ import ( // IMPROVE: Create Lambda functions dynamically. type AWSEnvironmentManager struct { *AbstractManager - runnerManager runner.Accessor + runnerManager runner.Manager } -func NewAWSEnvironmentManager(runnerManager runner.Accessor) *AWSEnvironmentManager { +func NewAWSEnvironmentManager(runnerManager runner.Manager) *AWSEnvironmentManager { m := &AWSEnvironmentManager{&AbstractManager{nil}, runnerManager} runnerManager.Load() + m.Load() return m } func (a *AWSEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) { list, err := a.NextHandler().List(fetch) if err != nil { - return nil, fmt.Errorf("aws wraped: %w", err) + return nil, fmt.Errorf("aws wrapped: %w", err) } - return list, nil + return append(list, a.runnerManager.ListEnvironments()...), nil } func (a *AWSEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (runner.ExecutionEnvironment, error) { - e, err := a.NextHandler().Get(id, fetch) - if err != nil { - return nil, fmt.Errorf("aws wraped: %w", err) + e, ok := a.runnerManager.GetEnvironment(id) + if ok { + return e, nil + } else { + e, err := a.NextHandler().Get(id, fetch) + if err != nil { + return nil, fmt.Errorf("aws wrapped: %w", err) + } + return e, nil } - return e, nil } func (a *AWSEnvironmentManager) CreateOrUpdate( id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (bool, error) { - isCreated, err := a.NextHandler().CreateOrUpdate(id, request) - if err != nil { - return false, fmt.Errorf("aws wraped: %w", err) + if id != runner.AwsJavaEnvironmentID { + isCreated, err := a.NextHandler().CreateOrUpdate(id, request) + if err != nil { + return false, fmt.Errorf("aws wrapped: %w", err) + } + return isCreated, nil } - return isCreated, nil + + _, ok := a.runnerManager.GetEnvironment(id) + e := NewAWSEnvironment() + e.SetID(id) + e.SetImage(request.Image) + a.runnerManager.StoreEnvironment(e) + return !ok, nil } func (a *AWSEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) { - isFound, err := a.NextHandler().Delete(id) - if err != nil { - return false, fmt.Errorf("aws wraped: %w", err) + e, ok := a.runnerManager.GetEnvironment(id) + if !ok { + isFound, err := a.NextHandler().Delete(id) + if err != nil { + return false, fmt.Errorf("aws wrapped: %w", err) + } + return isFound, nil } - return isFound, nil + + a.runnerManager.DeleteEnvironment(id) + if err := e.Delete(); err != nil { + return true, fmt.Errorf("could not delete environment: %w", err) + } + return true, nil } func (a *AWSEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData { return a.NextHandler().Statistics() } + +// Load fetches all remote environments in the local storage. ToDo: Fetch dynamically. +func (a *AWSEnvironmentManager) Load() { + _, err := a.CreateOrUpdate(runner.AwsJavaEnvironmentID, dto.ExecutionEnvironmentRequest{Image: "java11Exec"}) + if err != nil { + log.WithError(err).Warn("Could not load aws environment.") + } +} diff --git a/internal/environment/nomad_environment.go b/internal/environment/nomad_environment.go index f6a5b0e..f079d95 100644 --- a/internal/environment/nomad_environment.go +++ b/internal/environment/nomad_environment.go @@ -252,7 +252,7 @@ func (n *NomadEnvironment) IdleRunnerCount() int { } // MarshalJSON implements the json.Marshaler interface. -// This converts the NomadEnvironment into the expected schema for dto.ExecutionEnvironmentData. +// This converts the AWSEnvironment into the expected schema for dto.ExecutionEnvironmentData. func (n *NomadEnvironment) MarshalJSON() (res []byte, err error) { networkAccess, exposedPorts := n.NetworkAccess() diff --git a/internal/runner/abstract_manager.go b/internal/runner/abstract_manager.go index a409a4f..2cc701d 100644 --- a/internal/runner/abstract_manager.go +++ b/internal/runner/abstract_manager.go @@ -2,6 +2,7 @@ package runner import ( "errors" + "fmt" "github.com/openHPI/poseidon/pkg/dto" ) @@ -9,8 +10,19 @@ var ErrNullObject = errors.New("functionality not available for the null object" // AbstractManager is used to have a fallback runner manager in the chain of responsibility // following the null object pattern. +// Remember all functions that can call the NextHandler should call it (See AccessorHandler). type AbstractManager struct { - nextHandler AccessorHandler + nextHandler AccessorHandler + environments EnvironmentStorage + usedRunners Storage +} + +// NewAbstractManager creates a new abstract runner manager that keeps track of all runners of one kind. +func NewAbstractManager() *AbstractManager { + return &AbstractManager{ + environments: NewLocalEnvironmentStorage(), + usedRunners: NewLocalRunnerStorage(), + } } func (n *AbstractManager) SetNextHandler(next AccessorHandler) { @@ -18,20 +30,32 @@ func (n *AbstractManager) SetNextHandler(next AccessorHandler) { } func (n *AbstractManager) NextHandler() AccessorHandler { - return n.nextHandler + if n.nextHandler != nil { + return n.nextHandler + } else { + return NewAbstractManager() + } +} + +func (n *AbstractManager) HasNextHandler() bool { + return n.nextHandler != nil } func (n *AbstractManager) ListEnvironments() []ExecutionEnvironment { - return []ExecutionEnvironment{} + return n.environments.List() } -func (n *AbstractManager) GetEnvironment(_ dto.EnvironmentID) (ExecutionEnvironment, bool) { - return nil, false +func (n *AbstractManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) { + return n.environments.Get(id) } -func (n *AbstractManager) StoreEnvironment(_ ExecutionEnvironment) {} +func (n *AbstractManager) StoreEnvironment(environment ExecutionEnvironment) { + n.environments.Add(environment) +} -func (n *AbstractManager) DeleteEnvironment(_ dto.EnvironmentID) {} +func (n *AbstractManager) DeleteEnvironment(id dto.EnvironmentID) { + n.environments.Delete(id) +} func (n *AbstractManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData { return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{} @@ -41,8 +65,21 @@ func (n *AbstractManager) Claim(_ dto.EnvironmentID, _ int) (Runner, error) { return nil, ErrNullObject } -func (n *AbstractManager) Get(_ string) (Runner, error) { - return nil, ErrNullObject +func (n *AbstractManager) Get(runnerID string) (Runner, error) { + runner, ok := n.usedRunners.Get(runnerID) + if ok { + return runner, nil + } + + if !n.HasNextHandler() { + return nil, ErrRunnerNotFound + } + + r, err := n.NextHandler().Get(runnerID) + if err != nil { + return r, fmt.Errorf("abstract manager wrapped: %w", err) + } + return r, nil } func (n *AbstractManager) Return(_ Runner) error { diff --git a/internal/runner/aws_manager.go b/internal/runner/aws_manager.go index 25e0322..9596d66 100644 --- a/internal/runner/aws_manager.go +++ b/internal/runner/aws_manager.go @@ -3,55 +3,47 @@ package runner import ( "fmt" "github.com/openHPI/poseidon/pkg/dto" + "time" ) type AWSRunnerManager struct { *AbstractManager } +const AwsJavaEnvironmentID = 2142 + // NewAWSRunnerManager creates a new runner manager that keeps track of all runners at AWS. func NewAWSRunnerManager() *AWSRunnerManager { - return &AWSRunnerManager{&AbstractManager{}} -} - -func (a AWSRunnerManager) ListEnvironments() []ExecutionEnvironment { - return []ExecutionEnvironment{} -} - -func (a AWSRunnerManager) GetEnvironment(_ dto.EnvironmentID) (ExecutionEnvironment, bool) { - return nil, false -} - -func (a AWSRunnerManager) StoreEnvironment(_ ExecutionEnvironment) {} - -func (a AWSRunnerManager) DeleteEnvironment(_ dto.EnvironmentID) {} - -func (a AWSRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData { - return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{} + return &AWSRunnerManager{NewAbstractManager()} } func (a AWSRunnerManager) Claim(id dto.EnvironmentID, duration int) (Runner, error) { - r, err := a.NextHandler().Claim(id, duration) - if err != nil { - return nil, fmt.Errorf("aws wraped: %w", err) + environment, ok := a.environments.Get(id) + if !ok { + r, err := a.NextHandler().Claim(id, duration) + if err != nil { + return nil, fmt.Errorf("aws wrapped: %w", err) + } + return r, nil } - return r, nil -} -func (a AWSRunnerManager) Get(runnerID string) (Runner, error) { - r, err := a.NextHandler().Get(runnerID) - if err != nil { - return nil, fmt.Errorf("aws wraped: %w", err) + runner, ok := environment.Sample() + if !ok { + log.Warn("no aws runner available") + return nil, ErrNoRunnersAvailable } - return r, nil + + a.usedRunners.Add(runner) + runner.SetupTimeout(time.Duration(duration) * time.Second) + return runner, nil } func (a AWSRunnerManager) Return(r Runner) error { - err := a.NextHandler().Return(r) - if err != nil { - return fmt.Errorf("aws wraped: %w", err) + _, isAWSRunner := r.(*AWSFunctionWorkload) + if isAWSRunner { + a.usedRunners.Delete(r.ID()) + } else if err := a.NextHandler().Return(r); err != nil { + return fmt.Errorf("aws wrapped: %w", err) } return nil } - -func (a AWSRunnerManager) Load() {} diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index e66a03c..833dd47 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -2,34 +2,49 @@ package runner import ( "context" + "encoding/json" + "errors" "fmt" "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/execution" "io" ) +var ErrWrongMessageType = errors.New("received message that is not a text messages") + +type awsFunctionRequest struct { + Action string `json:"action"` + Cmd []string `json:"cmd"` + Files map[dto.FilePath][]byte `json:"files"` +} + // AWSFunctionWorkload is an abstraction to build a request to an AWS Lambda Function. type AWSFunctionWorkload struct { InactivityTimer - id string - fs map[dto.FilePath][]byte - executions execution.Storer - onDestroy destroyRunnerHandler + id string + fs map[dto.FilePath][]byte + executions execution.Storer + onDestroy destroyRunnerHandler + environment ExecutionEnvironment } // NewAWSFunctionWorkload creates a new AWSFunctionWorkload with the provided id. -func NewAWSFunctionWorkload(onDestroy destroyRunnerHandler) (*AWSFunctionWorkload, error) { +func NewAWSFunctionWorkload( + environment ExecutionEnvironment, onDestroy destroyRunnerHandler) (*AWSFunctionWorkload, error) { newUUID, err := uuid.NewUUID() if err != nil { return nil, fmt.Errorf("failed generating runner id: %w", err) } workload := &AWSFunctionWorkload{ - id: newUUID.String(), - executions: execution.NewLocalStorage(), - onDestroy: onDestroy, - fs: make(map[dto.FilePath][]byte), + id: newUUID.String(), + fs: make(map[dto.FilePath][]byte), + executions: execution.NewLocalStorage(), + onDestroy: onDestroy, + environment: environment, } workload.InactivityTimer = NewInactivityTimer(workload, onDestroy) return workload, nil @@ -40,26 +55,114 @@ func (w *AWSFunctionWorkload) ID() string { } func (w *AWSFunctionWorkload) MappedPorts() []*dto.MappedPort { - panic("implement me") + return []*dto.MappedPort{} } -func (w *AWSFunctionWorkload) StoreExecution(_ string, _ *dto.ExecutionRequest) { - panic("implement me") +func (w *AWSFunctionWorkload) StoreExecution(id string, request *dto.ExecutionRequest) { + w.executions.Add(execution.ID(id), request) } -func (w *AWSFunctionWorkload) ExecutionExists(_ string) bool { - panic("implement me") +func (w *AWSFunctionWorkload) ExecutionExists(id string) bool { + return w.executions.Exists(execution.ID(id)) } -func (w *AWSFunctionWorkload) ExecuteInteractively(_ string, _ io.ReadWriter, _, _ io.Writer) ( - exit <-chan ExitInfo, cancel context.CancelFunc, err error) { - panic("implement me") +func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, stdout, stderr io.Writer) ( + <-chan ExitInfo, context.CancelFunc, error) { + w.ResetTimeout() + request, ok := w.executions.Pop(execution.ID(id)) + if !ok { + return nil, nil, ErrorUnknownExecution + } + + command, ctx, cancel := prepareExecution(request) + exit := make(chan ExitInfo, 1) + go w.executeCommand(ctx, command, stdout, stderr, exit) + return exit, cancel, nil } -func (w *AWSFunctionWorkload) UpdateFileSystem(_ *dto.UpdateFileSystemRequest) error { - panic("implement me") +// UpdateFileSystem copies Files into the executor. +// ToDo: Currently, file deletion is not supported (but it could be). +func (w *AWSFunctionWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequest) error { + for _, file := range request.Copy { + w.fs[file.Path] = file.Content + } + return nil } func (w *AWSFunctionWorkload) Destroy() error { - panic("implement me") + if err := w.onDestroy(w); err != nil { + return fmt.Errorf("error while destroying aws runner: %w", err) + } + return nil +} + +func (w *AWSFunctionWorkload) executeCommand(ctx context.Context, command []string, + stdout, stderr io.Writer, exit chan<- ExitInfo, +) { + data := &awsFunctionRequest{ + Action: w.environment.Image(), + Cmd: command, + Files: w.fs, + } + rawData, err := json.Marshal(data) + if err != nil { + exit <- ExitInfo{uint8(1), fmt.Errorf("cannot stingify aws function request: %w", err)} + return + } + + wsConn, response, err := websocket.DefaultDialer.Dial(config.Config.AWS.Endpoint, nil) + if err != nil { + exit <- ExitInfo{uint8(1), fmt.Errorf("failed to establish aws connection: %w", err)} + return + } + _ = response.Body.Close() + defer wsConn.Close() + err = wsConn.WriteMessage(websocket.TextMessage, rawData) + if err != nil { + exit <- ExitInfo{uint8(1), fmt.Errorf("cannot send aws request: %w", err)} + return + } + + exitCode, err := w.receiveOutput(wsConn, stdout, stderr, ctx) + if w.TimeoutPassed() { + err = ErrorRunnerInactivityTimeout + } + exit <- ExitInfo{exitCode, err} + close(exit) +} + +func (w *AWSFunctionWorkload) receiveOutput( + conn *websocket.Conn, stdout, stderr io.Writer, ctx context.Context) (uint8, error) { + for ctx.Err() == nil { + messageType, reader, err := conn.NextReader() + if err != nil { + return 1, fmt.Errorf("cannot read from aws connection: %w", err) + } + if messageType != websocket.TextMessage { + return 1, ErrWrongMessageType + } + var wsMessage dto.WebSocketMessage + err = json.NewDecoder(reader).Decode(&wsMessage) + if err != nil { + return 1, fmt.Errorf("failed to decode message from aws: %w", err) + } + + log.WithField("msg", wsMessage).Info("New Message from AWS function") + + switch wsMessage.Type { + default: + log.WithField("data", wsMessage).Warn("unexpected message from aws function") + case dto.WebSocketExit: + return wsMessage.ExitCode, nil + case dto.WebSocketOutputStdout: + // We do not check the written bytes as the rawToCodeOceanWriter receives everything or nothing. + _, err = stdout.Write([]byte(wsMessage.Data)) + case dto.WebSocketOutputStderr: + _, err = stderr.Write([]byte(wsMessage.Data)) + } + if err != nil { + return 1, fmt.Errorf("failed to forward message: %w", err) + } + } + return 1, fmt.Errorf("receiveOutput stpped by context: %w", ctx.Err()) } diff --git a/internal/runner/manager.go b/internal/runner/manager.go index a94c25f..80afbb7 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -81,6 +81,7 @@ type AccessorHandler interface { Accessor SetNextHandler(m AccessorHandler) NextHandler() AccessorHandler + HasNextHandler() bool } // Accessor manages the lifecycle of Runner. diff --git a/internal/runner/manager_mock.go b/internal/runner/manager_mock.go index a9e2602..e31e61a 100644 --- a/internal/runner/manager_mock.go +++ b/internal/runner/manager_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package runner @@ -102,6 +102,20 @@ func (_m *ManagerMock) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironmen return r0, r1 } +// HasNextHandler provides a mock function with given fields: +func (_m *ManagerMock) HasNextHandler() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ListEnvironments provides a mock function with given fields: func (_m *ManagerMock) ListEnvironments() []ExecutionEnvironment { ret := _m.Called() diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 5b5b30b..5deae89 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -22,41 +22,18 @@ var ( type NomadRunnerManager struct { *AbstractManager - apiClient nomad.ExecutorAPI - environments EnvironmentStorage - usedRunners Storage + apiClient nomad.ExecutorAPI } // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. // It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad. // If you cancel the context the background synchronization will be stopped. func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { - m := &NomadRunnerManager{ - &AbstractManager{}, - apiClient, - NewLocalEnvironmentStorage(), - NewLocalRunnerStorage(), - } + m := &NomadRunnerManager{NewAbstractManager(), apiClient} go m.keepRunnersSynced(ctx) return m } -func (m *NomadRunnerManager) ListEnvironments() []ExecutionEnvironment { - return m.environments.List() -} - -func (m *NomadRunnerManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) { - return m.environments.Get(id) -} - -func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) { - m.environments.Add(environment) -} - -func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) { - m.environments.Delete(id) -} - func (m *NomadRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData { environments := make(map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData) for _, e := range m.environments.List() { @@ -105,14 +82,6 @@ func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int } } -func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) { - runner, ok := m.usedRunners.Get(runnerID) - if !ok { - return nil, ErrRunnerNotFound - } - return runner, nil -} - func (m *NomadRunnerManager) Return(r Runner) error { r.StopTimeout() err := m.apiClient.DeleteJob(r.ID()) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 22c178a..320fc28 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -41,12 +41,12 @@ type NomadJob struct { id string portMappings []nomadApi.PortMapping api nomad.ExecutorAPI - onDestroy func(r Runner) error + onDestroy destroyRunnerHandler } // NewNomadJob creates a new NomadJob with the provided id. func NewNomadJob(id string, portMappings []nomadApi.PortMapping, - apiClient nomad.ExecutorAPI, onDestroy func(r Runner) error, + apiClient nomad.ExecutorAPI, onDestroy destroyRunnerHandler, ) *NomadJob { job := &NomadJob{ id: id, @@ -280,14 +280,3 @@ func (r *NomadJob) MarshalJSON() ([]byte, error) { } return res, nil } - -// NewContext creates a context containing a runner. -func NewContext(ctx context.Context, runner Runner) context.Context { - return context.WithValue(ctx, runnerContextKey, runner) -} - -// FromContext returns a runner from a context. -func FromContext(ctx context.Context) (Runner, bool) { - runner, ok := ctx.Value(runnerContextKey).(Runner) - return runner, ok -} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index c5675bb..9973a36 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -45,3 +45,14 @@ type Runner interface { // Destroy destroys the Runner in Nomad. Destroy() error } + +// NewContext creates a context containing a runner. +func NewContext(ctx context.Context, runner Runner) context.Context { + return context.WithValue(ctx, runnerContextKey, runner) +} + +// FromContext returns a runner from a context. +func FromContext(ctx context.Context) (Runner, bool) { + runner, ok := ctx.Value(runnerContextKey).(Runner) + return runner, ok +}