Add architecture for multiple managers

using the chain of responsibility pattern.
This commit is contained in:
Maximilian Paß
2022-01-19 21:23:04 +01:00
parent dd1d27e393
commit ba43f667c2
20 changed files with 732 additions and 442 deletions

View File

@ -57,25 +57,43 @@ func runServer(server *http.Server) {
}
}
func initServer() *http.Server {
func createNomadManager() (runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
// API initialization
nomadAPIClient, err := nomad.NewExecutorAPI(&config.Config.Nomad)
if err != nil {
log.WithError(err).WithField("nomad config", config.Config.Nomad).Fatal("Error creating Nomad API client")
}
runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, context.Background())
environmentManager, err := environment.
runnerManager = runner.NewNomadRunnerManager(nomadAPIClient, context.Background())
environmentManager, err = environment.
NewNomadEnvironmentManager(runnerManager, nomadAPIClient, config.Config.Server.TemplateJobFile)
if err != nil {
log.WithError(err).Fatal("Error initializing environment manager")
}
return runnerManager, environmentManager
}
func createAWSManager(nextRunnerManager runner.Manager, nextEnvironmentManager environment.ManagerHandler) (
runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
runnerManager = runner.NewAWSRunnerManager()
runnerManager.SetNextHandler(nextRunnerManager)
environmentManager = environment.NewAWSEnvironmentManager(runnerManager)
environmentManager.SetNextHandler(nextEnvironmentManager)
return runnerManager, environmentManager
}
// initServer builds the http server and configures it with the chain of responsibility for multiple managers.
func initServer() *http.Server {
nomadRunnerManager, nomadEnvironmentManager := createNomadManager()
awsRunnerManager, awsEnvironmentManager := createAWSManager(nomadRunnerManager, nomadEnvironmentManager)
return &http.Server{
Addr: config.Config.Server.URL().Host,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
Handler: api.NewRouter(runnerManager, environmentManager),
Handler: api.NewRouter(awsRunnerManager, awsEnvironmentManager),
}
}

View File

