Refactor PrewarmingPoolAlert triggering

from route-based to Nomad-Event-Stream-based.
This commit is contained in:
Maximilian Paß
2023-11-08 14:36:06 +01:00
committed by Sebastian Serth
parent 543939e5cb
commit 7b82300ff7
2 changed files with 93 additions and 65 deletions

View File

@ -25,16 +25,21 @@ var (
ErrRunnerNotFound = errors.New("no runner found with this id") ErrRunnerNotFound = errors.New("no runner found with this id")
) )
type alertData struct {
*sync.Mutex
cancel context.CancelFunc
}
type NomadRunnerManager struct { type NomadRunnerManager struct {
*AbstractManager *AbstractManager
apiClient nomad.ExecutorAPI apiClient nomad.ExecutorAPI
reloadingEnvironment map[dto.EnvironmentID]*sync.Mutex reloadingEnvironment storage.Storage[*alertData]
} }
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners. // NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// KeepRunnersSynced has to be started separately. // KeepRunnersSynced has to be started separately.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager { func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
return &NomadRunnerManager{NewAbstractManager(ctx), apiClient, make(map[dto.EnvironmentID]*sync.Mutex)} return &NomadRunnerManager{NewAbstractManager(ctx), apiClient, storage.NewLocalStorage[*alertData]()}
} }
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) { func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
@ -43,7 +48,6 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int
return nil, ErrUnknownExecutionEnvironment return nil, ErrUnknownExecutionEnvironment
} }
runner, ok := environment.Sample() runner, ok := environment.Sample()
go m.checkPrewarmingPoolAlert(environment)
if !ok { if !ok {
return nil, ErrNoRunnersAvailable return nil, ErrNoRunnersAvailable
} }
@ -55,44 +59,6 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int
return runner, nil return runner, nil
} }
// checkPrewarmingPoolAlert checks if the prewarming pool contains enough idle runners as specified by the PrewarmingPoolThreshold
// if not it starts an environment reload mechanism according to the PrewarmingPoolReloadTimeout.
func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvironment) {
mutex := m.reloadingEnvironment[environment.ID()]
if !mutex.TryLock() {
// The environment is already about to be reloaded
return
}
defer mutex.Unlock()
prewarmingPoolThreshold := config.Config.Server.Alert.PrewarmingPoolThreshold
reloadTimeout := config.Config.Server.Alert.PrewarmingPoolReloadTimeout
if reloadTimeout == 0 || float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold {
return
}
log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Checking again..")
<-time.After(time.Duration(reloadTimeout) * time.Second)
if float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold {
return
}
log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Reloading environment")
err := util.RetryExponential(func() error {
usedRunners, err := m.loadEnvironment(environment)
if err != nil {
return err
}
m.updateUsedRunners(usedRunners, false)
return nil
})
if err != nil {
log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("Failed to reload environment")
}
}
func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) { func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
err := util.RetryExponential(func() (err error) { err := util.RetryExponential(func() (err error) {
if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil { if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil {
@ -136,12 +102,69 @@ func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error {
func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) { func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) {
m.AbstractManager.StoreEnvironment(environment) m.AbstractManager.StoreEnvironment(environment)
m.reloadingEnvironment[environment.ID()] = &sync.Mutex{} m.reloadingEnvironment.Add(environment.ID().ToString(), &alertData{Mutex: &sync.Mutex{}, cancel: nil})
} }
func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) { func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) {
m.AbstractManager.DeleteEnvironment(id) m.AbstractManager.DeleteEnvironment(id)
delete(m.reloadingEnvironment, id) m.reloadingEnvironment.Delete(id.ToString())
}
// checkPrewarmingPoolAlert checks if the prewarming pool contains enough idle runners as specified by the PrewarmingPoolThreshold
// if not it starts an environment reload mechanism according to the PrewarmingPoolReloadTimeout.
func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvironment, runnerAdded bool) {
data, ok := m.reloadingEnvironment.Get(environment.ID().ToString())
if !ok {
log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("reloadingEnvironment not initialized")
return
}
if runnerAdded && data.cancel != nil {
data.cancel()
data.cancel = nil
m.checkPrewarmingPoolAlert(environment, false)
return
}
// With this hard lock we collect/block goroutines waiting for one reload to be done.
// However, in practice its likely that only up to PrewarmingPoolSize/2 goroutines are waiting.
// We could avoid the waiting, but we use it to solve the race conditions of the recursive call above.
data.Lock()
defer data.Unlock()
prewarmingPoolThreshold := config.Config.Server.Alert.PrewarmingPoolThreshold
reloadTimeout := config.Config.Server.Alert.PrewarmingPoolReloadTimeout
if reloadTimeout == 0 || float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold {
return
}
ctx, cancel := context.WithCancel(context.Background())
data.cancel = cancel
log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Checking again..")
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(reloadTimeout) * time.Second):
}
if float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold {
return
}
log.WithField(dto.KeyEnvironmentID, environment.ID()).Warn("Prewarming Pool Alert. Reloading environment")
err := util.RetryExponential(func() error {
usedRunners, err := m.loadEnvironment(environment)
if err != nil {
return err
}
m.updateUsedRunners(usedRunners, false)
return nil
})
if err != nil {
log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("Failed to reload environment")
}
} }
// Load recovers all runners for all existing environments. // Load recovers all runners for all existing environments.
@ -264,6 +287,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start
mappedPorts = alloc.AllocatedResources.Shared.Ports mappedPorts = alloc.AllocatedResources.Shared.Ports
} }
environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed)) environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed))
go m.checkPrewarmingPoolAlert(environment, true)
monitorAllocationStartupDuration(startup, alloc.JobID, environmentID) monitorAllocationStartupDuration(startup, alloc.JobID, environmentID)
} }
} }
@ -301,6 +325,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error)
environment, ok := m.GetEnvironment(environmentID) environment, ok := m.GetEnvironment(environmentID)
if ok { if ok {
stillActive = stillActive || environment.DeleteRunner(runnerID) stillActive = stillActive || environment.DeleteRunner(runnerID)
go m.checkPrewarmingPoolAlert(environment, false)
} }
return !stillActive return !stillActive

