diff --git a/cmd/poseidon/main.go b/cmd/poseidon/main.go index 814cca8..88a668f 100644 --- a/cmd/poseidon/main.go +++ b/cmd/poseidon/main.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" "github.com/openHPI/poseidon/internal/api" @@ -197,7 +198,7 @@ func createNomadManager(ctx context.Context) (runner.Manager, environment.Manage // API initialization nomadAPIClient, err := nomad.NewExecutorAPI(&config.Config.Nomad) if err != nil { - log.WithError(err).WithField("nomad config", config.Config.Nomad).Fatal("Error creating Nomad API client") + log.WithError(err).WithField("nomad_config", config.Config.Nomad).Fatal("Error creating Nomad API client") } runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, ctx) @@ -206,10 +207,35 @@ func createNomadManager(ctx context.Context) (runner.Manager, environment.Manage if err != nil { log.WithError(err).Fatal("Error initializing environment manager") } - go environmentManager.KeepEnvironmentsSynced(runnerManager.SynchronizeRunners, ctx) + + synchronizeNomad(ctx, environmentManager, runnerManager) return runnerManager, environmentManager } +// synchronizeNomad starts the asynchronous synchronization background task and waits for the first environment and runner recovery. +func synchronizeNomad(ctx context.Context, environmentManager *environment.NomadEnvironmentManager, runnerManager *runner.NomadRunnerManager) { + firstRecoveryDone := make(chan struct{}) + go environmentManager.KeepEnvironmentsSynced(func(ctx context.Context) error { + runnerManager.Load() + + select { + case firstRecoveryDone <- struct{}{}: + log.Info("First Recovery Done") + default: + } + + if err := runnerManager.SynchronizeRunners(ctx); err != nil { + return fmt.Errorf("synchronize runners failed: %w", err) + } + return nil + }, ctx) + + select { + case <-firstRecoveryDone: + case <-ctx.Done(): + } +} + func createAWSManager(ctx context.Context) ( runnerManager runner.Manager, environmentManager environment.ManagerHandler) { runnerManager = runner.NewAWSRunnerManager(ctx) diff --git a/cmd/poseidon/main_test.go b/cmd/poseidon/main_test.go index 9150ca8..2694454 100644 --- a/cmd/poseidon/main_test.go +++ b/cmd/poseidon/main_test.go @@ -2,9 +2,12 @@ package main import ( "context" + "github.com/hashicorp/nomad/api" "github.com/openHPI/poseidon/internal/environment" + "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/tests" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "golang.org/x/sys/unix" "testing" @@ -63,3 +66,18 @@ func (s *MainTestSuite) TestShutdownOnOSSignal_Profiling() { s.True(called) } + +func (s *MainTestSuite) TestLoadNomadEnvironmentsBeforeStartingWebserver() { + apiMock := &nomad.ExecutorAPIMock{} + apiMock.On("LoadEnvironmentJobs").Return([]*api.Job{}, nil) + apiMock.On("WatchEventStream", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { + <-s.TestCtx.Done() + }).Return(nil).Maybe() + + runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx) + environmentManager, err := environment.NewNomadEnvironmentManager(runnerManager, apiMock, "") + s.Require().NoError(err) + + synchronizeNomad(s.TestCtx, environmentManager, runnerManager) + apiMock.AssertExpectations(s.T()) +} diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 90bc26f..022c7ef 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -84,11 +84,27 @@ func (m *NomadRunnerManager) Return(r Runner) error { return err } -// SynchronizeRunners loads all runners and keeps them synchronized (without a retry mechanism). -func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { +// Load recovers all runners for all existing environments. +func (m *NomadRunnerManager) Load() { log.Info("Loading runners") - m.load() + newUsedRunners := storage.NewLocalStorage[Runner]() + for _, environment := range m.ListEnvironments() { + usedRunners, err := m.loadEnvironment(environment) + if err != nil { + log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()). + Warn("Failed loading environment. Skipping...") + continue + } + for _, r := range usedRunners.List() { + newUsedRunners.Add(r.ID(), r) + } + } + m.updateUsedRunners(newUsedRunners, true) +} + +// SynchronizeRunners connect once (without retry) to Nomad to receive status updates regarding runners. +func (m *NomadRunnerManager) SynchronizeRunners(ctx context.Context) error { // Watch for changes regarding the existing or new runners. log.Info("Watching Event Stream") err := m.apiClient.WatchEventStream(ctx, @@ -167,24 +183,6 @@ func (m *NomadRunnerManager) checkPrewarmingPoolAlert(environment ExecutionEnvir } } -// Load recovers all runners for all existing environments. -func (m *NomadRunnerManager) load() { - newUsedRunners := storage.NewLocalStorage[Runner]() - for _, environment := range m.ListEnvironments() { - usedRunners, err := m.loadEnvironment(environment) - if err != nil { - log.WithError(err).WithField(dto.KeyEnvironmentID, environment.ID().ToString()). - Warn("Failed loading environment. Skipping...") - continue - } - for _, r := range usedRunners.List() { - newUsedRunners.Add(r.ID(), r) - } - } - - m.updateUsedRunners(newUsedRunners, true) -} - func (m *NomadRunnerManager) loadEnvironment(environment ExecutionEnvironment) (used storage.Storage[Runner], err error) { used = storage.NewLocalStorage[Runner]() runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID()) diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 72b4c6e..8e3d515 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -265,7 +265,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { log.WithError(err).Error("failed to synchronize runners") } - s.Require().Equal(3, len(hook.Entries)) + s.Require().Equal(2, len(hook.Entries)) s.Equal(logrus.ErrorLevel, hook.LastEntry().Level) err, ok := hook.LastEntry().Data[logrus.ErrorKey].(error) s.Require().True(ok) @@ -531,7 +531,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner. call.Return([]*nomadApi.Job{job}, nil) - runnerManager.load() + runnerManager.Load() environmentMock.AssertExpectations(s.T()) }) @@ -548,7 +548,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { call.Return([]*nomadApi.Job{job}, nil) s.Require().Zero(runnerManager.usedRunners.Length()) - runnerManager.load() + runnerManager.Load() _, ok := runnerManager.usedRunners.Get(tests.DefaultRunnerID) s.True(ok) }) @@ -570,7 +570,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { call.Return([]*nomadApi.Job{job}, nil) s.Require().Zero(runnerManager.usedRunners.Length()) - runnerManager.load() + runnerManager.Load() s.Require().NotZero(runnerManager.usedRunners.Length()) <-time.After(time.Duration(timeout*2) * time.Second)