Add EntityStore interface
This commit is contained in:

committed by
Jan-Eric Hellenberg

parent
dba7160a41
commit
abb1ce1bf8
@ -2,7 +2,6 @@ package environment
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment/pool"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
@ -28,7 +27,7 @@ type NomadExecutionEnvironment struct {
|
||||
id int
|
||||
jobId string
|
||||
availableRunners chan runner.Runner
|
||||
allRunners pool.RunnerPool
|
||||
allRunners RunnerPool
|
||||
nomadApiClient nomad.ExecutorApi
|
||||
}
|
||||
|
||||
@ -36,7 +35,7 @@ var executionEnvironment ExecutionEnvironment
|
||||
|
||||
// DebugInit initializes one execution environment so that its runners can be provided.
|
||||
// ToDo: This should be replaced by a create Execution Environment route
|
||||
func DebugInit(runnersPool pool.RunnerPool, nomadApi nomad.ExecutorApi) {
|
||||
func DebugInit(runnersPool RunnerPool, nomadApi nomad.ExecutorApi) {
|
||||
executionEnvironment = &NomadExecutionEnvironment{
|
||||
id: 0,
|
||||
jobId: "python",
|
||||
@ -75,7 +74,12 @@ func (environment *NomadExecutionEnvironment) Refresh() {
|
||||
for _, r := range environment.unusedRunners(runners) {
|
||||
// ToDo: Listen on Nomad event stream
|
||||
log.Printf("Adding allocation %+v", r)
|
||||
environment.allRunners.AddRunner(r)
|
||||
if err := environment.allRunners.Add(r); err != nil {
|
||||
log.
|
||||
WithError(err).
|
||||
WithField("runner", r).
|
||||
Fatal("Invalid storage implementation used for object of type")
|
||||
}
|
||||
environment.availableRunners <- r
|
||||
}
|
||||
jobScale, err := environment.nomadApiClient.GetJobScale(environment.jobId)
|
||||
@ -98,7 +102,7 @@ func (environment *NomadExecutionEnvironment) Refresh() {
|
||||
func (environment *NomadExecutionEnvironment) unusedRunners(fetchedRunnerIds []string) (newRunners []runner.Runner) {
|
||||
newRunners = make([]runner.Runner, 0)
|
||||
for _, runnerId := range fetchedRunnerIds {
|
||||
_, ok := environment.allRunners.GetRunner(runnerId)
|
||||
_, ok := environment.allRunners.Get(runnerId)
|
||||
if !ok {
|
||||
newRunners = append(newRunners, runner.NewExerciseRunner(runnerId))
|
||||
}
|
||||
|
@ -4,14 +4,12 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment/pool"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/mocks"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const runnerId = "s0m3-r4nd0m-1d"
|
||||
const anotherRunnerId = "4n0th3r-1d"
|
||||
const jobId = "4n0th3r-1d"
|
||||
|
||||
@ -28,9 +26,9 @@ type GetNextRunnerTestSuite struct {
|
||||
func (suite *GetNextRunnerTestSuite) SetupTest() {
|
||||
suite.nomadExecutionEnvironment = &NomadExecutionEnvironment{
|
||||
availableRunners: make(chan runner.Runner, 50),
|
||||
allRunners: pool.NewLocalRunnerPool(),
|
||||
allRunners: NewLocalRunnerPool(),
|
||||
}
|
||||
suite.exerciseRunner = runner.NewExerciseRunner(runnerId)
|
||||
suite.exerciseRunner = CreateTestRunner()
|
||||
}
|
||||
|
||||
func (suite *GetNextRunnerTestSuite) TestGetNextRunnerReturnsRunnerIfAvailable() {
|
||||
@ -64,7 +62,7 @@ func (suite *GetNextRunnerTestSuite) TestGetNextRunnerThrowsAnErrorIfNoRunnersAv
|
||||
}
|
||||
|
||||
func TestRefreshFetchRunners(t *testing.T) {
|
||||
apiMock, environment := newRefreshMock([]string{runnerId}, pool.NewLocalRunnerPool())
|
||||
apiMock, environment := newRefreshMock([]string{RunnerId}, NewLocalRunnerPool())
|
||||
// ToDo: Terminate Refresh when test finished (also in other tests)
|
||||
go environment.Refresh()
|
||||
_, _ = environment.NextRunner()
|
||||
@ -72,14 +70,14 @@ func TestRefreshFetchRunners(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRefreshFetchesRunnersIntoChannel(t *testing.T) {
|
||||
_, environment := newRefreshMock([]string{runnerId}, pool.NewLocalRunnerPool())
|
||||
_, environment := newRefreshMock([]string{RunnerId}, NewLocalRunnerPool())
|
||||
go environment.Refresh()
|
||||
availableRunner, _ := environment.NextRunner()
|
||||
assert.Equal(t, availableRunner.Id(), runnerId)
|
||||
assert.Equal(t, availableRunner.Id(), RunnerId)
|
||||
}
|
||||
|
||||
func TestRefreshScalesJob(t *testing.T) {
|
||||
apiMock, environment := newRefreshMock([]string{runnerId}, pool.NewLocalRunnerPool())
|
||||
apiMock, environment := newRefreshMock([]string{RunnerId}, NewLocalRunnerPool())
|
||||
go environment.Refresh()
|
||||
_, _ = environment.NextRunner()
|
||||
time.Sleep(100 * time.Millisecond) // ToDo: Be safe this test is not flaky
|
||||
@ -87,16 +85,16 @@ func TestRefreshScalesJob(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRefreshAddsRunnerToPool(t *testing.T) {
|
||||
runnersInUse := pool.NewLocalRunnerPool()
|
||||
_, environment := newRefreshMock([]string{runnerId}, runnersInUse)
|
||||
runnersInUse := NewLocalRunnerPool()
|
||||
_, environment := newRefreshMock([]string{RunnerId}, runnersInUse)
|
||||
go environment.Refresh()
|
||||
availableRunner, _ := environment.NextRunner()
|
||||
poolRunner, ok := runnersInUse.GetRunner(availableRunner.Id())
|
||||
assert.True(t, ok, "The requested runner is added to the pool")
|
||||
assert.Equal(t, availableRunner, poolRunner, "The requested runner equals the runner added to the pool")
|
||||
poolRunner, ok := runnersInUse.Get(availableRunner.Id())
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, availableRunner, poolRunner)
|
||||
}
|
||||
|
||||
func newRefreshMock(returnedRunnerIds []string, allRunners pool.RunnerPool) (apiClient *mocks.ExecutorApi, environment *NomadExecutionEnvironment) {
|
||||
func newRefreshMock(returnedRunnerIds []string, allRunners RunnerPool) (apiClient *mocks.ExecutorApi, environment *NomadExecutionEnvironment) {
|
||||
apiClient = &mocks.ExecutorApi{}
|
||||
apiClient.On("LoadRunners", jobId).Return(returnedRunnerIds, nil)
|
||||
apiClient.On("GetJobScale", jobId).Return(len(returnedRunnerIds), nil)
|
||||
|
@ -1,51 +0,0 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// RunnerPool is the interface for storing runners
|
||||
// ToDo: Create interface implementation that use some persistent storage like a database
|
||||
type RunnerPool interface {
|
||||
// AddRunner adds a provided runner to the pool storage
|
||||
AddRunner(runner.Runner)
|
||||
// GetRunner returns a runner from the pool storage
|
||||
// If the requested runner is not stored 'ok' will be false
|
||||
GetRunner(id string) (r runner.Runner, ok bool)
|
||||
// DeleteRunner deletes the runner with the passed id from the pool storage
|
||||
DeleteRunner(id string)
|
||||
}
|
||||
|
||||
// localRunnerPool stores runner objects in the local application memory
|
||||
type localRunnerPool struct {
|
||||
sync.RWMutex
|
||||
runners map[string]runner.Runner
|
||||
}
|
||||
|
||||
// NewLocalRunnerPool responds with a RunnerPool implementation
|
||||
// This implementation stores the data thread-safe in the local application memory
|
||||
func NewLocalRunnerPool() *localRunnerPool {
|
||||
return &localRunnerPool{
|
||||
runners: make(map[string]runner.Runner),
|
||||
}
|
||||
}
|
||||
|
||||
func (pool *localRunnerPool) AddRunner(r runner.Runner) {
|
||||
pool.Lock()
|
||||
defer pool.Unlock()
|
||||
pool.runners[r.Id()] = r
|
||||
}
|
||||
|
||||
func (pool *localRunnerPool) GetRunner(id string) (r runner.Runner, ok bool) {
|
||||
pool.RLock()
|
||||
defer pool.RUnlock()
|
||||
r, ok = pool.runners[id]
|
||||
return
|
||||
}
|
||||
|
||||
func (pool *localRunnerPool) DeleteRunner(id string) {
|
||||
pool.Lock()
|
||||
defer pool.Unlock()
|
||||
delete(pool.runners, id)
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const runnerId = "s0m3-r4nd0m-1d"
|
||||
|
||||
func TestAddedRunnerCanBeRetrieved(t *testing.T) {
|
||||
runnerPool := NewLocalRunnerPool()
|
||||
runnerPool.AddRunner(runner.NewExerciseRunner(runnerId))
|
||||
_, ok := runnerPool.GetRunner(runnerId)
|
||||
assert.True(t, ok, "A saved runner should be retrievable")
|
||||
}
|
||||
|
||||
func TestDeletedRunnersAreNotAccessible(t *testing.T) {
|
||||
pool := NewLocalRunnerPool()
|
||||
pool.AddRunner(runner.NewExerciseRunner(runnerId))
|
||||
pool.DeleteRunner(runnerId)
|
||||
_, ok := pool.GetRunner(runnerId)
|
||||
assert.False(t, ok, "A deleted runner should not be accessible")
|
||||
}
|
52
environment/runner_pool.go
Normal file
52
environment/runner_pool.go
Normal file
@ -0,0 +1,52 @@
|
||||
package environment
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/store"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// RunnerPool is a type of entity store that should store runner entities.
|
||||
type RunnerPool interface {
|
||||
store.EntityStore
|
||||
}
|
||||
|
||||
// localRunnerPool stores runner objects in the local application memory.
|
||||
// ToDo: Create implementation that use some persistent storage like a database
|
||||
type localRunnerPool struct {
|
||||
sync.RWMutex
|
||||
runners map[string]runner.Runner
|
||||
}
|
||||
|
||||
// NewLocalRunnerPool responds with a RunnerPool implementation
|
||||
// This implementation stores the data thread-safe in the local application memory
|
||||
func NewLocalRunnerPool() *localRunnerPool {
|
||||
return &localRunnerPool{
|
||||
runners: make(map[string]runner.Runner),
|
||||
}
|
||||
}
|
||||
|
||||
func (pool *localRunnerPool) Add(r store.Entity) (err error) {
|
||||
pool.Lock()
|
||||
defer pool.Unlock()
|
||||
runnerEntity, ok := r.(runner.Runner)
|
||||
if !ok {
|
||||
return errors.New("Entity of type runner.Runner was expected, but wasn't given.")
|
||||
}
|
||||
pool.runners[r.Id()] = runnerEntity
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pool *localRunnerPool) Get(id string) (r store.Entity, ok bool) {
|
||||
pool.RLock()
|
||||
defer pool.RUnlock()
|
||||
r, ok = pool.runners[id]
|
||||
return
|
||||
}
|
||||
|
||||
func (pool *localRunnerPool) Delete(id string) {
|
||||
pool.Lock()
|
||||
defer pool.Unlock()
|
||||
delete(pool.runners, id)
|
||||
}
|
66
environment/runner_pool_test.go
Normal file
66
environment/runner_pool_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
package environment
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type DummyEntity struct{}
|
||||
|
||||
func (DummyEntity) Id() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func TestRunnerPoolTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(RunnerPoolTestSuite))
|
||||
}
|
||||
|
||||
type RunnerPoolTestSuite struct {
|
||||
suite.Suite
|
||||
runnerPool *localRunnerPool
|
||||
runner runner.Runner
|
||||
}
|
||||
|
||||
func (suite *RunnerPoolTestSuite) SetupTest() {
|
||||
suite.runnerPool = NewLocalRunnerPool()
|
||||
suite.runner = CreateTestRunner()
|
||||
}
|
||||
|
||||
func (suite *RunnerPoolTestSuite) TestAddInvalidEntityTypeReturnsError() {
|
||||
dummyEntity := DummyEntity{}
|
||||
err := suite.runnerPool.Add(dummyEntity)
|
||||
suite.Error(err)
|
||||
}
|
||||
|
||||
func (suite *RunnerPoolTestSuite) TestAddValidEntityReturnsNoError() {
|
||||
err := suite.runnerPool.Add(suite.runner)
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *RunnerPoolTestSuite) TestAddedRunnerCanBeRetrieved() {
|
||||
_ = suite.runnerPool.Add(suite.runner)
|
||||
retrievedRunner, ok := suite.runnerPool.Get(suite.runner.Id())
|
||||
suite.True(ok, "A saved runner should be retrievable")
|
||||
suite.Equal(suite.runner, retrievedRunner)
|
||||
}
|
||||
|
||||
func (suite *RunnerPoolTestSuite) TestRunnerWithSameIdOverwritesOldOne() {
|
||||
otherRunnerWithSameId := runner.NewExerciseRunner(suite.runner.Id())
|
||||
// assure runner is actually different
|
||||
suite.NotEqual(suite.runner, otherRunnerWithSameId)
|
||||
|
||||
_ = suite.runnerPool.Add(suite.runner)
|
||||
_ = suite.runnerPool.Add(otherRunnerWithSameId)
|
||||
retrievedRunner, _ := suite.runnerPool.Get(suite.runner.Id())
|
||||
suite.NotEqual(suite.runner, retrievedRunner)
|
||||
suite.Equal(otherRunnerWithSameId, retrievedRunner)
|
||||
}
|
||||
|
||||
func (suite *RunnerPoolTestSuite) TestDeletedRunnersAreNotAccessible() {
|
||||
_ = suite.runnerPool.Add(suite.runner)
|
||||
suite.runnerPool.Delete(suite.runner.Id())
|
||||
retrievedRunner, ok := suite.runnerPool.Get(suite.runner.Id())
|
||||
suite.Nil(retrievedRunner)
|
||||
suite.False(ok, "A deleted runner should not be accessible")
|
||||
}
|
9
environment/test_constants.go
Normal file
9
environment/test_constants.go
Normal file
@ -0,0 +1,9 @@
|
||||
package environment
|
||||
|
||||
import "gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||
|
||||
const RunnerId = "s0m3-r4nd0m-1d"
|
||||
|
||||
func CreateTestRunner() runner.Runner {
|
||||
return runner.NewExerciseRunner(RunnerId)
|
||||
}
|
1
main.go
1
main.go
@ -5,7 +5,6 @@ import (
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/config"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment/pool"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||
"net/http"
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/google/uuid"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/store"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -28,15 +29,14 @@ const (
|
||||
)
|
||||
|
||||
type Runner interface {
|
||||
store.Entity
|
||||
|
||||
// SetStatus sets the status of the runner.
|
||||
SetStatus(Status)
|
||||
|
||||
// Status gets the status of the runner.
|
||||
Status() Status
|
||||
|
||||
// Id returns the id of the runner.
|
||||
Id() string
|
||||
|
||||
// Execution looks up an ExecutionId for the runner and returns the associated RunnerRequest.
|
||||
// If this request does not exit, ok is false, else true.
|
||||
Execution(ExecutionId) (request dto.ExecutionRequest, ok bool)
|
||||
|
21
store/entity_store.go
Normal file
21
store/entity_store.go
Normal file
@ -0,0 +1,21 @@
|
||||
package store
|
||||
|
||||
// EntityStore is the general interface for storing different entity types.
|
||||
type EntityStore interface {
|
||||
// Add adds an entity to the store.
|
||||
// It overwrites the old entity if one with the same id was already stored.
|
||||
// Returns an error if the entity is of invalid type for the concrete implementation.
|
||||
Add(entity Entity) (err error)
|
||||
|
||||
// Get returns a entity from the store.
|
||||
// If the entity does not exist in the store, ok will be false.
|
||||
Get(id string) (entity Entity, ok bool)
|
||||
|
||||
// Delete deletes the entity with the passed id from the store.
|
||||
Delete(id string)
|
||||
}
|
||||
|
||||
type Entity interface {
|
||||
// Id returns the id of the given entity.
|
||||
Id() string
|
||||
}
|
Reference in New Issue
Block a user