@ -27,7 +27,7 @@ const (
// always returns a router for the newest version of our API. We
// use gorilla/mux because it is more convenient than net/http, e.g.
// when extracting path parameters.
func NewRouter(runnerManager runner.Manager, environmentManager environment.Manager) *mux.Router {
func NewRouter(runnerManager runner.Manager, environmentManager environment.ManagerHandler) *mux.Router {
router := mux.NewRouter()
// this can later be restricted to a specific host with
// `router.Host(...)` and to HTTPS with `router.Schemes("https")`
@ -37,7 +37,8 @@ func NewRouter(runnerManager runner.Manager, environmentManager environment.Mana
}
// configureV1Router configures a given router with the routes of version 1 of the Poseidon API.
func configureV1Router(router *mux.Router, runnerManager runner.Manager, environmentManager environment.Manager) {
func configureV1Router(router *mux.Router,
runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.WithField("request", r).Debug("Not Found Handler")
w.WriteHeader(http.StatusNotFound)

View File

@ -24,7 +24,7 @@ const (
var ErrMissingURLParameter = errors.New("url parameter missing")
type EnvironmentController struct {
manager environment.Manager
manager environment.ManagerHandler
}
type ExecutionEnvironmentsResponse struct {

View File

@ -21,7 +21,7 @@ import (
type EnvironmentControllerTestSuite struct {
suite.Suite
manager *environment.ManagerMock
manager *environment.ManagerHandlerMock
router *mux.Router
}
@ -30,7 +30,7 @@ func TestEnvironmentControllerTestSuite(t *testing.T) {
}
func (s *EnvironmentControllerTestSuite) SetupTest() {
s.manager = &environment.ManagerMock{}
s.manager = &environment.ManagerHandlerMock{}
s.router = NewRouter(nil, s.manager)
}

View File

@ -24,7 +24,7 @@ const (
)
type RunnerController struct {
manager runner.Manager
manager runner.Accessor
runnerRouter *mux.Router
}
@ -52,10 +52,10 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
environmentID := dto.EnvironmentID(runnerRequest.ExecutionEnvironmentID)
nextRunner, err := r.manager.Claim(environmentID, runnerRequest.InactivityTimeout)
if err != nil {
switch err {
case runner.ErrUnknownExecutionEnvironment:
switch {
case errors.Is(err, runner.ErrUnknownExecutionEnvironment):
writeNotFound(writer, err)
case runner.ErrNoRunnersAvailable:
case errors.Is(err, runner.ErrNoRunnersAvailable):
log.WithField("environment", environmentID).Warn("No runners available")
writeInternalServerError(writer, err, dto.ErrorNomadOverload)
default:

View File

@ -0,0 +1,40 @@
package environment
import (
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
)
// AbstractManager is used to have a fallback environment manager in the chain of responsibility
// following the null object pattern.
type AbstractManager struct {
nextHandler ManagerHandler
}
func (n *AbstractManager) SetNextHandler(next ManagerHandler) {
n.nextHandler = next
}
func (n *AbstractManager) NextHandler() ManagerHandler {
return n.nextHandler
}
func (n *AbstractManager) List(_ bool) ([]runner.ExecutionEnvironment, error) {
return []runner.ExecutionEnvironment{}, nil
}
func (n *AbstractManager) Get(_ dto.EnvironmentID, _ bool) (runner.ExecutionEnvironment, error) {
return nil, runner.ErrNullObject
}
func (n *AbstractManager) CreateOrUpdate(_ dto.EnvironmentID, _ dto.ExecutionEnvironmentRequest) (bool, error) {
return false, nil
}
func (n *AbstractManager) Delete(_ dto.EnvironmentID) (bool, error) {
return false, nil
}
func (n *AbstractManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
}

View File

@ -0,0 +1,57 @@
package environment
import (
"fmt"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
)
// AWSEnvironmentManager contains no functionality at the moment.
// IMPROVE: Create Lambda functions dynamically.
type AWSEnvironmentManager struct {
*AbstractManager
runnerManager runner.Accessor
}
func NewAWSEnvironmentManager(runnerManager runner.Accessor) *AWSEnvironmentManager {
m := &AWSEnvironmentManager{&AbstractManager{nil}, runnerManager}
runnerManager.Load()
return m
}
func (a *AWSEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
list, err := a.NextHandler().List(fetch)
if err != nil {
return nil, fmt.Errorf("aws wraped: %w", err)
}
return list, nil
}
func (a *AWSEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (runner.ExecutionEnvironment, error) {
e, err := a.NextHandler().Get(id, fetch)
if err != nil {
return nil, fmt.Errorf("aws wraped: %w", err)
}
return e, nil
}
func (a *AWSEnvironmentManager) CreateOrUpdate(
id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (bool, error) {
isCreated, err := a.NextHandler().CreateOrUpdate(id, request)
if err != nil {
return false, fmt.Errorf("aws wraped: %w", err)
}
return isCreated, nil
}
func (a *AWSEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) {
isFound, err := a.NextHandler().Delete(id)
if err != nil {
return false, fmt.Errorf("aws wraped: %w", err)
}
return isFound, nil
}
func (a *AWSEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return a.NextHandler().Statistics()
}

View File

@ -1,30 +1,20 @@
package environment
import (
_ "embed"
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"os"
)
// templateEnvironmentJobHCL holds our default job in HCL format.
// The default job is used when creating new job and provides
// common settings that all the jobs share.
//go:embed template-environment-job.hcl
var templateEnvironmentJobHCL string
var log = logging.GetLogger("environment")
// ManagerHandler is one handler in the chain of responsibility of environment managers.
// Each manager can handle different requests.
type ManagerHandler interface {
Manager
SetNextHandler(next ManagerHandler)
NextHandler() ManagerHandler
}
// Manager encapsulates API calls to the executor API for creation and deletion of execution environments.
type Manager interface {
// Load fetches all already created execution environments from the executor and registers them at the runner manager.
// It should be called during the startup process (e.g. on creation of the Manager).
Load() error
// List returns all environments known by Poseidon.
// When `fetch` is set the environments are fetched from the executor before returning.
List(fetch bool) ([]runner.ExecutionEnvironment, error)
@ -48,185 +38,3 @@ type Manager interface {
// Statistics returns statistical data for each execution environment.
Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData
}
type NomadEnvironmentManager struct {
runnerManager runner.Manager
api nomad.ExecutorAPI
templateEnvironmentHCL string
}
func NewNomadEnvironmentManager(
runnerManager runner.Manager,
apiClient nomad.ExecutorAPI,
templateJobFile string,
) (*NomadEnvironmentManager, error) {
if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil {
return nil, err
}
m := &NomadEnvironmentManager{runnerManager, apiClient, templateEnvironmentJobHCL}
if err := m.Load(); err != nil {
log.WithError(err).Error("Error recovering the execution environments")
}
runnerManager.Load()
return m, nil
}
func (m *NomadEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (
executionEnvironment runner.ExecutionEnvironment, err error) {
executionEnvironment, ok := m.runnerManager.GetEnvironment(id)
if fetch {
fetchedEnvironment, err := fetchEnvironment(id, m.api)
switch {
case err != nil:
return nil, err
case fetchedEnvironment == nil:
_, err = m.Delete(id)
if err != nil {
return nil, err
}
ok = false
case !ok:
m.runnerManager.StoreEnvironment(fetchedEnvironment)
executionEnvironment = fetchedEnvironment
ok = true
default:
executionEnvironment.SetConfigFrom(fetchedEnvironment)
}
}
if !ok {
err = runner.ErrUnknownExecutionEnvironment
}
return executionEnvironment, err
}
func (m *NomadEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
if fetch {
err := m.Load()
if err != nil {
return nil, err
}
}
return m.runnerManager.ListEnvironments(), nil
}
func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
created bool, err error) {
// Check if execution environment is already existing (in the local memory).
environment, isExistingEnvironment := m.runnerManager.GetEnvironment(id)
if isExistingEnvironment {
// Remove existing environment to force downloading the newest Docker image.
// See https://github.com/openHPI/poseidon/issues/69
err = environment.Delete(m.api)
if err != nil {
return false, fmt.Errorf("failed to remove the environment: %w", err)
}
}
// Create a new environment with the given request options.
environment, err = NewNomadEnvironmentFromRequest(m.templateEnvironmentHCL, id, request)
if err != nil {
return false, fmt.Errorf("error creating Nomad environment: %w", err)
}
// Keep a copy of environment specification in memory.
m.runnerManager.StoreEnvironment(environment)
// Register template Job with Nomad.
err = environment.Register(m.api)
if err != nil {
return false, fmt.Errorf("error registering template job in API: %w", err)
}
// Launch idle runners based on the template job.
err = environment.ApplyPrewarmingPoolSize(m.api)
if err != nil {
return false, fmt.Errorf("error scaling template job in API: %w", err)
}
return !isExistingEnvironment, nil
}
func (m *NomadEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) {
executionEnvironment, ok := m.runnerManager.GetEnvironment(id)
if !ok {
return false, nil
}
m.runnerManager.DeleteEnvironment(id)
err := executionEnvironment.Delete(m.api)
if err != nil {
return true, fmt.Errorf("could not delete environment: %w", err)
}
return true, nil
}
func (m *NomadEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return m.runnerManager.EnvironmentStatistics()
}
func (m *NomadEnvironmentManager) Load() error {
templateJobs, err := m.api.LoadEnvironmentJobs()
if err != nil {
return fmt.Errorf("couldn't load template jobs: %w", err)
}
for _, job := range templateJobs {
jobLogger := log.WithField("jobID", *job.ID)
if *job.Status != structs.JobStatusRunning {
jobLogger.Info("Job not running, skipping ...")
continue
}
configTaskGroup := nomad.FindAndValidateConfigTaskGroup(job)
if configTaskGroup == nil {
jobLogger.Info("Couldn't find config task group in job, skipping ...")
continue
}
environment := &NomadEnvironment{
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
}
m.runnerManager.StoreEnvironment(environment)
jobLogger.Info("Successfully recovered environment")
}
return nil
}
// loadTemplateEnvironmentJobHCL loads the template environment job HCL from the given path.
// If the path is empty, the embedded default file is used.
func loadTemplateEnvironmentJobHCL(path string) error {
if path == "" {
return nil
}
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("error loading template environment job: %w", err)
}
templateEnvironmentJobHCL = string(data)
return nil
}
func fetchEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI) (runner.ExecutionEnvironment, error) {
environments, err := apiClient.LoadEnvironmentJobs()
if err != nil {
return nil, fmt.Errorf("error fetching the environment jobs: %w", err)
}
var fetchedEnvironment runner.ExecutionEnvironment
for _, job := range environments {
environmentID, err := nomad.EnvironmentIDFromTemplateJobID(*job.ID)
if err != nil {
log.WithError(err).Warn("Cannot parse environment id of loaded environment")
continue
}
if id == environmentID {
fetchedEnvironment = &NomadEnvironment{
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
}
}
}
return fetchedEnvironment, nil
}

View File

@ -9,13 +9,13 @@ import (
runner "github.com/openHPI/poseidon/internal/runner"
)
// ManagerMock is an autogenerated mock type for the Manager type
type ManagerMock struct {
// ManagerHandlerMock is an autogenerated mock type for the ManagerHandler type
type ManagerHandlerMock struct {
mock.Mock
}
// CreateOrUpdate provides a mock function with given fields: id, request
func (_m *ManagerMock) CreateOrUpdate(id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (bool, error) {
func (_m *ManagerHandlerMock) CreateOrUpdate(id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (bool, error) {
ret := _m.Called(id, request)
var r0 bool
@ -36,7 +36,7 @@ func (_m *ManagerMock) CreateOrUpdate(id dto.EnvironmentID, request dto.Executio
}
// Delete provides a mock function with given fields: id
func (_m *ManagerMock) Delete(id dto.EnvironmentID) (bool, error) {
func (_m *ManagerHandlerMock) Delete(id dto.EnvironmentID) (bool, error) {
ret := _m.Called(id)
var r0 bool
@ -57,7 +57,7 @@ func (_m *ManagerMock) Delete(id dto.EnvironmentID) (bool, error) {
}
// Get provides a mock function with given fields: id, fetch
func (_m *ManagerMock) Get(id dto.EnvironmentID, fetch bool) (runner.ExecutionEnvironment, error) {
func (_m *ManagerHandlerMock) Get(id dto.EnvironmentID, fetch bool) (runner.ExecutionEnvironment, error) {
ret := _m.Called(id, fetch)
var r0 runner.ExecutionEnvironment
@ -80,7 +80,7 @@ func (_m *ManagerMock) Get(id dto.EnvironmentID, fetch bool) (runner.ExecutionEn
}
// List provides a mock function with given fields: fetch
func (_m *ManagerMock) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
func (_m *ManagerHandlerMock) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
ret := _m.Called(fetch)
var r0 []runner.ExecutionEnvironment
@ -102,22 +102,29 @@ func (_m *ManagerMock) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
return r0, r1
}
// Load provides a mock function with given fields:
func (_m *ManagerMock) Load() error {
// NextHandler provides a mock function with given fields:
func (_m *ManagerHandlerMock) NextHandler() ManagerHandler {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
var r0 ManagerHandler
if rf, ok := ret.Get(0).(func() ManagerHandler); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
if ret.Get(0) != nil {
r0 = ret.Get(0).(ManagerHandler)
}
}
return r0
}
// SetNextHandler provides a mock function with given fields: m
func (_m *ManagerHandlerMock) SetNextHandler(m ManagerHandler) {
_m.Called(m)
}
// Statistics provides a mock function with given fields:
func (_m *ManagerMock) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
func (_m *ManagerHandlerMock) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
ret := _m.Called()
var r0 map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData

View File

@ -0,0 +1,204 @@
package environment
import (
_ "embed"
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"os"
)
// templateEnvironmentJobHCL holds our default job in HCL format.
// The default job is used when creating new job and provides
// common settings that all the jobs share.
//go:embed template-environment-job.hcl
var templateEnvironmentJobHCL string
var log = logging.GetLogger("environment")
type NomadEnvironmentManager struct {
*AbstractManager
runnerManager runner.Manager
api nomad.ExecutorAPI
templateEnvironmentHCL string
}
func NewNomadEnvironmentManager(
runnerManager runner.Manager,
apiClient nomad.ExecutorAPI,
templateJobFile string,
) (*NomadEnvironmentManager, error) {
if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil {
return nil, err
}
m := &NomadEnvironmentManager{&AbstractManager{nil}, runnerManager,
apiClient, templateEnvironmentJobHCL}
if err := m.Load(); err != nil {
log.WithError(err).Error("Error recovering the execution environments")
}
runnerManager.Load()
return m, nil
}
func (m *NomadEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (
executionEnvironment runner.ExecutionEnvironment, err error) {
executionEnvironment, ok := m.runnerManager.GetEnvironment(id)
if fetch {
fetchedEnvironment, err := fetchEnvironment(id, m.api)
switch {
case err != nil:
return nil, err
case fetchedEnvironment == nil:
_, err = m.Delete(id)
if err != nil {
return nil, err
}
ok = false
case !ok:
m.runnerManager.StoreEnvironment(fetchedEnvironment)
executionEnvironment = fetchedEnvironment
ok = true
default:
executionEnvironment.SetConfigFrom(fetchedEnvironment)
}
}
if !ok {
err = runner.ErrUnknownExecutionEnvironment
}
return executionEnvironment, err
}
func (m *NomadEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
if fetch {
err := m.Load()
if err != nil {
return nil, err
}
}
return m.runnerManager.ListEnvironments(), nil
}
func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
created bool, err error) {
// Check if execution environment is already existing (in the local memory).
environment, isExistingEnvironment := m.runnerManager.GetEnvironment(id)
if isExistingEnvironment {
// Remove existing environment to force downloading the newest Docker image.
// See https://github.com/openHPI/poseidon/issues/69
err = environment.Delete(m.api)
if err != nil {
return false, fmt.Errorf("failed to remove the environment: %w", err)
}
}
// Create a new environment with the given request options.
environment, err = NewNomadEnvironmentFromRequest(m.templateEnvironmentHCL, id, request)
if err != nil {
return false, fmt.Errorf("error creating Nomad environment: %w", err)
}
// Keep a copy of environment specification in memory.
m.runnerManager.StoreEnvironment(environment)
// Register template Job with Nomad.
err = environment.Register(m.api)
if err != nil {
return false, fmt.Errorf("error registering template job in API: %w", err)
}
// Launch idle runners based on the template job.
err = environment.ApplyPrewarmingPoolSize(m.api)
if err != nil {
return false, fmt.Errorf("error scaling template job in API: %w", err)
}
return !isExistingEnvironment, nil
}
func (m *NomadEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) {
executionEnvironment, ok := m.runnerManager.GetEnvironment(id)
if !ok {
return false, nil
}
m.runnerManager.DeleteEnvironment(id)
err := executionEnvironment.Delete(m.api)
if err != nil {
return true, fmt.Errorf("could not delete environment: %w", err)
}
return true, nil
}
func (m *NomadEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return m.runnerManager.EnvironmentStatistics()
}
func (m *NomadEnvironmentManager) Load() error {
templateJobs, err := m.api.LoadEnvironmentJobs()
if err != nil {
return fmt.Errorf("couldn't load template jobs: %w", err)
}
for _, job := range templateJobs {
jobLogger := log.WithField("jobID", *job.ID)
if *job.Status != structs.JobStatusRunning {
jobLogger.Info("Job not running, skipping ...")
continue
}
configTaskGroup := nomad.FindAndValidateConfigTaskGroup(job)
if configTaskGroup == nil {
jobLogger.Info("Couldn't find config task group in job, skipping ...")
continue
}
environment := &NomadEnvironment{
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
}
m.runnerManager.StoreEnvironment(environment)
jobLogger.Info("Successfully recovered environment")
}
return nil
}
// loadTemplateEnvironmentJobHCL loads the template environment job HCL from the given path.
// If the path is empty, the embedded default file is used.
func loadTemplateEnvironmentJobHCL(path string) error {
if path == "" {
return nil
}
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("error loading template environment job: %w", err)
}
templateEnvironmentJobHCL = string(data)
return nil
}
func fetchEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI) (runner.ExecutionEnvironment, error) {
environments, err := apiClient.LoadEnvironmentJobs()
if err != nil {
return nil, fmt.Errorf("error fetching the environment jobs: %w", err)
}
var fetchedEnvironment runner.ExecutionEnvironment
for _, job := range environments {
environmentID, err := nomad.EnvironmentIDFromTemplateJobID(*job.ID)
if err != nil {
log.WithError(err).Warn("Cannot parse environment id of loaded environment")
continue
}
if id == environmentID {
fetchedEnvironment = &NomadEnvironment{
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
}
}
}
return fetchedEnvironment, nil
}

View File

@ -0,0 +1,52 @@
package runner
import (
"errors"
"github.com/openHPI/poseidon/pkg/dto"
)
var ErrNullObject = errors.New("functionality not available for the null object")
// AbstractManager is used to have a fallback runner manager in the chain of responsibility
// following the null object pattern.
type AbstractManager struct {
nextHandler AccessorHandler
}
func (n *AbstractManager) SetNextHandler(next AccessorHandler) {
n.nextHandler = next
}
func (n *AbstractManager) NextHandler() AccessorHandler {
return n.nextHandler
}
func (n *AbstractManager) ListEnvironments() []ExecutionEnvironment {
return []ExecutionEnvironment{}
}
func (n *AbstractManager) GetEnvironment(_ dto.EnvironmentID) (ExecutionEnvironment, bool) {
return nil, false
}
func (n *AbstractManager) StoreEnvironment(_ ExecutionEnvironment) {}
func (n *AbstractManager) DeleteEnvironment(_ dto.EnvironmentID) {}
func (n *AbstractManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
}
func (n *AbstractManager) Claim(_ dto.EnvironmentID, _ int) (Runner, error) {
return nil, ErrNullObject
}
func (n *AbstractManager) Get(_ string) (Runner, error) {
return nil, ErrNullObject
}
func (n *AbstractManager) Return(_ Runner) error {
return nil
}
func (n *AbstractManager) Load() {}

View File

@ -0,0 +1,57 @@
package runner
import (
"fmt"
"github.com/openHPI/poseidon/pkg/dto"
)
type AWSRunnerManager struct {
*AbstractManager
}
// NewAWSRunnerManager creates a new runner manager that keeps track of all runners at AWS.
func NewAWSRunnerManager() *AWSRunnerManager {
return &AWSRunnerManager{&AbstractManager{}}
}
func (a AWSRunnerManager) ListEnvironments() []ExecutionEnvironment {
return []ExecutionEnvironment{}
}
func (a AWSRunnerManager) GetEnvironment(_ dto.EnvironmentID) (ExecutionEnvironment, bool) {
return nil, false
}
func (a AWSRunnerManager) StoreEnvironment(_ ExecutionEnvironment) {}
func (a AWSRunnerManager) DeleteEnvironment(_ dto.EnvironmentID) {}
func (a AWSRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
}
func (a AWSRunnerManager) Claim(id dto.EnvironmentID, duration int) (Runner, error) {
r, err := a.NextHandler().Claim(id, duration)
if err != nil {
return nil, fmt.Errorf("aws wraped: %w", err)
}
return r, nil
}
func (a AWSRunnerManager) Get(runnerID string) (Runner, error) {
r, err := a.NextHandler().Get(runnerID)
if err != nil {
return nil, fmt.Errorf("aws wraped: %w", err)
}
return r, nil
}
func (a AWSRunnerManager) Return(r Runner) error {
err := a.NextHandler().Return(r)
if err != nil {
return fmt.Errorf("aws wraped: %w", err)
}
return nil
}
func (a AWSRunnerManager) Load() {}

View File

@ -37,11 +37,11 @@ type InactivityTimerImplementation struct {
duration time.Duration
state TimerState
runner Runner
manager Manager
manager Accessor
mu sync.Mutex
}
func NewInactivityTimer(runner Runner, manager Manager) InactivityTimer {
func NewInactivityTimer(runner Runner, manager Accessor) InactivityTimer {
return &InactivityTimerImplementation{
state: TimerInactive,
runner: runner,

View File

@ -1,24 +1,9 @@
package runner
import (
"context"
"encoding/json"
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/sirupsen/logrus"
"strconv"
"time"
)
var (
log = logging.GetLogger("runner")
ErrUnknownExecutionEnvironment = errors.New("execution environment not found")
ErrNoRunnersAvailable = errors.New("no runners available for this execution environment")
ErrRunnerNotFound = errors.New("no runner found with this id")
)
// ExecutionEnvironment are groups of runner that share the configuration stored in the environment.
@ -67,6 +52,12 @@ type ExecutionEnvironment interface {
// Manager keeps track of the used and unused runners of all execution environments in order to provide unused
// runners to new clients and ensure no runner is used twice.
type Manager interface {
EnvironmentAccessor
AccessorHandler
}
// EnvironmentAccessor provides access to the stored environments.
type EnvironmentAccessor interface {
// ListEnvironments returns all execution environments known by Poseidon.
ListEnvironments() []ExecutionEnvironment
@ -83,7 +74,18 @@ type Manager interface {
// EnvironmentStatistics returns statistical data for each execution environment.
EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData
}
// AccessorHandler is one handler in te chain of responsibility of runner accessors.
// Each runner accessor can handle different requests.
type AccessorHandler interface {
Accessor
SetNextHandler(m AccessorHandler)
NextHandler() AccessorHandler
}
// Accessor manages the lifecycle of Runner.
type Accessor interface {
// Claim returns a new runner. The runner is deleted after duration seconds if duration is not 0.
// It makes sure that the runner is not in use yet and returns an error if no runner could be provided.
Claim(id dto.EnvironmentID, duration int) (Runner, error)
@ -100,201 +102,3 @@ type Manager interface {
// It should be called during the startup process (e.g. on creation of the Manager).
Load()
}
type NomadRunnerManager struct {
apiClient nomad.ExecutorAPI
environments EnvironmentStorage
usedRunners Storage
}
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
// If you cancel the context the background synchronization will be stopped.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
m := &NomadRunnerManager{
apiClient,
NewLocalEnvironmentStorage(),
NewLocalRunnerStorage(),
}
go m.keepRunnersSynced(ctx)
return m
}
func (m *NomadRunnerManager) ListEnvironments() []ExecutionEnvironment {
return m.environments.List()
}
func (m *NomadRunnerManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) {
return m.environments.Get(id)
}
func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) {
m.environments.Add(environment)
}
func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) {
m.environments.Delete(id)
}
func (m *NomadRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
environments := make(map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData)
for _, e := range m.environments.List() {
environments[e.ID()] = &dto.StatisticalExecutionEnvironmentData{
ID: int(e.ID()),
PrewarmingPoolSize: e.PrewarmingPoolSize(),
IdleRunners: uint(e.IdleRunnerCount()),
UsedRunners: 0,
}
}
for _, r := range m.usedRunners.List() {
id, err := nomad.EnvironmentIDFromRunnerID(r.ID())
if err != nil {
log.WithError(err).Error("Stored runners must have correct IDs")
}
environments[id].UsedRunners++
}
return environments
}
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
environment, ok := m.environments.Get(environmentID)
if !ok {
return nil, ErrUnknownExecutionEnvironment
}
runner, ok := environment.Sample(m.apiClient)
if !ok {
return nil, ErrNoRunnersAvailable
}
m.usedRunners.Add(runner)
go m.markRunnerAsUsed(runner, duration)
runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}
func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
err := m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration)
if err != nil {
err = m.Return(runner)
if err != nil {
log.WithError(err).WithField("runnerID", runner.ID()).Error("can't mark runner as used and can't return runner")
}
}
}
func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) {
runner, ok := m.usedRunners.Get(runnerID)
if !ok {
return nil, ErrRunnerNotFound
}
return runner, nil
}
func (m *NomadRunnerManager) Return(r Runner) error {
r.StopTimeout()
err := m.apiClient.DeleteJob(r.ID())
if err != nil {
return fmt.Errorf("error deleting runner in Nomad: %w", err)
}
m.usedRunners.Delete(r.ID())
return nil
}
func (m *NomadRunnerManager) Load() {
for _, environment := range m.environments.List() {
environmentLogger := log.WithField("environmentID", environment.ID())
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
if err != nil {
environmentLogger.WithError(err).Warn("Error fetching the runner jobs")
}
for _, job := range runnerJobs {
m.loadSingleJob(job, environmentLogger, environment)
}
err = environment.ApplyPrewarmingPoolSize(m.apiClient)
if err != nil {
environmentLogger.WithError(err).Error("Couldn't scale environment")
}
}
}
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry,
environment ExecutionEnvironment) {
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
if configTaskGroup == nil {
environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID)
return
}
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID)
if err != nil {
environmentLogger.WithError(err).Warn("Error loading runner portMappings")
return
}
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m)
if isUsed {
m.usedRunners.Add(newJob)
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
if err != nil {
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
} else {
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
}
} else {
environment.AddRunner(newJob)
}
}
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
retries := 0
for ctx.Err() == nil {
err := m.apiClient.WatchEventStream(ctx, m.onAllocationAdded, m.onAllocationStopped)
retries += 1
log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
<-time.After(time.Second)
}
}
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner started")
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
return
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Allocation could not be added")
return
}
environment, ok := m.environments.Get(environmentID)
if ok {
var mappedPorts []nomadApi.PortMapping
if alloc.AllocatedResources != nil {
mappedPorts = alloc.AllocatedResources.Shared.Ports
}
environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m))
}
}
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner stopped")
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
return
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Stopped allocation can not be handled")
return
}
m.usedRunners.Delete(alloc.JobID)
environment, ok := m.environments.Get(environmentID)
if ok {
environment.DeleteRunner(alloc.JobID)
}
}

View File

@ -123,6 +123,22 @@ func (_m *ManagerMock) Load() {
_m.Called()
}
// NextHandler provides a mock function with given fields:
func (_m *ManagerMock) NextHandler() AccessorHandler {
ret := _m.Called()
var r0 AccessorHandler
if rf, ok := ret.Get(0).(func() AccessorHandler); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(AccessorHandler)
}
}
return r0
}
// Return provides a mock function with given fields: r
func (_m *ManagerMock) Return(r Runner) error {
ret := _m.Called(r)
@ -137,6 +153,11 @@ func (_m *ManagerMock) Return(r Runner) error {
return r0
}
// SetNextHandler provides a mock function with given fields: m
func (_m *ManagerMock) SetNextHandler(m AccessorHandler) {
_m.Called(m)
}
// StoreEnvironment provides a mock function with given fields: environment
func (_m *ManagerMock) StoreEnvironment(environment ExecutionEnvironment) {
_m.Called(environment)

View File

@ -0,0 +1,221 @@
package runner
import (
"context"
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/sirupsen/logrus"
"strconv"
"time"
)
var (
log = logging.GetLogger("runner")
ErrUnknownExecutionEnvironment = errors.New("execution environment not found")
ErrNoRunnersAvailable = errors.New("no runners available for this execution environment")
ErrRunnerNotFound = errors.New("no runner found with this id")
)
type NomadRunnerManager struct {
*AbstractManager
apiClient nomad.ExecutorAPI
environments EnvironmentStorage
usedRunners Storage
}
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
// If you cancel the context the background synchronization will be stopped.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
m := &NomadRunnerManager{
&AbstractManager{},
apiClient,
NewLocalEnvironmentStorage(),
NewLocalRunnerStorage(),
}
go m.keepRunnersSynced(ctx)
return m
}
func (m *NomadRunnerManager) ListEnvironments() []ExecutionEnvironment {
return m.environments.List()
}
func (m *NomadRunnerManager) GetEnvironment(id dto.EnvironmentID) (ExecutionEnvironment, bool) {
return m.environments.Get(id)
}
func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) {
m.environments.Add(environment)
}
func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) {
m.environments.Delete(id)
}
func (m *NomadRunnerManager) EnvironmentStatistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
environments := make(map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData)
for _, e := range m.environments.List() {
environments[e.ID()] = &dto.StatisticalExecutionEnvironmentData{
ID: int(e.ID()),
PrewarmingPoolSize: e.PrewarmingPoolSize(),
IdleRunners: uint(e.IdleRunnerCount()),
UsedRunners: 0,
}
}
for _, r := range m.usedRunners.List() {
id, err := nomad.EnvironmentIDFromRunnerID(r.ID())
if err != nil {
log.WithError(err).Error("Stored runners must have correct IDs")
}
environments[id].UsedRunners++
}
return environments
}
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
environment, ok := m.environments.Get(environmentID)
if !ok {
return nil, ErrUnknownExecutionEnvironment
}
runner, ok := environment.Sample(m.apiClient)
if !ok {
return nil, ErrNoRunnersAvailable
}
m.usedRunners.Add(runner)
go m.markRunnerAsUsed(runner, duration)
runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}
func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
err := m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration)
if err != nil {
err = m.Return(runner)
if err != nil {
log.WithError(err).WithField("runnerID", runner.ID()).Error("can't mark runner as used and can't return runner")
}
}
}
func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) {
runner, ok := m.usedRunners.Get(runnerID)
if !ok {
return nil, ErrRunnerNotFound
}
return runner, nil
}
func (m *NomadRunnerManager) Return(r Runner) error {
r.StopTimeout()
err := m.apiClient.DeleteJob(r.ID())
if err != nil {
return fmt.Errorf("error deleting runner in Nomad: %w", err)
}
m.usedRunners.Delete(r.ID())
return nil
}
func (m *NomadRunnerManager) Load() {
for _, environment := range m.environments.List() {
environmentLogger := log.WithField("environmentID", environment.ID())
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
if err != nil {
environmentLogger.WithError(err).Warn("Error fetching the runner jobs")
}
for _, job := range runnerJobs {
m.loadSingleJob(job, environmentLogger, environment)
}
err = environment.ApplyPrewarmingPoolSize(m.apiClient)
if err != nil {
environmentLogger.WithError(err).Error("Couldn't scale environment")
}
}
}
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry,
environment ExecutionEnvironment) {
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
if configTaskGroup == nil {
environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID)
return
}
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID)
if err != nil {
environmentLogger.WithError(err).Warn("Error loading runner portMappings")
return
}
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m)
if isUsed {
m.usedRunners.Add(newJob)
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
if err != nil {
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
} else {
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
}
} else {
environment.AddRunner(newJob)
}
}
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
retries := 0
for ctx.Err() == nil {
err := m.apiClient.WatchEventStream(ctx, m.onAllocationAdded, m.onAllocationStopped)
retries += 1
log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
<-time.After(time.Second)
}
}
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner started")
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
return
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Allocation could not be added")
return
}
environment, ok := m.environments.Get(environmentID)
if ok {
var mappedPorts []nomadApi.PortMapping
if alloc.AllocatedResources != nil {
mappedPorts = alloc.AllocatedResources.Shared.Ports
}
environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m))
}
}
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner stopped")
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
return
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Stopped allocation can not be handled")
return
}
m.usedRunners.Delete(alloc.JobID)
environment, ok := m.environments.Get(environmentID)
if ok {
environment.DeleteRunner(alloc.JobID)
}
}

View File

@ -79,12 +79,12 @@ type NomadJob struct {
id string
portMappings []nomadApi.PortMapping
api nomad.ExecutorAPI
manager Manager
manager Accessor
}
// NewNomadJob creates a new NomadJob with the provided id.
func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
apiClient nomad.ExecutorAPI, manager Manager,
apiClient nomad.ExecutorAPI, manager Accessor,
) *NomadJob {
job := &NomadJob{
id: id,

View File

@ -390,6 +390,6 @@ func (s *UpdateFileSystemTestSuite) readFilesFromTarArchive(tarArchive io.Reader
}
// NewRunner creates a new runner with the provided id and manager.
func NewRunner(id string, manager Manager) Runner {
func NewRunner(id string, manager Accessor) Runner {
return NewNomadJob(id, nil, nil, manager)
}