#89 Generalise the three Storage interfaces and structs into one generic storage manager.

This commit is contained in:
Maximilian Paß
2022-06-21 19:06:55 +02:00
parent 542be96c66
commit 34040162c2
22 changed files with 292 additions and 554 deletions

View File

@ -60,7 +60,7 @@ func (a *AWSEnvironment) Sample() (r runner.Runner, ok bool) {
// The following methods are not supported at this moment.
// IdleRunnerCount is not supported as we have no information about the AWS managed prewarming pool.
func (a *AWSEnvironment) IdleRunnerCount() int {
func (a *AWSEnvironment) IdleRunnerCount() uint {
return 0
}

View File

@ -11,6 +11,7 @@ import (
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/storage"
"strconv"
"sync"
)
@ -25,7 +26,7 @@ type NomadEnvironment struct {
apiClient nomad.ExecutorAPI
jobHCL string
job *nomadApi.Job
idleRunners runner.Storage
idleRunners storage.Storage[runner.Runner]
}
func NewNomadEnvironment(apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvironment, error) {
@ -34,7 +35,7 @@ func NewNomadEnvironment(apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvi
return nil, fmt.Errorf("error parsing Nomad job: %w", err)
}
return &NomadEnvironment{apiClient, jobHCL, job, runner.NewLocalRunnerStorage()}, nil
return &NomadEnvironment{apiClient, jobHCL, job, storage.NewLocalStorage[runner.Runner]()}, nil
}
func NewNomadEnvironmentFromRequest(
@ -223,7 +224,7 @@ func (n *NomadEnvironment) Delete() error {
}
func (n *NomadEnvironment) ApplyPrewarmingPoolSize() error {
required := int(n.PrewarmingPoolSize()) - n.idleRunners.Length()
required := int(n.PrewarmingPoolSize()) - int(n.idleRunners.Length())
if required < 0 {
return fmt.Errorf("%w. Runners to remove: %d", ErrScaleDown, -required)
@ -245,14 +246,14 @@ func (n *NomadEnvironment) Sample() (runner.Runner, bool) {
}
func (n *NomadEnvironment) AddRunner(r runner.Runner) {
n.idleRunners.Add(r)
n.idleRunners.Add(r.ID(), r)
}
func (n *NomadEnvironment) DeleteRunner(id string) {
n.idleRunners.Delete(id)
}
func (n *NomadEnvironment) IdleRunnerCount() int {
func (n *NomadEnvironment) IdleRunnerCount() uint {
return n.idleRunners.Length()
}

View File

@ -5,6 +5,7 @@ import (
nomadApi "github.com/hashicorp/nomad/api"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/tests"
"github.com/openHPI/poseidon/tests/helpers"
"github.com/stretchr/testify/assert"
@ -113,7 +114,7 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) {
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, storage.NewLocalStorage[runner.Runner]()}
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register()
@ -130,7 +131,7 @@ func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, storage.NewLocalStorage[runner.Runner]()}
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register()
@ -146,7 +147,7 @@ func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, storage.NewLocalStorage[runner.Runner]()}
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register()
@ -172,7 +173,7 @@ func TestTwoSampleAddExactlyTwoRunners(t *testing.T) {
apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil)
_, job := helpers.CreateTemplateJob()
environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job, storage.NewLocalStorage[runner.Runner]()}
runner1 := &runner.RunnerMock{}
runner1.On("ID").Return(tests.DefaultRunnerID)
runner2 := &runner.RunnerMock{}
@ -205,7 +206,7 @@ func TestSampleDoesNotSetForcePullFlag(t *testing.T) {
})
_, job := helpers.CreateTemplateJob()
environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job, storage.NewLocalStorage[runner.Runner]()}
runner1 := &runner.RunnerMock{}
runner1.On("ID").Return(tests.DefaultRunnerID)
environment.AddRunner(runner1)

View File

@ -8,6 +8,7 @@ import (
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/storage"
"os"
)
@ -141,7 +142,7 @@ func (m *NomadEnvironmentManager) Load() error {
apiClient: m.api,
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
idleRunners: storage.NewLocalStorage[runner.Runner](),
}
m.runnerManager.StoreEnvironment(environment)
jobLogger.Info("Successfully recovered environment")
@ -180,7 +181,7 @@ func fetchEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI) (runner
apiClient: apiClient,
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
idleRunners: storage.NewLocalStorage[runner.Runner](),
}
}
}

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/storage"
)
var ErrNullObject = errors.New("functionality not available for the null object")
@ -13,15 +14,15 @@ var ErrNullObject = errors.New("functionality not available for the null object"
// Remember all functions that can call the NextHandler should call it (See AccessorHandler).
type AbstractManager struct {
nextHandler AccessorHandler
environments EnvironmentStorage
usedRunners Storage
environments storage.Storage[ExecutionEnvironment]
usedRunners storage.Storage[Runner]
}
// NewAbstractManager creates a new abstract runner manager that keeps track of all runners of one kind.
func NewAbstractManager() *AbstractManager {
return &AbstractManager{
environments: NewLocalEnvironmentStorage(),
usedRunners: NewLocalRunnerStorage(),
environments: storage.NewLocalStorage[ExecutionEnvironment](),
usedRunners: storage.NewLocalStorage[Runner](),
}
}
@ -46,15 +47,15 @@ func (n *AbstractManager) ListEnvironments() []ExecutionEnvironment {
}
func (n *AbstractManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) {
return n.environments.Get(id)
return n.environments.Get(id.ToString())
}
func (n *AbstractManager) StoreEnvironment(environment ExecutionEnvironment) {
n.environments.Add(environment)
n.environments.Add(environment.ID().ToString(), environment)
}
func (n *AbstractManager) DeleteEnvironment(id dto.EnvironmentID) {
n.environments.Delete(id)
n.environments.Delete(id.ToString())
}
func (n *AbstractManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
@ -63,7 +64,7 @@ func (n *AbstractManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.Sta
environments[e.ID()] = &dto.StatisticalExecutionEnvironmentData{
ID: int(e.ID()),
PrewarmingPoolSize: e.PrewarmingPoolSize(),
IdleRunners: uint(e.IdleRunnerCount()),
IdleRunners: e.IdleRunnerCount(),
UsedRunners: 0, // Increased below.
}
}

View File

@ -16,7 +16,7 @@ func NewAWSRunnerManager() *AWSRunnerManager {
}
func (a AWSRunnerManager) Claim(id dto.EnvironmentID, duration int) (Runner, error) {
environment, ok := a.environments.Get(id)
environment, ok := a.environments.Get(id.ToString())
if !ok {
r, err := a.NextHandler().Claim(id, duration)
if err != nil {
@ -31,7 +31,7 @@ func (a AWSRunnerManager) Claim(id dto.EnvironmentID, duration int) (Runner, err
return nil, ErrNoRunnersAvailable
}
a.usedRunners.Add(runner)
a.usedRunners.Add(runner.ID(), runner)
runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}

View File

@ -66,7 +66,7 @@ func TestAWSRunnerManager_Return(t *testing.T) {
assert.NoError(t, err)
t.Run("removes usedRunner", func(t *testing.T) {
m.usedRunners.Add(r)
m.usedRunners.Add(r.ID(), r)
assert.Contains(t, m.usedRunners.List(), r)
err := m.Return(r)

View File

@ -10,6 +10,7 @@ import (
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/execution"
"github.com/openHPI/poseidon/pkg/storage"
"io"
)
@ -29,7 +30,7 @@ type AWSFunctionWorkload struct {
InactivityTimer
id string
fs map[dto.FilePath][]byte
executions execution.Storer
executions storage.Storage[*dto.ExecutionRequest]
runningExecutions map[execution.ID]context.CancelFunc
onDestroy DestroyRunnerHandler
environment ExecutionEnvironment
@ -46,7 +47,7 @@ func NewAWSFunctionWorkload(
workload := &AWSFunctionWorkload{
id: newUUID.String(),
fs: make(map[dto.FilePath][]byte),
executions: execution.NewLocalStorage(),
executions: storage.NewLocalStorage[*dto.ExecutionRequest](),
runningExecutions: make(map[execution.ID]context.CancelFunc),
onDestroy: onDestroy,
environment: environment,
@ -70,17 +71,18 @@ func (w *AWSFunctionWorkload) MappedPorts() []*dto.MappedPort {
}
func (w *AWSFunctionWorkload) StoreExecution(id string, request *dto.ExecutionRequest) {
w.executions.Add(execution.ID(id), request)
w.executions.Add(id, request)
}
func (w *AWSFunctionWorkload) ExecutionExists(id string) bool {
return w.executions.Exists(execution.ID(id))
_, ok := w.executions.Get(id)
return ok
}
func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, stdout, stderr io.Writer) (
<-chan ExitInfo, context.CancelFunc, error) {
w.ResetTimeout()
request, ok := w.executions.Pop(execution.ID(id))
request, ok := w.executions.Pop(id)
if !ok {
return nil, nil, ErrorUnknownExecution
}

View File

@ -45,5 +45,5 @@ type ExecutionEnvironment interface {
// DeleteRunner removes an idle runner from the environment.
DeleteRunner(id string)
// IdleRunnerCount returns the number of idle runners of the environment.
IdleRunnerCount() int
IdleRunnerCount() uint
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.9.4. DO NOT EDIT.
// Code generated by mockery v2.13.1. DO NOT EDIT.
package runner
@ -79,14 +79,14 @@ func (_m *ExecutionEnvironmentMock) ID() dto.EnvironmentID {
}
// IdleRunnerCount provides a mock function with given fields:
func (_m *ExecutionEnvironmentMock) IdleRunnerCount() int {
func (_m *ExecutionEnvironmentMock) IdleRunnerCount() uint {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
var r0 uint
if rf, ok := ret.Get(0).(func() uint); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
r0 = ret.Get(0).(uint)
}
return r0
@ -251,3 +251,18 @@ func (_m *ExecutionEnvironmentMock) SetNetworkAccess(allow bool, ports []uint16)
func (_m *ExecutionEnvironmentMock) SetPrewarmingPoolSize(count uint) {
_m.Called(count)
}
type mockConstructorTestingTNewExecutionEnvironmentMock interface {
mock.TestingT
Cleanup(func())
}
// NewExecutionEnvironmentMock creates a new instance of ExecutionEnvironmentMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewExecutionEnvironmentMock(t mockConstructorTestingTNewExecutionEnvironmentMock) *ExecutionEnvironmentMock {
mock := &ExecutionEnvironmentMock{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1,76 +0,0 @@
package runner
import (
"github.com/openHPI/poseidon/pkg/dto"
"sync"
)
// EnvironmentStorage is an interface for storing environments.
type EnvironmentStorage interface {
// List returns all environments stored in this storage.
List() []ExecutionEnvironment
// Add adds an environment to the storage.
// It overwrites the old environment if one with the same id was already stored.
Add(environment ExecutionEnvironment)
// Get returns an environment from the storage.
// Iff the environment does not exist in the store, ok will be false.
Get(id dto.EnvironmentID) (environment ExecutionEnvironment, ok bool)
// Delete deletes the environment with the passed id from the storage. It does nothing if no environment with the id
// is present in the storage.
Delete(id dto.EnvironmentID)
// Length returns the number of currently stored environments in the storage.
Length() int
}
// localEnvironmentStorage stores ExecutionEnvironment objects in the local application memory.
type localEnvironmentStorage struct {
sync.RWMutex
environments map[dto.EnvironmentID]ExecutionEnvironment
}
// NewLocalEnvironmentStorage responds with an empty localEnvironmentStorage.
// This implementation stores the data thread-safe in the local application memory.
func NewLocalEnvironmentStorage() *localEnvironmentStorage {
return &localEnvironmentStorage{
environments: make(map[dto.EnvironmentID]ExecutionEnvironment),
}
}
func (s *localEnvironmentStorage) List() []ExecutionEnvironment {
s.RLock()
defer s.RUnlock()
values := make([]ExecutionEnvironment, 0, len(s.environments))
for _, v := range s.environments {
values = append(values, v)
}
return values
}
func (s *localEnvironmentStorage) Add(environment ExecutionEnvironment) {
s.Lock()
defer s.Unlock()
s.environments[environment.ID()] = environment
}
func (s *localEnvironmentStorage) Get(id dto.EnvironmentID) (environment ExecutionEnvironment, ok bool) {
s.RLock()
defer s.RUnlock()
environment, ok = s.environments[id]
return
}
func (s *localEnvironmentStorage) Delete(id dto.EnvironmentID) {
s.Lock()
defer s.Unlock()
delete(s.environments, id)
}
func (s *localEnvironmentStorage) Length() int {
s.RLock()
defer s.RUnlock()
return len(s.environments)
}

View File

@ -1,103 +0,0 @@
package runner
import (
"github.com/stretchr/testify/suite"
"testing"
)
func TestEnvironmentStoreTestSuite(t *testing.T) {
suite.Run(t, new(EnvironmentStoreTestSuite))
}
type EnvironmentStoreTestSuite struct {
suite.Suite
environmentStorage *localEnvironmentStorage
environment *ExecutionEnvironmentMock
}
func (s *EnvironmentStoreTestSuite) SetupTest() {
s.environmentStorage = NewLocalEnvironmentStorage()
environmentMock := &ExecutionEnvironmentMock{}
environmentMock.On("ID").Return(defaultEnvironmentID)
s.environment = environmentMock
}
func (s *EnvironmentStoreTestSuite) TestAddedEnvironmentCanBeRetrieved() {
s.environmentStorage.Add(s.environment)
retrievedEnvironment, ok := s.environmentStorage.Get(s.environment.ID())
s.True(ok, "A saved runner should be retrievable")
s.Equal(s.environment, retrievedEnvironment)
}
func (s *EnvironmentStoreTestSuite) TestEnvironmentWithSameIdOverwritesOldOne() {
otherEnvironmentWithSameID := &ExecutionEnvironmentMock{}
otherEnvironmentWithSameID.On("ID").Return(defaultEnvironmentID)
s.NotEqual(s.environment, otherEnvironmentWithSameID)
s.environmentStorage.Add(s.environment)
s.environmentStorage.Add(otherEnvironmentWithSameID)
retrievedEnvironment, _ := s.environmentStorage.Get(s.environment.ID())
s.NotEqual(s.environment, retrievedEnvironment)
s.Equal(otherEnvironmentWithSameID, retrievedEnvironment)
}
func (s *EnvironmentStoreTestSuite) TestDeletedEnvironmentIsNotAccessible() {
s.environmentStorage.Add(s.environment)
s.environmentStorage.Delete(s.environment.ID())
retrievedRunner, ok := s.environmentStorage.Get(s.environment.ID())
s.Nil(retrievedRunner)
s.False(ok, "A deleted runner should not be accessible")
}
func (s *EnvironmentStoreTestSuite) TestLenOfEmptyPoolIsZero() {
s.Equal(0, s.environmentStorage.Length())
}
func (s *EnvironmentStoreTestSuite) TestLenChangesOnStoreContentChange() {
s.Run("len increases when environment is added", func() {
s.environmentStorage.Add(s.environment)
s.Equal(1, s.environmentStorage.Length())
})
s.Run("len does not increase when environment with same id is added", func() {
s.environmentStorage.Add(s.environment)
s.Equal(1, s.environmentStorage.Length())
})
s.Run("len increases again when different environment is added", func() {
anotherEnvironment := &ExecutionEnvironmentMock{}
anotherEnvironment.On("ID").Return(anotherEnvironmentID)
s.environmentStorage.Add(anotherEnvironment)
s.Equal(2, s.environmentStorage.Length())
})
s.Run("len decreases when environment is deleted", func() {
s.environmentStorage.Delete(s.environment.ID())
s.Equal(1, s.environmentStorage.Length())
})
}
func (s *EnvironmentStoreTestSuite) TestListEnvironments() {
s.Run("list returns empty array", func() {
environments := s.environmentStorage.List()
s.Empty(environments)
})
s.Run("list returns one environment", func() {
s.environmentStorage.Add(s.environment)
environments := s.environmentStorage.List()
s.Equal(1, len(environments))
s.Equal(defaultEnvironmentID, environments[0].ID())
})
s.Run("list returns multiple environments", func() {
anotherEnvironment := &ExecutionEnvironmentMock{}
anotherEnvironment.On("ID").Return(anotherEnvironmentID)
s.environmentStorage.Add(s.environment)
s.environmentStorage.Add(anotherEnvironment)
environments := s.environmentStorage.List()
s.Equal(2, len(environments))
})
}

View File

@ -35,7 +35,7 @@ func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *No
}
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
environment, ok := m.environments.Get(environmentID)
environment, ok := m.environments.Get(environmentID.ToString())
if !ok {
return nil, ErrUnknownExecutionEnvironment
}
@ -44,7 +44,7 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int
return nil, ErrNoRunnersAvailable
}
m.usedRunners.Add(runner)
m.usedRunners.Add(runner.ID(), runner)
go m.markRunnerAsUsed(runner, duration)
runner.SetupTimeout(time.Duration(duration) * time.Second)
@ -103,7 +103,7 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger
}
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.Return)
if isUsed {
m.usedRunners.Add(newJob)
m.usedRunners.Add(newJob.ID(), newJob)
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
if err != nil {
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
@ -138,7 +138,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
return
}
environment, ok := m.environments.Get(environmentID)
environment, ok := m.environments.Get(environmentID.ToString())
if ok {
var mappedPorts []nomadApi.PortMapping
if alloc.AllocatedResources != nil {
@ -162,7 +162,7 @@ func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
}
m.usedRunners.Delete(alloc.JobID)
environment, ok := m.environments.Get(environmentID)
environment, ok := m.environments.Get(environmentID.ToString())
if ok {
environment.DeleteRunner(alloc.JobID)
}

View File

@ -5,6 +5,7 @@ import (
nomadApi "github.com/hashicorp/nomad/api"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/tests"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
@ -57,13 +58,13 @@ func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []strin
}
func mockIdleRunners(environmentMock *ExecutionEnvironmentMock) {
idleRunner := NewLocalRunnerStorage()
idleRunner := storage.NewLocalStorage[Runner]()
environmentMock.On("AddRunner", mock.Anything).Run(func(args mock.Arguments) {
r, ok := args.Get(0).(Runner)
if !ok {
return
}
idleRunner.Add(r)
idleRunner.Add(r.ID(), r)
})
sampleCall := environmentMock.On("Sample", mock.Anything)
sampleCall.Run(func(args mock.Arguments) {
@ -94,7 +95,7 @@ func (s *ManagerTestSuite) TestSetEnvironmentAddsNewEnvironment() {
anotherEnvironment.On("ID").Return(anotherEnvironmentID)
s.nomadRunnerManager.StoreEnvironment(anotherEnvironment)
job, ok := s.nomadRunnerManager.environments.Get(anotherEnvironmentID)
job, ok := s.nomadRunnerManager.environments.Get(anotherEnvironmentID.ToString())
s.True(ok)
s.NotNil(job)
}
@ -166,7 +167,7 @@ func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() {
}
func (s *ManagerTestSuite) TestGetReturnsRunnerIfRunnerIsUsed() {
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner)
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner)
savedRunner, err := s.nomadRunnerManager.Get(s.exerciseRunner.ID())
s.NoError(err)
s.Equal(savedRunner, s.exerciseRunner)
@ -180,7 +181,7 @@ func (s *ManagerTestSuite) TestGetReturnsErrorIfRunnerNotFound() {
func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() {
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner)
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner)
err := s.nomadRunnerManager.Return(s.exerciseRunner)
s.Nil(err)
_, ok := s.nomadRunnerManager.usedRunners.Get(s.exerciseRunner.ID())
@ -222,7 +223,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() {
func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() {
allocation := &nomadApi.Allocation{ID: tests.DefaultRunnerID}
environment, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID)
environment, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID.ToString())
s.Require().True(ok)
allocation.JobID = environment.ID().ToString()
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
@ -250,13 +251,13 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() {
func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() {
allocation := &nomadApi.Allocation{JobID: tests.DefaultRunnerID}
environment, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID)
environment, ok := s.nomadRunnerManager.environments.Get(defaultEnvironmentID.ToString())
s.Require().True(ok)
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
testRunner := NewRunner(allocation.JobID, s.nomadRunnerManager)
environment.AddRunner(testRunner)
s.nomadRunnerManager.usedRunners.Add(testRunner)
s.nomadRunnerManager.usedRunners.Add(testRunner.ID(), testRunner)
modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) {
call.Run(func(args mock.Arguments) {
@ -288,7 +289,7 @@ func modifyMockedCall(apiMock *nomad.ExecutorAPIMock, method string, modifier fu
func (s *ManagerTestSuite) TestOnAllocationAdded() {
s.Run("does not add environment template id job", func() {
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsInteger)
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsString)
s.True(ok)
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
@ -306,7 +307,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() {
})
s.Run("does not panic when environment does not exist", func() {
nonExistentEnvironment := dto.EnvironmentID(1234)
_, ok := s.nomadRunnerManager.environments.Get(nonExistentEnvironment)
_, ok := s.nomadRunnerManager.environments.Get(nonExistentEnvironment.ToString())
s.Require().False(ok)
alloc := &nomadApi.Allocation{JobID: nomad.RunnerJobID(nonExistentEnvironment, "1-1-1-1")}
@ -316,7 +317,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() {
})
s.Run("adds correct job", func() {
s.Run("without allocated resources", func() {
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsInteger)
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsString)
s.True(ok)
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
@ -334,7 +335,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() {
s.Empty(nomadJob.portMappings)
})
s.Run("with mapped ports", func() {
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsInteger)
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsString)
s.True(ok)
mockIdleRunners(environment.(*ExecutionEnvironmentMock))

View File

@ -11,7 +11,7 @@ import (
nomadApi "github.com/hashicorp/nomad/api"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/execution"
"github.com/openHPI/poseidon/pkg/storage"
"io"
"strings"
"time"
@ -38,7 +38,7 @@ var (
// NomadJob is an abstraction to communicate with Nomad environments.
type NomadJob struct {
InactivityTimer
executions execution.Storer
executions storage.Storage[*dto.ExecutionRequest]
id string
portMappings []nomadApi.PortMapping
api nomad.ExecutorAPI
@ -55,7 +55,7 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
id: id,
portMappings: portMappings,
api: apiClient,
executions: execution.NewLocalStorage(),
executions: storage.NewLocalStorage[*dto.ExecutionRequest](),
onDestroy: onDestroy,
}
job.InactivityTimer = NewInactivityTimer(job, onDestroy)
@ -86,11 +86,12 @@ func (r *NomadJob) MappedPorts() []*dto.MappedPort {
}
func (r *NomadJob) StoreExecution(id string, request *dto.ExecutionRequest) {
r.executions.Add(execution.ID(id), request)
r.executions.Add(id, request)
}
func (r *NomadJob) ExecutionExists(id string) bool {
return r.executions.Exists(execution.ID(id))
_, ok := r.executions.Get(id)
return ok
}
func (r *NomadJob) ExecuteInteractively(
@ -98,7 +99,7 @@ func (r *NomadJob) ExecuteInteractively(
stdin io.ReadWriter,
stdout, stderr io.Writer,
) (<-chan ExitInfo, context.CancelFunc, error) {
request, ok := r.executions.Pop(execution.ID(id))
request, ok := r.executions.Pop(id)
if !ok {
return nil, nil, ErrorUnknownExecution
}

View File

@ -8,8 +8,8 @@ import (
"fmt"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/execution"
"github.com/openHPI/poseidon/pkg/nullio"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -53,7 +53,7 @@ func TestExecutionRequestIsStored(t *testing.T) {
}
id := "test-execution"
runner.StoreExecution(id, executionRequest)
storedExecutionRunner, ok := runner.executions.Pop(execution.ID(id))
storedExecutionRunner, ok := runner.executions.Pop(id)
assert.True(t, ok, "Getting an execution should not return ok false")
assert.Equal(t, executionRequest, storedExecutionRunner)
@ -121,7 +121,7 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() {
s.manager.On("Return", mock.Anything).Return(nil)
s.runner = &NomadJob{
executions: execution.NewLocalStorage(),
executions: storage.NewLocalStorage[*dto.ExecutionRequest](),
InactivityTimer: s.timer,
id: tests.DefaultRunnerID,
api: s.apiMock,
@ -251,7 +251,7 @@ func (s *UpdateFileSystemTestSuite) SetupTest() {
s.timer.On("ResetTimeout").Return()
s.timer.On("TimeoutPassed").Return(false)
s.runner = &NomadJob{
executions: execution.NewLocalStorage(),
executions: storage.NewLocalStorage[*dto.ExecutionRequest](),
InactivityTimer: s.timer,
id: tests.DefaultRunnerID,
api: s.apiMock,

View File

@ -1,98 +0,0 @@
package runner
import (
"sync"
)
// Storage is an interface for storing runners.
type Storage interface {
// List returns all runners from the storage.
List() []Runner
// Add adds a runner to the storage.
// It overwrites the old runner if one with the same id was already stored.
Add(Runner)
// Get returns a runner from the storage.
// Iff the runner does not exist in the storage, ok will be false.
Get(id string) (r Runner, ok bool)
// Delete deletes the runner with the passed id from the storage.
// It does nothing if no runner with the id is present in the store.
Delete(id string)
// Purge removes all runners from the storage.
Purge()
// Length returns the number of currently stored runners in the storage.
Length() int
// Sample returns and removes an arbitrary runner from the storage.
// ok is true iff a runner was returned.
Sample() (r Runner, ok bool)
}
// localRunnerStorage stores runner objects in the local application memory.
// ToDo: Create implementation that use some persistent storage like a database.
type localRunnerStorage struct {
sync.RWMutex
runners map[string]Runner
}
// NewLocalRunnerStorage responds with a Storage implementation.
// This implementation stores the data thread-safe in the local application memory.
func NewLocalRunnerStorage() *localRunnerStorage {
return &localRunnerStorage{
runners: make(map[string]Runner),
}
}
func (s *localRunnerStorage) List() (r []Runner) {
s.RLock()
defer s.RUnlock()
for _, value := range s.runners {
r = append(r, value)
}
return r
}
func (s *localRunnerStorage) Add(r Runner) {
s.Lock()
defer s.Unlock()
s.runners[r.ID()] = r
}
func (s *localRunnerStorage) Get(id string) (r Runner, ok bool) {
s.RLock()
defer s.RUnlock()
r, ok = s.runners[id]
return
}
func (s *localRunnerStorage) Delete(id string) {
s.Lock()
defer s.Unlock()
delete(s.runners, id)
}
func (s *localRunnerStorage) Purge() {
s.Lock()
defer s.Unlock()
s.runners = make(map[string]Runner)
}
func (s *localRunnerStorage) Sample() (Runner, bool) {
s.Lock()
defer s.Unlock()
for _, runner := range s.runners {
delete(s.runners, runner.ID())
return runner, true
}
return nil, false
}
func (s *localRunnerStorage) Length() int {
s.RLock()
defer s.RUnlock()
return len(s.runners)
}

View File

@ -1,103 +0,0 @@
package runner
import (
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/tests"
"github.com/stretchr/testify/suite"
"testing"
)
func TestRunnerPoolTestSuite(t *testing.T) {
suite.Run(t, new(RunnerPoolTestSuite))
}
type RunnerPoolTestSuite struct {
suite.Suite
runnerStorage *localRunnerStorage
runner Runner
}
func (s *RunnerPoolTestSuite) SetupTest() {
s.runnerStorage = NewLocalRunnerStorage()
s.runner = NewRunner(tests.DefaultRunnerID, nil)
s.runner.StoreExecution(tests.DefaultExecutionID, &dto.ExecutionRequest{Command: "true"})
}
func (s *RunnerPoolTestSuite) TestAddedRunnerCanBeRetrieved() {
s.runnerStorage.Add(s.runner)
retrievedRunner, ok := s.runnerStorage.Get(s.runner.ID())
s.True(ok, "A saved runner should be retrievable")
s.Equal(s.runner, retrievedRunner)
}
func (s *RunnerPoolTestSuite) TestRunnerWithSameIdOverwritesOldOne() {
otherRunnerWithSameID := NewRunner(s.runner.ID(), nil)
// assure runner is actually different
s.NotEqual(s.runner, otherRunnerWithSameID)
s.runnerStorage.Add(s.runner)
s.runnerStorage.Add(otherRunnerWithSameID)
retrievedRunner, _ := s.runnerStorage.Get(s.runner.ID())
s.NotEqual(s.runner, retrievedRunner)
s.Equal(otherRunnerWithSameID, retrievedRunner)
}
func (s *RunnerPoolTestSuite) TestDeletedRunnersAreNotAccessible() {
s.runnerStorage.Add(s.runner)
s.runnerStorage.Delete(s.runner.ID())
retrievedRunner, ok := s.runnerStorage.Get(s.runner.ID())
s.Nil(retrievedRunner)
s.False(ok, "A deleted runner should not be accessible")
}
func (s *RunnerPoolTestSuite) TestSampleReturnsRunnerWhenOneIsAvailable() {
s.runnerStorage.Add(s.runner)
sampledRunner, ok := s.runnerStorage.Sample()
s.NotNil(sampledRunner)
s.True(ok)
}
func (s *RunnerPoolTestSuite) TestSampleReturnsFalseWhenNoneIsAvailable() {
sampledRunner, ok := s.runnerStorage.Sample()
s.Nil(sampledRunner)
s.False(ok)
}
func (s *RunnerPoolTestSuite) TestSampleRemovesRunnerFromPool() {
s.runnerStorage.Add(s.runner)
sampledRunner, _ := s.runnerStorage.Sample()
_, ok := s.runnerStorage.Get(sampledRunner.ID())
s.False(ok)
}
func (s *RunnerPoolTestSuite) TestLenOfEmptyPoolIsZero() {
s.Equal(0, s.runnerStorage.Length())
}
func (s *RunnerPoolTestSuite) TestLenChangesOnStoreContentChange() {
s.Run("len increases when runner is added", func() {
s.runnerStorage.Add(s.runner)
s.Equal(1, s.runnerStorage.Length())
})
s.Run("len does not increase when runner with same id is added", func() {
s.runnerStorage.Add(s.runner)
s.Equal(1, s.runnerStorage.Length())
})
s.Run("len increases again when different runner is added", func() {
anotherRunner := NewRunner(tests.AnotherRunnerID, nil)
s.runnerStorage.Add(anotherRunner)
s.Equal(2, s.runnerStorage.Length())
})
s.Run("len decreases when runner is deleted", func() {
s.runnerStorage.Delete(s.runner.ID())
s.Equal(1, s.runnerStorage.Length())
})
s.Run("len decreases when runner is sampled", func() {
_, _ = s.runnerStorage.Sample()
s.Equal(0, s.runnerStorage.Length())
})
}