diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 1358521..9f3667c 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -211,11 +211,10 @@ type nomadAPIEventHandler func(event *nomadApi.Event) (done bool, err error) func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler nomadAPIEventHandler) error { // If original context is canceled, the stream will be closed by Nomad and we exit the for loop. for events := range stream { - if events.IsHeartbeat() { - continue - } if err := events.Err; err != nil { return fmt.Errorf("error receiving events: %w", err) + } else if events.IsHeartbeat() { + continue } for _, event := range events.Events { // Don't take the address of the loop variable as the underlying value might change diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index 39cefff..f9b2560 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -3,6 +3,7 @@ package nomad import ( "bytes" "context" + "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -26,6 +27,7 @@ var ( OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, OnDeleted: func(_ *nomadApi.Allocation) {}, } + ErrUnexpectedEOF = errors.New("unexpected EOF") ) func TestLoadRunnersTestSuite(t *testing.T) { @@ -543,6 +545,13 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWi assert.Equal(t, 1, eventsProcessed) } +func TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF(t *testing.T) { + events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}} + eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring) + assert.Error(t, err) + assert.Equal(t, 1, eventsProcessed) +} + func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) { t.Helper() diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index 6fc4239..cbf7a9c 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -131,6 +131,18 @@ func WriteInfluxPoint(p *write.Point) { if influxClient != nil { p.AddTag("stage", config.Config.InfluxDB.Stage) influxClient.WritePoint(p) + } else { + entry := log.WithField("name", p.Name()) + for _, tag := range p.TagList() { + if tag.Key == "event_type" && tag.Value == "periodically" { + return + } + entry = entry.WithField(tag.Key, tag.Value) + } + for _, field := range p.FieldList() { + entry = entry.WithField(field.Key, field.Value) + } + entry.Debug("Influx data point") } }