diff --git a/main.go b/main.go index 8ebab8f..9d11863 100644 --- a/main.go +++ b/main.go @@ -46,7 +46,7 @@ func initServer() *http.Server { log.WithError(err).WithField("nomad url", config.Config.NomadAPIURL()).Fatal("Error parsing the nomad url") } - runnerManager := runner.NewNomadRunnerManager(nomadAPIClient) + runnerManager := runner.NewNomadRunnerManager(nomadAPIClient, context.Background()) environmentManager := environment.NewNomadEnvironmentManager(runnerManager, nomadAPIClient) return &http.Server{ diff --git a/nomad/api_querier.go b/nomad/api_querier.go index ac2fbab..9d30430 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -38,6 +38,9 @@ type apiQuerier interface { // EvaluationStream returns a Nomad event stream filtered to return only events belonging to the // given evaluation ID. EvaluationStream(evalID string, ctx context.Context) (<-chan *nomadApi.Events, error) + + // AllocationStream returns a Nomad event stream filtered to return only allocation events. + AllocationStream(ctx context.Context) (<-chan *nomadApi.Events, error) } // nomadApiClient implements the nomadApiQuerier interface and provides access to a real Nomad API. @@ -109,3 +112,14 @@ func (nc *nomadApiClient) EvaluationStream(evalID string, ctx context.Context) ( nc.queryOptions) return } + +func (nc *nomadApiClient) AllocationStream(ctx context.Context) (stream <-chan *nomadApi.Events, err error) { + stream, err = nc.client.EventStream().Stream( + ctx, + map[nomadApi.Topic][]string{ + nomadApi.TopicAllocation: {}, + }, + 0, + nc.queryOptions) + return +} diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index b8b3b14..36b9075 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.8.0. DO NOT EDIT. package nomad @@ -19,6 +19,29 @@ type apiQuerierMock struct { mock.Mock } +// AllocationStream provides a mock function with given fields: ctx +func (_m *apiQuerierMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) { + ret := _m.Called(ctx) + + var r0 <-chan *api.Events + if rf, ok := ret.Get(0).(func(context.Context) <-chan *api.Events); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *api.Events) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // DeleteRunner provides a mock function with given fields: runnerId func (_m *apiQuerierMock) DeleteRunner(runnerId string) error { ret := _m.Called(runnerId) diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 97f90e2..adefe26 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -19,6 +19,29 @@ type ExecutorApiMock struct { mock.Mock } +// AllocationStream provides a mock function with given fields: ctx +func (_m *ExecutorApiMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) { + ret := _m.Called(ctx) + + var r0 <-chan *api.Events + if rf, ok := ret.Get(0).(func(context.Context) <-chan *api.Events); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *api.Events) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // DeleteRunner provides a mock function with given fields: runnerId func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error { ret := _m.Called(runnerId) @@ -193,6 +216,20 @@ func (_m *ExecutorApiMock) SetJobScale(jobId string, count uint, reason string) return r0 } +// WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation +func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation allocationProcessor, onDeletedAllocation allocationProcessor) error { + ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, allocationProcessor, allocationProcessor) error); ok { + r0 = rf(ctx, onNewAllocation, onDeletedAllocation) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // init provides a mock function with given fields: nomadURL, nomadNamespace func (_m *ExecutorApiMock) init(nomadURL *url.URL, nomadNamespace string) error { ret := _m.Called(nomadURL, nomadNamespace) diff --git a/nomad/nomad.go b/nomad/nomad.go index 695f15b..335a1c1 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -9,6 +9,7 @@ import ( "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "net/url" "strings" + "time" ) var ( @@ -16,6 +17,8 @@ var ( ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") ) +type allocationProcessor func(*nomadApi.Allocation) + // ExecutorApi provides access to an container orchestration solution type ExecutorApi interface { apiQuerier @@ -28,6 +31,10 @@ type ExecutorApi interface { // If the evaluation was not successful, an error containing the failures is returned. // See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175 MonitorEvaluation(evalID string, ctx context.Context) error + + // WatchAllocations listens on the Nomad event stream for allocation events. + // Depending on the incoming event, any of the given function is executed. + WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation allocationProcessor) error } // ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. @@ -74,26 +81,100 @@ func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error return err } // If ctx is cancelled, the stream will be closed by Nomad and we exit the for loop. + return receiveAndHandleNomadAPIEvents(stream, handleEvaluationEvent) +} + +func (a *ApiClient) WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation allocationProcessor) error { + startTime := time.Now().UnixNano() + stream, err := a.AllocationStream(ctx) + if err != nil { + return fmt.Errorf("failed retrieving allocation stream: %w", err) + } + waitingToRun := make(map[string]bool) + + handler := func(event nomadApi.Event) error { + return handleAllocationEvent(startTime, waitingToRun, event, onNewAllocation, onDeletedAllocation) + } + + err = receiveAndHandleNomadAPIEvents(stream, handler) + return err +} + +type nomadAPIEventHandler func(event nomadApi.Event) error + +// receiveAndHandleNomadAPIEvents receives events from the Nomad event stream and calls the handler function for each received +// event. It skips heartbeat events and returns an error if the received events contain an error. +func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler nomadAPIEventHandler) error { + // If original context is cancelled, the stream will be closed by Nomad and we exit the for loop. for events := range stream { if events.IsHeartbeat() { continue } if err := events.Err; err != nil { - log.WithError(err).Warn("Error monitoring evaluation") - return err + return fmt.Errorf("error receiving events: %w", err) } for _, event := range events.Events { - eval, err := event.Evaluation() - if err != nil { - log.WithError(err).Warn("Error retrieving evaluation from streamed event") + // TODO: we can't break out of this function from inside the handler + if err := handler(event); err != nil { return err } - switch eval.Status { - case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: - return checkEvaluation(eval) - default: + } + } + return nil +} + +// handleEvaluationEvent is a nomadAPIEventHandler that returns the status of an evaluation in the event. +func handleEvaluationEvent(event nomadApi.Event) error { + eval, err := event.Evaluation() + if err != nil { + return fmt.Errorf("failed monitoring evaluation: %w", err) + } + switch eval.Status { + case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: + return checkEvaluation(eval) + default: + } + return nil +} + +// handleAllocationEvent is a nomadAPIEventHandler that processes allocation events. +// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation +// is called. The waitingToRun map is used to store allocations that are pending but not started yet. Using the map +// the state is persisted between multiple calls of this function. +func handleAllocationEvent(startTime int64, waitingToRun map[string]bool, event nomadApi.Event, + onNewAllocation, onDeletedAllocation allocationProcessor) error { + alloc, err := event.Allocation() + if err != nil { + return fmt.Errorf("failed retrieving allocation from event %v: %w", event, err) + } + if alloc == nil || event.Type == structs.TypePlanResult { + return nil + } + + if event.Type == structs.TypeAllocationUpdated { + // When starting the API and listening on the Nomad event stream we might get events that already + // happened from Nomad as it seems to buffer them for a certain duration. + // Ignore old events here. + if alloc.ModifyTime < startTime { + return nil + } + + if alloc.ClientStatus == structs.AllocClientStatusRunning { + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusStop: + onDeletedAllocation(alloc) + case structs.AllocDesiredStatusRun: + // first event that marks the transition between pending and running + _, ok := pendingAllocations[alloc.ID] + if ok { + onNewAllocation(alloc) + delete(pendingAllocations, alloc.ID) } } + if alloc.ClientStatus == structs.AllocClientStatusPending && alloc.DesiredStatus == structs.AllocDesiredStatusRun { + // allocation is started, wait until it runs and add to our list afterwards + waitingToRun[alloc.ID] = true + } } return nil } diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index 198aacd..a46bcd0 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -321,7 +321,8 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { eventsProcessed, err := runEvaluationMonitoring(t, c.streamedEvents) - assert.Equal(t, c.expectedError, err) + require.NotNil(t, err) + assert.Contains(t, err.Error(), c.expectedError.Error()) assert.Equal(t, c.expectedEventsProcessed, eventsProcessed) }) } diff --git a/runner/manager.go b/runner/manager.go index ca1b508..13baccc 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -1,9 +1,12 @@ package runner import ( + "context" "errors" + nomadApi "github.com/hashicorp/nomad/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "strconv" "time" ) @@ -49,12 +52,14 @@ type NomadRunnerManager struct { usedRunners Storage } -func NewNomadRunnerManager(apiClient nomad.ExecutorApi) *NomadRunnerManager { - return &NomadRunnerManager{ +func NewNomadRunnerManager(apiClient nomad.ExecutorApi, ctx context.Context) *NomadRunnerManager { + m := &NomadRunnerManager{ apiClient, NewLocalNomadJobStorage(), NewLocalRunnerStorage(), } + go m.updateRunners(ctx) + return m } type NomadJob struct { @@ -113,6 +118,41 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) { return } +func (m *NomadRunnerManager) updateRunners(ctx context.Context) { + onCreate := func(alloc *nomadApi.Allocation) { + log.WithField("id", alloc.ID).Debug("Allocation started") + + intJobID, err := strconv.Atoi(alloc.JobID) + if err != nil { + return + } + + job, ok := m.jobs.Get(EnvironmentId(intJobID)) + if ok { + job.idleRunners.Add(NewRunner(alloc.ID)) + } + } + onStop := func(alloc *nomadApi.Allocation) { + log.WithField("id", alloc.ID).Debug("Allocation stopped") + + intJobID, err := strconv.Atoi(alloc.JobID) + if err != nil { + return + } + + job, ok := m.jobs.Get(EnvironmentId(intJobID)) + if ok { + job.idleRunners.Delete(alloc.ID) + m.usedRunners.Delete(alloc.ID) + } + } + + err := m.apiClient.WatchAllocations(ctx, onCreate, onStop) + if err != nil { + log.WithError(err).Error("Failed updating runners") + } +} + // Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) { job, ok := m.jobs.Get(id) diff --git a/runner/manager_test.go b/runner/manager_test.go index 69031ce..639377a 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -1,6 +1,7 @@ package runner import ( + "context" "errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -28,7 +29,7 @@ type ManagerTestSuite struct { func (s *ManagerTestSuite) SetupTest() { s.apiMock = &nomad.ExecutorApiMock{} - s.nomadRunnerManager = NewNomadRunnerManager(s.apiMock) + s.nomadRunnerManager = NewNomadRunnerManager(s.apiMock, context.Background()) s.exerciseRunner = NewRunner(tests.DefaultRunnerId) s.mockRunnerQueries([]string{}) s.registerDefaultEnvironment() @@ -37,6 +38,7 @@ func (s *ManagerTestSuite) SetupTest() { func (s *ManagerTestSuite) mockRunnerQueries(returnedRunnerIds []string) { // reset expected calls to allow new mocked return values s.apiMock.ExpectedCalls = []*mock.Call{} + s.apiMock.On("WatchAllocations", mock.Anything, mock.Anything, mock.Anything).Return(nil) s.apiMock.On("LoadRunners", tests.DefaultJobId).Return(returnedRunnerIds, nil) s.apiMock.On("JobScale", tests.DefaultJobId).Return(uint(len(returnedRunnerIds)), nil) s.apiMock.On("SetJobScale", tests.DefaultJobId, mock.AnythingOfType("uint"), "Runner Requested").Return(nil)