Extract the WatchEventStream retry mechanism
into the utils including all other retry mechanisms. With this change we fix that the WatchEventStream goroutine does not stop directly when the context is done (but previously only one second after).
This commit is contained in:
@ -245,7 +245,7 @@ func (n *NomadEnvironment) Sample() (runner.Runner, bool) {
|
|||||||
r, ok := n.idleRunners.Sample()
|
r, ok := n.idleRunners.Sample()
|
||||||
if ok && n.idleRunners.Length() < n.PrewarmingPoolSize() {
|
if ok && n.idleRunners.Length() < n.PrewarmingPoolSize() {
|
||||||
go func() {
|
go func() {
|
||||||
err := util.RetryExponentialContext(n.ctx, func() error { return n.createRunner(false) })
|
err := util.RetryExponentialWithContext(n.ctx, func() error { return n.createRunner(false) })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField(dto.KeyEnvironmentID, n.ID().ToString()).
|
log.WithError(err).WithField(dto.KeyEnvironmentID, n.ID().ToString()).
|
||||||
Error("Couldn't create new runner for claimed one")
|
Error("Couldn't create new runner for claimed one")
|
||||||
|
@ -44,7 +44,7 @@ func NewNomadEnvironmentManager(
|
|||||||
|
|
||||||
m := &NomadEnvironmentManager{&AbstractManager{nil, runnerManager},
|
m := &NomadEnvironmentManager{&AbstractManager{nil, runnerManager},
|
||||||
apiClient, templateEnvironmentJobHCL}
|
apiClient, templateEnvironmentJobHCL}
|
||||||
if err := util.RetryExponentialContext(ctx, func() error { return m.Load() }); err != nil {
|
if err := util.RetryExponentialWithContext(ctx, func() error { return m.Load() }); err != nil {
|
||||||
log.WithError(err).Error("Error recovering the execution environments")
|
log.WithError(err).Error("Error recovering the execution environments")
|
||||||
}
|
}
|
||||||
runnerManager.Load()
|
runnerManager.Load()
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/openHPI/poseidon/pkg/monitoring"
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
||||||
"github.com/openHPI/poseidon/pkg/util"
|
"github.com/openHPI/poseidon/pkg/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -125,13 +126,17 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
||||||
retries := 0
|
err := util.RetryConstantAttemptsWithContext(math.MaxInt, ctx, func() error {
|
||||||
for ctx.Err() == nil {
|
|
||||||
err := m.apiClient.WatchEventStream(ctx,
|
err := m.apiClient.WatchEventStream(ctx,
|
||||||
&nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
|
&nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
|
||||||
retries += 1
|
if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
|
||||||
log.WithContext(ctx).WithError(err).WithField("count", retries).Errorf("Nomad Event Stream failed! Retrying...")
|
log.WithContext(ctx).WithError(err).Errorf("Nomad Event Stream failed! Retrying...")
|
||||||
<-time.After(time.Second)
|
err = fmt.Errorf("KeepRunnersSynced: %w", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil && !(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
|
||||||
|
log.WithContext(ctx).WithError(err).Fatal("Stopped Restarting the Nomad Event Stream")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package util
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openHPI/poseidon/pkg/logging"
|
"github.com/openHPI/poseidon/pkg/logging"
|
||||||
"time"
|
"time"
|
||||||
@ -15,33 +16,66 @@ var (
|
|||||||
InitialWaitingDuration = time.Second
|
InitialWaitingDuration = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// RetryExponentialAttemptsContext executes the passed function
|
func retryExponential(ctx context.Context, sleep time.Duration, f func() error) func() error {
|
||||||
// with exponentially increasing time in between starting at the passed sleep duration
|
return func() error {
|
||||||
// up to a maximum of attempts tries as long as the context is not done.
|
err := f()
|
||||||
func RetryExponentialAttemptsContext(
|
|
||||||
ctx context.Context, attempts int, sleep time.Duration, f func() error) (err error) {
|
if err != nil {
|
||||||
for i := 0; i < attempts; i++ {
|
select {
|
||||||
if ctx.Err() != nil {
|
case <-ctx.Done():
|
||||||
return fmt.Errorf("stopped retry due to: %w", ctx.Err())
|
case <-time.After(sleep):
|
||||||
} else if err = f(); err == nil {
|
sleep *= 2
|
||||||
return nil
|
}
|
||||||
} else {
|
|
||||||
log.WithField("count", i).WithError(err).Debug("retrying after error")
|
|
||||||
time.Sleep(sleep)
|
|
||||||
sleep *= 2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryConstant(ctx context.Context, sleep time.Duration, f func() error) func() error {
|
||||||
|
return func() error {
|
||||||
|
err := f()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("stopped retrying: %w", ctx.Err())
|
||||||
|
case <-time.After(sleep):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryAttempts(maxAttempts int, f func() error) (err error) {
|
||||||
|
for i := 0; i < maxAttempts; i++ {
|
||||||
|
err = f()
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
} else if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.WithField("count", i).WithError(err).Debug("retrying after error")
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func RetryExponentialContext(ctx context.Context, f func() error) error {
|
// RetryExponentialWithContext executes the passed function with exponentially increasing time starting with one second
|
||||||
return RetryExponentialAttemptsContext(ctx, MaxConnectionRetriesExponential, InitialWaitingDuration, f)
|
// up to a default maximum number of attempts as long as the context is not done.
|
||||||
}
|
func RetryExponentialWithContext(ctx context.Context, f func() error) error {
|
||||||
|
return retryAttempts(MaxConnectionRetriesExponential, retryExponential(ctx, InitialWaitingDuration, f))
|
||||||
func RetryExponentialDuration(sleep time.Duration, f func() error) error {
|
|
||||||
return RetryExponentialAttemptsContext(context.Background(), MaxConnectionRetriesExponential, sleep, f)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RetryExponential executes the passed function with exponentially increasing time starting with one second
|
||||||
|
// up to a default maximum number of attempts.
|
||||||
func RetryExponential(f func() error) error {
|
func RetryExponential(f func() error) error {
|
||||||
return RetryExponentialDuration(InitialWaitingDuration, f)
|
return retryAttempts(MaxConnectionRetriesExponential,
|
||||||
|
retryExponential(context.Background(), InitialWaitingDuration, f))
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetryConstantAttemptsWithContext executes the passed function with a constant retry delay of one second
|
||||||
|
// up to the passed maximum number of attempts as long as the context is not done.
|
||||||
|
func RetryConstantAttemptsWithContext(attempts int, ctx context.Context, f func() error) error {
|
||||||
|
return retryAttempts(attempts, retryConstant(ctx, InitialWaitingDuration, f))
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user