Implement core functionality of AWS integration

This commit is contained in:
Maximilian Paß
2022-01-20 20:47:29 +01:00
parent dd41e0d5c4
commit 6123d20525
17 changed files with 360 additions and 157 deletions

View File

@@ -2,6 +2,7 @@ package runner
import (
"errors"
"fmt"
"github.com/openHPI/poseidon/pkg/dto"
)
@@ -9,8 +10,19 @@ var ErrNullObject = errors.New("functionality not available for the null object"
// AbstractManager is used to have a fallback runner manager in the chain of responsibility
// following the null object pattern.
// Remember all functions that can call the NextHandler should call it (See AccessorHandler).
type AbstractManager struct {
nextHandler AccessorHandler
nextHandler AccessorHandler
environments EnvironmentStorage
usedRunners Storage
}
// NewAbstractManager creates a new abstract runner manager that keeps track of all runners of one kind.
func NewAbstractManager() *AbstractManager {
return &AbstractManager{
environments: NewLocalEnvironmentStorage(),
usedRunners: NewLocalRunnerStorage(),
}
}
func (n *AbstractManager) SetNextHandler(next AccessorHandler) {
@@ -18,20 +30,32 @@ func (n *AbstractManager) SetNextHandler(next AccessorHandler) {
}
func (n *AbstractManager) NextHandler() AccessorHandler {
return n.nextHandler
if n.nextHandler != nil {
return n.nextHandler
} else {
return NewAbstractManager()
}
}
func (n *AbstractManager) HasNextHandler() bool {
return n.nextHandler != nil
}
func (n *AbstractManager) ListEnvironments() []ExecutionEnvironment {
return []ExecutionEnvironment{}
return n.environments.List()
}
func (n *AbstractManager) GetEnvironment(_ dto.EnvironmentID) (ExecutionEnvironment, bool) {
return nil, false
func (n *AbstractManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) {
return n.environments.Get(id)
}
func (n *AbstractManager) StoreEnvironment(_ ExecutionEnvironment) {}
func (n *AbstractManager) StoreEnvironment(environment ExecutionEnvironment) {
n.environments.Add(environment)
}
func (n *AbstractManager) DeleteEnvironment(_ dto.EnvironmentID) {}
func (n *AbstractManager) DeleteEnvironment(id dto.EnvironmentID) {
n.environments.Delete(id)
}
func (n *AbstractManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
@@ -41,8 +65,21 @@ func (n *AbstractManager) Claim(_ dto.EnvironmentID, _ int) (Runner, error) {
return nil, ErrNullObject
}
func (n *AbstractManager) Get(_ string) (Runner, error) {
return nil, ErrNullObject
func (n *AbstractManager) Get(runnerID string) (Runner, error) {
runner, ok := n.usedRunners.Get(runnerID)
if ok {
return runner, nil
}
if !n.HasNextHandler() {
return nil, ErrRunnerNotFound
}
r, err := n.NextHandler().Get(runnerID)
if err != nil {
return r, fmt.Errorf("abstract manager wrapped: %w", err)
}
return r, nil
}
func (n *AbstractManager) Return(_ Runner) error {

View File

@@ -3,55 +3,47 @@ package runner
import (
"fmt"
"github.com/openHPI/poseidon/pkg/dto"
"time"
)
type AWSRunnerManager struct {
*AbstractManager
}
const AwsJavaEnvironmentID = 2142
// NewAWSRunnerManager creates a new runner manager that keeps track of all runners at AWS.
func NewAWSRunnerManager() *AWSRunnerManager {
return &AWSRunnerManager{&AbstractManager{}}
}
func (a AWSRunnerManager) ListEnvironments() []ExecutionEnvironment {
return []ExecutionEnvironment{}
}
func (a AWSRunnerManager) GetEnvironment(_ dto.EnvironmentID) (ExecutionEnvironment, bool) {
return nil, false
}
func (a AWSRunnerManager) StoreEnvironment(_ ExecutionEnvironment) {}
func (a AWSRunnerManager) DeleteEnvironment(_ dto.EnvironmentID) {}
func (a AWSRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
return &AWSRunnerManager{NewAbstractManager()}
}
func (a AWSRunnerManager) Claim(id dto.EnvironmentID, duration int) (Runner, error) {
r, err := a.NextHandler().Claim(id, duration)
if err != nil {
return nil, fmt.Errorf("aws wraped: %w", err)
environment, ok := a.environments.Get(id)
if !ok {
r, err := a.NextHandler().Claim(id, duration)
if err != nil {
return nil, fmt.Errorf("aws wrapped: %w", err)
}
return r, nil
}
return r, nil
}
func (a AWSRunnerManager) Get(runnerID string) (Runner, error) {
r, err := a.NextHandler().Get(runnerID)
if err != nil {
return nil, fmt.Errorf("aws wraped: %w", err)
runner, ok := environment.Sample()
if !ok {
log.Warn("no aws runner available")
return nil, ErrNoRunnersAvailable
}
return r, nil
a.usedRunners.Add(runner)
runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}
func (a AWSRunnerManager) Return(r Runner) error {
err := a.NextHandler().Return(r)
if err != nil {
return fmt.Errorf("aws wraped: %w", err)
_, isAWSRunner := r.(*AWSFunctionWorkload)
if isAWSRunner {
a.usedRunners.Delete(r.ID())
} else if err := a.NextHandler().Return(r); err != nil {
return fmt.Errorf("aws wrapped: %w", err)
}
return nil
}
func (a AWSRunnerManager) Load() {}

View File

@@ -2,34 +2,49 @@ package runner
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/execution"
"io"
)
var ErrWrongMessageType = errors.New("received message that is not a text messages")
type awsFunctionRequest struct {
Action string `json:"action"`
Cmd []string `json:"cmd"`
Files map[dto.FilePath][]byte `json:"files"`
}
// AWSFunctionWorkload is an abstraction to build a request to an AWS Lambda Function.
type AWSFunctionWorkload struct {
InactivityTimer
id string
fs map[dto.FilePath][]byte
executions execution.Storer
onDestroy destroyRunnerHandler
id string
fs map[dto.FilePath][]byte
executions execution.Storer
onDestroy destroyRunnerHandler
environment ExecutionEnvironment
}
// NewAWSFunctionWorkload creates a new AWSFunctionWorkload with the provided id.
func NewAWSFunctionWorkload(onDestroy destroyRunnerHandler) (*AWSFunctionWorkload, error) {
func NewAWSFunctionWorkload(
environment ExecutionEnvironment, onDestroy destroyRunnerHandler) (*AWSFunctionWorkload, error) {
newUUID, err := uuid.NewUUID()
if err != nil {
return nil, fmt.Errorf("failed generating runner id: %w", err)
}
workload := &AWSFunctionWorkload{
id: newUUID.String(),
executions: execution.NewLocalStorage(),
onDestroy: onDestroy,
fs: make(map[dto.FilePath][]byte),
id: newUUID.String(),
fs: make(map[dto.FilePath][]byte),
executions: execution.NewLocalStorage(),
onDestroy: onDestroy,
environment: environment,
}
workload.InactivityTimer = NewInactivityTimer(workload, onDestroy)
return workload, nil
@@ -40,26 +55,114 @@ func (w *AWSFunctionWorkload) ID() string {
}
func (w *AWSFunctionWorkload) MappedPorts() []*dto.MappedPort {
panic("implement me")
return []*dto.MappedPort{}
}
func (w *AWSFunctionWorkload) StoreExecution(_ string, _ *dto.ExecutionRequest) {
panic("implement me")
func (w *AWSFunctionWorkload) StoreExecution(id string, request *dto.ExecutionRequest) {
w.executions.Add(execution.ID(id), request)
}
func (w *AWSFunctionWorkload) ExecutionExists(_ string) bool {
panic("implement me")
func (w *AWSFunctionWorkload) ExecutionExists(id string) bool {
return w.executions.Exists(execution.ID(id))
}
func (w *AWSFunctionWorkload) ExecuteInteractively(_ string, _ io.ReadWriter, _, _ io.Writer) (
exit <-chan ExitInfo, cancel context.CancelFunc, err error) {
panic("implement me")
func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, stdout, stderr io.Writer) (
<-chan ExitInfo, context.CancelFunc, error) {
w.ResetTimeout()
request, ok := w.executions.Pop(execution.ID(id))
if !ok {
return nil, nil, ErrorUnknownExecution
}
command, ctx, cancel := prepareExecution(request)
exit := make(chan ExitInfo, 1)
go w.executeCommand(ctx, command, stdout, stderr, exit)
return exit, cancel, nil
}
func (w *AWSFunctionWorkload) UpdateFileSystem(_ *dto.UpdateFileSystemRequest) error {
panic("implement me")
// UpdateFileSystem copies Files into the executor.
// ToDo: Currently, file deletion is not supported (but it could be).
func (w *AWSFunctionWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequest) error {
for _, file := range request.Copy {
w.fs[file.Path] = file.Content
}
return nil
}
func (w *AWSFunctionWorkload) Destroy() error {
panic("implement me")
if err := w.onDestroy(w); err != nil {
return fmt.Errorf("error while destroying aws runner: %w", err)
}
return nil
}
func (w *AWSFunctionWorkload) executeCommand(ctx context.Context, command []string,
stdout, stderr io.Writer, exit chan<- ExitInfo,
) {
data := &awsFunctionRequest{
Action: w.environment.Image(),
Cmd: command,
Files: w.fs,
}
rawData, err := json.Marshal(data)
if err != nil {
exit <- ExitInfo{uint8(1), fmt.Errorf("cannot stingify aws function request: %w", err)}
return
}
wsConn, response, err := websocket.DefaultDialer.Dial(config.Config.AWS.Endpoint, nil)
if err != nil {
exit <- ExitInfo{uint8(1), fmt.Errorf("failed to establish aws connection: %w", err)}
return
}
_ = response.Body.Close()
defer wsConn.Close()
err = wsConn.WriteMessage(websocket.TextMessage, rawData)
if err != nil {
exit <- ExitInfo{uint8(1), fmt.Errorf("cannot send aws request: %w", err)}
return
}
exitCode, err := w.receiveOutput(wsConn, stdout, stderr, ctx)
if w.TimeoutPassed() {
err = ErrorRunnerInactivityTimeout
}
exit <- ExitInfo{exitCode, err}
close(exit)
}
func (w *AWSFunctionWorkload) receiveOutput(
conn *websocket.Conn, stdout, stderr io.Writer, ctx context.Context) (uint8, error) {
for ctx.Err() == nil {
messageType, reader, err := conn.NextReader()
if err != nil {
return 1, fmt.Errorf("cannot read from aws connection: %w", err)
}
if messageType != websocket.TextMessage {
return 1, ErrWrongMessageType
}
var wsMessage dto.WebSocketMessage
err = json.NewDecoder(reader).Decode(&wsMessage)
if err != nil {
return 1, fmt.Errorf("failed to decode message from aws: %w", err)
}
log.WithField("msg", wsMessage).Info("New Message from AWS function")
switch wsMessage.Type {
default:
log.WithField("data", wsMessage).Warn("unexpected message from aws function")
case dto.WebSocketExit:
return wsMessage.ExitCode, nil
case dto.WebSocketOutputStdout:
// We do not check the written bytes as the rawToCodeOceanWriter receives everything or nothing.
_, err = stdout.Write([]byte(wsMessage.Data))
case dto.WebSocketOutputStderr:
_, err = stderr.Write([]byte(wsMessage.Data))
}
if err != nil {
return 1, fmt.Errorf("failed to forward message: %w", err)
}
}
return 1, fmt.Errorf("receiveOutput stpped by context: %w", ctx.Err())
}

View File

@@ -81,6 +81,7 @@ type AccessorHandler interface {
Accessor
SetNextHandler(m AccessorHandler)
NextHandler() AccessorHandler
HasNextHandler() bool
}
// Accessor manages the lifecycle of Runner.

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.9.4. DO NOT EDIT.
// Code generated by mockery v2.10.0. DO NOT EDIT.
package runner
@@ -102,6 +102,20 @@ func (_m *ManagerMock) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironmen
return r0, r1
}
// HasNextHandler provides a mock function with given fields:
func (_m *ManagerMock) HasNextHandler() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ListEnvironments provides a mock function with given fields:
func (_m *ManagerMock) ListEnvironments() []ExecutionEnvironment {
ret := _m.Called()

View File

@@ -22,41 +22,18 @@ var (
type NomadRunnerManager struct {
*AbstractManager
apiClient nomad.ExecutorAPI
environments EnvironmentStorage
usedRunners Storage
apiClient nomad.ExecutorAPI
}
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
// If you cancel the context the background synchronization will be stopped.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
m := &NomadRunnerManager{
&AbstractManager{},
apiClient,
NewLocalEnvironmentStorage(),
NewLocalRunnerStorage(),
}
m := &NomadRunnerManager{NewAbstractManager(), apiClient}
go m.keepRunnersSynced(ctx)
return m
}
func (m *NomadRunnerManager) ListEnvironments() []ExecutionEnvironment {
return m.environments.List()
}
func (m *NomadRunnerManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) {
return m.environments.Get(id)
}
func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) {
m.environments.Add(environment)
}
func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) {
m.environments.Delete(id)
}
func (m *NomadRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
environments := make(map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData)
for _, e := range m.environments.List() {
@@ -105,14 +82,6 @@ func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int
}
}
func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) {
runner, ok := m.usedRunners.Get(runnerID)
if !ok {
return nil, ErrRunnerNotFound
}
return runner, nil
}
func (m *NomadRunnerManager) Return(r Runner) error {
r.StopTimeout()
err := m.apiClient.DeleteJob(r.ID())

View File

@@ -41,12 +41,12 @@ type NomadJob struct {
id string
portMappings []nomadApi.PortMapping
api nomad.ExecutorAPI
onDestroy func(r Runner) error
onDestroy destroyRunnerHandler
}
// NewNomadJob creates a new NomadJob with the provided id.
func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
apiClient nomad.ExecutorAPI, onDestroy func(r Runner) error,
apiClient nomad.ExecutorAPI, onDestroy destroyRunnerHandler,
) *NomadJob {
job := &NomadJob{
id: id,
@@ -280,14 +280,3 @@ func (r *NomadJob) MarshalJSON() ([]byte, error) {
}
return res, nil
}
// NewContext creates a context containing a runner.
func NewContext(ctx context.Context, runner Runner) context.Context {
return context.WithValue(ctx, runnerContextKey, runner)
}
// FromContext returns a runner from a context.
func FromContext(ctx context.Context) (Runner, bool) {
runner, ok := ctx.Value(runnerContextKey).(Runner)
return runner, ok
}

View File

@@ -45,3 +45,14 @@ type Runner interface {
// Destroy destroys the Runner in Nomad.
Destroy() error
}
// NewContext creates a context containing a runner.
func NewContext(ctx context.Context, runner Runner) context.Context {
return context.WithValue(ctx, runnerContextKey, runner)
}
// FromContext returns a runner from a context.
func FromContext(ctx context.Context) (Runner, bool) {
runner, ok := ctx.Value(runnerContextKey).(Runner)
return runner, ok
}