Fix concurrent map write
in the Nomad `evaluations` map by replacing the simple map with our concurrency-ready storage object.
This commit is contained in:
@ -116,7 +116,7 @@ type ExecutorAPI interface {
|
||||
// Executor API and its return values.
|
||||
type APIClient struct {
|
||||
apiQuerier
|
||||
evaluations map[string]chan error
|
||||
evaluations storage.Storage[chan error]
|
||||
// allocations contain management data for all pending and running allocations.
|
||||
allocations storage.Storage[*allocationData]
|
||||
isListening bool
|
||||
@ -127,7 +127,7 @@ type APIClient struct {
|
||||
func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) {
|
||||
client := &APIClient{
|
||||
apiQuerier: &nomadAPIClient{},
|
||||
evaluations: map[string]chan error{},
|
||||
evaluations: storage.NewLocalStorage[chan error](),
|
||||
allocations: storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations,
|
||||
func(p *write.Point, object *allocationData, _ storage.EventType) {
|
||||
p.AddTag(monitoring.InfluxKeyJobID, object.jobID)
|
||||
@ -197,8 +197,9 @@ func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi
|
||||
}
|
||||
|
||||
func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) (err error) {
|
||||
a.evaluations[evaluationID] = make(chan error, 1)
|
||||
defer delete(a.evaluations, evaluationID)
|
||||
evaluationErrorChannel := make(chan error, 1)
|
||||
a.evaluations.Add(evaluationID, evaluationErrorChannel)
|
||||
defer a.evaluations.Delete(evaluationID)
|
||||
|
||||
if !a.isListening {
|
||||
var cancel context.CancelFunc
|
||||
@ -217,7 +218,7 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return err
|
||||
case err := <-a.evaluations[evaluationID]:
|
||||
case err := <-evaluationErrorChannel:
|
||||
// At the moment we expect only one error to be sent via this channel.
|
||||
return err
|
||||
}
|
||||
@ -313,14 +314,14 @@ func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler noma
|
||||
|
||||
// handleEvaluationEvent is an event handler that returns whether the evaluation described by the event
|
||||
// was successful.
|
||||
func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Event) error {
|
||||
func handleEvaluationEvent(evaluations storage.Storage[chan error], event *nomadApi.Event) error {
|
||||
eval, err := event.Evaluation()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to monitor evaluation: %w", err)
|
||||
}
|
||||
switch eval.Status {
|
||||
case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed:
|
||||
resultChannel, ok := evaluations[eval.ID]
|
||||
resultChannel, ok := evaluations.Get(eval.ID)
|
||||
if ok {
|
||||
evalErr := checkEvaluation(eval)
|
||||
select {
|
||||
|
@ -183,7 +183,7 @@ func asynchronouslyMonitorEvaluation(stream <-chan *nomadApi.Events) chan error
|
||||
apiMock := &apiQuerierMock{}
|
||||
apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")).
|
||||
Return(readOnlyStream, nil)
|
||||
apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false}
|
||||
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
|
||||
|
||||
errChan := make(chan error)
|
||||
go func() {
|
||||
@ -211,7 +211,7 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationReturnsErrorWhenStreamRet
|
||||
apiMock := &apiQuerierMock{}
|
||||
apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")).
|
||||
Return(nil, tests.ErrDefault)
|
||||
apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false}
|
||||
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
|
||||
err := apiClient.MonitorEvaluation("id", context.Background())
|
||||
s.ErrorIs(err, tests.ErrDefault)
|
||||
}
|
||||
@ -677,7 +677,7 @@ func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() {
|
||||
func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved() {
|
||||
apiMock := &apiQuerierMock{}
|
||||
apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault)
|
||||
apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false}
|
||||
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
|
||||
|
||||
err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessing)
|
||||
s.ErrorIs(err, tests.ErrDefault)
|
||||
@ -752,7 +752,7 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *All
|
||||
|
||||
apiMock := &apiQuerierMock{}
|
||||
apiMock.On("EventStream", ctx).Return(readOnlyStream, nil)
|
||||
apiClient := &APIClient{apiMock, map[string]chan error{}, storage.NewLocalStorage[*allocationData](), false}
|
||||
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
|
||||
|
||||
errChan := make(chan error)
|
||||
go func() {
|
||||
|
Reference in New Issue
Block a user