diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index adefe26..6e628c3 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -217,11 +217,11 @@ func (_m *ExecutorApiMock) SetJobScale(jobId string, count uint, reason string) } // WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation -func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation allocationProcessor, onDeletedAllocation allocationProcessor) error { +func (_m *ExecutorApiMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error { ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, allocationProcessor, allocationProcessor) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, AllocationProcessor, AllocationProcessor) error); ok { r0 = rf(ctx, onNewAllocation, onDeletedAllocation) } else { r0 = ret.Error(0) diff --git a/nomad/nomad.go b/nomad/nomad.go index 66dc0d7..1066cf4 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -18,7 +18,7 @@ var ( errPlacingAllocations = errors.New("failed to place all allocations") ) -type allocationProcessor func(*nomadApi.Allocation) +type AllocationProcessor func(*nomadApi.Allocation) // ExecutorApi provides access to an container orchestration solution type ExecutorApi interface { @@ -35,7 +35,7 @@ type ExecutorApi interface { // WatchAllocations listens on the Nomad event stream for allocation events. // Depending on the incoming event, any of the given function is executed. - WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation allocationProcessor) error + WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error } // APIClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. @@ -62,7 +62,6 @@ func (a *APIClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { // LoadRunners loads the allocations of the specified job. func (a *APIClient) LoadRunners(jobID string) (runnerIds []string, err error) { - // list, _, err := apiClient.client.Jobs().Allocations(jobID, true, nil) list, err := a.loadRunners(jobID) if err != nil { return nil, err @@ -79,23 +78,23 @@ func (a *APIClient) LoadRunners(jobID string) (runnerIds []string, err error) { func (a *APIClient) MonitorEvaluation(evalID string, ctx context.Context) error { stream, err := a.EvaluationStream(evalID, ctx) if err != nil { - return err + return fmt.Errorf("failed retrieving evaluation stream: %w", err) } // If ctx is canceled, the stream will be closed by Nomad and we exit the for loop. return receiveAndHandleNomadAPIEvents(stream, handleEvaluationEvent) } func (a *APIClient) WatchAllocations(ctx context.Context, - onNewAllocation, onDeletedAllocation allocationProcessor) error { + onNewAllocation, onDeletedAllocation AllocationProcessor) error { startTime := time.Now().UnixNano() stream, err := a.AllocationStream(ctx) if err != nil { return fmt.Errorf("failed retrieving allocation stream: %w", err) } - waitingToRun := make(map[string]bool) + pendingAllocations := make(map[string]bool) handler := func(event *nomadApi.Event) (bool, error) { - return false, handleAllocationEvent(startTime, waitingToRun, event, onNewAllocation, onDeletedAllocation) + return false, handleAllocationEvent(startTime, pendingAllocations, event, onNewAllocation, onDeletedAllocation) } err = receiveAndHandleNomadAPIEvents(stream, handler) @@ -140,17 +139,16 @@ func handleEvaluationEvent(event *nomadApi.Event) (bool, error) { switch eval.Status { case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: return true, checkEvaluation(eval) - default: } return false, nil } // handleAllocationEvent is a nomadAPIEventHandler that processes allocation events. // If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation -// is called. The waitingToRun map is used to store allocations that are pending but not started yet. Using the map -// the state is persisted between multiple calls of this function. -func handleAllocationEvent(startTime int64, waitingToRun map[string]bool, event *nomadApi.Event, - onNewAllocation, onDeletedAllocation allocationProcessor) error { +// is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the +// map the state is persisted between multiple calls of this function. +func handleAllocationEvent(startTime int64, pendingAllocations map[string]bool, event *nomadApi.Event, + onNewAllocation, onDeletedAllocation AllocationProcessor) error { alloc, err := event.Allocation() if err != nil { return fmt.Errorf("failed retrieving allocation from event: %w", err) @@ -182,7 +180,7 @@ func handleAllocationEvent(startTime int64, waitingToRun map[string]bool, event if alloc.ClientStatus == structs.AllocClientStatusPending && alloc.DesiredStatus == structs.AllocDesiredStatusRun { // allocation is started, wait until it runs and add to our list afterwards - waitingToRun[alloc.ID] = true + pendingAllocations[alloc.ID] = true } return nil } diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index a7b50db..ba1339f 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -17,8 +17,6 @@ import ( "time" ) -var errEvent = errors.New("my events error") - func TestLoadRunnersTestSuite(t *testing.T) { suite.Run(t, new(LoadRunnersTestSuite)) } @@ -34,97 +32,94 @@ type LoadRunnersTestSuite struct { stoppingRunner *nomadApi.AllocationListStub } -func (suite *LoadRunnersTestSuite) SetupTest() { - suite.jobId = "1d-0f-v3ry-sp3c14l-j0b" +func (s *LoadRunnersTestSuite) SetupTest() { + s.jobId = "1d-0f-v3ry-sp3c14l-j0b" - suite.mock = &apiQuerierMock{} - suite.nomadApiClient = APIClient{apiQuerier: suite.mock} + s.mock = &apiQuerierMock{} + s.nomadApiClient = APIClient{apiQuerier: s.mock} - suite.availableRunner = &nomadApi.AllocationListStub{ + s.availableRunner = &nomadApi.AllocationListStub{ ID: "s0m3-r4nd0m-1d", ClientStatus: nomadApi.AllocClientStatusRunning, DesiredStatus: nomadApi.AllocDesiredStatusRun, } - suite.anotherAvailableRunner = &nomadApi.AllocationListStub{ + s.anotherAvailableRunner = &nomadApi.AllocationListStub{ ID: "s0m3-s1m1l4r-1d", ClientStatus: nomadApi.AllocClientStatusRunning, DesiredStatus: nomadApi.AllocDesiredStatusRun, } - suite.stoppedRunner = &nomadApi.AllocationListStub{ + s.stoppedRunner = &nomadApi.AllocationListStub{ ID: "4n0th3r-1d", ClientStatus: nomadApi.AllocClientStatusComplete, DesiredStatus: nomadApi.AllocDesiredStatusRun, } - suite.stoppingRunner = &nomadApi.AllocationListStub{ + s.stoppingRunner = &nomadApi.AllocationListStub{ ID: "th1rd-1d", ClientStatus: nomadApi.AllocClientStatusRunning, DesiredStatus: nomadApi.AllocDesiredStatusStop, } } -func (suite *LoadRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { +func (s *LoadRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { errorString := "api errored" - suite.mock.On("loadRunners", mock.AnythingOfType("string")). + s.mock.On("loadRunners", mock.AnythingOfType("string")). Return(nil, errors.New(errorString)) - returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) - suite.Nil(returnedIds) - suite.Error(err) + returnedIds, err := s.nomadApiClient.LoadRunners(s.jobId) + s.Nil(returnedIds) + s.Error(err) } -func (suite *LoadRunnersTestSuite) TestThrowsNoErrorWhenUnderlyingApiCallDoesNot() { - suite.mock.On("loadRunners", mock.AnythingOfType("string")). +func (s *LoadRunnersTestSuite) TestThrowsNoErrorWhenUnderlyingApiCallDoesNot() { + s.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{}, nil) - _, err := suite.nomadApiClient.LoadRunners(suite.jobId) - suite.NoError(err) + _, err := s.nomadApiClient.LoadRunners(s.jobId) + s.NoError(err) } -func (suite *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { - suite.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{suite.availableRunner}, nil) +func (s *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { + s.mock.On("loadRunners", mock.AnythingOfType("string")). + Return([]*nomadApi.AllocationListStub{s.availableRunner}, nil) - returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) - require.NoError(suite.T(), err) - suite.Len(returnedIds, 1) - suite.Equal(suite.availableRunner.ID, returnedIds[0]) + returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + s.Len(returnedIds, 1) + s.Equal(s.availableRunner.ID, returnedIds[0]) } -func (suite *LoadRunnersTestSuite) TestStoppedRunnerIsNotReturned() { - suite.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{suite.stoppedRunner}, nil) +func (s *LoadRunnersTestSuite) TestStoppedRunnerIsNotReturned() { + s.mock.On("loadRunners", mock.AnythingOfType("string")). + Return([]*nomadApi.AllocationListStub{s.stoppedRunner}, nil) - returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) - require.NoError(suite.T(), err) - suite.Empty(returnedIds) + returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + s.Empty(returnedIds) } -func (suite *LoadRunnersTestSuite) TestStoppingRunnerIsNotReturned() { - suite.mock.On("loadRunners", mock.AnythingOfType("string")). - Return([]*nomadApi.AllocationListStub{suite.stoppingRunner}, nil) +func (s *LoadRunnersTestSuite) TestStoppingRunnerIsNotReturned() { + s.mock.On("loadRunners", mock.AnythingOfType("string")). + Return([]*nomadApi.AllocationListStub{s.stoppingRunner}, nil) - returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) - require.NoError(suite.T(), err) - suite.Empty(returnedIds) + returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + s.Empty(returnedIds) } -func (suite *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() { +func (s *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() { runnersList := []*nomadApi.AllocationListStub{ - suite.availableRunner, - suite.anotherAvailableRunner, - suite.stoppedRunner, - suite.stoppingRunner, + s.availableRunner, + s.anotherAvailableRunner, + s.stoppedRunner, + s.stoppingRunner, } - suite.mock.On("loadRunners", mock.AnythingOfType("string")). + s.mock.On("loadRunners", mock.AnythingOfType("string")). Return(runnersList, nil) - returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) - suite.Len(returnedIds, 2) - suite.Contains(returnedIds, suite.availableRunner.ID) - suite.Contains(returnedIds, suite.anotherAvailableRunner.ID) + returnedIds, _ := s.nomadApiClient.LoadRunners(s.jobId) + s.Len(returnedIds, 2) + s.Contains(returnedIds, s.availableRunner.ID) + s.Contains(returnedIds, s.anotherAvailableRunner.ID) } var ( @@ -303,7 +298,7 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { multipleEventsWithPending := nomadApi.Events{Events: []nomadApi.Event{ eventForEvaluation(t, pendingEval), eventForEvaluation(t, eval), }} - eventsWithErr := nomadApi.Events{Err: errEvent, Events: []nomadApi.Event{{}}} + eventsWithErr := nomadApi.Events{Err: tests.DefaultError, Events: []nomadApi.Event{{}}} var cases = []struct { streamedEvents []*nomadApi.Events @@ -321,7 +316,7 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { "it skips pending evaluation and fail"}, {[]*nomadApi.Events{&multipleEventsWithPending}, 1, evalErr, "it handles multiple events per received event and fails"}, - {[]*nomadApi.Events{&eventsWithErr}, 1, errEvent, + {[]*nomadApi.Events{&eventsWithErr}, 1, tests.DefaultError, "it fails with event error when event has error"}, } @@ -344,7 +339,7 @@ func TestApiClient_MonitorEvaluationFailsWhenFailingToDecodeEvaluation(t *testin _, err := event.Evaluation() require.NotNil(t, err) eventsProcessed, err := runEvaluationMonitoring([]*nomadApi.Events{{Events: []nomadApi.Event{event}}}) - assert.Equal(t, err, err) + assert.Error(t, err) assert.Equal(t, 1, eventsProcessed) } @@ -438,7 +433,7 @@ func TestApiClient_WatchAllocationsIgnoresUnhandledEvents(t *testing.T) { func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) { newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) - newPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, newPendingAllocation)}} + pendingAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, newPendingAllocation)}} newStartedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) startAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{ @@ -459,7 +454,7 @@ func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) { expectedDeletedAllocations []*nomadApi.Allocation name string }{ - {[]*nomadApi.Events{&newPendingEvents}, + {[]*nomadApi.Events{&pendingAllocationEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil), "it does not add allocation when client status is pending"}, {[]*nomadApi.Events{&startAllocationEvents}, @@ -493,23 +488,22 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { newPendingEvent := eventForAllocation(t, newPendingAllocation) pendingMap := make(map[string]bool) - var doNothing allocationProcessor = func(allocation *nomadApi.Allocation) {} + var noop AllocationProcessor = func(allocation *nomadApi.Allocation) {} - err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, doNothing, doNothing) + err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, noop, noop) require.NoError(t, err) assert.True(t, pendingMap[newPendingAllocation.ID]) } func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) { - testErr := errors.New("test error") apiMock := &apiQuerierMock{} - apiMock.On("AllocationStream", mock.Anything).Return(nil, testErr) + apiMock.On("AllocationStream", mock.Anything).Return(nil, tests.DefaultError) apiClient := &APIClient{apiMock} noop := func(a *nomadApi.Allocation) {} err := apiClient.WatchAllocations(context.Background(), noop, noop) - assert.ErrorIs(t, err, testErr) + assert.ErrorIs(t, err, tests.DefaultError) } func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWithoutReceivingFurtherEvents(t *testing.T) { @@ -556,7 +550,7 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // and sequentially transfers the events from the given array to a channel simulating the stream. func runAllocationWatching(t *testing.T, events []*nomadApi.Events, - onNewAllocation, onDeletedAllocation allocationProcessor, ctx context.Context) (eventsProcessed int, err error) { + onNewAllocation, onDeletedAllocation AllocationProcessor, ctx context.Context) (eventsProcessed int, err error) { t.Helper() stream := make(chan *nomadApi.Events) go func() { @@ -572,7 +566,7 @@ func runAllocationWatching(t *testing.T, events []*nomadApi.Events, // version of the given stream to simulate an event stream gotten from the real // Nomad API. func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, - onNewAllocation, onDeletedAllocation allocationProcessor) chan error { + onNewAllocation, onDeletedAllocation AllocationProcessor) chan error { ctx := context.Background() // We can only get a read-only channel once we return it from a function. readOnlyStream := func() <-chan *nomadApi.Events { return stream }() diff --git a/runner/manager_test.go b/runner/manager_test.go index 060bcdd..ff51f28 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -3,6 +3,9 @@ package runner import ( "context" "errors" + nomadApi "github.com/hashicorp/nomad/api" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -40,7 +43,7 @@ func (s *ManagerTestSuite) mockRunnerQueries(returnedRunnerIds []string) { s.apiMock.ExpectedCalls = []*mock.Call{} s.apiMock.On("WatchAllocations", mock.Anything, mock.Anything, mock.Anything).Return(nil) s.apiMock.On("LoadRunners", tests.DefaultJobId).Return(returnedRunnerIds, nil) - s.apiMock.On("JobScale", tests.DefaultJobId).Return(uint(len(returnedRunnerIds)), nil) + s.apiMock.On("JobScale", tests.DefaultJobId).Return(len(returnedRunnerIds), nil) s.apiMock.On("SetJobScale", tests.DefaultJobId, mock.AnythingOfType("uint"), "Runner Requested").Return(nil) } @@ -185,6 +188,80 @@ func (s *ManagerTestSuite) TestRefreshAddsRunnerToPool() { s.Equal(tests.DefaultRunnerId, poolRunner.Id()) } +func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { + var hook *test.Hook + logger, hook := test.NewNullLogger() + log = logger.WithField("pkg", "runner") + s.modifyMockedCall("WatchAllocations", func(call *mock.Call) { + call.Return(tests.DefaultError) + }) + + s.nomadRunnerManager.updateRunners(context.Background()) + + require.Equal(s.T(), 1, len(hook.Entries)) + s.Equal(logrus.ErrorLevel, hook.LastEntry().Level) + s.Equal(hook.LastEntry().Data[logrus.ErrorKey], tests.DefaultError) +} + +func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { + allocation := &nomadApi.Allocation{ID: tests.AllocationID} + defaultJob, ok := s.nomadRunnerManager.jobs.Get(defaultEnvironmentId) + require.True(s.T(), ok) + allocation.JobID = string(defaultJob.jobId) + + _, ok = defaultJob.idleRunners.Get(allocation.ID) + require.False(s.T(), ok) + + s.modifyMockedCall("WatchAllocations", func(call *mock.Call) { + call.Run(func(args mock.Arguments) { + onCreate, ok := args.Get(1).(nomad.AllocationProcessor) + require.True(s.T(), ok) + onCreate(allocation) + }) + }) + + s.nomadRunnerManager.updateRunners(context.Background()) + + _, ok = defaultJob.idleRunners.Get(allocation.ID) + s.True(ok) +} + +func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { + allocation := &nomadApi.Allocation{ID: tests.AllocationID} + defaultJob, ok := s.nomadRunnerManager.jobs.Get(defaultEnvironmentId) + require.True(s.T(), ok) + allocation.JobID = string(defaultJob.jobId) + + testRunner := NewRunner(allocation.ID) + defaultJob.idleRunners.Add(testRunner) + s.nomadRunnerManager.usedRunners.Add(testRunner) + + s.modifyMockedCall("WatchAllocations", func(call *mock.Call) { + call.Run(func(args mock.Arguments) { + onDelete, ok := args.Get(2).(nomad.AllocationProcessor) + require.True(s.T(), ok) + onDelete(allocation) + }) + }) + + s.nomadRunnerManager.updateRunners(context.Background()) + + _, ok = defaultJob.idleRunners.Get(allocation.ID) + s.False(ok) + _, ok = s.nomadRunnerManager.usedRunners.Get(allocation.ID) + s.False(ok) +} + +func (s *ManagerTestSuite) modifyMockedCall(method string, modifier func(call *mock.Call)) { + for _, c := range s.apiMock.ExpectedCalls { + if c.Method == method { + modifier(c) + } + } + s.True(ok) + s.Equal(tests.DefaultRunnerId, poolRunner.Id()) +} + func (s *ManagerTestSuite) TestWhenEnvironmentDoesNotExistEnvironmentExistsReturnsFalse() { id := anotherEnvironmentId _, ok := s.nomadRunnerManager.jobs.Get(id)