From d0a2a1d96c2ba007340a9365856f4245475f6e35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Tue, 1 Jun 2021 19:48:21 +0200 Subject: [PATCH] Add tests for receiving allocation updates from Nomad --- environment/job_test.go | 2 +- main.go | 2 +- nomad/nomad.go | 105 +++++++------- nomad/nomad_test.go | 311 ++++++++++++++++++++++++++++++++++------ runner/manager_test.go | 9 +- 5 files changed, 333 insertions(+), 96 deletions(-) diff --git a/environment/job_test.go b/environment/job_test.go index ab01ad7..b54ad12 100644 --- a/environment/job_test.go +++ b/environment/job_test.go @@ -246,7 +246,7 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { func TestCreateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() - manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.ApiClient{}, *base} + manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.APIClient{}, *base} job := createJob( manager.defaultJob, *testJob.ID, diff --git a/main.go b/main.go index 9d11863..2f7fdfc 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,7 @@ func runServer(server *http.Server) { func initServer() *http.Server { // API initialization - nomadAPIClient, err := nomad.NewExecutorApi(config.Config.NomadAPIURL(), config.Config.Nomad.Namespace) + nomadAPIClient, err := nomad.NewExecutorAPI(config.Config.NomadAPIURL(), config.Config.Nomad.Namespace) if err != nil { log.WithError(err).WithField("nomad url", config.Config.NomadAPIURL()).Fatal("Error parsing the nomad url") } diff --git a/nomad/nomad.go b/nomad/nomad.go index 335a1c1..66dc0d7 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -8,13 +8,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "net/url" - "strings" "time" ) var ( log = logging.GetLogger("nomad") ErrorExecutorCommunicationFailed = errors.New("communication with executor failed") + errEvaluation = errors.New("evaluation could not complete") + errPlacingAllocations = errors.New("failed to place all allocations") ) type allocationProcessor func(*nomadApi.Allocation) @@ -24,7 +25,7 @@ type ExecutorApi interface { apiQuerier // LoadRunners loads all allocations of the specified job which are running and not about to get stopped. - LoadRunners(jobId string) (runnerIds []string, err error) + LoadRunners(jobID string) (runnerIds []string, err error) // MonitorEvaluation monitors the given evaluation ID. // It waits until the evaluation reaches one of the states complete, cancelled or failed. @@ -37,21 +38,21 @@ type ExecutorApi interface { WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation allocationProcessor) error } -// ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. -type ApiClient struct { +// APIClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. +type APIClient struct { apiQuerier } -// NewExecutorApi creates a new api client. +// NewExecutorAPI creates a new api client. // One client is usually sufficient for the complete runtime of the API. -func NewExecutorApi(nomadURL *url.URL, nomadNamespace string) (ExecutorApi, error) { - client := &ApiClient{apiQuerier: &nomadApiClient{}} +func NewExecutorAPI(nomadURL *url.URL, nomadNamespace string) (ExecutorApi, error) { + client := &APIClient{apiQuerier: &nomadApiClient{}} err := client.init(nomadURL, nomadNamespace) return client, err } // init prepares an apiClient to be able to communicate to a provided Nomad API. -func (a *ApiClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { +func (a *APIClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { err = a.apiQuerier.init(nomadURL, nomadNamespace) if err != nil { return err @@ -60,9 +61,9 @@ func (a *ApiClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { } // LoadRunners loads the allocations of the specified job. -func (a *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { - //list, _, err := apiClient.client.Jobs().Allocations(jobId, true, nil) - list, err := a.loadRunners(jobId) +func (a *APIClient) LoadRunners(jobID string) (runnerIds []string, err error) { + // list, _, err := apiClient.client.Jobs().Allocations(jobID, true, nil) + list, err := a.loadRunners(jobID) if err != nil { return nil, err } @@ -75,16 +76,17 @@ func (a *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { return } -func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error { +func (a *APIClient) MonitorEvaluation(evalID string, ctx context.Context) error { stream, err := a.EvaluationStream(evalID, ctx) if err != nil { return err } - // If ctx is cancelled, the stream will be closed by Nomad and we exit the for loop. + // If ctx is canceled, the stream will be closed by Nomad and we exit the for loop. return receiveAndHandleNomadAPIEvents(stream, handleEvaluationEvent) } -func (a *ApiClient) WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation allocationProcessor) error { +func (a *APIClient) WatchAllocations(ctx context.Context, + onNewAllocation, onDeletedAllocation allocationProcessor) error { startTime := time.Now().UnixNano() stream, err := a.AllocationStream(ctx) if err != nil { @@ -92,20 +94,24 @@ func (a *ApiClient) WatchAllocations(ctx context.Context, onNewAllocation, onDel } waitingToRun := make(map[string]bool) - handler := func(event nomadApi.Event) error { - return handleAllocationEvent(startTime, waitingToRun, event, onNewAllocation, onDeletedAllocation) + handler := func(event *nomadApi.Event) (bool, error) { + return false, handleAllocationEvent(startTime, waitingToRun, event, onNewAllocation, onDeletedAllocation) } err = receiveAndHandleNomadAPIEvents(stream, handler) return err } -type nomadAPIEventHandler func(event nomadApi.Event) error +// nomadAPIEventHandler is a function that receives a nomadApi.Event and processes it. +// It is called by an event listening loop. For each received event, the function is called. +// If done is true, the calling function knows that it should break out of the event listening +// loop. +type nomadAPIEventHandler func(event *nomadApi.Event) (done bool, err error) -// receiveAndHandleNomadAPIEvents receives events from the Nomad event stream and calls the handler function for each received -// event. It skips heartbeat events and returns an error if the received events contain an error. +// receiveAndHandleNomadAPIEvents receives events from the Nomad event stream and calls the handler function for +// each received event. It skips heartbeat events and returns an error if the received events contain an error. func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler nomadAPIEventHandler) error { - // If original context is cancelled, the stream will be closed by Nomad and we exit the for loop. + // If original context is canceled, the stream will be closed by Nomad and we exit the for loop. for events := range stream { if events.IsHeartbeat() { continue @@ -114,8 +120,10 @@ func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler noma return fmt.Errorf("error receiving events: %w", err) } for _, event := range events.Events { - // TODO: we can't break out of this function from inside the handler - if err := handler(event); err != nil { + // Don't take the address of the loop variable as the underlying value might change + localEvent := event + done, err := handler(&localEvent) + if err != nil || done { return err } } @@ -124,40 +132,39 @@ func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler noma } // handleEvaluationEvent is a nomadAPIEventHandler that returns the status of an evaluation in the event. -func handleEvaluationEvent(event nomadApi.Event) error { +func handleEvaluationEvent(event *nomadApi.Event) (bool, error) { eval, err := event.Evaluation() if err != nil { - return fmt.Errorf("failed monitoring evaluation: %w", err) + return true, fmt.Errorf("failed monitoring evaluation: %w", err) } switch eval.Status { case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: - return checkEvaluation(eval) + return true, checkEvaluation(eval) default: } - return nil + return false, nil } // handleAllocationEvent is a nomadAPIEventHandler that processes allocation events. // If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation // is called. The waitingToRun map is used to store allocations that are pending but not started yet. Using the map // the state is persisted between multiple calls of this function. -func handleAllocationEvent(startTime int64, waitingToRun map[string]bool, event nomadApi.Event, +func handleAllocationEvent(startTime int64, waitingToRun map[string]bool, event *nomadApi.Event, onNewAllocation, onDeletedAllocation allocationProcessor) error { alloc, err := event.Allocation() if err != nil { - return fmt.Errorf("failed retrieving allocation from event %v: %w", event, err) + return fmt.Errorf("failed retrieving allocation from event: %w", err) } - if alloc == nil || event.Type == structs.TypePlanResult { + if alloc == nil || event.Type != structs.TypeAllocationUpdated { return nil } - if event.Type == structs.TypeAllocationUpdated { - // When starting the API and listening on the Nomad event stream we might get events that already - // happened from Nomad as it seems to buffer them for a certain duration. - // Ignore old events here. - if alloc.ModifyTime < startTime { - return nil - } + // When starting the API and listening on the Nomad event stream we might get events that already + // happened from Nomad as it seems to buffer them for a certain duration. + // Ignore old events here. + if alloc.ModifyTime < startTime { + return nil + } if alloc.ClientStatus == structs.AllocClientStatusRunning { switch alloc.DesiredStatus { @@ -171,32 +178,30 @@ func handleAllocationEvent(startTime int64, waitingToRun map[string]bool, event delete(pendingAllocations, alloc.ID) } } - if alloc.ClientStatus == structs.AllocClientStatusPending && alloc.DesiredStatus == structs.AllocDesiredStatusRun { - // allocation is started, wait until it runs and add to our list afterwards - waitingToRun[alloc.ID] = true - } + } + + if alloc.ClientStatus == structs.AllocClientStatusPending && alloc.DesiredStatus == structs.AllocDesiredStatusRun { + // allocation is started, wait until it runs and add to our list afterwards + waitingToRun[alloc.ID] = true } return nil } // checkEvaluation checks whether the given evaluation failed. // If the evaluation failed, it returns an error with a message containing the failure information. -func checkEvaluation(eval *nomadApi.Evaluation) error { +func checkEvaluation(eval *nomadApi.Evaluation) (err error) { if len(eval.FailedTGAllocs) == 0 { - if eval.Status == structs.EvalStatusComplete { - return nil + if eval.Status != structs.EvalStatusComplete { + err = fmt.Errorf("%w: %q", errEvaluation, eval.Status) } - return fmt.Errorf("evaluation could not complete: %q", eval.Status) } else { - messages := []string{ - fmt.Sprintf("Evaluation %q finished with status %q but failed to place all allocations.", eval.ID, eval.Status), - } + err = fmt.Errorf("evaluation %q finished with status %q but %w", eval.ID, eval.Status, errPlacingAllocations) for tg, metrics := range eval.FailedTGAllocs { - messages = append(messages, fmt.Sprintf("%s: %#v", tg, metrics)) + err = fmt.Errorf("%w\n%s: %#v", err, tg, metrics) } if eval.BlockedEval != "" { - messages = append(messages, fmt.Sprintf("Evaluation %q waiting for additional capacity to place remainder", eval.BlockedEval)) + err = fmt.Errorf("%w\nEvaluation %q waiting for additional capacity to place remainder", err, eval.BlockedEval) } - return errors.New(strings.Join(messages, "\n")) } + return err } diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index a46bcd0..a7b50db 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -11,11 +11,14 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "gitlab.hpi.de/codeocean/codemoon/poseidon/tests" "net/url" "testing" "time" ) +var errEvent = errors.New("my events error") + func TestLoadRunnersTestSuite(t *testing.T) { suite.Run(t, new(LoadRunnersTestSuite)) } @@ -24,7 +27,7 @@ type LoadRunnersTestSuite struct { suite.Suite jobId string mock *apiQuerierMock - nomadApiClient ApiClient + nomadApiClient APIClient availableRunner *nomadApi.AllocationListStub anotherAvailableRunner *nomadApi.AllocationListStub stoppedRunner *nomadApi.AllocationListStub @@ -35,7 +38,7 @@ func (suite *LoadRunnersTestSuite) SetupTest() { suite.jobId = "1d-0f-v3ry-sp3c14l-j0b" suite.mock = &apiQuerierMock{} - suite.nomadApiClient = ApiClient{apiQuerier: suite.mock} + suite.nomadApiClient = APIClient{apiQuerier: suite.mock} suite.availableRunner = &nomadApi.AllocationListStub{ ID: "s0m3-r4nd0m-1d", @@ -84,7 +87,8 @@ func (suite *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{suite.availableRunner}, nil) - returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) + returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) + require.NoError(suite.T(), err) suite.Len(returnedIds, 1) suite.Equal(suite.availableRunner.ID, returnedIds[0]) } @@ -93,7 +97,8 @@ func (suite *LoadRunnersTestSuite) TestStoppedRunnerIsNotReturned() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{suite.stoppedRunner}, nil) - returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) + returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) + require.NoError(suite.T(), err) suite.Empty(returnedIds) } @@ -101,7 +106,8 @@ func (suite *LoadRunnersTestSuite) TestStoppingRunnerIsNotReturned() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{suite.stoppingRunner}, nil) - returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) + returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) + require.NoError(suite.T(), err) suite.Empty(returnedIds) } @@ -131,13 +137,13 @@ var ( const TestNamespace = "unit-tests" func TestApiClient_init(t *testing.T) { - client := &ApiClient{apiQuerier: &nomadApiClient{}} + client := &APIClient{apiQuerier: &nomadApiClient{}} err := client.init(&TestURL, TestNamespace) require.Nil(t, err) } func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) { - client := &ApiClient{apiQuerier: &nomadApiClient{}} + client := &APIClient{apiQuerier: &nomadApiClient{}} err := client.init(&url.URL{ Scheme: "http", Host: "http://127.0.0.1:4646", @@ -146,15 +152,15 @@ func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) { } func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) { - expectedClient := &ApiClient{apiQuerier: &nomadApiClient{}} + expectedClient := &APIClient{apiQuerier: &nomadApiClient{}} err := expectedClient.init(&TestURL, TestNamespace) require.Nil(t, err) - _, err = NewExecutorApi(&TestURL, TestNamespace) + _, err = NewExecutorAPI(&TestURL, TestNamespace) require.Nil(t, err) } -// asynchronouslyMonitorEvaluation creates an ApiClient with mocked Nomad API and +// asynchronouslyMonitorEvaluation creates an APIClient with mocked Nomad API and // runs the MonitorEvaluation method in a goroutine. The mock returns a read-only // version of the given stream to simulate an event stream gotten from the real // Nomad API. @@ -165,7 +171,7 @@ func asynchronouslyMonitorEvaluation(stream chan *nomadApi.Events) chan error { apiMock := &apiQuerierMock{} apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil) - apiClient := &ApiClient{apiMock} + apiClient := &APIClient{apiMock} errChan := make(chan error) go func() { @@ -194,13 +200,14 @@ func TestApiClient_MonitorEvaluationReturnsErrorWhenStreamReturnsError(t *testin expectedErr := errors.New("test error") apiMock.On("EvaluationStream", mock.AnythingOfType("string"), mock.AnythingOfType("*context.emptyCtx")). Return(nil, expectedErr) - apiClient := &ApiClient{apiMock} + apiClient := &APIClient{apiMock} err := apiClient.MonitorEvaluation("id", context.Background()) - assert.Equal(t, expectedErr, err) + assert.ErrorIs(t, err, expectedErr) } type eventPayload struct { Evaluation *nomadApi.Evaluation + Allocation *nomadApi.Allocation } // eventForEvaluation takes an evaluation and creates an Event with the given evaluation @@ -209,39 +216,40 @@ type eventPayload struct { func eventForEvaluation(t *testing.T, eval nomadApi.Evaluation) nomadApi.Event { payload := make(map[string]interface{}) - err := mapstructure.Decode(eventPayload{&eval}, &payload) + err := mapstructure.Decode(eventPayload{Evaluation: &eval}, &payload) if err != nil { - t.Fatalf("Couldn't encode evaluation %v", eval) + t.Fatalf("Couldn't decode evaluation %v into payload map", eval) return nomadApi.Event{} } event := nomadApi.Event{Topic: nomadApi.TopicEvaluation, Payload: payload} return event } +// simulateNomadEventStream streams the given events sequentially to the stream channel. +// It returns how many events have been processed until an error occurred. +func simulateNomadEventStream(stream chan *nomadApi.Events, errChan chan error, events []*nomadApi.Events) (int, error) { + eventsProcessed := 0 + var e *nomadApi.Events + for _, e = range events { + select { + case err := <-errChan: + close(stream) + return eventsProcessed, err + case stream <- e: + eventsProcessed++ + } + } + err := <-errChan + return eventsProcessed, err +} + // runEvaluationMonitoring simulates events streamed from the Nomad event stream // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // and sequentially transfers the events from the given array to a channel simulating the stream. -func runEvaluationMonitoring(t *testing.T, events []*nomadApi.Events) (eventsProcessed int, err error) { +func runEvaluationMonitoring(events []*nomadApi.Events) (eventsProcessed int, err error) { stream := make(chan *nomadApi.Events) errChan := asynchronouslyMonitorEvaluation(stream) - - var e *nomadApi.Events - for eventsProcessed, e = range events { - select { - case err = <-errChan: - return - case stream <- e: - } - } - // wait for error after streaming final event - select { - case err = <-errChan: - case <-time.After(time.Millisecond * 10): - t.Fatal("MonitorEvaluation didn't finish as expected") - } - // Increment once as range starts at 0 - eventsProcessed++ - return + return simulateNomadEventStream(stream, errChan, events) } func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) { @@ -276,7 +284,7 @@ func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - eventsProcessed, err := runEvaluationMonitoring(t, c.streamedEvents) + eventsProcessed, err := runEvaluationMonitoring(c.streamedEvents) assert.Nil(t, err) assert.Equal(t, c.expectedEventsProcessed, eventsProcessed) }) @@ -289,14 +297,13 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { require.NotNil(t, evalErr) pendingEval := nomadApi.Evaluation{Status: structs.EvalStatusPending} - eventsErr := errors.New("my events error") events := nomadApi.Events{Events: []nomadApi.Event{eventForEvaluation(t, eval)}} pendingEvaluationEvents := nomadApi.Events{Events: []nomadApi.Event{eventForEvaluation(t, pendingEval)}} multipleEventsWithPending := nomadApi.Events{Events: []nomadApi.Event{ eventForEvaluation(t, pendingEval), eventForEvaluation(t, eval), }} - eventsWithErr := nomadApi.Events{Err: eventsErr, Events: []nomadApi.Event{{}}} + eventsWithErr := nomadApi.Events{Err: errEvent, Events: []nomadApi.Event{{}}} var cases = []struct { streamedEvents []*nomadApi.Events @@ -314,13 +321,13 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { "it skips pending evaluation and fail"}, {[]*nomadApi.Events{&multipleEventsWithPending}, 1, evalErr, "it handles multiple events per received event and fails"}, - {[]*nomadApi.Events{&eventsWithErr}, 1, eventsErr, + {[]*nomadApi.Events{&eventsWithErr}, 1, errEvent, "it fails with event error when event has error"}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - eventsProcessed, err := runEvaluationMonitoring(t, c.streamedEvents) + eventsProcessed, err := runEvaluationMonitoring(c.streamedEvents) require.NotNil(t, err) assert.Contains(t, err.Error(), c.expectedError.Error()) assert.Equal(t, c.expectedEventsProcessed, eventsProcessed) @@ -336,7 +343,7 @@ func TestApiClient_MonitorEvaluationFailsWhenFailingToDecodeEvaluation(t *testin } _, err := event.Evaluation() require.NotNil(t, err) - eventsProcessed, err := runEvaluationMonitoring(t, []*nomadApi.Events{{Events: []nomadApi.Event{event}}}) + eventsProcessed, err := runEvaluationMonitoring([]*nomadApi.Events{{Events: []nomadApi.Event{event}}}) assert.Equal(t, err, err) assert.Equal(t, 1, eventsProcessed) } @@ -383,7 +390,8 @@ func TestCheckEvaluationWithoutFailedAllocations(t *testing.T) { }) t.Run("when evaluation status not complete", func(t *testing.T) { - for _, status := range []string{structs.EvalStatusFailed, structs.EvalStatusCancelled, structs.EvalStatusBlocked, structs.EvalStatusPending} { + incompleteStates := []string{structs.EvalStatusFailed, structs.EvalStatusCancelled, structs.EvalStatusBlocked, structs.EvalStatusPending} + for _, status := range incompleteStates { evaluation.Status = status err := checkEvaluation(&evaluation) require.NotNil(t, err) @@ -391,3 +399,224 @@ func TestCheckEvaluationWithoutFailedAllocations(t *testing.T) { } }) } + +func TestApiClient_WatchAllocationsIgnoresOldAllocations(t *testing.T) { + oldStoppedAllocation := createOldAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusStop) + oldPendingAllocation := createOldAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + oldRunningAllocation := createOldAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + oldAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(t, oldStoppedAllocation), + eventForAllocation(t, oldPendingAllocation), + eventForAllocation(t, oldRunningAllocation), + }} + + assertWatchAllocation(t, []*nomadApi.Events{&oldAllocationEvents}, + []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil)) +} + +func createOldAllocation(clientStatus, desiredStatus string) *nomadApi.Allocation { + return createAllocation(time.Now().Add(-time.Minute).UnixNano(), clientStatus, desiredStatus) +} + +func TestApiClient_WatchAllocationsIgnoresUnhandledEvents(t *testing.T) { + nodeEvents := nomadApi.Events{Events: []nomadApi.Event{ + { + Topic: nomadApi.TopicNode, + Type: structs.TypeNodeEvent, + }, + }} + assertWatchAllocation(t, []*nomadApi.Events{&nodeEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil)) + + planEvents := nomadApi.Events{Events: []nomadApi.Event{ + { + Topic: nomadApi.TopicAllocation, + Type: structs.TypePlanResult, + }, + }} + assertWatchAllocation(t, []*nomadApi.Events{&planEvents}, []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil)) +} + +func TestApiClient_WatchAllocationsHandlesEvents(t *testing.T) { + newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + newPendingEvents := nomadApi.Events{Events: []nomadApi.Event{eventForAllocation(t, newPendingAllocation)}} + + newStartedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + startAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(t, newPendingAllocation), + eventForAllocation(t, newStartedAllocation), + }} + + newStoppedAllocation := createRecentAllocation(structs.AllocClientStatusRunning, structs.AllocDesiredStatusStop) + stopAllocationEvents := nomadApi.Events{Events: []nomadApi.Event{ + eventForAllocation(t, newPendingAllocation), + eventForAllocation(t, newStartedAllocation), + eventForAllocation(t, newStoppedAllocation), + }} + + var cases = []struct { + streamedEvents []*nomadApi.Events + expectedNewAllocations []*nomadApi.Allocation + expectedDeletedAllocations []*nomadApi.Allocation + name string + }{ + {[]*nomadApi.Events{&newPendingEvents}, + []*nomadApi.Allocation(nil), []*nomadApi.Allocation(nil), + "it does not add allocation when client status is pending"}, + {[]*nomadApi.Events{&startAllocationEvents}, + []*nomadApi.Allocation{newStartedAllocation}, + []*nomadApi.Allocation(nil), + "it adds allocation with matching events"}, + {[]*nomadApi.Events{{}, &startAllocationEvents}, + []*nomadApi.Allocation{newStartedAllocation}, + []*nomadApi.Allocation(nil), + "it skips heartbeat and adds allocation with matching events"}, + {[]*nomadApi.Events{&stopAllocationEvents}, + []*nomadApi.Allocation{newStartedAllocation}, + []*nomadApi.Allocation{newStoppedAllocation}, + "it adds and deletes the allocation"}, + {[]*nomadApi.Events{&startAllocationEvents, &startAllocationEvents}, + []*nomadApi.Allocation{newStartedAllocation, newStartedAllocation}, + []*nomadApi.Allocation(nil), + "it handles multiple events"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + assertWatchAllocation(t, c.streamedEvents, + c.expectedNewAllocations, c.expectedDeletedAllocations) + }) + } +} + +func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { + newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + newPendingEvent := eventForAllocation(t, newPendingAllocation) + + pendingMap := make(map[string]bool) + var doNothing allocationProcessor = func(allocation *nomadApi.Allocation) {} + + err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, doNothing, doNothing) + require.NoError(t, err) + + assert.True(t, pendingMap[newPendingAllocation.ID]) +} + +func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) { + testErr := errors.New("test error") + apiMock := &apiQuerierMock{} + apiMock.On("AllocationStream", mock.Anything).Return(nil, testErr) + apiClient := &APIClient{apiMock} + + noop := func(a *nomadApi.Allocation) {} + err := apiClient.WatchAllocations(context.Background(), noop, noop) + assert.ErrorIs(t, err, testErr) +} + +func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWithoutReceivingFurtherEvents(t *testing.T) { + noop := func(a *nomadApi.Allocation) {} + event := nomadApi.Event{ + Topic: nomadApi.TopicAllocation, + // This should fail decoding, as Allocation.ID is expected to be a string, not int + Payload: map[string]interface{}{"Allocation": map[string]interface{}{"ID": 1}}, + } + _, err := event.Allocation() + require.Error(t, err) + + events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}} + eventsProcessed, err := runAllocationWatching(t, events, noop, noop, context.Background()) + assert.Error(t, err) + assert.Equal(t, 1, eventsProcessed) +} + +func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, + expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) { + t.Helper() + var newAllocations []*nomadApi.Allocation + onNewAllocation := func(alloc *nomadApi.Allocation) { + newAllocations = append(newAllocations, alloc) + } + + var deletedAllocations []*nomadApi.Allocation + onDeletedAllocation := func(alloc *nomadApi.Allocation) { + deletedAllocations = append(deletedAllocations, alloc) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + + eventsProcessed, err := runAllocationWatching(t, events, onNewAllocation, onDeletedAllocation, ctx) + assert.NoError(t, err) + assert.Equal(t, len(events), eventsProcessed) + + assert.Equal(t, expectedNewAllocations, newAllocations) + assert.Equal(t, expectedDeletedAllocations, deletedAllocations) +} + +// runAllocationWatching simulates events streamed from the Nomad event stream +// to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine +// and sequentially transfers the events from the given array to a channel simulating the stream. +func runAllocationWatching(t *testing.T, events []*nomadApi.Events, + onNewAllocation, onDeletedAllocation allocationProcessor, ctx context.Context) (eventsProcessed int, err error) { + t.Helper() + stream := make(chan *nomadApi.Events) + go func() { + <-ctx.Done() + close(stream) + }() + errChan := asynchronouslyWatchAllocations(stream, onNewAllocation, onDeletedAllocation) + return simulateNomadEventStream(stream, errChan, events) +} + +// asynchronouslyMonitorEvaluation creates an APIClient with mocked Nomad API and +// runs the MonitorEvaluation method in a goroutine. The mock returns a read-only +// version of the given stream to simulate an event stream gotten from the real +// Nomad API. +func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, + onNewAllocation, onDeletedAllocation allocationProcessor) chan error { + ctx := context.Background() + // We can only get a read-only channel once we return it from a function. + readOnlyStream := func() <-chan *nomadApi.Events { return stream }() + + apiMock := &apiQuerierMock{} + apiMock.On("AllocationStream", ctx).Return(readOnlyStream, nil) + apiClient := &APIClient{apiMock} + + errChan := make(chan error) + go func() { + errChan <- apiClient.WatchAllocations(ctx, onNewAllocation, onDeletedAllocation) + }() + return errChan +} + +// eventForEvaluation takes an evaluation and creates an Event with the given evaluation +// as its payload. Nomad uses the mapstructure library to decode the payload, which we +// simply reverse here. +func eventForAllocation(t *testing.T, alloc *nomadApi.Allocation) nomadApi.Event { + t.Helper() + payload := make(map[string]interface{}) + + err := mapstructure.Decode(eventPayload{Allocation: alloc}, &payload) + if err != nil { + t.Fatalf("Couldn't encode allocation %v", err) + return nomadApi.Event{} + } + event := nomadApi.Event{ + Topic: nomadApi.TopicAllocation, + Type: structs.TypeAllocationUpdated, + Payload: payload, + } + return event +} + +func createAllocation(modifyTime int64, clientStatus, desiredStatus string) *nomadApi.Allocation { + return &nomadApi.Allocation{ + ID: tests.AllocationID, + ModifyTime: modifyTime, + ClientStatus: clientStatus, + DesiredStatus: desiredStatus, + } +} + +func createRecentAllocation(clientStatus, desiredStatus string) *nomadApi.Allocation { + return createAllocation(time.Now().Add(time.Minute).UnixNano(), clientStatus, desiredStatus) +} diff --git a/runner/manager_test.go b/runner/manager_test.go index 639377a..060bcdd 100644 --- a/runner/manager_test.go +++ b/runner/manager_test.go @@ -95,8 +95,10 @@ func (s *ManagerTestSuite) TestClaimDoesNotReturnTheSameRunnerTwice() { s.AddIdleRunnerForDefaultEnvironment(s.exerciseRunner) s.AddIdleRunnerForDefaultEnvironment(NewRunner(tests.AnotherRunnerId)) - firstReceivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentId) - secondReceivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentId) + firstReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentId) + require.NoError(s.T(), err) + secondReceivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentId) + require.NoError(s.T(), err) s.NotEqual(firstReceivedRunner, secondReceivedRunner) } @@ -109,7 +111,8 @@ func (s *ManagerTestSuite) TestClaimThrowsAnErrorIfNoRunnersAvailable() { func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { s.mockRunnerQueries([]string{tests.DefaultRunnerId}) s.waitForRunnerRefresh() - receivedRunner, _ := s.nomadRunnerManager.Claim(defaultEnvironmentId) + receivedRunner, err := s.nomadRunnerManager.Claim(defaultEnvironmentId) + require.NoError(s.T(), err) savedRunner, ok := s.nomadRunnerManager.usedRunners.Get(receivedRunner.Id()) s.True(ok) s.Equal(savedRunner, receivedRunner)