Exit MonitorEvaluation once stream is closed
As we pass the context to the Nomad API event stream, they close the event stream once the passed context is cancelled. We use this to exit our receive loop on the event stream once the stream is closed, instead of having to check the context manually.
This commit is contained in:

committed by
Tobias Kantusch

parent
a891d72c4f
commit
6084b00e23
@ -66,17 +66,12 @@ func (a *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error {
|
func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error {
|
||||||
var events *nomadApi.Events
|
|
||||||
stream, err := a.EvaluationStream(evalID, ctx)
|
stream, err := a.EvaluationStream(evalID, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for {
|
// If ctx is cancelled, the stream will be closed by Nomad and we exit the for loop.
|
||||||
select {
|
for events := range stream {
|
||||||
case events = <-stream:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if events.IsHeartbeat() {
|
if events.IsHeartbeat() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -97,6 +92,7 @@ func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkEvaluation checks whether the given evaluation failed.
|
// checkEvaluation checks whether the given evaluation failed.
|
||||||
|
@ -154,21 +154,33 @@ func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) {
|
|||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestApiClient_MonitorEvaluationReturnsNilWhenContextCancelled(t *testing.T) {
|
// asynchronouslyMonitorEvaluation creates an ApiClient with mocked Nomad API and
|
||||||
stream := make(<-chan *nomadApi.Events)
|
// runs the MonitorEvaluation method in a goroutine. The mock returns a read-only
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
// version of the given stream to simulate an event stream gotten from the real
|
||||||
|
// Nomad API.
|
||||||
|
func asynchronouslyMonitorEvaluation(stream chan *nomadApi.Events) chan error {
|
||||||
|
ctx := context.Background()
|
||||||
|
// We can only get a read-only channel once we return it from a function.
|
||||||
|
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
|
||||||
|
|
||||||
apiMock := &apiQuerierMock{}
|
apiMock := &apiQuerierMock{}
|
||||||
apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(stream, nil)
|
apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil)
|
||||||
apiClient := &ApiClient{apiMock}
|
apiClient := &ApiClient{apiMock}
|
||||||
|
|
||||||
var err error
|
|
||||||
errChan := make(chan error)
|
errChan := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
errChan <- apiClient.MonitorEvaluation("id", ctx)
|
errChan <- apiClient.MonitorEvaluation("id", ctx)
|
||||||
}()
|
}()
|
||||||
cancel()
|
return errChan
|
||||||
// If cancel doesn't terminate MonitorEvaluation, this test won't complete without a timeout.
|
}
|
||||||
|
|
||||||
|
func TestApiClient_MonitorEvaluationReturnsNilWhenStreamIsClosed(t *testing.T) {
|
||||||
|
stream := make(chan *nomadApi.Events)
|
||||||
|
errChan := asynchronouslyMonitorEvaluation(stream)
|
||||||
|
|
||||||
|
close(stream)
|
||||||
|
var err error
|
||||||
|
// If close doesn't terminate MonitorEvaluation, this test won't complete without a timeout.
|
||||||
select {
|
select {
|
||||||
case err = <-errChan:
|
case err = <-errChan:
|
||||||
case <-time.After(time.Millisecond * 10):
|
case <-time.After(time.Millisecond * 10):
|
||||||
@ -206,23 +218,12 @@ func eventForEvaluation(t *testing.T, eval nomadApi.Evaluation) nomadApi.Event {
|
|||||||
return event
|
return event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runEvaluationMonitoring simulates events streamed from the Nomad event stream
|
||||||
|
// to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine
|
||||||
|
// and sequentially transfers the events from the given array to a channel simulating the stream.
|
||||||
func runEvaluationMonitoring(t *testing.T, events []*nomadApi.Events) (eventsProcessed int, err error) {
|
func runEvaluationMonitoring(t *testing.T, events []*nomadApi.Events) (eventsProcessed int, err error) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
stream := make(chan *nomadApi.Events)
|
stream := make(chan *nomadApi.Events)
|
||||||
|
errChan := asynchronouslyMonitorEvaluation(stream)
|
||||||
apiMock := &apiQuerierMock{}
|
|
||||||
|
|
||||||
// Yes it is hacky. However, we can only get a read-only channel once we return it from a function.
|
|
||||||
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
|
|
||||||
|
|
||||||
apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil)
|
|
||||||
apiClient := &ApiClient{apiMock}
|
|
||||||
|
|
||||||
errChan := make(chan error)
|
|
||||||
go func() {
|
|
||||||
errChan <- apiClient.MonitorEvaluation("id", ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
var e *nomadApi.Events
|
var e *nomadApi.Events
|
||||||
for eventsProcessed, e = range events {
|
for eventsProcessed, e = range events {
|
||||||
|
Reference in New Issue
Block a user