diff --git a/nomad/nomad.go b/nomad/nomad.go index 6003163..a5c88cd 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -66,17 +66,12 @@ func (a *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { } func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error { - var events *nomadApi.Events stream, err := a.EvaluationStream(evalID, ctx) if err != nil { return err } - for { - select { - case events = <-stream: - case <-ctx.Done(): - return nil - } + // If ctx is cancelled, the stream will be closed by Nomad and we exit the for loop. + for events := range stream { if events.IsHeartbeat() { continue } @@ -97,6 +92,7 @@ func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error } } } + return nil } // checkEvaluation checks whether the given evaluation failed. diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index b29fea6..198aacd 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -154,21 +154,33 @@ func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) { require.Nil(t, err) } -func TestApiClient_MonitorEvaluationReturnsNilWhenContextCancelled(t *testing.T) { - stream := make(<-chan *nomadApi.Events) - ctx, cancel := context.WithCancel(context.Background()) +// asynchronouslyMonitorEvaluation creates an ApiClient with mocked Nomad API and +// runs the MonitorEvaluation method in a goroutine. The mock returns a read-only +// version of the given stream to simulate an event stream gotten from the real +// Nomad API. +func asynchronouslyMonitorEvaluation(stream chan *nomadApi.Events) chan error { + ctx := context.Background() + // We can only get a read-only channel once we return it from a function. + readOnlyStream := func() <-chan *nomadApi.Events { return stream }() apiMock := &apiQuerierMock{} - apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(stream, nil) + apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil) apiClient := &ApiClient{apiMock} - var err error errChan := make(chan error) go func() { errChan <- apiClient.MonitorEvaluation("id", ctx) }() - cancel() - // If cancel doesn't terminate MonitorEvaluation, this test won't complete without a timeout. + return errChan +} + +func TestApiClient_MonitorEvaluationReturnsNilWhenStreamIsClosed(t *testing.T) { + stream := make(chan *nomadApi.Events) + errChan := asynchronouslyMonitorEvaluation(stream) + + close(stream) + var err error + // If close doesn't terminate MonitorEvaluation, this test won't complete without a timeout. select { case err = <-errChan: case <-time.After(time.Millisecond * 10): @@ -206,23 +218,12 @@ func eventForEvaluation(t *testing.T, eval nomadApi.Evaluation) nomadApi.Event { return event } +// runEvaluationMonitoring simulates events streamed from the Nomad event stream +// to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine +// and sequentially transfers the events from the given array to a channel simulating the stream. func runEvaluationMonitoring(t *testing.T, events []*nomadApi.Events) (eventsProcessed int, err error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() stream := make(chan *nomadApi.Events) - - apiMock := &apiQuerierMock{} - - // Yes it is hacky. However, we can only get a read-only channel once we return it from a function. - readOnlyStream := func() <-chan *nomadApi.Events { return stream }() - - apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil) - apiClient := &ApiClient{apiMock} - - errChan := make(chan error) - go func() { - errChan <- apiClient.MonitorEvaluation("id", ctx) - }() + errChan := asynchronouslyMonitorEvaluation(stream) var e *nomadApi.Events for eventsProcessed, e = range events {