Parametrize e2e tests to also check AWS environments.

- Fix destroy runner after timeout.
- Add file deletion
This commit is contained in:
Maximilian Paß
2022-02-03 14:32:05 +01:00
parent 13eaa61f3b
commit 4ffbb712ed
13 changed files with 505 additions and 342 deletions

View File

@ -8,12 +8,13 @@ import (
)
type AWSEnvironment struct {
id dto.EnvironmentID
awsEndpoint string
id dto.EnvironmentID
awsEndpoint string
onDestroyRunner runner.DestroyRunnerHandler
}
func NewAWSEnvironment() *AWSEnvironment {
return &AWSEnvironment{}
func NewAWSEnvironment(onDestroyRunner runner.DestroyRunnerHandler) *AWSEnvironment {
return &AWSEnvironment{onDestroyRunner: onDestroyRunner}
}
func (a *AWSEnvironment) MarshalJSON() ([]byte, error) {
@ -86,11 +87,11 @@ func (a *AWSEnvironment) Register() error {
}
func (a *AWSEnvironment) Delete() error {
panic("implement me")
return nil
}
func (a *AWSEnvironment) Sample() (r runner.Runner, ok bool) {
workload, err := runner.NewAWSFunctionWorkload(a, nil)
workload, err := runner.NewAWSFunctionWorkload(a, a.onDestroyRunner)
if err != nil {
return nil, false
}

View File

@ -53,7 +53,7 @@ func (a *AWSEnvironmentManager) CreateOrUpdate(
}
_, ok := a.runnerManager.GetEnvironment(id)
e := NewAWSEnvironment()
e := NewAWSEnvironment(a.runnerManager.Return)
e.SetID(id)
e.SetImage(request.Image)
a.runnerManager.StoreEnvironment(e)

View File

@ -66,7 +66,7 @@ func TestAWSEnvironmentManager_Get(t *testing.T) {
})
t.Run("Returns environment when it was added before", func(t *testing.T) {
expectedEnvironment := NewAWSEnvironment()
expectedEnvironment := NewAWSEnvironment(nil)
expectedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
runnerManager.StoreEnvironment(expectedEnvironment)
@ -82,7 +82,7 @@ func TestAWSEnvironmentManager_List(t *testing.T) {
t.Run("returs also environments of the rest of the manager chain", func(t *testing.T) {
nextHandler := &ManagerHandlerMock{}
existingEnvironment := NewAWSEnvironment()
existingEnvironment := NewAWSEnvironment(nil)
nextHandler.On("List", mock.AnythingOfType("bool")).
Return([]runner.ExecutionEnvironment{existingEnvironment}, nil)
m.SetNextHandler(nextHandler)
@ -95,7 +95,7 @@ func TestAWSEnvironmentManager_List(t *testing.T) {
m.SetNextHandler(nil)
t.Run("Returns added environment", func(t *testing.T) {
localEnvironment := NewAWSEnvironment()
localEnvironment := NewAWSEnvironment(nil)
localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
runnerManager.StoreEnvironment(localEnvironment)

View File

@ -24,29 +24,33 @@ type awsFunctionRequest struct {
// AWSFunctionWorkload is an abstraction to build a request to an AWS Lambda Function.
type AWSFunctionWorkload struct {
InactivityTimer
id string
fs map[dto.FilePath][]byte
executions execution.Storer
onDestroy destroyRunnerHandler
environment ExecutionEnvironment
id string
fs map[dto.FilePath][]byte
executions execution.Storer
runningExecutions map[execution.ID]context.CancelFunc
onDestroy DestroyRunnerHandler
environment ExecutionEnvironment
}
// NewAWSFunctionWorkload creates a new AWSFunctionWorkload with the provided id.
func NewAWSFunctionWorkload(
environment ExecutionEnvironment, onDestroy destroyRunnerHandler) (*AWSFunctionWorkload, error) {
environment ExecutionEnvironment, onDestroy DestroyRunnerHandler) (*AWSFunctionWorkload, error) {
newUUID, err := uuid.NewUUID()
if err != nil {
return nil, fmt.Errorf("failed generating runner id: %w", err)
}
workload := &AWSFunctionWorkload{
id: newUUID.String(),
fs: make(map[dto.FilePath][]byte),
executions: execution.NewLocalStorage(),
onDestroy: onDestroy,
environment: environment,
id: newUUID.String(),
fs: make(map[dto.FilePath][]byte),
executions: execution.NewLocalStorage(),
runningExecutions: make(map[execution.ID]context.CancelFunc),
onDestroy: onDestroy,
environment: environment,
}
workload.InactivityTimer = NewInactivityTimer(workload, onDestroy)
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
return workload.Destroy()
})
return workload, nil
}
@ -73,16 +77,21 @@ func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, s
if !ok {
return nil, nil, ErrorUnknownExecution
}
command, ctx, cancel := prepareExecution(request)
exitInternal := make(chan ExitInfo)
exit := make(chan ExitInfo, 1)
go w.executeCommand(ctx, command, stdout, stderr, exit)
go w.executeCommand(ctx, command, stdout, stderr, exitInternal)
go w.handleRunnerTimeout(ctx, exitInternal, exit, execution.ID(id))
return exit, cancel, nil
}
// UpdateFileSystem copies Files into the executor.
// ToDo: Currently, file deletion is not supported (but it could be).
func (w *AWSFunctionWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequest) error {
for _, path := range request.Delete {
delete(w.fs, path)
}
for _, file := range request.Copy {
w.fs[file.Path] = file.Content
}
@ -90,6 +99,9 @@ func (w *AWSFunctionWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequ
}
func (w *AWSFunctionWorkload) Destroy() error {
for _, cancel := range w.runningExecutions {
cancel()
}
if err := w.onDestroy(w); err != nil {
return fmt.Errorf("error while destroying aws runner: %w", err)
}
@ -99,6 +111,7 @@ func (w *AWSFunctionWorkload) Destroy() error {
func (w *AWSFunctionWorkload) executeCommand(ctx context.Context, command []string,
stdout, stderr io.Writer, exit chan<- ExitInfo,
) {
defer close(exit)
data := &awsFunctionRequest{
Action: w.environment.Image(),
Cmd: command,
@ -128,7 +141,6 @@ func (w *AWSFunctionWorkload) executeCommand(ctx context.Context, command []stri
err = ErrorRunnerInactivityTimeout
}
exit <- ExitInfo{exitCode, err}
close(exit)
}
func (w *AWSFunctionWorkload) receiveOutput(
@ -157,7 +169,7 @@ func (w *AWSFunctionWorkload) receiveOutput(
case dto.WebSocketOutputStdout:
// We do not check the written bytes as the rawToCodeOceanWriter receives everything or nothing.
_, err = stdout.Write([]byte(wsMessage.Data))
case dto.WebSocketOutputStderr:
case dto.WebSocketOutputStderr, dto.WebSocketOutputError:
_, err = stderr.Write([]byte(wsMessage.Data))
}
if err != nil {
@ -166,3 +178,20 @@ func (w *AWSFunctionWorkload) receiveOutput(
}
return 1, fmt.Errorf("receiveOutput stpped by context: %w", ctx.Err())
}
// handleRunnerTimeout listens for a runner timeout and aborts the execution in that case.
// It listens via a context in runningExecutions that is canceled on the timeout event.
func (w *AWSFunctionWorkload) handleRunnerTimeout(ctx context.Context,
exitInternal <-chan ExitInfo, exit chan<- ExitInfo, id execution.ID) {
executionCtx, cancelExecution := context.WithCancel(ctx)
w.runningExecutions[id] = cancelExecution
defer delete(w.runningExecutions, id)
defer close(exit)
select {
case exitInfo := <-exitInternal:
exit <- exitInfo
case <-executionCtx.Done():
exit <- ExitInfo{255, ErrorRunnerInactivityTimeout}
}
}

View File

@ -37,11 +37,11 @@ type InactivityTimerImplementation struct {
duration time.Duration
state TimerState
runner Runner
onDestroy destroyRunnerHandler
onDestroy DestroyRunnerHandler
mu sync.Mutex
}
func NewInactivityTimer(runner Runner, onDestroy destroyRunnerHandler) InactivityTimer {
func NewInactivityTimer(runner Runner, onDestroy DestroyRunnerHandler) InactivityTimer {
return &InactivityTimerImplementation{
state: TimerInactive,
runner: runner,

View File

@ -41,12 +41,12 @@ type NomadJob struct {
id string
portMappings []nomadApi.PortMapping
api nomad.ExecutorAPI
onDestroy destroyRunnerHandler
onDestroy DestroyRunnerHandler
}
// NewNomadJob creates a new NomadJob with the provided id.
func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
apiClient nomad.ExecutorAPI, onDestroy destroyRunnerHandler,
apiClient nomad.ExecutorAPI, onDestroy DestroyRunnerHandler,
) *NomadJob {
job := &NomadJob{
id: id,

View File

@ -391,7 +391,7 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader
// NewRunner creates a new runner with the provided id and manager.
func NewRunner(id string, manager Accessor) Runner {
var handler destroyRunnerHandler
var handler DestroyRunnerHandler
if manager != nil {
handler = manager.Return
} else {

View File

@ -11,7 +11,7 @@ type ExitInfo struct {
Err error
}
type destroyRunnerHandler = func(r Runner) error
type DestroyRunnerHandler = func(r Runner) error
type Runner interface {
InactivityTimer