Add independent environment reload
in the case that the prewarming pool is depleting (see PrewarmingPoolThreshold) and is still depleting after a timeout (PrewarmingPoolReloadTimeout).
This commit is contained in:

committed by
Sebastian Serth

parent
c46a09eeae
commit
543939e5cb
@ -6,14 +6,15 @@ import (
|
||||
"fmt"
|
||||
nomadApi "github.com/hashicorp/nomad/api"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
"github.com/openHPI/poseidon/internal/config"
|
||||
"github.com/openHPI/poseidon/internal/nomad"
|
||||
"github.com/openHPI/poseidon/pkg/dto"
|
||||
"github.com/openHPI/poseidon/pkg/logging"
|
||||
"github.com/openHPI/poseidon/pkg/monitoring"
|
||||
"github.com/openHPI/poseidon/pkg/storage"
|
||||
"github.com/openHPI/poseidon/pkg/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -26,21 +27,23 @@ var (
|
||||
|
||||
type NomadRunnerManager struct {
|
||||
*AbstractManager
|
||||
apiClient nomad.ExecutorAPI
|
||||
apiClient nomad.ExecutorAPI
|
||||
reloadingEnvironment map[dto.EnvironmentID]*sync.Mutex
|
||||
}
|
||||
|
||||
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
|
||||
// KeepRunnersSynced has to be started separately.
|
||||
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
|
||||
return &NomadRunnerManager{NewAbstractManager(ctx), apiClient}
|
||||
return &NomadRunnerManager{NewAbstractManager(ctx), apiClient, make(map[dto.EnvironmentID]*sync.Mutex)}
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
|
||||
environment, ok := m.environments.Get(environmentID.ToString())
|
||||
environment, ok := m.GetEnvironment(environmentID)
|
||||
if !ok {
|
||||
return nil, ErrUnknownExecutionEnvironment
|
||||
}
|
||||
runner, ok := environment.Sample()
|
||||
go m.checkPrewarmingPoolAlert(environment)
|
||||
if !ok {
|
||||
return nil, ErrNoRunnersAvailable
|
||||
}
|
||||
@ -52,6 +55,44 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int
|
||||
return runner, nil
|
||||
}
|
||||
|
||||
// checkPrewarmingPoolAlert checks if the prewarming pool contains enough idle runners as specified by the PrewarmingPoolThreshold
|
||||
// if not it starts an environment reload mechanism according to the PrewarmingPoolReloadTimeout.
|
||||
func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvironment) {
|
||||
mutex := m.reloadingEnvironment[environment.ID()]
|
||||
if !mutex.TryLock() {
|
||||
// The environment is already about to be reloaded
|
||||
return
|
||||
}
|
||||
defer mutex.Unlock()
|
||||
|
||||
prewarmingPoolThreshold := config.Config.Server.Alert.PrewarmingPoolThreshold
|
||||
reloadTimeout := config.Config.Server.Alert.PrewarmingPoolReloadTimeout
|
||||
|
||||
if reloadTimeout == 0 || float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Checking again..")
|
||||
<-time.After(time.Duration(reloadTimeout) * time.Second)
|
||||
|
||||
if float64(environment.IdleRunnerCount())/float64(environment.PrewarmingPoolSize()) >= prewarmingPoolThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithField(dto.KeyEnvironmentID, environment.ID()).Info("Prewarming Pool Alert. Reloading environment")
|
||||
err := util.RetryExponential(func() error {
|
||||
usedRunners, err := m.loadEnvironment(environment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.updateUsedRunners(usedRunners, false)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.WithField(dto.KeyEnvironmentID, environment.ID()).Error("Failed to reload environment")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
|
||||
err := util.RetryExponential(func() (err error) {
|
||||
if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil {
|
||||
@ -80,9 +121,7 @@ func (m *NomadRunnerManager) Return(r Runner) error {
|
||||
// SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism).
|
||||
func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error {
|
||||
log.Info("Loading runners")
|
||||
if err := m.load(); err != nil {
|
||||
return fmt.Errorf("failed loading runners: %w", err)
|
||||
}
|
||||
m.load()
|
||||
|
||||
// Watch for changes regarding the existing or new runners.
|
||||
log.Info("Watching Event Stream")
|
||||
@ -95,69 +134,101 @@ func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load recovers all runners for all existing environments.
|
||||
func (m *NomadRunnerManager) load() error {
|
||||
newUsedRunners := storage.NewLocalStorage[Runner]()
|
||||
for _, environment := range m.environments.List() {
|
||||
environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString())
|
||||
|
||||
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed fetching the runner jobs: %w", err)
|
||||
}
|
||||
for _, job := range runnerJobs {
|
||||
m.loadSingleJob(job, environmentLogger, environment, newUsedRunners)
|
||||
}
|
||||
err = environment.ApplyPrewarmingPoolSize()
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't scale environment: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
m.updateUsedRunners(newUsedRunners)
|
||||
return nil
|
||||
func (m *NomadRunnerManager) StoreEnvironment(environment ExecutionEnvironment) {
|
||||
m.AbstractManager.StoreEnvironment(environment)
|
||||
m.reloadingEnvironment[environment.ID()] = &sync.Mutex{}
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry,
|
||||
environment ExecutionEnvironment, newUsedRunners storage.Storage[Runner]) {
|
||||
func (m *NomadRunnerManager) DeleteEnvironment(id dto.EnvironmentID) {
|
||||
m.AbstractManager.DeleteEnvironment(id)
|
||||
delete(m.reloadingEnvironment, id)
|
||||
}
|
||||
|
||||
// Load recovers all runners for all existing environments.
|
||||
func (m *NomadRunnerManager) load() {
|
||||
newUsedRunners := storage.NewLocalStorage[Runner]()
|
||||
for _, environment := range m.ListEnvironments() {
|
||||
usedRunners, err := m.loadEnvironment(environment)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()).
|
||||
Warn("Failed loading environment. Skipping ...")
|
||||
continue
|
||||
}
|
||||
for _, r := range usedRunners.List() {
|
||||
newUsedRunners.Add(r.ID(), r)
|
||||
}
|
||||
}
|
||||
|
||||
m.updateUsedRunners(newUsedRunners, true)
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) loadEnvironment(environment ExecutionEnvironment) (used storage.Storage[Runner], err error) {
|
||||
used = storage.NewLocalStorage[Runner]()
|
||||
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed fetching the runner jobs: %w", err)
|
||||
}
|
||||
for _, job := range runnerJobs {
|
||||
r, isUsed, err := m.loadSingleJob(job, environment)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()).
|
||||
WithField("used", isUsed).Warn("Failed loading job. Skipping ...")
|
||||
continue
|
||||
} else if isUsed {
|
||||
used.Add(r.ID(), r)
|
||||
}
|
||||
}
|
||||
err = environment.ApplyPrewarmingPoolSize()
|
||||
if err != nil {
|
||||
return used, fmt.Errorf("couldn't scale environment: %w", err)
|
||||
}
|
||||
return used, nil
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environment ExecutionEnvironment) (r Runner, isUsed bool, err error) {
|
||||
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
|
||||
if configTaskGroup == nil {
|
||||
environmentLogger.Warnf("Couldn't find config task group in job %s, skipping ...", *job.ID)
|
||||
return
|
||||
return nil, false, fmt.Errorf("%w, %s", nomad.ErrorMissingTaskGroup, *job.ID)
|
||||
}
|
||||
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
|
||||
isUsed = configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
|
||||
portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID)
|
||||
if err != nil {
|
||||
environmentLogger.WithError(err).Warn("Error loading runner portMappings, skipping ...")
|
||||
return
|
||||
return nil, false, fmt.Errorf("error loading runner portMappings: %w", err)
|
||||
}
|
||||
|
||||
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.onRunnerDestroyed)
|
||||
log.WithField("isUsed", isUsed).WithField(dto.KeyRunnerID, newJob.ID()).Debug("Recovered Runner")
|
||||
if isUsed {
|
||||
newUsedRunners.Add(newJob.ID(), newJob)
|
||||
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
|
||||
if err != nil {
|
||||
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
|
||||
} else {
|
||||
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
|
||||
log.WithField(dto.KeyRunnerID, newJob.ID()).WithError(err).Warn("failed loading timeout from meta values")
|
||||
timeout = int(nomad.RunnerTimeoutFallback.Seconds())
|
||||
go m.markRunnerAsUsed(newJob, timeout)
|
||||
}
|
||||
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
|
||||
} else {
|
||||
environment.AddRunner(newJob)
|
||||
}
|
||||
return newJob, isUsed, nil
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner]) {
|
||||
// updateUsedRunners handles the cleanup process of updating the used runner storage.
|
||||
// This includes the clean deletion of the local references to the (replaced/deleted) runners.
|
||||
// Only if removeDeleted is set, the runners that are only in newUsedRunners (and not in the main m.usedRunners) will be removed.
|
||||
func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner], removeDeleted bool) {
|
||||
for _, r := range m.usedRunners.List() {
|
||||
var reason DestroyReason
|
||||
if _, ok := newUsedRunners.Get(r.ID()); ok {
|
||||
reason = ErrDestroyedAndReplaced
|
||||
} else {
|
||||
} else if removeDeleted {
|
||||
reason = ErrLocalDestruction
|
||||
log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered")
|
||||
}
|
||||
m.usedRunners.Delete(r.ID())
|
||||
if err := r.Destroy(reason); err != nil {
|
||||
log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally")
|
||||
if reason != nil {
|
||||
m.usedRunners.Delete(r.ID())
|
||||
if err := r.Destroy(reason); err != nil {
|
||||
log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,7 +257,7 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, start
|
||||
return
|
||||
}
|
||||
|
||||
environment, ok := m.environments.Get(environmentID.ToString())
|
||||
environment, ok := m.GetEnvironment(environmentID)
|
||||
if ok {
|
||||
var mappedPorts []nomadApi.PortMapping
|
||||
if alloc.AllocatedResources != nil {
|
||||
@ -227,7 +298,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error)
|
||||
}
|
||||
}
|
||||
|
||||
environment, ok := m.environments.Get(environmentID.ToString())
|
||||
environment, ok := m.GetEnvironment(environmentID)
|
||||
if ok {
|
||||
stillActive = stillActive || environment.DeleteRunner(runnerID)
|
||||
}
|
||||
@ -240,7 +311,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error)
|
||||
func (m *NomadRunnerManager) onRunnerDestroyed(r Runner) error {
|
||||
m.usedRunners.Delete(r.ID())
|
||||
|
||||
environment, ok := m.environments.Get(r.Environment().ToString())
|
||||
environment, ok := m.GetEnvironment(r.Environment())
|
||||
if ok {
|
||||
environment.DeleteRunner(r.ID())
|
||||
}
|
||||
|
Reference in New Issue
Block a user