diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index fe95b6b..a347cbc 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -116,7 +116,7 @@ type ExecutorAPI interface { // Executor API and its return values. type APIClient struct { apiQuerier - evaluations map[string]chan error + evaluations storage.Storage[chan error] // allocations contain management data for all pending and running allocations. allocations storage.Storage[*allocationData] isListening bool @@ -127,7 +127,7 @@ type APIClient struct { func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) { client := &APIClient{ apiQuerier: &nomadAPIClient{}, - evaluations: map[string]chan error{}, + evaluations: storage.NewLocalStorage[chan error](), allocations: storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations, func(p *write.Point, object *allocationData, _ storage.EventType) { p.AddTag(monitoring.InfluxKeyJobID, object.jobID) @@ -197,8 +197,9 @@ func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi } func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) (err error) { - a.evaluations[evaluationID] = make(chan error, 1) - defer delete(a.evaluations, evaluationID) + evaluationErrorChannel := make(chan error, 1) + a.evaluations.Add(evaluationID, evaluationErrorChannel) + defer a.evaluations.Delete(evaluationID) if !a.isListening { var cancel context.CancelFunc @@ -217,7 +218,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) select { case <-ctx.Done(): return err - case err := <-a.evaluations[evaluationID]: + case err := <-evaluationErrorChannel: // At the moment we expect only one error to be sent via this channel. return err } @@ -313,14 +314,14 @@ func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler noma // handleEvaluationEvent is an event handler that returns whether the evaluation described by the event // was successful. -func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Event) error { +func handleEvaluationEvent(evaluations storage.Storage[chan error], event *nomadApi.Event) error { eval, err := event.Evaluation() if err != nil { return fmt.Errorf("failed to monitor evaluation: %w", err) } switch eval.Status { case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: - resultChannel, ok := evaluations[eval.ID] + resultChannel, ok := evaluations.Get(eval.ID) if ok { evalErr := checkEvaluation(eval) select { diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index 943c314..c2e0e12 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -183,7 +183,7 @@ func asynchronouslyMonitorEvaluation(stream <-chan *nomadApi.Events) chan error apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")). Return(readOnlyStream, nil) - apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} errChan := make(chan error) go func() { @@ -211,7 +211,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationReturnsErrorWhenStreamRet apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")). Return(nil, tests.ErrDefault) - apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} err := apiClient.MonitorEvaluation("id", context.Background()) s.ErrorIs(err, tests.ErrDefault) } @@ -677,7 +677,7 @@ func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() { func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved() { apiMock := &apiQuerierMock{} apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) - apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessing) s.ErrorIs(err, tests.ErrDefault) @@ -752,7 +752,7 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *All apiMock := &apiQuerierMock{} apiMock.On("EventStream", ctx).Return(readOnlyStream, nil) - apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false} + apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false} errChan := make(chan error) go func() {