From 25f92e5f947c56e4dc6d184a4bb6ed9c9614b332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Tue, 12 Apr 2022 21:16:17 +0200 Subject: [PATCH] Add environment specific data to the influxdb data. --- internal/api/api.go | 3 +- internal/api/runners.go | 18 +-- internal/api/websocket.go | 3 +- pkg/logging/influxdb2_middleware.go | 106 ------------------ pkg/logging/logging.go | 8 +- pkg/monitoring/influxdb2_middleware.go | 146 +++++++++++++++++++++++++ 6 files changed, 160 insertions(+), 124 deletions(-) delete mode 100644 pkg/logging/influxdb2_middleware.go create mode 100644 pkg/monitoring/influxdb2_middleware.go diff --git a/internal/api/api.go b/internal/api/api.go index 4882d4c..ea68b83 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -9,6 +9,7 @@ import ( "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" + "github.com/openHPI/poseidon/pkg/monitoring" "net/http" ) @@ -35,7 +36,7 @@ func NewRouter(runnerManager runner.Manager, environmentManager environment.Mana // `router.Host(...)` and to HTTPS with `router.Schemes("https")` configureV1Router(router, runnerManager, environmentManager) router.Use(logging.HTTPLoggingMiddleware) - router.Use(logging.InfluxDB2Middleware(influxClient)) + router.Use(monitoring.InfluxDB2Middleware(influxClient, environmentManager)) return router } diff --git a/internal/api/runners.go b/internal/api/runners.go index d9ec81b..2ce7d1a 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -8,7 +8,7 @@ import ( "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" - "github.com/openHPI/poseidon/pkg/logging" + "github.com/openHPI/poseidon/pkg/monitoring" "io" "net/http" "net/url" @@ -65,21 +65,21 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req } return } - addMonitoringData(request, nextRunner) + monitoring.AddRunnerMonitoringData(request, nextRunner) sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK) } // updateFileSystem handles the files API route. // It takes an dto.UpdateFileSystemRequest and sends it to the runner for processing. func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request *http.Request) { - logging.AddRequestSize(request) + monitoring.AddRequestSize(request) fileCopyRequest := new(dto.UpdateFileSystemRequest) if err := parseJSONRequestBody(writer, request, fileCopyRequest); err != nil { return } targetRunner, _ := runner.FromContext(request.Context()) - addMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner) if err := targetRunner.UpdateFileSystem(fileCopyRequest); err != nil { log.WithError(err).Error("Could not perform the requested updateFileSystem.") writeInternalServerError(writer, err, dto.ErrorUnknown) @@ -105,7 +105,7 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req scheme = "ws" } targetRunner, _ := runner.FromContext(request.Context()) - addMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner) path, err := r.runnerRouter.Get(WebsocketPath).URL(RunnerIDKey, targetRunner.ID()) if err != nil { @@ -157,7 +157,7 @@ func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler // It destroys the given runner on the executor and removes it from the used runners list. func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Request) { targetRunner, _ := runner.FromContext(request.Context()) - addMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner) err := r.manager.Return(targetRunner) if err != nil { @@ -171,9 +171,3 @@ func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Requ writer.WriteHeader(http.StatusNoContent) } - -// addMonitoringData adds the data of the runner and environment we want to monitor. -func addMonitoringData(request *http.Request, r runner.Runner) { - logging.AddRunnerID(request, r.ID()) - logging.AddEnvironmentID(request, r.Environment()) -} diff --git a/internal/api/websocket.go b/internal/api/websocket.go index b2939b5..cf77e45 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/websocket" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/monitoring" "io" "net/http" "sync" @@ -336,7 +337,7 @@ func (wp *webSocketProxy) writeMessage(messageType int, data []byte) error { // connectToRunner is the endpoint for websocket connections. func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) { targetRunner, _ := runner.FromContext(request.Context()) - addMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner) executionID := request.URL.Query().Get(ExecutionIDKey) if !targetRunner.ExecutionExists(executionID) { diff --git a/pkg/logging/influxdb2_middleware.go b/pkg/logging/influxdb2_middleware.go deleted file mode 100644 index 074f927..0000000 --- a/pkg/logging/influxdb2_middleware.go +++ /dev/null @@ -1,106 +0,0 @@ -package logging - -import ( - "bytes" - "context" - "github.com/gorilla/mux" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - influxdb2API "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/openHPI/poseidon/pkg/dto" - "io" - "net/http" - "strconv" - "time" -) - -// InfluxdbContextKey is a key to reference the influxdb data point in the request context. -const InfluxdbContextKey = "influxdb data point" - -// InfluxdbMeasurementPrefix allows easier filtering in influxdb. -const InfluxdbMeasurementPrefix = "poseidon_" - -// InfluxDB2Middleware is a middleware to send events to an influx database. -func InfluxDB2Middleware(influxClient influxdb2API.WriteAPI) mux.MiddlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - route := mux.CurrentRoute(r).GetName() - p := influxdb2.NewPointWithMeasurement(InfluxdbMeasurementPrefix + route) - - start := time.Now().UTC() - p.SetTime(time.Now()) - - requestWithPoint := r.WithContext(newContextWithPoint(r.Context(), p)) - lrw := NewLoggingResponseWriter(w) - next.ServeHTTP(lrw, requestWithPoint) - - p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds()) - p.AddTag("status", strconv.Itoa(lrw.statusCode)) - - if influxClient != nil { - influxClient.WritePoint(p) - } - }) - } -} - -// AddEnvironmentID adds the environment id to the influx data point for the current request. -func AddEnvironmentID(r *http.Request, id dto.EnvironmentID) { - p := pointFromContext(r.Context()) - p.AddTag("environment_id", strconv.Itoa(int(id))) -} - -// AddEnvironmentType adds the type of the used environment to the influxdb data point. -func AddEnvironmentType(r *http.Request, t string) { - p := pointFromContext(r.Context()) - p.AddTag("environment_type", t) -} - -// AddRunnerID adds the runner id to the influx data point for the current request. -func AddRunnerID(r *http.Request, id string) { - p := pointFromContext(r.Context()) - p.AddTag("runner_id", id) -} - -// AddIdleRunner adds the count of idle runners of the used environment to the influxdb data point. -func AddIdleRunner(r *http.Request, count int) { - p := pointFromContext(r.Context()) - p.AddField("idle_runner", strconv.Itoa(count)) -} - -// AddPrewarmingPoolSize adds the prewarming pool size of the used environment to the influxdb data point. -func AddPrewarmingPoolSize(r *http.Request, count uint) { - p := pointFromContext(r.Context()) - p.AddField("prewarming_pool_size", strconv.Itoa(int(count))) -} - -// AddRequestSize adds the size of the request body to the influx data point for the current request. -func AddRequestSize(r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - log.WithError(err).Warn("Failed to read request body") - } - - err = r.Body.Close() - if err != nil { - log.WithError(err).Warn("Failed to close request body") - } - r.Body = io.NopCloser(bytes.NewBuffer(body)) - - p := pointFromContext(r.Context()) - p.AddField("request_size", strconv.Itoa(len(body))) -} - -// newContextWithPoint creates a context containing an InfluxDB data point. -func newContextWithPoint(ctx context.Context, p *write.Point) context.Context { - return context.WithValue(ctx, InfluxdbContextKey, p) -} - -// pointFromContext returns an InfluxDB data point from a context. -func pointFromContext(ctx context.Context) *write.Point { - p, ok := ctx.Value(InfluxdbContextKey).(*write.Point) - if !ok { - log.Errorf("InfluxDB data point not stored in context.") - } - return p -} diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index eea6ada..e2e2470 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -46,7 +46,7 @@ func GetLogger(pkg string) *logrus.Entry { // that is written. type loggingResponseWriter struct { http.ResponseWriter - statusCode int + StatusCode int } func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter { @@ -54,7 +54,7 @@ func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter { } func (writer *loggingResponseWriter) WriteHeader(code int) { - writer.statusCode = code + writer.StatusCode = code writer.ResponseWriter.WriteHeader(code) } @@ -77,13 +77,13 @@ func HTTPLoggingMiddleware(next http.Handler) http.Handler { latency := time.Now().UTC().Sub(start) logEntry := log.WithFields(logrus.Fields{ - "code": lrw.statusCode, + "code": lrw.StatusCode, "method": r.Method, "path": path, "duration": latency, "user_agent": r.UserAgent(), }) - if lrw.statusCode >= http.StatusInternalServerError { + if lrw.StatusCode >= http.StatusInternalServerError { logEntry.Error("Failing " + path) } else { logEntry.Debug() diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go new file mode 100644 index 0000000..1231281 --- /dev/null +++ b/pkg/monitoring/influxdb2_middleware.go @@ -0,0 +1,146 @@ +package monitoring + +import ( + "bytes" + "context" + "github.com/gorilla/mux" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + influxdb2API "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/openHPI/poseidon/internal/environment" + "github.com/openHPI/poseidon/internal/runner" + "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/logging" + "io" + "net/http" + "reflect" + "strconv" + "time" +) + +var log = logging.GetLogger("monitoring") + +const ( + // influxdbContextKey is a key to reference the influxdb data point in the request context. + influxdbContextKey runner.ContextKey = "influxdb data point" + // influxdbMeasurementPrefix allows easier filtering in influxdb. + influxdbMeasurementPrefix = "poseidon_" + + // The keys for the monitored tags and fields. + influxKeyRunnerID = "runner_id" + influxKeyEnvironmentID = "environment_id" + influxKeyEnvironmentType = "environment_type" + influxKeyEnvironmentIdleRunner = "idle_runner" + influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size" + influxKeyRequestSize = "request_size" +) + +// InfluxDB2Middleware is a middleware to send events to an influx database. +func InfluxDB2Middleware(influxClient influxdb2API.WriteAPI, manager environment.Manager) mux.MiddlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + route := mux.CurrentRoute(r).GetName() + p := influxdb2.NewPointWithMeasurement(influxdbMeasurementPrefix + route) + + start := time.Now().UTC() + p.SetTime(time.Now()) + + ctx := context.WithValue(r.Context(), influxdbContextKey, p) + requestWithPoint := r.WithContext(ctx) + lrw := logging.NewLoggingResponseWriter(w) + next.ServeHTTP(lrw, requestWithPoint) + + p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds()) + p.AddTag("status", strconv.Itoa(lrw.StatusCode)) + + environmentID, err := strconv.Atoi(getEnvironmentID(p)) + if err == nil && manager != nil { + addEnvironmentData(p, manager, dto.EnvironmentID(environmentID)) + } + + if influxClient != nil { + influxClient.WritePoint(p) + } + }) + } +} + +// AddRunnerMonitoringData adds the data of the runner we want to monitor. +func AddRunnerMonitoringData(request *http.Request, r runner.Runner) { + addRunnerID(request, r.ID()) + addEnvironmentID(request, r.Environment()) +} + +// addRunnerID adds the runner id to the influx data point for the current request. +func addRunnerID(r *http.Request, id string) { + addInfluxDBTag(r, influxKeyRunnerID, id) +} + +// addEnvironmentID adds the environment id to the influx data point for the current request. +func addEnvironmentID(r *http.Request, id dto.EnvironmentID) { + addInfluxDBTag(r, influxKeyEnvironmentID, strconv.Itoa(int(id))) +} + +// AddRequestSize adds the size of the request body to the influx data point for the current request. +func AddRequestSize(r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + log.WithError(err).Warn("Failed to read request body") + } + + err = r.Body.Close() + if err != nil { + log.WithError(err).Warn("Failed to close request body") + } + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + addInfluxDBField(r, influxKeyRequestSize, len(body)) +} + +// addInfluxDBTag adds a tag to the influxdb data point in the request. +func addInfluxDBTag(r *http.Request, key, value string) { + dataPointFromRequest(r).AddTag(key, value) +} + +// addInfluxDBField adds a field to the influxdb data point in the request. +func addInfluxDBField(r *http.Request, key string, value interface{}) { + dataPointFromRequest(r).AddField(key, value) +} + +// dataPointFromRequest returns the data point in the passed request. +func dataPointFromRequest(r *http.Request) *write.Point { + p, ok := r.Context().Value(influxdbContextKey).(*write.Point) + if !ok { + log.Error("All http request must contain an influxdb data point!") + } + return p +} + +// getEnvironmentID tries to find the environment id in the influxdb data point. +func getEnvironmentID(p *write.Point) string { + for _, tag := range p.TagList() { + if tag.Key == influxKeyEnvironmentID { + return tag.Value + } + } + return "" +} + +// addEnvironmentData adds environment specific data to the influxdb data point. +func addEnvironmentData(p *write.Point, manager environment.Manager, id dto.EnvironmentID) { + e, err := manager.Get(id, false) + if err == nil { + p.AddTag(influxKeyEnvironmentType, getType(e)) + p.AddField(influxKeyEnvironmentIdleRunner, e.IdleRunnerCount()) + p.AddField(influxKeyEnvironmentPrewarmingPoolSize, e.PrewarmingPoolSize()) + } +} + +// Get type returns the type of the passed execution environment. +func getType(e runner.ExecutionEnvironment) string { + if t := reflect.TypeOf(e); t.Kind() == reflect.Ptr { + return t.Elem().Name() + } else { + return t.Name() + } +}