#110 Refactor influxdb monitoring

to use it as singleton.
This enables the possibility to monitor processes that are independent of an incoming request.
This commit is contained in:
Maximilian Paß
2022-06-29 20:05:19 +02:00
parent eafc01e69a
commit 498e8f5ff5
19 changed files with 174 additions and 133 deletions

View File

@ -136,6 +136,9 @@ func (f File) ByteContent() []byte {
}
}
// ContextKey is the type for keys in a request context that is used for passing data to the next handler.
type ContextKey string
// WebSocketMessageType is the type for the messages from Poseidon to the client.
type WebSocketMessageType string

View File

@ -8,69 +8,77 @@ import (
influxdb2API "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/internal/environment"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"io"
"net/http"
"reflect"
"strconv"
"time"
)
var log = logging.GetLogger("monitoring")
const (
// influxdbContextKey is a key to reference the influxdb data point in the request context.
influxdbContextKey runner.ContextKey = "influxdb data point"
// influxdbMeasurementPrefix allows easier filtering in influxdb.
influxdbMeasurementPrefix = "poseidon_"
// influxdbContextKey is a key (runner.ContextKey) to reference the influxdb data point in the request context.
influxdbContextKey dto.ContextKey = "influxdb data point"
// measurementPrefix allows easier filtering in influxdb.
measurementPrefix = "poseidon_"
measurementPoolSize = measurementPrefix + "poolsize"
MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners"
MeasurementExecutionsAWS = measurementPrefix + "aws_executions"
MeasurementExecutionsNomad = measurementPrefix + "nomad_executions"
MeasurementEnvironments = measurementPrefix + "environments"
MeasurementUsedRunner = measurementPrefix + "used_runners"
// The keys for the monitored tags and fields.
influxKeyRunnerID = "runner_id"
influxKeyEnvironmentID = "environment_id"
influxKeyEnvironmentType = "environment_type"
influxKeyEnvironmentIdleRunner = "idle_runner"
influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size"
influxKeyRequestSize = "request_size"
)
// InfluxDB2Middleware is a middleware to send events to an influx database.
func InfluxDB2Middleware(influxClient influxdb2API.WriteAPI, manager environment.Manager) mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r).GetName()
p := influxdb2.NewPointWithMeasurement(influxdbMeasurementPrefix + route)
p.AddTag("stage", config.Config.InfluxDB.Stage)
var (
log = logging.GetLogger("monitoring")
influxClient influxdb2API.WriteAPI
)
start := time.Now().UTC()
p.SetTime(time.Now())
ctx := context.WithValue(r.Context(), influxdbContextKey, p)
requestWithPoint := r.WithContext(ctx)
lrw := logging.NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, requestWithPoint)
p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds())
p.AddTag("status", strconv.Itoa(lrw.StatusCode))
environmentID, err := strconv.Atoi(getEnvironmentID(p))
if err == nil && manager != nil {
addEnvironmentData(p, manager, dto.EnvironmentID(environmentID))
}
if influxClient != nil {
influxClient.WritePoint(p)
}
})
func InitializeInfluxDB(db *config.InfluxDB) (cancel func()) {
if db.URL == "" {
return func() {}
}
client := influxdb2.NewClient(db.URL, db.Token)
influxClient = client.WriteAPI(db.Organization, db.Bucket)
cancel = func() {
influxClient.Flush()
client.Close()
}
return cancel
}
// InfluxDB2Middleware is a middleware to send events to an influx database.
func InfluxDB2Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r).GetName()
p := influxdb2.NewPointWithMeasurement(measurementPrefix + route)
start := time.Now().UTC()
p.SetTime(time.Now())
ctx := context.WithValue(r.Context(), influxdbContextKey, p)
requestWithPoint := r.WithContext(ctx)
lrw := logging.NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, requestWithPoint)
p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds())
p.AddTag("status", strconv.Itoa(lrw.StatusCode))
WriteInfluxPoint(p)
})
}
// AddRunnerMonitoringData adds the data of the runner we want to monitor.
func AddRunnerMonitoringData(request *http.Request, r runner.Runner) {
addRunnerID(request, r.ID())
addEnvironmentID(request, r.Environment())
func AddRunnerMonitoringData(request *http.Request, runnerID string, environmentID dto.EnvironmentID) {
addRunnerID(request, runnerID)
addEnvironmentID(request, environmentID)
}
// addRunnerID adds the runner id to the influx data point for the current request.
@ -99,6 +107,23 @@ func AddRequestSize(r *http.Request) {
addInfluxDBField(r, influxKeyRequestSize, len(body))
}
func ChangedPrewarmingPoolSize(id dto.EnvironmentID, count uint) {
p := influxdb2.NewPointWithMeasurement(measurementPoolSize)
p.AddTag(influxKeyEnvironmentID, strconv.Itoa(int(id)))
p.AddField(influxKeyEnvironmentPrewarmingPoolSize, count)
WriteInfluxPoint(p)
}
// WriteInfluxPoint schedules the indlux data point to be sent.
func WriteInfluxPoint(p *write.Point) {
if influxClient != nil {
p.AddTag("stage", config.Config.InfluxDB.Stage)
influxClient.WritePoint(p)
}
}
// addInfluxDBTag adds a tag to the influxdb data point in the request.
func addInfluxDBTag(r *http.Request, key, value string) {
dataPointFromRequest(r).AddTag(key, value)
@ -117,32 +142,3 @@ func dataPointFromRequest(r *http.Request) *write.Point {
}
return p
}
// getEnvironmentID tries to find the environment id in the influxdb data point.
func getEnvironmentID(p *write.Point) string {
for _, tag := range p.TagList() {
if tag.Key == influxKeyEnvironmentID {
return tag.Value
}
}
return ""
}
// addEnvironmentData adds environment specific data to the influxdb data point.
func addEnvironmentData(p *write.Point, manager environment.Manager, id dto.EnvironmentID) {
e, err := manager.Get(id, false)
if err == nil {
p.AddTag(influxKeyEnvironmentType, getType(e))
p.AddField(influxKeyEnvironmentIdleRunner, e.IdleRunnerCount())
p.AddField(influxKeyEnvironmentPrewarmingPoolSize, e.PrewarmingPoolSize())
}
}
// Get type returns the type of the passed execution environment.
func getType(e runner.ExecutionEnvironment) string {
if t := reflect.TypeOf(e); t.Kind() == reflect.Ptr {
return t.Elem().Name()
} else {
return t.Name()
}
}

View File

@ -1,6 +1,10 @@
package storage
import (
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/pkg/monitoring"
"strconv"
"sync"
)
@ -36,10 +40,14 @@ type Storage[T any] interface {
Sample() (o T, ok bool)
}
type WriteCallback[T any] func(p *write.Point, object T, isDeletion bool)
// localStorage stores objects in the local application memory.
type localStorage[T any] struct {
sync.RWMutex
objects map[string]T
objects map[string]T
measurement string
callback WriteCallback[T]
}
// NewLocalStorage responds with a Storage implementation.
@ -50,6 +58,17 @@ func NewLocalStorage[T any]() *localStorage[T] {
}
}
// NewMonitoredLocalStorage responds with a Storage implementation.
// All write operations are monitored in the passed measurement.
// Iff callback is set, it will be called on a write operation.
func NewMonitoredLocalStorage[T any](measurement string, callback WriteCallback[T]) *localStorage[T] {
return &localStorage[T]{
objects: make(map[string]T),
measurement: measurement,
callback: callback,
}
}
func (s *localStorage[T]) List() (o []T) {
s.RLock()
defer s.RUnlock()
@ -63,6 +82,7 @@ func (s *localStorage[T]) Add(id string, o T) {
s.Lock()
defer s.Unlock()
s.objects[id] = o
s.sendMonitoringData(id, o, false)
}
func (s *localStorage[T]) Get(id string) (o T, ok bool) {
@ -75,6 +95,7 @@ func (s *localStorage[T]) Get(id string) (o T, ok bool) {
func (s *localStorage[T]) Delete(id string) {
s.Lock()
defer s.Unlock()
s.sendMonitoringData(id, s.objects[id], true)
delete(s.objects, id)
}
@ -87,6 +108,9 @@ func (s *localStorage[T]) Pop(id string) (T, bool) {
func (s *localStorage[T]) Purge() {
s.Lock()
defer s.Unlock()
for key, object := range s.objects {
s.sendMonitoringData(key, object, true)
}
s.objects = make(map[string]T)
}
@ -94,6 +118,7 @@ func (s *localStorage[T]) Sample() (o T, ok bool) {
s.Lock()
defer s.Unlock()
for key, object := range s.objects {
s.sendMonitoringData(key, object, true)
delete(s.objects, key)
return object, true
}
@ -105,3 +130,18 @@ func (s *localStorage[T]) Length() uint {
defer s.RUnlock()
return uint(len(s.objects))
}
func (s *localStorage[T]) sendMonitoringData(id string, o T, isDeletion bool) {
if s.measurement != "" {
p := influxdb2.NewPointWithMeasurement(s.measurement)
p.AddTag("id", id)
p.AddTag("isDeletion", strconv.FormatBool(isDeletion))
p.AddField("count", 1)
if s.callback != nil {
s.callback(p, o, isDeletion)
}
monitoring.WriteInfluxPoint(p)
}
}