Monitor the Nomad events
and send all Nomad events to Influxdb.
This commit is contained in:
@ -181,6 +181,9 @@ func (nc *nomadAPIClient) EventStream(ctx context.Context) (<-chan *nomadApi.Eve
|
|||||||
// As Poseidon uses no such token, the request will return a permission denied error.
|
// As Poseidon uses no such token, the request will return a permission denied error.
|
||||||
"*",
|
"*",
|
||||||
},
|
},
|
||||||
|
nomadApi.TopicJob: {"*"},
|
||||||
|
nomadApi.TopicNode: {"*"},
|
||||||
|
nomadApi.TopicDeployment: {"*"},
|
||||||
},
|
},
|
||||||
0,
|
0,
|
||||||
nc.queryOptions())
|
nc.queryOptions())
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
nomadApi "github.com/hashicorp/nomad/api"
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||||
"github.com/openHPI/poseidon/internal/config"
|
"github.com/openHPI/poseidon/internal/config"
|
||||||
"github.com/openHPI/poseidon/pkg/dto"
|
"github.com/openHPI/poseidon/pkg/dto"
|
||||||
@ -207,6 +208,7 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP
|
|||||||
}
|
}
|
||||||
|
|
||||||
handler := func(event *nomadApi.Event) (bool, error) {
|
handler := func(event *nomadApi.Event) (bool, error) {
|
||||||
|
dumpNomadEventToInflux(event)
|
||||||
switch event.Topic {
|
switch event.Topic {
|
||||||
case nomadApi.TopicEvaluation:
|
case nomadApi.TopicEvaluation:
|
||||||
return false, handleEvaluationEvent(a.evaluations, event)
|
return false, handleEvaluationEvent(a.evaluations, event)
|
||||||
@ -223,6 +225,15 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dumpNomadEventToInflux(event *nomadApi.Event) {
|
||||||
|
p := influxdb2.NewPointWithMeasurement(monitoring.MeasurementNomadEvents)
|
||||||
|
p.AddTag("topic", event.Topic.String())
|
||||||
|
p.AddTag("type", event.Type)
|
||||||
|
p.AddTag("key", event.Key)
|
||||||
|
p.AddField("payload", event.Payload)
|
||||||
|
monitoring.WriteInfluxPoint(p)
|
||||||
|
}
|
||||||
|
|
||||||
func (a *APIClient) initializeAllocations() {
|
func (a *APIClient) initializeAllocations() {
|
||||||
allocationStubs, err := a.listAllocations()
|
allocationStubs, err := a.listAllocations()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -23,6 +23,7 @@ const (
|
|||||||
// measurementPrefix allows easier filtering in influxdb.
|
// measurementPrefix allows easier filtering in influxdb.
|
||||||
measurementPrefix = "poseidon_"
|
measurementPrefix = "poseidon_"
|
||||||
measurementPoolSize = measurementPrefix + "poolsize"
|
measurementPoolSize = measurementPrefix + "poolsize"
|
||||||
|
MeasurementNomadEvents = measurementPrefix + "nomad_events"
|
||||||
MeasurementNomadAllocations = measurementPrefix + "nomad_allocations"
|
MeasurementNomadAllocations = measurementPrefix + "nomad_allocations"
|
||||||
MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners"
|
MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners"
|
||||||
MeasurementExecutionsAWS = measurementPrefix + "aws_executions"
|
MeasurementExecutionsAWS = measurementPrefix + "aws_executions"
|
||||||
|
Reference in New Issue
Block a user