diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index ad1a0a0..3cc82d6 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -25,16 +25,21 @@ var ( ErrRunnerNotFound = errors.New("no runner found with this id") ) +type alertData struct { + *sync.Mutex + cancel context.CancelFunc +} + type NomadRunnerManager struct { *AbstractManager apiClient nomad.ExecutorAPI - reloadingEnvironment map[dto.EnvironmentID]*sync.Mutex + reloadingEnvironment storage.Storage[*alertData] } // NewNomadRunnerManager creates a new runner manager that keeps track of all runners. // KeepRunnersSynced has to be started separately. func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { - return &NomadRunnerManager{NewAbstractManager(ctx), apiClient, make(map[dto.EnvironmentID]*sync.Mutex)} + return &NomadRunnerManager{NewAbstractManager(ctx), apiClient, storage.NewLocalStorage[*alertData]()} } func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) { @@ -43,7 +48,6 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int return nil, ErrUnknownExecutionEnvironment } runner, ok := environment.Sample() - go m.checkPrewarmingPoolAlert(environment) if !ok { return nil, ErrNoRunnersAvailable } @@ -55,44 +59,6 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int return runner, nil } -// checkPrewarmingPoolAlert checks if the prewarming pool contains enough idle runners as specified by the PrewarmingPoolThreshold -// if not it starts an environment reload mechanism according to the PrewarmingPoolReloadTimeout. -func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvironment) { - mutex := m.reloadingEnvironment[environment.ID()] - if !mutex.TryLock() { - // The environment is already about to be reloaded - return - } - defer mutex.Unlock() - - prewarmingPoolThreshold := config.Config.Server.Alert.PrewarmingPoolThreshold - reloadTimeout := config.Config.Server.Alert.PrewarmingPoolReloadTimeout - - if reloadTimeout == 0 || float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold { - return - } - - log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Checking again..") - <-time.After(time.Duration(reloadTimeout) * time.Second) - - if float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold { - return - } - - log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Reloading environment") - err := util.RetryExponential(func() error { - usedRunners, err := m.loadEnvironment(environment) - if err != nil { - return err - } - m.updateUsedRunners(usedRunners, false) - return nil - }) - if err != nil { - log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("Failed to reload environment") - } -} - func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) { err := util.RetryExponential(func() (err error) { if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil { @@ -136,12 +102,69 @@ func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) { m.AbstractManager.StoreEnvironment(environment) - m.reloadingEnvironment[environment.ID()] = &sync.Mutex{} + m.reloadingEnvironment.Add(environment.ID().ToString(), &alertData{Mutex: &sync.Mutex{}, cancel: nil}) } func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) { m.AbstractManager.DeleteEnvironment(id) - delete(m.reloadingEnvironment, id) + m.reloadingEnvironment.Delete(id.ToString()) +} + +// checkPrewarmingPoolAlert checks if the prewarming pool contains enough idle runners as specified by the PrewarmingPoolThreshold +// if not it starts an environment reload mechanism according to the PrewarmingPoolReloadTimeout. +func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvironment, runnerAdded bool) { + data, ok := m.reloadingEnvironment.Get(environment.ID().ToString()) + if !ok { + log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("reloadingEnvironment not initialized") + return + } + + if runnerAdded && data.cancel != nil { + data.cancel() + data.cancel = nil + m.checkPrewarmingPoolAlert(environment, false) + return + } + + // With this hard lock we collect/block goroutines waiting for one reload to be done. + // However, in practice its likely that only up to PrewarmingPoolSize/2 goroutines are waiting. + // We could avoid the waiting, but we use it to solve the race conditions of the recursive call above. + data.Lock() + defer data.Unlock() + + prewarmingPoolThreshold := config.Config.Server.Alert.PrewarmingPoolThreshold + reloadTimeout := config.Config.Server.Alert.PrewarmingPoolReloadTimeout + + if reloadTimeout == 0 || float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + data.cancel = cancel + + log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Checking again..") + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(reloadTimeout) * time.Second): + } + + if float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold { + return + } + + log.WithField(dto.KeyEnvironmentID, environment.ID()).Warn("Prewarming Pool Alert. Reloading environment") + err := util.RetryExponential(func() error { + usedRunners, err := m.loadEnvironment(environment) + if err != nil { + return err + } + m.updateUsedRunners(usedRunners, false) + return nil + }) + if err != nil { + log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("Failed to reload environment") + } } // Load recovers all runners for all existing environments. @@ -264,6 +287,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start mappedPorts = alloc.AllocatedResources.Shared.Ports } environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed)) + go m.checkPrewarmingPoolAlert(environment, true) monitorAllocationStartupDuration(startup, alloc.JobID, environmentID) } } @@ -301,6 +325,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) environment, ok := m.GetEnvironment(environmentID) if ok { stillActive = stillActive || environment.DeleteRunner(runnerID) + go m.checkPrewarmingPoolAlert(environment, false) } return !stillActive diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index fb377d8..3da7bb5 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -594,25 +594,6 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() { apiMock := &nomad.ExecutorAPIMock{} m := NewNomadRunnerManager(apiMock, s.TestCtx) m.StoreEnvironment(environment) - - s.Run("does not allow concurrent calls", func() { - environment.On("PrewarmingPoolSize").Return(uint(1)).Once() - - secondCallDone := make(chan struct{}) - environment.On("IdleRunnerCount").Run(func(_ mock.Arguments) { - <-secondCallDone - }).Return(uint(1)).Once() - - go m.checkPrewarmingPoolAlert(environment) - <-time.After(tests.ShortTimeout) - go func() { - m.checkPrewarmingPoolAlert(environment) - close(secondCallDone) - }() - - <-time.After(tests.ShortTimeout) - environment.AssertExpectations(s.T()) - }) s.Run("checks the alert condition again after the reload timeout", func() { environment.On("PrewarmingPoolSize").Return(uint(1)).Once() environment.On("IdleRunnerCount").Return(uint(0)).Once() @@ -621,7 +602,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() { checkDone := make(chan struct{}) go func() { - m.checkPrewarmingPoolAlert(environment) + m.checkPrewarmingPoolAlert(environment, false) close(checkDone) }() @@ -646,7 +627,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() { checkDone := make(chan struct{}) go func() { - m.checkPrewarmingPoolAlert(environment) + m.checkPrewarmingPoolAlert(environment, false) close(checkDone) }() @@ -657,6 +638,28 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() { } environment.AssertExpectations(s.T()) }) + s.Run("is canceled by an added runner", func() { + environment.On("PrewarmingPoolSize").Return(uint(1)).Twice() + environment.On("IdleRunnerCount").Return(uint(0)).Once() + environment.On("IdleRunnerCount").Return(uint(1)).Once() + + checkDone := make(chan struct{}) + go func() { + m.checkPrewarmingPoolAlert(environment, false) + close(checkDone) + }() + + <-time.After(tests.ShortTimeout) + go m.checkPrewarmingPoolAlert(environment, true) + <-time.After(tests.ShortTimeout) + + select { + case <-time.After(100 * time.Duration(timeout) * time.Second): + s.Fail("checkPrewarmingPoolAlert was not canceled") + case <-checkDone: + } + environment.AssertExpectations(s.T()) + }) } func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsRunners() { @@ -697,7 +700,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsR s.NoError(err) }).Return().Once() - m.checkPrewarmingPoolAlert(environment) + m.checkPrewarmingPoolAlert(environment, false) r, ok := m.usedRunners.Get(tests.DefaultRunnerID) s.Require().True(ok)