View File

@ -594,25 +594,6 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() {
apiMock := &nomad.ExecutorAPIMock{} apiMock := &nomad.ExecutorAPIMock{}
m := NewNomadRunnerManager(apiMock, s.TestCtx) m := NewNomadRunnerManager(apiMock, s.TestCtx)
m.StoreEnvironment(environment) m.StoreEnvironment(environment)
s.Run("does not allow concurrent calls", func() {
environment.On("PrewarmingPoolSize").Return(uint(1)).Once()
secondCallDone := make(chan struct{})
environment.On("IdleRunnerCount").Run(func(_ mock.Arguments) {
<-secondCallDone
}).Return(uint(1)).Once()
go m.checkPrewarmingPoolAlert(environment)
<-time.After(tests.ShortTimeout)
go func() {
m.checkPrewarmingPoolAlert(environment)
close(secondCallDone)
}()
<-time.After(tests.ShortTimeout)
environment.AssertExpectations(s.T())
})
s.Run("checks the alert condition again after the reload timeout", func() { s.Run("checks the alert condition again after the reload timeout", func() {
environment.On("PrewarmingPoolSize").Return(uint(1)).Once() environment.On("PrewarmingPoolSize").Return(uint(1)).Once()
environment.On("IdleRunnerCount").Return(uint(0)).Once() environment.On("IdleRunnerCount").Return(uint(0)).Once()
@ -621,7 +602,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() {
checkDone := make(chan struct{}) checkDone := make(chan struct{})
go func() { go func() {
m.checkPrewarmingPoolAlert(environment) m.checkPrewarmingPoolAlert(environment, false)
close(checkDone) close(checkDone)
}() }()
@ -646,7 +627,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() {
checkDone := make(chan struct{}) checkDone := make(chan struct{})
go func() { go func() {
m.checkPrewarmingPoolAlert(environment) m.checkPrewarmingPoolAlert(environment, false)
close(checkDone) close(checkDone)
}() }()
@ -657,6 +638,28 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() {
} }
environment.AssertExpectations(s.T()) environment.AssertExpectations(s.T())
}) })
s.Run("is canceled by an added runner", func() {
environment.On("PrewarmingPoolSize").Return(uint(1)).Twice()
environment.On("IdleRunnerCount").Return(uint(0)).Once()
environment.On("IdleRunnerCount").Return(uint(1)).Once()
checkDone := make(chan struct{})
go func() {
m.checkPrewarmingPoolAlert(environment, false)
close(checkDone)
}()
<-time.After(tests.ShortTimeout)
go m.checkPrewarmingPoolAlert(environment, true)
<-time.After(tests.ShortTimeout)
select {
case <-time.After(100 * time.Duration(timeout) * time.Second):
s.Fail("checkPrewarmingPoolAlert was not canceled")
case <-checkDone:
}
environment.AssertExpectations(s.T())
})
} }
func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsRunners() { func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsRunners() {
@ -697,7 +700,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsR
s.NoError(err) s.NoError(err)
}).Return().Once() }).Return().Once()
m.checkPrewarmingPoolAlert(environment) m.checkPrewarmingPoolAlert(environment, false)
r, ok := m.usedRunners.Get(tests.DefaultRunnerID) r, ok := m.usedRunners.Get(tests.DefaultRunnerID)
s.Require().True(ok) s.Require().True(ok)