diff --git a/configuration.example.yaml b/configuration.example.yaml index 3f2c5f5..44e072b 100644 --- a/configuration.example.yaml +++ b/configuration.example.yaml @@ -19,10 +19,16 @@ server: interactivestderr: true # If set, the file at the given path overwrites the default Nomad job file in internal/environment/template-environment-job.hcl # templatejobfile: ./poseidon.hcl - # The prewarming pool alert threshold [0, 1) defines which part of the prewarming pool should always be filled. - # Setting it to 0 will disable the alert. - # If the prewarming pool is filled for less than, i.e., 50%, the health route of Poseidon will return a warning. - prewarmingpoolalertthreshold: 0.5 + # alert defines how poseidon should handle specific risks. + alert: + # The prewarming pool threshold [0, 1) defines which part of the prewarming pool should always be filled. + # Setting it to 0 will disable the alert. + # If the prewarming pool is filled for less than, i.e., 50%, the health route of Poseidon will return a warning. + prewarmingpoolthreshold: 0.5 + # The prewarming pool reload timeout (in seconds) defines for how long the low prewarming pool warning (above) + # should be active before Poseidon automatically reloads the environment. + # Setting it to 0 will disable the automatic reload. + prewarmingpoolreloadtimeout: 300 # Configuration of the used Nomad cluster nomad: diff --git a/internal/api/api_test.go b/internal/api/api_test.go index a8f3db0..38f0f65 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -3,6 +3,8 @@ package api import ( "github.com/gorilla/mux" "github.com/openHPI/poseidon/internal/config" + "github.com/openHPI/poseidon/internal/environment" + "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/tests" "github.com/stretchr/testify/suite" "net/http" @@ -25,7 +27,9 @@ func TestMainTestSuite(t *testing.T) { func (s *MainTestSuite) TestNewRouterV1WithAuthenticationDisabled() { config.Config.Server.Token = "" router := mux.NewRouter() - configureV1Router(router, nil, nil) + m := &environment.ManagerHandlerMock{} + m.On("Statistics").Return(make(map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData)) + configureV1Router(router, nil, m) s.Run("health route is accessible", func() { request, err := http.NewRequest(http.MethodGet, "/api/v1/health", http.NoBody) @@ -52,7 +56,9 @@ func (s *MainTestSuite) TestNewRouterV1WithAuthenticationDisabled() { func (s *MainTestSuite) TestNewRouterV1WithAuthenticationEnabled() { config.Config.Server.Token = "TestToken" router := mux.NewRouter() - configureV1Router(router, nil, nil) + m := &environment.ManagerHandlerMock{} + m.On("Statistics").Return(make(map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData)) + configureV1Router(router, nil, m) s.Run("health route is accessible", func() { request, err := http.NewRequest(http.MethodGet, "/api/v1/health", http.NoBody) diff --git a/internal/api/health.go b/internal/api/health.go index 05acc61..8349a59 100644 --- a/internal/api/health.go +++ b/internal/api/health.go @@ -30,7 +30,7 @@ func Health(manager environment.Manager) http.HandlerFunc { func checkPrewarmingPool(manager environment.Manager) error { var depletingEnvironments []int for _, data := range manager.Statistics() { - if float64(data.IdleRunners)/float64(data.PrewarmingPoolSize) < config.Config.Server.PrewarmingPoolAlertThreshold { + if float64(data.IdleRunners)/float64(data.PrewarmingPoolSize) < config.Config.Server.Alert.PrewarmingPoolThreshold { depletingEnvironments = append(depletingEnvironments, data.ID) } } diff --git a/internal/api/health_test.go b/internal/api/health_test.go index 4338b50..89e2b3a 100644 --- a/internal/api/health_test.go +++ b/internal/api/health_test.go @@ -39,7 +39,7 @@ func (s *MainTestSuite) TestHealth() { IdleRunners: 1, }, }) - config.Config.Server.PrewarmingPoolAlertThreshold = 0.5 + config.Config.Server.Alert.PrewarmingPoolThreshold = 0.5 Health(manager).ServeHTTP(recorder, request) s.Equal(http.StatusServiceUnavailable, recorder.Code) diff --git a/internal/config/config.go b/internal/config/config.go index bb7d4f4..5a203bc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,9 +29,12 @@ var ( CertFile: "", KeyFile: "", }, - InteractiveStderr: true, - TemplateJobFile: "", - PrewarmingPoolAlertThreshold: 0, + InteractiveStderr: true, + TemplateJobFile: "", + Alert: alert{ + PrewarmingPoolThreshold: 0, + PrewarmingPoolReloadTimeout: 0, + }, }, Nomad: Nomad{ Enabled: true, @@ -80,15 +83,20 @@ var ( ErrConfigInitialized = errors.New("configuration is already initialized") ) +type alert struct { + PrewarmingPoolThreshold float64 + PrewarmingPoolReloadTimeout uint +} + // server configures the Poseidon webserver. type server struct { - Address string - Port int - Token string - TLS TLS - InteractiveStderr bool - TemplateJobFile string - PrewarmingPoolAlertThreshold float64 + Address string + Port int + Token string + TLS TLS + InteractiveStderr bool + TemplateJobFile string + Alert alert } // URL returns the URL of the Poseidon webserver. diff --git a/internal/nomad/job.go b/internal/nomad/job.go index ca1c442..3dda0b6 100644 --- a/internal/nomad/job.go +++ b/internal/nomad/job.go @@ -30,11 +30,13 @@ const ( ConfigMetaPoolSizeKey = "prewarmingPoolSize" TemplateJobNameParts = 2 RegisterTimeout = 10 * time.Second + RunnerTimeoutFallback = 60 * time.Second ) var ( - ErrorInvalidJobID = errors.New("invalid job id") - TaskArgs = []string{"infinity"} + ErrorInvalidJobID = errors.New("invalid job id") + ErrorMissingTaskGroup = errors.New("couldn't find config task group in job") + TaskArgs = []string{"infinity"} ) func (a *APIClient) RegisterRunnerJob(template *nomadApi.Job) error { diff --git a/internal/runner/aws_manager.go b/internal/runner/aws_manager.go index 500211b..8c72991 100644 --- a/internal/runner/aws_manager.go +++ b/internal/runner/aws_manager.go @@ -17,7 +17,7 @@ func NewAWSRunnerManager(ctx context.Context) *AWSRunnerManager { } func (a AWSRunnerManager) Claim(id dto.EnvironmentID, duration int) (Runner, error) { - environment, ok := a.environments.Get(id.ToString()) + environment, ok := a.GetEnvironment(id) if !ok { r, err := a.NextHandler().Claim(id, duration) if err != nil { diff --git a/internal/runner/aws_manager_test.go b/internal/runner/aws_manager_test.go index 529d8c1..cf1f91a 100644 --- a/internal/runner/aws_manager_test.go +++ b/internal/runner/aws_manager_test.go @@ -112,5 +112,7 @@ func createBasicEnvironmentMock(id dto.EnvironmentID) *ExecutionEnvironmentMock environment.On("NetworkAccess").Return(false, nil) environment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) environment.On("ApplyPrewarmingPoolSize").Return(nil) + environment.On("IdleRunnerCount").Return(uint(1)).Maybe() + environment.On("PrewarmingPoolSize").Return(uint(1)).Maybe() return environment } diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index a579fd1..ad1a0a0 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -6,14 +6,15 @@ import ( "fmt" nomadApi "github.com/hashicorp/nomad/api" influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/util" - "github.com/sirupsen/logrus" "strconv" + "sync" "time" ) @@ -26,21 +27,23 @@ var ( type NomadRunnerManager struct { *AbstractManager - apiClient nomad.ExecutorAPI + apiClient nomad.ExecutorAPI + reloadingEnvironment map[dto.EnvironmentID]*sync.Mutex } // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. // KeepRunnersSynced has to be started separately. func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { - return &NomadRunnerManager{NewAbstractManager(ctx), apiClient} + return &NomadRunnerManager{NewAbstractManager(ctx), apiClient, make(map[dto.EnvironmentID]*sync.Mutex)} } func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) { - environment, ok := m.environments.Get(environmentID.ToString()) + environment, ok := m.GetEnvironment(environmentID) if !ok { return nil, ErrUnknownExecutionEnvironment } runner, ok := environment.Sample() + go m.checkPrewarmingPoolAlert(environment) if !ok { return nil, ErrNoRunnersAvailable } @@ -52,6 +55,44 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int return runner, nil } +// checkPrewarmingPoolAlert checks if the prewarming pool contains enough idle runners as specified by the PrewarmingPoolThreshold +// if not it starts an environment reload mechanism according to the PrewarmingPoolReloadTimeout. +func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvironment) { + mutex := m.reloadingEnvironment[environment.ID()] + if !mutex.TryLock() { + // The environment is already about to be reloaded + return + } + defer mutex.Unlock() + + prewarmingPoolThreshold := config.Config.Server.Alert.PrewarmingPoolThreshold + reloadTimeout := config.Config.Server.Alert.PrewarmingPoolReloadTimeout + + if reloadTimeout == 0 || float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold { + return + } + + log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Checking again..") + <-time.After(time.Duration(reloadTimeout) * time.Second) + + if float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold { + return + } + + log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Reloading environment") + err := util.RetryExponential(func() error { + usedRunners, err := m.loadEnvironment(environment) + if err != nil { + return err + } + m.updateUsedRunners(usedRunners, false) + return nil + }) + if err != nil { + log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("Failed to reload environment") + } +} + func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) { err := util.RetryExponential(func() (err error) { if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil { @@ -80,9 +121,7 @@ func (m *NomadRunnerManager) Return(r Runner) error { // SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism). func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { log.Info("Loading runners") - if err := m.load(); err != nil { - return fmt.Errorf("failed loading runners: %w", err) - } + m.load() // Watch for changes regarding the existing or new runners. log.Info("Watching Event Stream") @@ -95,69 +134,101 @@ func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { return err } -// Load recovers all runners for all existing environments. -func (m *NomadRunnerManager) load() error { - newUsedRunners := storage.NewLocalStorage[Runner]() - for _, environment := range m.environments.List() { - environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString()) - - runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID()) - if err != nil { - return fmt.Errorf("failed fetching the runner jobs: %w", err) - } - for _, job := range runnerJobs { - m.loadSingleJob(job, environmentLogger, environment, newUsedRunners) - } - err = environment.ApplyPrewarmingPoolSize() - if err != nil { - return fmt.Errorf("couldn't scale environment: %w", err) - } - } - - m.updateUsedRunners(newUsedRunners) - return nil +func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) { + m.AbstractManager.StoreEnvironment(environment) + m.reloadingEnvironment[environment.ID()] = &sync.Mutex{} } -func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry, - environment ExecutionEnvironment, newUsedRunners storage.Storage[Runner]) { +func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) { + m.AbstractManager.DeleteEnvironment(id) + delete(m.reloadingEnvironment, id) +} + +// Load recovers all runners for all existing environments. +func (m *NomadRunnerManager) load() { + newUsedRunners := storage.NewLocalStorage[Runner]() + for _, environment := range m.ListEnvironments() { + usedRunners, err := m.loadEnvironment(environment) + if err != nil { + log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()). + Warn("Failed loading environment. Skipping ...") + continue + } + for _, r := range usedRunners.List() { + newUsedRunners.Add(r.ID(), r) + } + } + + m.updateUsedRunners(newUsedRunners, true) +} + +func (m *NomadRunnerManager) loadEnvironment(environment ExecutionEnvironment) (used storage.Storage[Runner], err error) { + used = storage.NewLocalStorage[Runner]() + runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID()) + if err != nil { + return nil, fmt.Errorf("failed fetching the runner jobs: %w", err) + } + for _, job := range runnerJobs { + r, isUsed, err := m.loadSingleJob(job, environment) + if err != nil { + log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()). + WithField("used", isUsed).Warn("Failed loading job. Skipping ...") + continue + } else if isUsed { + used.Add(r.ID(), r) + } + } + err = environment.ApplyPrewarmingPoolSize() + if err != nil { + return used, fmt.Errorf("couldn't scale environment: %w", err) + } + return used, nil +} + +func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environment ExecutionEnvironment) (r Runner, isUsed bool, err error) { configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) if configTaskGroup == nil { - environmentLogger.Warnf("Couldn't find config task group in job %s, skipping ...", *job.ID) - return + return nil, false, fmt.Errorf("%w, %s", nomad.ErrorMissingTaskGroup, *job.ID) } - isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue + isUsed = configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID) if err != nil { - environmentLogger.WithError(err).Warn("Error loading runner portMappings, skipping ...") - return + return nil, false, fmt.Errorf("error loading runner portMappings: %w", err) } + newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.onRunnerDestroyed) log.WithField("isUsed", isUsed).WithField(dto.KeyRunnerID, newJob.ID()).Debug("Recovered Runner") if isUsed { - newUsedRunners.Add(newJob.ID(), newJob) timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey]) if err != nil { - environmentLogger.WithError(err).Warn("Error loading timeout from meta values") - } else { - newJob.SetupTimeout(time.Duration(timeout) * time.Second) + log.WithField(dto.KeyRunnerID, newJob.ID()).WithError(err).Warn("failed loading timeout from meta values") + timeout = int(nomad.RunnerTimeoutFallback.Seconds()) + go m.markRunnerAsUsed(newJob, timeout) } + newJob.SetupTimeout(time.Duration(timeout) * time.Second) } else { environment.AddRunner(newJob) } + return newJob, isUsed, nil } -func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner]) { +// updateUsedRunners handles the cleanup process of updating the used runner storage. +// This includes the clean deletion of the local references to the (replaced/deleted) runners. +// Only if removeDeleted is set, the runners that are only in newUsedRunners (and not in the main m.usedRunners) will be removed. +func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner], removeDeleted bool) { for _, r := range m.usedRunners.List() { var reason DestroyReason if _, ok := newUsedRunners.Get(r.ID()); ok { reason = ErrDestroyedAndReplaced - } else { + } else if removeDeleted { reason = ErrLocalDestruction log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered") } - m.usedRunners.Delete(r.ID()) - if err := r.Destroy(reason); err != nil { - log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally") + if reason != nil { + m.usedRunners.Delete(r.ID()) + if err := r.Destroy(reason); err != nil { + log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally") + } } } @@ -186,7 +257,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start return } - environment, ok := m.environments.Get(environmentID.ToString()) + environment, ok := m.GetEnvironment(environmentID) if ok { var mappedPorts []nomadApi.PortMapping if alloc.AllocatedResources != nil { @@ -227,7 +298,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) } } - environment, ok := m.environments.Get(environmentID.ToString()) + environment, ok := m.GetEnvironment(environmentID) if ok { stillActive = stillActive || environment.DeleteRunner(runnerID) } @@ -240,7 +311,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) func (m *NomadRunnerManager) onRunnerDestroyed(r Runner) error { m.usedRunners.Delete(r.ID()) - environment, ok := m.environments.Get(r.Environment().ToString()) + environment, ok := m.GetEnvironment(r.Environment()) if ok { environment.DeleteRunner(r.ID()) } diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 502fc40..fb377d8 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -3,6 +3,7 @@ package runner import ( "context" nomadApi "github.com/hashicorp/nomad/api" + "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/storage" @@ -533,13 +534,12 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner. call.Return([]*nomadApi.Job{job}, nil) - err := runnerManager.load() - s.NoError(err) - + runnerManager.load() environmentMock.AssertExpectations(s.T()) }) s.Run("Stores used runner", func() { + apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil) _, job := helpers.CreateTemplateJob() jobID := tests.DefaultRunnerID job.ID = &jobID @@ -547,14 +547,11 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) s.Require().NotNil(configTaskGroup) configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue - s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner. + s.ExpectedGoroutingIncrease++ // We don't care about destroying the created runner. call.Return([]*nomadApi.Job{job}, nil) s.Require().Zero(runnerManager.usedRunners.Length()) - - err := runnerManager.load() - s.NoError(err) - + runnerManager.load() _, ok := runnerManager.usedRunners.Get(tests.DefaultRunnerID) s.True(ok) }) @@ -576,10 +573,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { call.Return([]*nomadApi.Job{job}, nil) s.Require().Zero(runnerManager.usedRunners.Length()) - - err := runnerManager.load() - s.NoError(err) - + runnerManager.load() s.Require().NotZero(runnerManager.usedRunners.Length()) <-time.After(time.Duration(timeout*2) * time.Second) @@ -587,6 +581,132 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { }) } +func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() { + timeout := uint(1) + config.Config.Server.Alert.PrewarmingPoolReloadTimeout = timeout + config.Config.Server.Alert.PrewarmingPoolThreshold = 0.5 + environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) + environment.On("Image").Return("") + environment.On("CPULimit").Return(uint(0)) + environment.On("MemoryLimit").Return(uint(0)) + environment.On("NetworkAccess").Return(false, nil) + apiMock := &nomad.ExecutorAPIMock{} + m := NewNomadRunnerManager(apiMock, s.TestCtx) + m.StoreEnvironment(environment) + + s.Run("does not allow concurrent calls", func() { + environment.On("PrewarmingPoolSize").Return(uint(1)).Once() + + secondCallDone := make(chan struct{}) + environment.On("IdleRunnerCount").Run(func(_ mock.Arguments) { + <-secondCallDone + }).Return(uint(1)).Once() + + go m.checkPrewarmingPoolAlert(environment) + <-time.After(tests.ShortTimeout) + go func() { + m.checkPrewarmingPoolAlert(environment) + close(secondCallDone) + }() + + <-time.After(tests.ShortTimeout) + environment.AssertExpectations(s.T()) + }) + s.Run("checks the alert condition again after the reload timeout", func() { + environment.On("PrewarmingPoolSize").Return(uint(1)).Once() + environment.On("IdleRunnerCount").Return(uint(0)).Once() + environment.On("PrewarmingPoolSize").Return(uint(1)).Once() + environment.On("IdleRunnerCount").Return(uint(1)).Once() + + checkDone := make(chan struct{}) + go func() { + m.checkPrewarmingPoolAlert(environment) + close(checkDone) + }() + + select { + case <-checkDone: + s.Fail("checkPrewarmingPoolAlert returned before the reload timeout") + case <-time.After(time.Duration(timeout) * time.Second / 2): + } + + select { + case <-time.After(time.Duration(timeout) * time.Second): + s.Fail("checkPrewarmingPoolAlert did not return after checking the alert condition again") + case <-checkDone: + } + environment.AssertExpectations(s.T()) + }) + s.Run("checks the alert condition again after the reload timeout", func() { + environment.On("PrewarmingPoolSize").Return(uint(1)).Twice() + environment.On("IdleRunnerCount").Return(uint(0)).Twice() + apiMock.On("LoadRunnerJobs", environment.ID()).Return([]*nomadApi.Job{}, nil).Once() + environment.On("ApplyPrewarmingPoolSize").Return(nil).Once() + + checkDone := make(chan struct{}) + go func() { + m.checkPrewarmingPoolAlert(environment) + close(checkDone) + }() + + select { + case <-time.After(time.Duration(timeout) * time.Second * 2): + s.Fail("checkPrewarmingPoolAlert did not return") + case <-checkDone: + } + environment.AssertExpectations(s.T()) + }) +} + +func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsRunners() { + config.Config.Server.Alert.PrewarmingPoolReloadTimeout = uint(1) + config.Config.Server.Alert.PrewarmingPoolThreshold = 0.5 + environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) + environment.On("Image").Return("") + environment.On("CPULimit").Return(uint(0)) + environment.On("MemoryLimit").Return(uint(0)) + environment.On("NetworkAccess").Return(false, nil) + apiMock := &nomad.ExecutorAPIMock{} + m := NewNomadRunnerManager(apiMock, s.TestCtx) + m.StoreEnvironment(environment) + + environment.On("PrewarmingPoolSize").Return(uint(1)).Twice() + environment.On("IdleRunnerCount").Return(uint(0)).Twice() + environment.On("DeleteRunner", mock.Anything).Return(false).Once() + + s.Require().Empty(m.usedRunners.Length()) + _, usedJob := helpers.CreateTemplateJob() + id := tests.DefaultRunnerID + usedJob.ID = &id + configTaskGroup := nomad.FindTaskGroup(usedJob, nomad.ConfigTaskGroupName) + configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue + configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = "42" + _, idleJob := helpers.CreateTemplateJob() + idleID := tests.AnotherRunnerID + idleJob.ID = &idleID + nomad.FindTaskGroup(idleJob, nomad.ConfigTaskGroupName).Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUnusedValue + apiMock.On("LoadRunnerJobs", environment.ID()).Return([]*nomadApi.Job{usedJob, idleJob}, nil).Once() + apiMock.On("LoadRunnerPortMappings", mock.Anything).Return(nil, nil).Twice() + environment.On("ApplyPrewarmingPoolSize").Return(nil).Once() + environment.On("AddRunner", mock.Anything).Run(func(args mock.Arguments) { + job, ok := args[0].(*NomadJob) + s.Require().True(ok) + err := job.Destroy(ErrLocalDestruction) + s.NoError(err) + }).Return().Once() + + m.checkPrewarmingPoolAlert(environment) + + r, ok := m.usedRunners.Get(tests.DefaultRunnerID) + s.Require().True(ok) + err := r.Destroy(ErrLocalDestruction) + s.NoError(err) + + environment.AssertExpectations(s.T()) +} + func mockWatchAllocations(ctx context.Context, apiMock *nomad.ExecutorAPIMock) { call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything) call.Run(func(args mock.Arguments) {