Block Webserver during first Nomad recovery.
No requests are accepted while Poseidon is recovering Nomad environments and runners.
This commit is contained in:
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
sentryhttp "github.com/getsentry/sentry-go/http"
|
sentryhttp "github.com/getsentry/sentry-go/http"
|
||||||
"github.com/openHPI/poseidon/internal/api"
|
"github.com/openHPI/poseidon/internal/api"
|
||||||
@ -197,7 +198,7 @@ func createNomadManager(ctx context.Context) (runner.Manager, environment.Manage
|
|||||||
// API initialization
|
// API initialization
|
||||||
nomadAPIClient, err := nomad.NewExecutorAPI(&config.Config.Nomad)
|
nomadAPIClient, err := nomad.NewExecutorAPI(&config.Config.Nomad)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("nomad config", config.Config.Nomad).Fatal("Error creating Nomad API client")
|
log.WithError(err).WithField("nomad_config", config.Config.Nomad).Fatal("Error creating Nomad API client")
|
||||||
}
|
}
|
||||||
|
|
||||||
runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, ctx)
|
runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, ctx)
|
||||||
@ -206,10 +207,35 @@ func createNomadManager(ctx context.Context) (runner.Manager, environment.Manage
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Fatal("Error initializing environment manager")
|
log.WithError(err).Fatal("Error initializing environment manager")
|
||||||
}
|
}
|
||||||
go environmentManager.KeepEnvironmentsSynced(runnerManager.SynchronizeRunners, ctx)
|
|
||||||
|
synchronizeNomad(ctx, environmentManager, runnerManager)
|
||||||
return runnerManager, environmentManager
|
return runnerManager, environmentManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// synchronizeNomad starts the asynchronous synchronization background task and waits for the first environment and runner recovery.
|
||||||
|
func synchronizeNomad(ctx context.Context, environmentManager *environment.NomadEnvironmentManager, runnerManager *runner.NomadRunnerManager) {
|
||||||
|
firstRecoveryDone := make(chan struct{})
|
||||||
|
go environmentManager.KeepEnvironmentsSynced(func(ctx context.Context) error {
|
||||||
|
runnerManager.Load()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case firstRecoveryDone <- struct{}{}:
|
||||||
|
log.Info("First Recovery Done")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := runnerManager.SynchronizeRunners(ctx); err != nil {
|
||||||
|
return fmt.Errorf("synchronize runners failed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, ctx)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-firstRecoveryDone:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func createAWSManager(ctx context.Context) (
|
func createAWSManager(ctx context.Context) (
|
||||||
runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
|
runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
|
||||||
runnerManager = runner.NewAWSRunnerManager(ctx)
|
runnerManager = runner.NewAWSRunnerManager(ctx)
|
||||||
|
@ -2,9 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/hashicorp/nomad/api"
|
||||||
"github.com/openHPI/poseidon/internal/environment"
|
"github.com/openHPI/poseidon/internal/environment"
|
||||||
|
"github.com/openHPI/poseidon/internal/nomad"
|
||||||
"github.com/openHPI/poseidon/internal/runner"
|
"github.com/openHPI/poseidon/internal/runner"
|
||||||
"github.com/openHPI/poseidon/tests"
|
"github.com/openHPI/poseidon/tests"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
"testing"
|
"testing"
|
||||||
@ -63,3 +66,18 @@ func (s *MainTestSuite) TestShutdownOnOSSignal_Profiling() {
|
|||||||
|
|
||||||
s.True(called)
|
s.True(called)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MainTestSuite) TestLoadNomadEnvironmentsBeforeStartingWebserver() {
|
||||||
|
apiMock := &nomad.ExecutorAPIMock{}
|
||||||
|
apiMock.On("LoadEnvironmentJobs").Return([]*api.Job{}, nil)
|
||||||
|
apiMock.On("WatchEventStream", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) {
|
||||||
|
<-s.TestCtx.Done()
|
||||||
|
}).Return(nil).Maybe()
|
||||||
|
|
||||||
|
runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx)
|
||||||
|
environmentManager, err := environment.NewNomadEnvironmentManager(runnerManager, apiMock, "")
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
synchronizeNomad(s.TestCtx, environmentManager, runnerManager)
|
||||||
|
apiMock.AssertExpectations(s.T())
|
||||||
|
}
|
||||||
|
@ -84,11 +84,27 @@ func (m *NomadRunnerManager) Return(r Runner) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism).
|
// Load recovers all runners for all existing environments.
|
||||||
func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error {
|
func (m *NomadRunnerManager) Load() {
|
||||||
log.Info("Loading runners")
|
log.Info("Loading runners")
|
||||||
m.load()
|
newUsedRunners := storage.NewLocalStorage[Runner]()
|
||||||
|
for _, environment := range m.ListEnvironments() {
|
||||||
|
usedRunners, err := m.loadEnvironment(environment)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()).
|
||||||
|
Warn("Failed loading environment. Skipping...")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, r := range usedRunners.List() {
|
||||||
|
newUsedRunners.Add(r.ID(), r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.updateUsedRunners(newUsedRunners, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SynchronizeRunners connect once (without retry) to Nomad to receive status updates regarding runners.
|
||||||
|
func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error {
|
||||||
// Watch for changes regarding the existing or new runners.
|
// Watch for changes regarding the existing or new runners.
|
||||||
log.Info("Watching Event Stream")
|
log.Info("Watching Event Stream")
|
||||||
err := m.apiClient.WatchEventStream(ctx,
|
err := m.apiClient.WatchEventStream(ctx,
|
||||||
@ -167,24 +183,6 @@ func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvir
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load recovers all runners for all existing environments.
|
|
||||||
func (m *NomadRunnerManager) load() {
|
|
||||||
newUsedRunners := storage.NewLocalStorage[Runner]()
|
|
||||||
for _, environment := range m.ListEnvironments() {
|
|
||||||
usedRunners, err := m.loadEnvironment(environment)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()).
|
|
||||||
Warn("Failed loading environment. Skipping...")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, r := range usedRunners.List() {
|
|
||||||
newUsedRunners.Add(r.ID(), r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
m.updateUsedRunners(newUsedRunners, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *NomadRunnerManager) loadEnvironment(environment ExecutionEnvironment) (used storage.Storage[Runner], err error) {
|
func (m *NomadRunnerManager) loadEnvironment(environment ExecutionEnvironment) (used storage.Storage[Runner], err error) {
|
||||||
used = storage.NewLocalStorage[Runner]()
|
used = storage.NewLocalStorage[Runner]()
|
||||||
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
|
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
|
||||||
|
@ -265,7 +265,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() {
|
|||||||
log.WithError(err).Error("failed to synchronize runners")
|
log.WithError(err).Error("failed to synchronize runners")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Require().Equal(3, len(hook.Entries))
|
s.Require().Equal(2, len(hook.Entries))
|
||||||
s.Equal(logrus.ErrorLevel, hook.LastEntry().Level)
|
s.Equal(logrus.ErrorLevel, hook.LastEntry().Level)
|
||||||
err, ok := hook.LastEntry().Data[logrus.ErrorKey].(error)
|
err, ok := hook.LastEntry().Data[logrus.ErrorKey].(error)
|
||||||
s.Require().True(ok)
|
s.Require().True(ok)
|
||||||
@ -531,7 +531,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
|
|||||||
s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner.
|
s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner.
|
||||||
call.Return([]*nomadApi.Job{job}, nil)
|
call.Return([]*nomadApi.Job{job}, nil)
|
||||||
|
|
||||||
runnerManager.load()
|
runnerManager.Load()
|
||||||
environmentMock.AssertExpectations(s.T())
|
environmentMock.AssertExpectations(s.T())
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -548,7 +548,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
|
|||||||
call.Return([]*nomadApi.Job{job}, nil)
|
call.Return([]*nomadApi.Job{job}, nil)
|
||||||
|
|
||||||
s.Require().Zero(runnerManager.usedRunners.Length())
|
s.Require().Zero(runnerManager.usedRunners.Length())
|
||||||
runnerManager.load()
|
runnerManager.Load()
|
||||||
_, ok := runnerManager.usedRunners.Get(tests.DefaultRunnerID)
|
_, ok := runnerManager.usedRunners.Get(tests.DefaultRunnerID)
|
||||||
s.True(ok)
|
s.True(ok)
|
||||||
})
|
})
|
||||||
@ -570,7 +570,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
|
|||||||
call.Return([]*nomadApi.Job{job}, nil)
|
call.Return([]*nomadApi.Job{job}, nil)
|
||||||
|
|
||||||
s.Require().Zero(runnerManager.usedRunners.Length())
|
s.Require().Zero(runnerManager.usedRunners.Length())
|
||||||
runnerManager.load()
|
runnerManager.Load()
|
||||||
s.Require().NotZero(runnerManager.usedRunners.Length())
|
s.Require().NotZero(runnerManager.usedRunners.Length())
|
||||||
|
|
||||||
<-time.After(time.Duration(timeout*2) * time.Second)
|
<-time.After(time.Duration(timeout*2) * time.Second)
|
||||||
|
Reference in New Issue
Block a user