Make Execution Environment interface Nomad independent

This commit is contained in:
Maximilian Paß
2022-01-19 20:41:05 +01:00
parent ba43f667c2
commit 0ef5a4e39f
10 changed files with 96 additions and 95 deletions

View File

@ -25,20 +25,22 @@ type NomadEnvironment struct {
jobHCL string
job *nomadApi.Job
idleRunners runner.Storage
apiClient nomad.ExecutorAPI
}
func NewNomadEnvironment(jobHCL string) (*NomadEnvironment, error) {
func NewNomadEnvironment(apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvironment, error) {
job, err := parseJob(jobHCL)
if err != nil {
return nil, fmt.Errorf("error parsing Nomad job: %w", err)
}
return &NomadEnvironment{jobHCL, job, runner.NewLocalRunnerStorage()}, nil
return &NomadEnvironment{jobHCL, job, runner.NewLocalRunnerStorage(), apiClient}, nil
}
func NewNomadEnvironmentFromRequest(jobHCL string, id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
func NewNomadEnvironmentFromRequest(
apiClient nomad.ExecutorAPI, jobHCL string, id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
*NomadEnvironment, error) {
environment, err := NewNomadEnvironment(jobHCL)
environment, err := NewNomadEnvironment(apiClient, jobHCL)
if err != nil {
return nil, err
}
@ -188,47 +190,47 @@ func (n *NomadEnvironment) SetNetworkAccess(allow bool, exposedPorts []uint16) {
// Register creates a Nomad job based on the default job configuration and the given parameters.
// It registers the job with Nomad and waits until the registration completes.
func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error {
func (n *NomadEnvironment) Register() error {
nomad.SetForcePullFlag(n.job, true) // This must be the default as otherwise new runners could have different images.
evalID, err := apiClient.RegisterNomadJob(n.job)
evalID, err := n.apiClient.RegisterNomadJob(n.job)
if err != nil {
return fmt.Errorf("couldn't register job: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), nomad.RegisterTimeout)
defer cancel()
err = apiClient.MonitorEvaluation(evalID, ctx)
err = n.apiClient.MonitorEvaluation(evalID, ctx)
if err != nil {
return fmt.Errorf("error during the monitoring of the environment job: %w", err)
}
return nil
}
func (n *NomadEnvironment) Delete(apiClient nomad.ExecutorAPI) error {
err := n.removeRunners(apiClient)
func (n *NomadEnvironment) Delete() error {
err := n.removeRunners()
if err != nil {
return err
}
err = apiClient.DeleteJob(*n.job.ID)
err = n.apiClient.DeleteJob(*n.job.ID)
if err != nil {
return fmt.Errorf("couldn't delete environment job: %w", err)
}
return nil
}
func (n *NomadEnvironment) ApplyPrewarmingPoolSize(apiClient nomad.ExecutorAPI) error {
func (n *NomadEnvironment) ApplyPrewarmingPoolSize() error {
required := int(n.PrewarmingPoolSize()) - n.idleRunners.Length()
if required < 0 {
return fmt.Errorf("%w. Runners to remove: %d", ErrScaleDown, -required)
}
return n.createRunners(apiClient, uint(required), true)
return n.createRunners(uint(required), true)
}
func (n *NomadEnvironment) Sample(apiClient nomad.ExecutorAPI) (runner.Runner, bool) {
func (n *NomadEnvironment) Sample() (runner.Runner, bool) {
r, ok := n.idleRunners.Sample()
if ok {
go func() {
err := n.createRunner(apiClient, false)
err := n.createRunner(false)
if err != nil {
log.WithError(err).WithField("environmentID", n.ID()).Error("Couldn't create new runner for claimed one")
}
@ -309,10 +311,10 @@ func parseJob(jobHCL string) (*nomadApi.Job, error) {
return job, nil
}
func (n *NomadEnvironment) createRunners(apiClient nomad.ExecutorAPI, count uint, forcePull bool) error {
func (n *NomadEnvironment) createRunners(count uint, forcePull bool) error {
log.WithField("runnersRequired", count).WithField("id", n.ID()).Debug("Creating new runners")
for i := 0; i < int(count); i++ {
err := n.createRunner(apiClient, forcePull)
err := n.createRunner(forcePull)
if err != nil {
return fmt.Errorf("couldn't create new runner: %w", err)
}
@ -320,7 +322,7 @@ func (n *NomadEnvironment) createRunners(apiClient nomad.ExecutorAPI, count uint
return nil
}
func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull bool) error {
func (n *NomadEnvironment) createRunner(forcePull bool) error {
newUUID, err := uuid.NewUUID()
if err != nil {
return fmt.Errorf("failed generating runner id: %w", err)
@ -332,7 +334,7 @@ func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull b
template.Name = &newRunnerID
nomad.SetForcePullFlag(template, forcePull)
err = apiClient.RegisterRunnerJob(template)
err = n.apiClient.RegisterRunnerJob(template)
if err != nil {
return fmt.Errorf("error registering new runner job: %w", err)
}
@ -340,12 +342,12 @@ func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull b
}
// removeRunners removes all (idle and used) runners for the given environment n.
func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI) error {
func (n *NomadEnvironment) removeRunners() error {
// This prevents a race condition where the number of required runners is miscalculated in the up-scaling process
// based on the number of allocation that has been stopped at the moment of the scaling.
n.idleRunners.Purge()
ids, err := apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), ""))
ids, err := n.apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), ""))
if err != nil {
return fmt.Errorf("failed to load runner ids: %w", err)
}
@ -356,7 +358,7 @@ func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI) error {
wg.Add(1)
go func(jobID string) {
defer wg.Done()
deleteErr := apiClient.DeleteJob(jobID)
deleteErr := n.apiClient.DeleteJob(jobID)
if deleteErr != nil {
err = deleteErr
}

View File

@ -17,7 +17,7 @@ import (
func TestConfigureNetworkCreatesNewNetworkWhenNoNetworkExists(t *testing.T) {
_, job := helpers.CreateTemplateJob()
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(job)
environment := &NomadEnvironment{"", job, nil}
environment := &NomadEnvironment{"", job, nil, nil}
if assert.Equal(t, 0, len(defaultTaskGroup.Networks)) {
environment.SetNetworkAccess(true, []uint16{})
@ -29,7 +29,7 @@ func TestConfigureNetworkCreatesNewNetworkWhenNoNetworkExists(t *testing.T) {
func TestConfigureNetworkDoesNotCreateNewNetworkWhenNetworkExists(t *testing.T) {
_, job := helpers.CreateTemplateJob()
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(job)
environment := &NomadEnvironment{"", job, nil}
environment := &NomadEnvironment{"", job, nil, nil}
networkResource := &nomadApi.NetworkResource{Mode: "bridge"}
defaultTaskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
@ -58,7 +58,7 @@ func TestConfigureNetworkSetsCorrectValues(t *testing.T) {
_, testJob := helpers.CreateTemplateJob()
testTaskGroup := nomad.FindAndValidateDefaultTaskGroup(testJob)
testTask := nomad.FindAndValidateDefaultTask(testTaskGroup)
testEnvironment := &NomadEnvironment{"", job, nil}
testEnvironment := &NomadEnvironment{"", job, nil, nil}
testEnvironment.SetNetworkAccess(false, ports)
mode, ok := testTask.Config["network_mode"]
@ -73,7 +73,7 @@ func TestConfigureNetworkSetsCorrectValues(t *testing.T) {
_, testJob := helpers.CreateTemplateJob()
testTaskGroup := nomad.FindAndValidateDefaultTaskGroup(testJob)
testTask := nomad.FindAndValidateDefaultTask(testTaskGroup)
testEnvironment := &NomadEnvironment{"", testJob, nil}
testEnvironment := &NomadEnvironment{"", testJob, nil, nil}
testEnvironment.SetNetworkAccess(true, ports)
require.Equal(t, 1, len(testTaskGroup.Networks))
@ -113,9 +113,9 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) {
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage(), apiClientMock}
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register(apiClientMock)
err := environment.Register()
assert.ErrorIs(t, err, expectedErr)
apiClientMock.AssertNotCalled(t, "MonitorEvaluation")
@ -130,9 +130,9 @@ func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage(), apiClientMock}
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register(apiClientMock)
err := environment.Register()
assert.NoError(t, err)
}
@ -146,22 +146,22 @@ func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage(), apiClientMock}
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register(apiClientMock)
err := environment.Register()
assert.ErrorIs(t, err, tests.ErrDefault)
}
func TestParseJob(t *testing.T) {
t.Run("parses the given default job", func(t *testing.T) {
environment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
environment, err := NewNomadEnvironment(nil, templateEnvironmentJobHCL)
assert.NoError(t, err)
assert.NotNil(t, environment.job)
})
t.Run("returns error when given wrong job", func(t *testing.T) {
environment, err := NewNomadEnvironment("")
environment, err := NewNomadEnvironment(nil, "")
assert.Error(t, err)
assert.Nil(t, environment)
})
@ -172,7 +172,7 @@ func TestTwoSampleAddExactlyTwoRunners(t *testing.T) {
apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil)
_, job := helpers.CreateTemplateJob()
environment := &NomadEnvironment{templateEnvironmentJobHCL, job, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{templateEnvironmentJobHCL, job, runner.NewLocalRunnerStorage(), apiMock}
runner1 := &runner.RunnerMock{}
runner1.On("ID").Return(tests.DefaultRunnerID)
runner2 := &runner.RunnerMock{}
@ -181,9 +181,9 @@ func TestTwoSampleAddExactlyTwoRunners(t *testing.T) {
environment.AddRunner(runner1)
environment.AddRunner(runner2)
_, ok := environment.Sample(apiMock)
_, ok := environment.Sample()
require.True(t, ok)
_, ok = environment.Sample(apiMock)
_, ok = environment.Sample()
require.True(t, ok)
<-time.After(tests.ShortTimeout) // New Runners are requested asynchronously
@ -205,12 +205,12 @@ func TestSampleDoesNotSetForcePullFlag(t *testing.T) {
})
_, job := helpers.CreateTemplateJob()
environment := &NomadEnvironment{templateEnvironmentJobHCL, job, runner.NewLocalRunnerStorage()}
environment := &NomadEnvironment{templateEnvironmentJobHCL, job, runner.NewLocalRunnerStorage(), apiMock}
runner1 := &runner.RunnerMock{}
runner1.On("ID").Return(tests.DefaultRunnerID)
environment.AddRunner(runner1)
_, ok := environment.Sample(apiMock)
_, ok := environment.Sample()
require.True(t, ok)
<-time.After(tests.ShortTimeout) // New Runners are requested asynchronously
}

View File

@ -91,14 +91,14 @@ func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request d
if isExistingEnvironment {
// Remove existing environment to force downloading the newest Docker image.
// See https://github.com/openHPI/poseidon/issues/69
err = environment.Delete(m.api)
err = environment.Delete()
if err != nil {
return false, fmt.Errorf("failed to remove the environment: %w", err)
}
}
// Create a new environment with the given request options.
environment, err = NewNomadEnvironmentFromRequest(m.templateEnvironmentHCL, id, request)
environment, err = NewNomadEnvironmentFromRequest(m.api, m.templateEnvironmentHCL, id, request)
if err != nil {
return false, fmt.Errorf("error creating Nomad environment: %w", err)
}
@ -107,13 +107,13 @@ func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request d
m.runnerManager.StoreEnvironment(environment)
// Register template Job with Nomad.
err = environment.Register(m.api)
err = environment.Register()
if err != nil {
return false, fmt.Errorf("error registering template job in API: %w", err)
}
// Launch idle runners based on the template job.
err = environment.ApplyPrewarmingPoolSize(m.api)
err = environment.ApplyPrewarmingPoolSize()
if err != nil {
return false, fmt.Errorf("error scaling template job in API: %w", err)
}
@ -127,7 +127,7 @@ func (m *NomadEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) {
return false, nil
}
m.runnerManager.DeleteEnvironment(id)
err := executionEnvironment.Delete(m.api)
err := executionEnvironment.Delete()
if err != nil {
return true, fmt.Errorf("could not delete environment: %w", err)
}
@ -159,6 +159,7 @@ func (m *NomadEnvironmentManager) Load() error {
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
apiClient: m.api,
}
m.runnerManager.StoreEnvironment(environment)
jobLogger.Info("Successfully recovered environment")
@ -197,6 +198,7 @@ func fetchEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI) (runner
jobHCL: templateEnvironmentJobHCL,
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
apiClient: apiClient,
}
}
}

View File

@ -107,7 +107,7 @@ func TestNewNomadEnvironmentManager(t *testing.T) {
t.Run("loads template environment job from file", func(t *testing.T) {
templateJobHCL := "job \"test\" {}"
_, err := NewNomadEnvironment(templateJobHCL)
_, err := NewNomadEnvironment(nil, templateJobHCL)
require.NoError(t, err)
f := createTempFile(t, templateJobHCL)
defer os.Remove(f.Name())
@ -125,7 +125,7 @@ func TestNewNomadEnvironmentManager(t *testing.T) {
m, err := NewNomadEnvironmentManager(runnerManagerMock, executorAPIMock, f.Name())
require.NoError(t, err)
_, err = NewNomadEnvironment(m.templateEnvironmentHCL)
_, err = NewNomadEnvironment(nil, m.templateEnvironmentHCL)
assert.Error(t, err)
})
@ -152,7 +152,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) {
})
t.Run("Returns environment when it was added before", func(t *testing.T) {
expectedEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
expectedEnvironment, err := NewNomadEnvironment(apiMock, templateEnvironmentJobHCL)
expectedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
require.NoError(t, err)
runnerManager.StoreEnvironment(expectedEnvironment)
@ -170,7 +170,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) {
})
t.Run("Updates values when environment already known by Poseidon", func(t *testing.T) {
fetchedEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
fetchedEnvironment, err := NewNomadEnvironment(nil, templateEnvironmentJobHCL)
require.NoError(t, err)
fetchedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
fetchedEnvironment.SetImage("random docker image")
@ -178,7 +178,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) {
call.ReturnArguments = mock.Arguments{[]*nomadApi.Job{fetchedEnvironment.job}, nil}
})
localEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
localEnvironment, err := NewNomadEnvironment(nil, templateEnvironmentJobHCL)
require.NoError(t, err)
localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
runnerManager.StoreEnvironment(localEnvironment)
@ -194,7 +194,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) {
runnerManager.DeleteEnvironment(tests.DefaultEnvironmentIDAsInteger)
t.Run("Adds environment when not already known by Poseidon", func(t *testing.T) {
fetchedEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
fetchedEnvironment, err := NewNomadEnvironment(nil, templateEnvironmentJobHCL)
require.NoError(t, err)
fetchedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
fetchedEnvironment.SetImage("random docker image")
@ -231,7 +231,7 @@ func TestNomadEnvironmentManager_List(t *testing.T) {
})
t.Run("Returns added environment", func(t *testing.T) {
localEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
localEnvironment, err := NewNomadEnvironment(apiMock, templateEnvironmentJobHCL)
require.NoError(t, err)
localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
runnerManager.StoreEnvironment(localEnvironment)
@ -244,7 +244,7 @@ func TestNomadEnvironmentManager_List(t *testing.T) {
runnerManager.DeleteEnvironment(tests.DefaultEnvironmentIDAsInteger)
t.Run("Fetches new Runners via the api client", func(t *testing.T) {
fetchedEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
fetchedEnvironment, err := NewNomadEnvironment(apiMock, templateEnvironmentJobHCL)
require.NoError(t, err)
fetchedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
status := structs.JobStatusRunning