From fff67246d6cc0bad92f403f55dd77dc5bcf62b6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Wed, 10 Nov 2021 09:57:40 +0100 Subject: [PATCH] Infinite busy waiting for lost event (#31) * Close evaluation stream for Nomad Job creation when set event handler have been finished * Remove evaluation event stream requests by handling the events via the main Nomad event handler. --- internal/environment/environment.go | 4 +- internal/environment/environment_test.go | 11 +--- internal/environment/manager_test.go | 2 +- internal/nomad/api_querier.go | 25 ++------ internal/nomad/api_querier_mock.go | 57 ++++++------------ internal/nomad/executor_api_mock.go | 41 +++---------- internal/nomad/job.go | 7 ++- internal/nomad/nomad.go | 74 ++++++++++++++++++------ internal/nomad/nomad_test.go | 34 ++++++----- internal/runner/manager.go | 2 +- internal/runner/manager_test.go | 8 +-- 11 files changed, 120 insertions(+), 145 deletions(-) diff --git a/internal/environment/environment.go b/internal/environment/environment.go index 5f01c5c..96fedc1 100644 --- a/internal/environment/environment.go +++ b/internal/environment/environment.go @@ -177,7 +177,9 @@ func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error { if err != nil { return fmt.Errorf("couldn't register job: %w", err) } - err = apiClient.MonitorEvaluation(evalID, context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), nomad.RegisterTimeout) + defer cancel() + err = apiClient.MonitorEvaluation(evalID, ctx) if err != nil { return fmt.Errorf("error during the monitoring of the environment job: %w", err) } diff --git a/internal/environment/environment_test.go b/internal/environment/environment_test.go index 6c824b4..ce54d65 100644 --- a/internal/environment/environment_test.go +++ b/internal/environment/environment_test.go @@ -116,24 +116,15 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) { err := environment.Register(apiClientMock) assert.ErrorIs(t, err, expectedErr) - apiClientMock.AssertNotCalled(t, "EvaluationStream") + apiClientMock.AssertNotCalled(t, "MonitorEvaluation") } func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { apiClientMock := &nomad.ExecutorAPIMock{} evaluationID := "id" - stream := make(chan *nomadApi.Events) - readonlyStream := func() <-chan *nomadApi.Events { - return stream - }() - // Immediately close stream to avoid any reading from it resulting in endless wait - close(stream) - apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil) - apiClientMock.On("EvaluationStream", evaluationID, mock.AnythingOfType("*context.emptyCtx")). - Return(readonlyStream, nil) environment := &NomadEnvironment{"", &nomadApi.Job{}, nil} environment.SetID(tests.DefaultEnvironmentIDAsInteger) diff --git a/internal/environment/manager_test.go b/internal/environment/manager_test.go index 12983d6..a33d888 100644 --- a/internal/environment/manager_test.go +++ b/internal/environment/manager_test.go @@ -232,7 +232,7 @@ func TestNomadEnvironmentManager_List(t *testing.T) { } func mockWatchAllocations(apiMock *nomad.ExecutorAPIMock) { - call := apiMock.On("WatchAllocations", mock.Anything, mock.Anything, mock.Anything) + call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything) call.Run(func(args mock.Arguments) { <-time.After(10 * time.Minute) // 10 minutes is the default test timeout call.ReturnArguments = mock.Arguments{nil} diff --git a/internal/nomad/api_querier.go b/internal/nomad/api_querier.go index 7684228..d818506 100644 --- a/internal/nomad/api_querier.go +++ b/internal/nomad/api_querier.go @@ -47,12 +47,8 @@ type apiQuerier interface { // It returns the evaluation ID that can be used when listening to the Nomad event stream. RegisterNomadJob(job *nomadApi.Job) (string, error) - // EvaluationStream returns a Nomad event stream filtered to return only events belonging to the - // given evaluation ID. - EvaluationStream(evalID string, ctx context.Context) (<-chan *nomadApi.Events, error) - - // AllocationStream returns a Nomad event stream filtered to return only allocation events. - AllocationStream(ctx context.Context) (<-chan *nomadApi.Events, error) + // EventStream returns a Nomad event stream filtered to return only allocation and evaluation events. + EventStream(ctx context.Context) (<-chan *nomadApi.Events, error) } // nomadAPIClient implements the nomadApiQuerier interface and provides access to a real Nomad API. @@ -136,24 +132,11 @@ func (nc *nomadAPIClient) RegisterNomadJob(job *nomadApi.Job) (string, error) { return resp.EvalID, nil } -func (nc *nomadAPIClient) EvaluationStream(evalID string, ctx context.Context) (<-chan *nomadApi.Events, error) { - stream, err := nc.client.EventStream().Stream( - ctx, - map[nomadApi.Topic][]string{ - nomadApi.TopicEvaluation: {evalID}, - }, - 0, - nc.queryOptions()) - if err != nil { - return nil, fmt.Errorf("error retrieving Nomad Evaluation event stream: %w", err) - } - return stream, nil -} - -func (nc *nomadAPIClient) AllocationStream(ctx context.Context) (<-chan *nomadApi.Events, error) { +func (nc *nomadAPIClient) EventStream(ctx context.Context) (<-chan *nomadApi.Events, error) { stream, err := nc.client.EventStream().Stream( ctx, map[nomadApi.Topic][]string{ + nomadApi.TopicEvaluation: {"*"}, nomadApi.TopicAllocation: { // Necessary to have the "topic" URL param show up in the HTTP request to Nomad. // Without the param, Nomad will try to deliver all event types. diff --git a/internal/nomad/api_querier_mock.go b/internal/nomad/api_querier_mock.go index d27e0e2..11c45a5 100644 --- a/internal/nomad/api_querier_mock.go +++ b/internal/nomad/api_querier_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.9.4. DO NOT EDIT. package nomad @@ -18,8 +18,22 @@ type apiQuerierMock struct { mock.Mock } -// AllocationStream provides a mock function with given fields: ctx -func (_m *apiQuerierMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) { +// DeleteJob provides a mock function with given fields: jobID +func (_m *apiQuerierMock) DeleteJob(jobID string) error { + ret := _m.Called(jobID) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(jobID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventStream provides a mock function with given fields: ctx +func (_m *apiQuerierMock) EventStream(ctx context.Context) (<-chan *api.Events, error) { ret := _m.Called(ctx) var r0 <-chan *api.Events @@ -41,43 +55,6 @@ func (_m *apiQuerierMock) AllocationStream(ctx context.Context) (<-chan *api.Eve return r0, r1 } -// DeleteRunner provides a mock function with given fields: runnerID -func (_m *apiQuerierMock) DeleteJob(runnerID string) error { - ret := _m.Called(runnerID) - - var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(runnerID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// EvaluationStream provides a mock function with given fields: evalID, ctx -func (_m *apiQuerierMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) { - ret := _m.Called(evalID, ctx) - - var r0 <-chan *api.Events - if rf, ok := ret.Get(0).(func(string, context.Context) <-chan *api.Events); ok { - r0 = rf(evalID, ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan *api.Events) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string, context.Context) error); ok { - r1 = rf(evalID, ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // Execute provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr func (_m *apiQuerierMock) Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) { ret := _m.Called(jobID, ctx, command, tty, stdin, stdout, stderr) diff --git a/internal/nomad/executor_api_mock.go b/internal/nomad/executor_api_mock.go index 0e21ed3..81a73d4 100644 --- a/internal/nomad/executor_api_mock.go +++ b/internal/nomad/executor_api_mock.go @@ -20,29 +20,6 @@ type ExecutorAPIMock struct { mock.Mock } -// AllocationStream provides a mock function with given fields: ctx -func (_m *ExecutorAPIMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) { - ret := _m.Called(ctx) - - var r0 <-chan *api.Events - if rf, ok := ret.Get(0).(func(context.Context) <-chan *api.Events); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan *api.Events) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // DeleteJob provides a mock function with given fields: jobID func (_m *ExecutorAPIMock) DeleteJob(jobID string) error { ret := _m.Called(jobID) @@ -57,13 +34,13 @@ func (_m *ExecutorAPIMock) DeleteJob(jobID string) error { return r0 } -// EvaluationStream provides a mock function with given fields: evalID, ctx -func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) { - ret := _m.Called(evalID, ctx) +// EventStream provides a mock function with given fields: ctx +func (_m *ExecutorAPIMock) EventStream(ctx context.Context) (<-chan *api.Events, error) { + ret := _m.Called(ctx) var r0 <-chan *api.Events - if rf, ok := ret.Get(0).(func(string, context.Context) <-chan *api.Events); ok { - r0 = rf(evalID, ctx) + if rf, ok := ret.Get(0).(func(context.Context) <-chan *api.Events); ok { + r0 = rf(ctx) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(<-chan *api.Events) @@ -71,8 +48,8 @@ func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context) } var r1 error - if rf, ok := ret.Get(1).(func(string, context.Context) error); ok { - r1 = rf(evalID, ctx) + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) } else { r1 = ret.Error(1) } @@ -335,8 +312,8 @@ func (_m *ExecutorAPIMock) SetJobScale(jobID string, count uint, reason string) return r0 } -// WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation -func (_m *ExecutorAPIMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error { +// WatchEventStream provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation +func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error { ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation) var r0 error diff --git a/internal/nomad/job.go b/internal/nomad/job.go index 7f18a2f..4848b5e 100644 --- a/internal/nomad/job.go +++ b/internal/nomad/job.go @@ -8,6 +8,7 @@ import ( "github.com/openHPI/poseidon/pkg/dto" "strconv" "strings" + "time" ) const ( @@ -27,6 +28,7 @@ const ( ConfigMetaTimeoutKey = "timeout" ConfigMetaPoolSizeKey = "prewarmingPoolSize" TemplateJobNameParts = 2 + RegisterTimeout = 10 * time.Second ) var ( @@ -44,7 +46,10 @@ func (a *APIClient) RegisterRunnerJob(template *nomadApi.Job) error { if err != nil { return fmt.Errorf("couldn't register runner job: %w", err) } - return a.MonitorEvaluation(evalID, context.Background()) + + registerTimeout, cancel := context.WithTimeout(context.Background(), RegisterTimeout) + defer cancel() + return a.MonitorEvaluation(evalID, registerTimeout) } func FindTaskGroup(job *nomadApi.Job, name string) *nomadApi.TaskGroup { diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index b00b51b..096175e 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -24,6 +24,9 @@ var ( ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") ) +// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected. +const resultChannelWriteTimeout = 10 * time.Millisecond + type AllocationProcessor func(*nomadApi.Allocation) // ExecutorAPI provides access to a container orchestration solution. @@ -53,9 +56,10 @@ type ExecutorAPI interface { // See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175 MonitorEvaluation(evaluationID string, ctx context.Context) error - // WatchAllocations listens on the Nomad event stream for allocation events. + // WatchEventStream listens on the Nomad event stream for allocation and evaluation events. // Depending on the incoming event, any of the given function is executed. - WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error + // Do not run multiple times simultaneously. + WatchEventStream(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error // ExecuteCommand executes the given command in the allocation with the given id. // It writes the output of the command to stdout/stderr and reads input from stdin. @@ -71,12 +75,14 @@ type ExecutorAPI interface { // Executor API and its return values. type APIClient struct { apiQuerier + evaluations map[string]chan error + isListening bool } // NewExecutorAPI creates a new api client. // One client is usually sufficient for the complete runtime of the API. func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) { - client := &APIClient{apiQuerier: &nomadAPIClient{}} + client := &APIClient{apiQuerier: &nomadAPIClient{}, evaluations: map[string]chan error{}} err := client.init(nomadConfig) return client, err } @@ -136,29 +142,53 @@ func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi return jobs, occurredError } -func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error { - stream, err := a.apiQuerier.EvaluationStream(evaluationID, ctx) - if err != nil { - return fmt.Errorf("failed retrieving evaluation stream: %w", err) +func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) (err error) { + a.evaluations[evaluationID] = make(chan error, 1) + defer delete(a.evaluations, evaluationID) + + if !a.isListening { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() // cancel the WatchEventStream when the evaluation result was read. + + go func() { + err = a.WatchEventStream(ctx, func(_ *nomadApi.Allocation) {}, func(_ *nomadApi.Allocation) {}) + cancel() // cancel the waiting for an evaluation result if watching the event stream ends. + }() + } + + select { + case <-ctx.Done(): + return err + case err := <-a.evaluations[evaluationID]: + // At the moment we expect only one error to be sent via this channel. + return err } - // If ctx is canceled, the stream will be closed by Nomad and we exit the for loop. - return receiveAndHandleNomadAPIEvents(stream, handleEvaluationEvent) } -func (a *APIClient) WatchAllocations(ctx context.Context, +func (a *APIClient) WatchEventStream(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error { startTime := time.Now().UnixNano() - stream, err := a.AllocationStream(ctx) + stream, err := a.EventStream(ctx) if err != nil { return fmt.Errorf("failed retrieving allocation stream: %w", err) } pendingAllocations := make(map[string]bool) handler := func(event *nomadApi.Event) (bool, error) { - return false, handleAllocationEvent(startTime, pendingAllocations, event, onNewAllocation, onDeletedAllocation) + switch event.Topic { + case nomadApi.TopicEvaluation: + return false, handleEvaluationEvent(a.evaluations, event) + case nomadApi.TopicAllocation: + return false, handleAllocationEvent(startTime, pendingAllocations, event, onNewAllocation, onDeletedAllocation) + default: + return false, nil + } } + a.isListening = true err = receiveAndHandleNomadAPIEvents(stream, handler) + a.isListening = false return err } @@ -191,21 +221,29 @@ func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler noma return nil } -// handleEvaluationEvent is a nomadAPIEventHandler that returns whether the evaluation described by the event +// handleEvaluationEvent is an event handler that returns whether the evaluation described by the event // was successful. -func handleEvaluationEvent(event *nomadApi.Event) (bool, error) { +func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Event) error { eval, err := event.Evaluation() if err != nil { - return true, fmt.Errorf("failed to monitor evaluation: %w", err) + return fmt.Errorf("failed to monitor evaluation: %w", err) } switch eval.Status { case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: - return true, checkEvaluation(eval) + resultChannel, ok := evaluations[eval.ID] + if ok { + select { + case resultChannel <- checkEvaluation(eval): + close(resultChannel) + case <-time.After(resultChannelWriteTimeout): + log.WithField("eval", eval).Error("Full evaluation channel") + } + } } - return false, nil + return nil } -// handleAllocationEvent is a nomadAPIEventHandler that processes allocation events. +// handleAllocationEvent is an event handler that processes allocation events. // If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation // is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the // map the state is persisted between multiple calls of this function. diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index 77ec38e..7ee7a69 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -124,6 +124,7 @@ func (s *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() { const TestNamespace = "unit-tests" const TestNomadToken = "n0m4d-t0k3n" const TestDefaultAddress = "127.0.0.1" +const evaluationID = "evaluation-id" func NomadTestConfig(address string) *config.Nomad { return &config.Nomad{ @@ -168,12 +169,13 @@ func asynchronouslyMonitorEvaluation(stream chan *nomadApi.Events) chan error { readOnlyStream := func() <-chan *nomadApi.Events { return stream }() apiMock := &apiQuerierMock{} - apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil) - apiClient := &APIClient{apiMock} + apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")). + Return(readOnlyStream, nil) + apiClient := &APIClient{apiMock, map[string]chan error{}, false} errChan := make(chan error) go func() { - errChan <- apiClient.MonitorEvaluation("id", ctx) + errChan <- apiClient.MonitorEvaluation(evaluationID, ctx) }() return errChan } @@ -195,9 +197,9 @@ func TestApiClient_MonitorEvaluationReturnsNilWhenStreamIsClosed(t *testing.T) { func TestApiClient_MonitorEvaluationReturnsErrorWhenStreamReturnsError(t *testing.T) { apiMock := &apiQuerierMock{} - apiMock.On("EvaluationStream", mock.AnythingOfType("string"), mock.AnythingOfType("*context.emptyCtx")). + apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")). Return(nil, tests.ErrDefault) - apiClient := &APIClient{apiMock} + apiClient := &APIClient{apiMock, map[string]chan error{}, false} err := apiClient.MonitorEvaluation("id", context.Background()) assert.ErrorIs(t, err, tests.ErrDefault) } @@ -278,8 +280,8 @@ func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) { }{ {[]*nomadApi.Events{&events}, 1, "it completes with successful event"}, - {[]*nomadApi.Events{&events, &events}, 1, - "it completes at first successful event"}, + {[]*nomadApi.Events{&events, &events}, 2, + "it keeps listening after first successful event"}, {[]*nomadApi.Events{{}, &events}, 2, "it skips heartbeat and completes"}, {[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2, @@ -298,7 +300,7 @@ func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) { } func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { - eval := nomadApi.Evaluation{Status: structs.EvalStatusFailed} + eval := nomadApi.Evaluation{ID: evaluationID, Status: structs.EvalStatusFailed} evalErr := checkEvaluation(&eval) require.NotNil(t, evalErr) @@ -318,9 +320,9 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { name string }{ {[]*nomadApi.Events{&events}, 1, evalErr, - "it fails with failing event"}, + "it completes with failing event"}, {[]*nomadApi.Events{&events, &events}, 1, evalErr, - "it fails at first failing event"}, + "it does not fail after first failing event"}, {[]*nomadApi.Events{{}, &events}, 2, evalErr, "it skips heartbeat and fail"}, {[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2, evalErr, @@ -510,11 +512,11 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) { apiMock := &apiQuerierMock{} - apiMock.On("AllocationStream", mock.Anything).Return(nil, tests.ErrDefault) - apiClient := &APIClient{apiMock} + apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) + apiClient := &APIClient{apiMock, map[string]chan error{}, false} noop := func(a *nomadApi.Allocation) {} - err := apiClient.WatchAllocations(context.Background(), noop, noop) + err := apiClient.WatchEventStream(context.Background(), noop, noop) assert.ErrorIs(t, err, tests.ErrDefault) } @@ -579,12 +581,12 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, readOnlyStream := func() <-chan *nomadApi.Events { return stream }() apiMock := &apiQuerierMock{} - apiMock.On("AllocationStream", ctx).Return(readOnlyStream, nil) - apiClient := &APIClient{apiMock} + apiMock.On("EventStream", ctx).Return(readOnlyStream, nil) + apiClient := &APIClient{apiMock, map[string]chan error{}, false} errChan := make(chan error) go func() { - errChan <- apiClient.WatchAllocations(ctx, onNewAllocation, onDeletedAllocation) + errChan <- apiClient.WatchEventStream(ctx, onNewAllocation, onDeletedAllocation) }() return errChan } diff --git a/internal/runner/manager.go b/internal/runner/manager.go index 6b52de0..3b68878 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -228,7 +228,7 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { retries := 0 for ctx.Err() == nil { - err := m.apiClient.WatchAllocations(ctx, m.onAllocationAdded, m.onAllocationStopped) + err := m.apiClient.WatchEventStream(ctx, m.onAllocationAdded, m.onAllocationStopped) retries += 1 log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries) <-time.After(time.Second) diff --git a/internal/runner/manager_test.go b/internal/runner/manager_test.go index ed15ffe..b929f42 100644 --- a/internal/runner/manager_test.go +++ b/internal/runner/manager_test.go @@ -42,7 +42,7 @@ func (s *ManagerTestSuite) SetupTest() { func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []string) { // reset expected calls to allow new mocked return values apiMock.ExpectedCalls = []*mock.Call{} - call := apiMock.On("WatchAllocations", mock.Anything, mock.Anything, mock.Anything) + call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything) call.Run(func(args mock.Arguments) { <-time.After(10 * time.Minute) // 10 minutes is the default test timeout call.ReturnArguments = mock.Arguments{nil} @@ -206,7 +206,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() { var hook *test.Hook logger, hook := test.NewNullLogger() log = logger.WithField("pkg", "runner") - modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) { + modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { call.ReturnArguments = mock.Arguments{tests.ErrDefault} }) @@ -232,7 +232,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { _, ok = environment.Sample(s.apiMock) s.Require().False(ok) - modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) { + modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { onCreate, ok := args.Get(1).(nomad.AllocationProcessor) s.Require().True(ok) @@ -260,7 +260,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { environment.AddRunner(testRunner) s.nomadRunnerManager.usedRunners.Add(testRunner) - modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) { + modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { onDelete, ok := args.Get(2).(nomad.AllocationProcessor) s.Require().True(ok)