Fix Nomad event stream is ignoring errors
when an event stream could be established once.
This commit is contained in:
@ -211,11 +211,10 @@ type nomadAPIEventHandler func(event *nomadApi.Event) (done bool, err error)
|
|||||||
func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler nomadAPIEventHandler) error {
|
func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler nomadAPIEventHandler) error {
|
||||||
// If original context is canceled, the stream will be closed by Nomad and we exit the for loop.
|
// If original context is canceled, the stream will be closed by Nomad and we exit the for loop.
|
||||||
for events := range stream {
|
for events := range stream {
|
||||||
if events.IsHeartbeat() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := events.Err; err != nil {
|
if err := events.Err; err != nil {
|
||||||
return fmt.Errorf("error receiving events: %w", err)
|
return fmt.Errorf("error receiving events: %w", err)
|
||||||
|
} else if events.IsHeartbeat() {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
for _, event := range events.Events {
|
for _, event := range events.Events {
|
||||||
// Don't take the address of the loop variable as the underlying value might change
|
// Don't take the address of the loop variable as the underlying value might change
|
||||||
|
@ -3,6 +3,7 @@ package nomad
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
nomadApi "github.com/hashicorp/nomad/api"
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
@ -26,6 +27,7 @@ var (
|
|||||||
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
|
||||||
OnDeleted: func(_ *nomadApi.Allocation) {},
|
OnDeleted: func(_ *nomadApi.Allocation) {},
|
||||||
}
|
}
|
||||||
|
ErrUnexpectedEOF = errors.New("unexpected EOF")
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLoadRunnersTestSuite(t *testing.T) {
|
func TestLoadRunnersTestSuite(t *testing.T) {
|
||||||
@ -543,6 +545,13 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWi
|
|||||||
assert.Equal(t, 1, eventsProcessed)
|
assert.Equal(t, 1, eventsProcessed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAPIClient_WatchAllocationsReturnsErrorOnUnexpectedEOF(t *testing.T) {
|
||||||
|
events := []*nomadApi.Events{{Err: ErrUnexpectedEOF}, {}}
|
||||||
|
eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Equal(t, 1, eventsProcessed)
|
||||||
|
}
|
||||||
|
|
||||||
func assertWatchAllocation(t *testing.T, events []*nomadApi.Events,
|
func assertWatchAllocation(t *testing.T, events []*nomadApi.Events,
|
||||||
expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) {
|
expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
@ -131,6 +131,18 @@ func WriteInfluxPoint(p *write.Point) {
|
|||||||
if influxClient != nil {
|
if influxClient != nil {
|
||||||
p.AddTag("stage", config.Config.InfluxDB.Stage)
|
p.AddTag("stage", config.Config.InfluxDB.Stage)
|
||||||
influxClient.WritePoint(p)
|
influxClient.WritePoint(p)
|
||||||
|
} else {
|
||||||
|
entry := log.WithField("name", p.Name())
|
||||||
|
for _, tag := range p.TagList() {
|
||||||
|
if tag.Key == "event_type" && tag.Value == "periodically" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry = entry.WithField(tag.Key, tag.Value)
|
||||||
|
}
|
||||||
|
for _, field := range p.FieldList() {
|
||||||
|
entry = entry.WithField(field.Key, field.Value)
|
||||||
|
}
|
||||||
|
entry.Debug("Influx data point")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user