Implement review suggestions

This commit is contained in:
Maximilian Paß
2021-12-17 17:09:01 +01:00
parent 0571b10b5c
commit d57a0c07b8
10 changed files with 104 additions and 106 deletions

View File

@ -3,6 +3,7 @@ package environment
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/google/uuid"
nomadApi "github.com/hashicorp/nomad/api"
@ -18,6 +19,8 @@ const (
portNumberBase = 10
)
var ErrScaleDown = errors.New("cannot scale down the environment")
type NomadEnvironment struct {
jobHCL string
job *nomadApi.Job
@ -33,6 +36,23 @@ func NewNomadEnvironment(jobHCL string) (*NomadEnvironment, error) {
return &NomadEnvironment{jobHCL, job, runner.NewLocalRunnerStorage()}, nil
}
func NewNomadEnvironmentFromRequest(jobHCL string, id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
*NomadEnvironment, error) {
environment, err := NewNomadEnvironment(jobHCL)
if err != nil {
return nil, err
}
environment.SetID(id)
// Set options according to request
environment.SetPrewarmingPoolSize(request.PrewarmingPoolSize)
environment.SetCPULimit(request.CPULimit)
environment.SetMemoryLimit(request.MemoryLimit)
environment.SetImage(request.Image)
environment.SetNetworkAccess(request.NetworkAccess, request.ExposedPorts)
return environment, nil
}
func (n *NomadEnvironment) ID() dto.EnvironmentID {
id, err := nomad.EnvironmentIDFromTemplateJobID(*n.job.ID)
if err != nil {
@ -169,11 +189,6 @@ func (n *NomadEnvironment) SetNetworkAccess(allow bool, exposedPorts []uint16) {
// Register creates a Nomad job based on the default job configuration and the given parameters.
// It registers the job with Nomad and waits until the registration completes.
func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error {
// To avoid docker image issues. See https://github.com/openHPI/poseidon/issues/69
if err := n.Delete(apiClient); err != nil {
return fmt.Errorf("failed to remove the environment: %w", err)
}
nomad.SetForcePullFlag(n.job, true) // This must be the default as otherwise new runners could have different images.
evalID, err := apiClient.RegisterNomadJob(n.job)
if err != nil {
@ -200,14 +215,13 @@ func (n *NomadEnvironment) Delete(apiClient nomad.ExecutorAPI) error {
return nil
}
func (n *NomadEnvironment) Scale(apiClient nomad.ExecutorAPI) error {
func (n *NomadEnvironment) ApplyPrewarmingPoolSize(apiClient nomad.ExecutorAPI) error {
required := int(n.PrewarmingPoolSize()) - n.idleRunners.Length()
if required > 0 {
return n.createRunners(apiClient, uint(required), true)
} else {
return n.removeIdleRunners(apiClient, uint(-required))
if required < 0 {
return fmt.Errorf("%w. Runners to remove: %d", ErrScaleDown, -required)
}
return n.createRunners(apiClient, uint(required), true)
}
func (n *NomadEnvironment) Sample(apiClient nomad.ExecutorAPI) (runner.Runner, bool) {
@ -322,32 +336,18 @@ func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull b
return nil
}
func (n *NomadEnvironment) removeIdleRunners(apiClient nomad.ExecutorAPI, count uint) error {
log.WithField("runnersToDelete", count).WithField("id", n.ID()).Debug("Removing idle runners")
for i := 0; i < int(count); i++ {
r, ok := n.idleRunners.Sample()
if !ok {
return fmt.Errorf("could not delete expected idle runner: %w", runner.ErrRunnerNotFound)
}
err := apiClient.DeleteJob(r.ID())
if err != nil {
return fmt.Errorf("could not delete expected Nomad idle runner: %w", err)
}
}
return nil
}
// removeRunners removes all (idle and used) runners for the given environment n.
func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI) error {
// Only to avoid timing issues as an idle runner is also removed when Nomad has deleted the allocation.
for _, r := range n.idleRunners.List() {
n.idleRunners.Delete(r.ID())
}
// This prevents a race condition where the number of required runners is miscalculated in the up-scaling process
// based on the number of allocation that has been stopped at the moment of the scaling.
n.idleRunners.Purge()
ids, err := apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), ""))
if err != nil {
return fmt.Errorf("failed to load runner ids: %w", err)
}
// Block execution until Nomad confirmed all deletion requests.
var wg sync.WaitGroup
for _, id := range ids {
wg.Add(1)

View File

@ -88,7 +88,7 @@ func (m *NomadEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (
}
ok = false
case !ok:
m.runnerManager.SetEnvironment(fetchedEnvironment)
m.runnerManager.StoreEnvironment(fetchedEnvironment)
executionEnvironment = fetchedEnvironment
ok = true
default:
@ -114,32 +114,39 @@ func (m *NomadEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironmen
func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
created bool, err error) {
environment, ok := m.runnerManager.GetEnvironment(id)
if !ok {
environment, err = NewNomadEnvironment(m.templateEnvironmentHCL)
// Check if execution environment is already existing (in the local memory).
environment, isExistingEnvironment := m.runnerManager.GetEnvironment(id)
if isExistingEnvironment {
// Remove existing environment to force downloading the newest Docker image.
// See https://github.com/openHPI/poseidon/issues/69
err = environment.Delete(m.api)
if err != nil {
return false, fmt.Errorf("error creating Nomad environment: %w", err)
return false, fmt.Errorf("failed to remove the environment: %w", err)
}
environment.SetID(id)
}
environment.SetPrewarmingPoolSize(request.PrewarmingPoolSize)
environment.SetCPULimit(request.CPULimit)
environment.SetMemoryLimit(request.MemoryLimit)
environment.SetImage(request.Image)
environment.SetNetworkAccess(request.NetworkAccess, request.ExposedPorts)
created = m.runnerManager.SetEnvironment(environment)
// Create a new environment with the given request options.
environment, err = NewNomadEnvironmentFromRequest(m.templateEnvironmentHCL, id, request)
if err != nil {
return false, fmt.Errorf("error creating Nomad environment: %w", err)
}
// Keep a copy of environment specification in memory.
m.runnerManager.StoreEnvironment(environment)
// Register template Job with Nomad.
err = environment.Register(m.api)
if err != nil {
return false, fmt.Errorf("error registering template job in API: %w", err)
}
err = environment.Scale(m.api)
// Launch idle runners based on the template job.
err = environment.ApplyPrewarmingPoolSize(m.api)
if err != nil {
return false, fmt.Errorf("error scaling template job in API: %w", err)
}
return created, nil
return !isExistingEnvironment, nil
}
func (m *NomadEnvironmentManager) Delete(id dto.EnvironmentID) (bool, error) {
@ -181,7 +188,7 @@ func (m *NomadEnvironmentManager) Load() error {
job: job,
idleRunners: runner.NewLocalRunnerStorage(),
}
m.runnerManager.SetEnvironment(environment)
m.runnerManager.StoreEnvironment(environment)
jobLogger.Info("Successfully recovered environment")
}
return nil

View File

@ -57,7 +57,7 @@ func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentRe
s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false)
s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
_, err := s.manager.CreateOrUpdate(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request)
s.ErrorIs(err, tests.ErrDefault)
}
@ -67,7 +67,7 @@ func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() {
s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false)
s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
s.apiMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil)
s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
call := s.apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job"))
@ -155,7 +155,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) {
expectedEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
expectedEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
require.NoError(t, err)
runnerManager.SetEnvironment(expectedEnvironment)
runnerManager.StoreEnvironment(expectedEnvironment)
environment, err := m.Get(tests.DefaultEnvironmentIDAsInteger, false)
assert.NoError(t, err)
@ -181,7 +181,7 @@ func TestNomadEnvironmentManager_Get(t *testing.T) {
localEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
require.NoError(t, err)
localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
runnerManager.SetEnvironment(localEnvironment)
runnerManager.StoreEnvironment(localEnvironment)
environment, err := m.Get(tests.DefaultEnvironmentIDAsInteger, false)
assert.NoError(t, err)
@ -234,7 +234,7 @@ func TestNomadEnvironmentManager_List(t *testing.T) {
localEnvironment, err := NewNomadEnvironment(templateEnvironmentJobHCL)
require.NoError(t, err)
localEnvironment.SetID(tests.DefaultEnvironmentIDAsInteger)
runnerManager.SetEnvironment(localEnvironment)
runnerManager.StoreEnvironment(localEnvironment)
environments, err := m.List(false)
assert.NoError(t, err)