diff --git a/cmd/poseidon/main.go b/cmd/poseidon/main.go index 0093dc2..3a0a340 100644 --- a/cmd/poseidon/main.go +++ b/cmd/poseidon/main.go @@ -4,14 +4,13 @@ import ( "context" "errors" "github.com/getsentry/sentry-go" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - influxdb2API "github.com/influxdata/influxdb-client-go/v2/api" "github.com/openHPI/poseidon/internal/api" "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/environment" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/logging" + "github.com/openHPI/poseidon/pkg/monitoring" "net/http" "os" "os/signal" @@ -37,20 +36,6 @@ func shutdownSentry() { } } -func initInfluxDB(db *config.InfluxDB) (writeAPI influxdb2API.WriteAPI, cancel func()) { - if db.URL == "" { - return nil, func() {} - } - - client := influxdb2.NewClient(db.URL, db.Token) - writeAPI = client.WriteAPI(db.Organization, db.Bucket) - cancel = func() { - writeAPI.Flush() - client.Close() - } - return writeAPI, cancel -} - func runServer(server *http.Server) { log.WithField("address", server.Addr).Info("Starting server") var err error @@ -111,7 +96,7 @@ func createAWSManager() (runnerManager runner.Manager, environmentManager enviro } // initServer builds the http server and configures it with the chain of responsibility for multiple managers. -func initServer(influxClient influxdb2API.WriteAPI) *http.Server { +func initServer() *http.Server { runnerManager, environmentManager := createManagerHandler(createNomadManager, config.Config.Nomad.Enabled, nil, nil) runnerManager, environmentManager = createManagerHandler(createAWSManager, config.Config.AWS.Enabled, @@ -121,7 +106,7 @@ func initServer(influxClient influxdb2API.WriteAPI) *http.Server { Addr: config.Config.Server.URL().Host, ReadTimeout: time.Second * 15, IdleTimeout: time.Second * 60, - Handler: api.NewRouter(runnerManager, environmentManager, influxClient), + Handler: api.NewRouter(runnerManager, environmentManager), } } @@ -149,10 +134,10 @@ func main() { initSentry(&config.Config.Sentry) defer shutdownSentry() - influxDB, cancel := initInfluxDB(&config.Config.InfluxDB) + cancel := monitoring.InitializeInfluxDB(&config.Config.InfluxDB) defer cancel() - server := initServer(influxDB) + server := initServer() go runServer(server) shutdownOnOSSignal(server) } diff --git a/internal/api/api.go b/internal/api/api.go index ea68b83..4ea0501 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -2,7 +2,6 @@ package api import ( "github.com/gorilla/mux" - influxdb2API "github.com/influxdata/influxdb-client-go/v2/api" "github.com/openHPI/poseidon/internal/api/auth" "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/environment" @@ -29,14 +28,13 @@ const ( // always returns a router for the newest version of our API. We // use gorilla/mux because it is more convenient than net/http, e.g. // when extracting path parameters. -func NewRouter(runnerManager runner.Manager, environmentManager environment.ManagerHandler, - influxClient influxdb2API.WriteAPI) *mux.Router { +func NewRouter(runnerManager runner.Manager, environmentManager environment.ManagerHandler) *mux.Router { router := mux.NewRouter() // this can later be restricted to a specific host with // `router.Host(...)` and to HTTPS with `router.Schemes("https")` configureV1Router(router, runnerManager, environmentManager) router.Use(logging.HTTPLoggingMiddleware) - router.Use(monitoring.InfluxDB2Middleware(influxClient, environmentManager)) + router.Use(monitoring.InfluxDB2Middleware) return router } diff --git a/internal/api/environments_test.go b/internal/api/environments_test.go index 1d71814..6babb4a 100644 --- a/internal/api/environments_test.go +++ b/internal/api/environments_test.go @@ -31,7 +31,7 @@ func TestEnvironmentControllerTestSuite(t *testing.T) { func (s *EnvironmentControllerTestSuite) SetupTest() { s.manager = &environment.ManagerHandlerMock{} - s.router = NewRouter(nil, s.manager, nil) + s.router = NewRouter(nil, s.manager) } func (s *EnvironmentControllerTestSuite) TestList() { diff --git a/internal/api/runners.go b/internal/api/runners.go index ae3fc23..9e8a4c8 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -68,7 +68,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req } return } - monitoring.AddRunnerMonitoringData(request, nextRunner) + monitoring.AddRunnerMonitoringData(request, nextRunner.ID(), nextRunner.Environment()) sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK) } @@ -82,7 +82,7 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request } targetRunner, _ := runner.FromContext(request.Context()) - monitoring.AddRunnerMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment()) if err := targetRunner.UpdateFileSystem(fileCopyRequest); err != nil { log.WithError(err).Error("Could not perform the requested updateFileSystem.") writeInternalServerError(writer, err, dto.ErrorUnknown) @@ -108,7 +108,7 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req scheme = "ws" } targetRunner, _ := runner.FromContext(request.Context()) - monitoring.AddRunnerMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment()) path, err := r.runnerRouter.Get(WebsocketPath).URL(RunnerIDKey, targetRunner.ID()) if err != nil { @@ -160,7 +160,7 @@ func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler // It destroys the given runner on the executor and removes it from the used runners list. func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Request) { targetRunner, _ := runner.FromContext(request.Context()) - monitoring.AddRunnerMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment()) err := r.manager.Return(targetRunner) if err != nil { diff --git a/internal/api/runners_test.go b/internal/api/runners_test.go index 6891ca8..005f82a 100644 --- a/internal/api/runners_test.go +++ b/internal/api/runners_test.go @@ -107,7 +107,7 @@ type RunnerRouteTestSuite struct { func (s *RunnerRouteTestSuite) SetupTest() { s.runnerManager = &runner.ManagerMock{} - s.router = NewRouter(s.runnerManager, nil, nil) + s.router = NewRouter(s.runnerManager, nil) s.runner = runner.NewNomadJob("some-id", nil, nil, nil) s.executionID = "execution" s.runner.StoreExecution(s.executionID, &dto.ExecutionRequest{}) diff --git a/internal/api/websocket.go b/internal/api/websocket.go index 1370c00..e073bc5 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -76,7 +76,7 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti // connectToRunner is the endpoint for websocket connections. func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) { targetRunner, _ := runner.FromContext(request.Context()) - monitoring.AddRunnerMonitoringData(request, targetRunner) + monitoring.AddRunnerMonitoringData(request, targetRunner.ID(), targetRunner.Environment()) executionID := request.URL.Query().Get(ExecutionIDKey) if !targetRunner.ExecutionExists(executionID) { diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 186accf..98e17c5 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -51,7 +51,7 @@ func (s *WebSocketTestSuite) SetupTest() { runnerManager := &runner.ManagerMock{} runnerManager.On("Get", s.runner.ID()).Return(s.runner, nil) - s.router = NewRouter(runnerManager, nil, nil) + s.router = NewRouter(runnerManager, nil) s.server = httptest.NewServer(s.router) } @@ -257,7 +257,7 @@ func TestWebsocketTLS(t *testing.T) { runnerManager := &runner.ManagerMock{} runnerManager.On("Get", r.ID()).Return(r, nil) - router := NewRouter(runnerManager, nil, nil) + router := NewRouter(runnerManager, nil) server, err := helpers.StartTLSServer(t, router) require.NoError(t, err) @@ -327,7 +327,7 @@ func newRunnerWithNotMockedRunnerManager(t *testing.T, apiMock *nomad.ExecutorAP call.ReturnArguments = mock.Arguments{nil} }) runnerManager := runner.NewNomadRunnerManager(apiMock, context.Background()) - router := NewRouter(runnerManager, nil, nil) + router := NewRouter(runnerManager, nil) server := httptest.NewServer(router) runnerID := tests.DefaultRunnerID diff --git a/internal/environment/aws_environment.go b/internal/environment/aws_environment.go index 8c68630..5f742be 100644 --- a/internal/environment/aws_environment.go +++ b/internal/environment/aws_environment.go @@ -87,7 +87,7 @@ func (a *AWSEnvironment) CPULimit() uint { func (a *AWSEnvironment) SetCPULimit(_ uint) {} func (a *AWSEnvironment) MemoryLimit() uint { - panic("not supported") + return 0 } func (a *AWSEnvironment) SetMemoryLimit(_ uint) { @@ -95,7 +95,7 @@ func (a *AWSEnvironment) SetMemoryLimit(_ uint) { } func (a *AWSEnvironment) NetworkAccess() (enabled bool, mappedPorts []uint16) { - panic("not supported") + return false, nil } func (a *AWSEnvironment) SetNetworkAccess(_ bool, _ []uint16) { diff --git a/internal/environment/nomad_environment.go b/internal/environment/nomad_environment.go index 6e9cc54..68e4575 100644 --- a/internal/environment/nomad_environment.go +++ b/internal/environment/nomad_environment.go @@ -11,6 +11,7 @@ import ( "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "strconv" "sync" @@ -35,7 +36,8 @@ func NewNomadEnvironment(apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvi return nil, fmt.Errorf("error parsing Nomad job: %w", err) } - return &NomadEnvironment{apiClient, jobHCL, job, storage.NewLocalStorage[runner.Runner]()}, nil + return &NomadEnvironment{apiClient, jobHCL, job, + storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad, nil)}, nil } func NewNomadEnvironmentFromRequest( @@ -80,6 +82,7 @@ func (n *NomadEnvironment) PrewarmingPoolSize() uint { } func (n *NomadEnvironment) SetPrewarmingPoolSize(count uint) { + monitoring.ChangedPrewarmingPoolSize(n.ID(), count) taskGroup := nomad.FindAndValidateConfigTaskGroup(n.job) if taskGroup.Meta == nil { diff --git a/internal/environment/nomad_manager.go b/internal/environment/nomad_manager.go index ee089ce..8fc6895 100644 --- a/internal/environment/nomad_manager.go +++ b/internal/environment/nomad_manager.go @@ -8,6 +8,7 @@ import ( "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "os" ) @@ -142,7 +143,7 @@ func (m *NomadEnvironmentManager) Load() error { apiClient: m.api, jobHCL: templateEnvironmentJobHCL, job: job, - idleRunners: storage.NewLocalStorage[runner.Runner](), + idleRunners: storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad, nil), } m.runnerManager.StoreEnvironment(environment) jobLogger.Info("Successfully recovered environment") @@ -181,7 +182,7 @@ func fetchEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI) (runner apiClient: apiClient, jobHCL: templateEnvironmentJobHCL, job: job, - idleRunners: storage.NewLocalStorage[runner.Runner](), + idleRunners: storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad, nil), } } } diff --git a/internal/runner/abstract_manager.go b/internal/runner/abstract_manager.go index 09dbcf4..dd29282 100644 --- a/internal/runner/abstract_manager.go +++ b/internal/runner/abstract_manager.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" ) @@ -21,8 +22,9 @@ type AbstractManager struct { // NewAbstractManager creates a new abstract runner manager that keeps track of all runners of one kind. func NewAbstractManager() *AbstractManager { return &AbstractManager{ - environments: storage.NewLocalStorage[ExecutionEnvironment](), - usedRunners: storage.NewLocalStorage[Runner](), + environments: storage.NewMonitoredLocalStorage[ExecutionEnvironment]( + monitoring.MeasurementEnvironments, monitorEnvironmentData), + usedRunners: storage.NewMonitoredLocalStorage[Runner](monitoring.MeasurementUsedRunner, nil), } } diff --git a/internal/runner/aws_manager_test.go b/internal/runner/aws_manager_test.go index 1e8db04..389a94e 100644 --- a/internal/runner/aws_manager_test.go +++ b/internal/runner/aws_manager_test.go @@ -14,8 +14,7 @@ func TestAWSRunnerManager_EnvironmentAccessor(t *testing.T) { environments := m.ListEnvironments() assert.Empty(t, environments) - environment := &ExecutionEnvironmentMock{} - environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) + environment := createBasicEnvironmentMock(defaultEnvironmentID) m.StoreEnvironment(environment) environments = m.ListEnvironments() @@ -32,8 +31,7 @@ func TestAWSRunnerManager_EnvironmentAccessor(t *testing.T) { func TestAWSRunnerManager_Claim(t *testing.T) { m := NewAWSRunnerManager() - environment := &ExecutionEnvironmentMock{} - environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) + environment := createBasicEnvironmentMock(defaultEnvironmentID) r, err := NewAWSFunctionWorkload(environment, nil) assert.NoError(t, err) environment.On("Sample").Return(r, true) @@ -59,8 +57,7 @@ func TestAWSRunnerManager_Claim(t *testing.T) { func TestAWSRunnerManager_Return(t *testing.T) { m := NewAWSRunnerManager() - environment := &ExecutionEnvironmentMock{} - environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) + environment := createBasicEnvironmentMock(defaultEnvironmentID) m.StoreEnvironment(environment) r, err := NewAWSFunctionWorkload(environment, nil) assert.NoError(t, err) @@ -85,3 +82,13 @@ func TestAWSRunnerManager_Return(t *testing.T) { nextHandler.AssertCalled(t, "Return", nonAWSRunner) }) } + +func createBasicEnvironmentMock(id dto.EnvironmentID) *ExecutionEnvironmentMock { + environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(id) + environment.On("Image").Return("") + environment.On("CPULimit").Return(uint(0)) + environment.On("MemoryLimit").Return(uint(0)) + environment.On("NetworkAccess").Return(false, nil) + return environment +} diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index 22e395c..14dfda4 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -10,6 +10,7 @@ import ( "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/execution" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "io" ) @@ -47,7 +48,7 @@ func NewAWSFunctionWorkload( workload := &AWSFunctionWorkload{ id: newUUID.String(), fs: make(map[dto.FilePath][]byte), - executions: storage.NewLocalStorage[*dto.ExecutionRequest](), + executions: storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](monitoring.MeasurementExecutionsAWS, nil), runningExecutions: make(map[execution.ID]context.CancelFunc), onDestroy: onDestroy, environment: environment, diff --git a/internal/runner/execution_environment.go b/internal/runner/execution_environment.go index 6bf33a0..b9a8bae 100644 --- a/internal/runner/execution_environment.go +++ b/internal/runner/execution_environment.go @@ -2,7 +2,9 @@ package runner import ( "encoding/json" + "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openHPI/poseidon/pkg/dto" + "strconv" ) // ExecutionEnvironment are groups of runner that share the configuration stored in the environment. @@ -47,3 +49,14 @@ type ExecutionEnvironment interface { // IdleRunnerCount returns the number of idle runners of the environment. IdleRunnerCount() uint } + +// monitorEnvironmentData passes the configuration of the environment e into the monitoring Point p. +func monitorEnvironmentData(p *write.Point, e ExecutionEnvironment, isDeletion bool) { + if !isDeletion && e != nil { + p.AddTag("image", e.Image()) + p.AddTag("cpu_limit", strconv.Itoa(int(e.CPULimit()))) + p.AddTag("memory_limit", strconv.Itoa(int(e.MemoryLimit()))) + hasNetworkAccess, _ := e.NetworkAccess() + p.AddTag("network_access", strconv.FormatBool(hasNetworkAccess)) + } +} diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 61239ff..0f5079f 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -36,8 +36,8 @@ func (s *ManagerTestSuite) SetupTest() { s.nomadRunnerManager = NewNomadRunnerManager(s.apiMock, ctx) s.exerciseRunner = NewRunner(tests.DefaultRunnerID, s.nomadRunnerManager) - s.exerciseEnvironment = &ExecutionEnvironmentMock{} - s.setDefaultEnvironment() + s.exerciseEnvironment = createBasicEnvironmentMock(defaultEnvironmentID) + s.nomadRunnerManager.StoreEnvironment(s.exerciseEnvironment) } func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []string) { @@ -81,18 +81,12 @@ func mockIdleRunners(environmentMock *ExecutionEnvironmentMock) { }) } -func (s *ManagerTestSuite) setDefaultEnvironment() { - s.exerciseEnvironment.On("ID").Return(defaultEnvironmentID) - s.nomadRunnerManager.StoreEnvironment(s.exerciseEnvironment) -} - func (s *ManagerTestSuite) waitForRunnerRefresh() { <-time.After(100 * time.Millisecond) } func (s *ManagerTestSuite) TestSetEnvironmentAddsNewEnvironment() { - anotherEnvironment := &ExecutionEnvironmentMock{} - anotherEnvironment.On("ID").Return(anotherEnvironmentID) + anotherEnvironment := createBasicEnvironmentMock(anotherEnvironmentID) s.nomadRunnerManager.StoreEnvironment(anotherEnvironment) job, ok := s.nomadRunnerManager.environments.Get(anotherEnvironmentID.ToString()) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 3f2c015..816445b 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -11,18 +11,16 @@ import ( nomadApi "github.com/hashicorp/nomad/api" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "io" "strings" "time" ) -// ContextKey is the type for keys in a request context. -type ContextKey string - const ( // runnerContextKey is the key used to store runners in context.Context. - runnerContextKey ContextKey = "runner" + runnerContextKey dto.ContextKey = "runner" // SIGQUIT is the character that causes a tty to send the SIGQUIT signal to the controlled process. SIGQUIT = 0x1c // executionTimeoutGracePeriod is the time to wait after sending a SIGQUIT signal to a timed out execution. @@ -55,7 +53,7 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping, id: id, portMappings: portMappings, api: apiClient, - executions: storage.NewLocalStorage[*dto.ExecutionRequest](), + executions: storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](monitoring.MeasurementExecutionsNomad, nil), onDestroy: onDestroy, } job.InactivityTimer = NewInactivityTimer(job, onDestroy) diff --git a/pkg/dto/dto.go b/pkg/dto/dto.go index d3371c2..b0521c6 100644 --- a/pkg/dto/dto.go +++ b/pkg/dto/dto.go @@ -136,6 +136,9 @@ func (f File) ByteContent() []byte { } } +// ContextKey is the type for keys in a request context that is used for passing data to the next handler. +type ContextKey string + // WebSocketMessageType is the type for the messages from Poseidon to the client. type WebSocketMessageType string diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index 9ee5af2..34e38f5 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -8,69 +8,77 @@ import ( influxdb2API "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openHPI/poseidon/internal/config" - "github.com/openHPI/poseidon/internal/environment" - "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" "io" "net/http" - "reflect" "strconv" "time" ) -var log = logging.GetLogger("monitoring") - const ( - // influxdbContextKey is a key to reference the influxdb data point in the request context. - influxdbContextKey runner.ContextKey = "influxdb data point" - // influxdbMeasurementPrefix allows easier filtering in influxdb. - influxdbMeasurementPrefix = "poseidon_" + // influxdbContextKey is a key (runner.ContextKey) to reference the influxdb data point in the request context. + influxdbContextKey dto.ContextKey = "influxdb data point" + // measurementPrefix allows easier filtering in influxdb. + measurementPrefix = "poseidon_" + measurementPoolSize = measurementPrefix + "poolsize" + MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners" + MeasurementExecutionsAWS = measurementPrefix + "aws_executions" + MeasurementExecutionsNomad = measurementPrefix + "nomad_executions" + MeasurementEnvironments = measurementPrefix + "environments" + MeasurementUsedRunner = measurementPrefix + "used_runners" // The keys for the monitored tags and fields. influxKeyRunnerID = "runner_id" influxKeyEnvironmentID = "environment_id" - influxKeyEnvironmentType = "environment_type" - influxKeyEnvironmentIdleRunner = "idle_runner" influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size" influxKeyRequestSize = "request_size" ) -// InfluxDB2Middleware is a middleware to send events to an influx database. -func InfluxDB2Middleware(influxClient influxdb2API.WriteAPI, manager environment.Manager) mux.MiddlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - route := mux.CurrentRoute(r).GetName() - p := influxdb2.NewPointWithMeasurement(influxdbMeasurementPrefix + route) - p.AddTag("stage", config.Config.InfluxDB.Stage) +var ( + log = logging.GetLogger("monitoring") + influxClient influxdb2API.WriteAPI +) - start := time.Now().UTC() - p.SetTime(time.Now()) - - ctx := context.WithValue(r.Context(), influxdbContextKey, p) - requestWithPoint := r.WithContext(ctx) - lrw := logging.NewLoggingResponseWriter(w) - next.ServeHTTP(lrw, requestWithPoint) - - p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds()) - p.AddTag("status", strconv.Itoa(lrw.StatusCode)) - - environmentID, err := strconv.Atoi(getEnvironmentID(p)) - if err == nil && manager != nil { - addEnvironmentData(p, manager, dto.EnvironmentID(environmentID)) - } - - if influxClient != nil { - influxClient.WritePoint(p) - } - }) +func InitializeInfluxDB(db *config.InfluxDB) (cancel func()) { + if db.URL == "" { + return func() {} } + + client := influxdb2.NewClient(db.URL, db.Token) + influxClient = client.WriteAPI(db.Organization, db.Bucket) + cancel = func() { + influxClient.Flush() + client.Close() + } + return cancel +} + +// InfluxDB2Middleware is a middleware to send events to an influx database. +func InfluxDB2Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + route := mux.CurrentRoute(r).GetName() + p := influxdb2.NewPointWithMeasurement(measurementPrefix + route) + + start := time.Now().UTC() + p.SetTime(time.Now()) + + ctx := context.WithValue(r.Context(), influxdbContextKey, p) + requestWithPoint := r.WithContext(ctx) + lrw := logging.NewLoggingResponseWriter(w) + next.ServeHTTP(lrw, requestWithPoint) + + p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds()) + p.AddTag("status", strconv.Itoa(lrw.StatusCode)) + + WriteInfluxPoint(p) + }) } // AddRunnerMonitoringData adds the data of the runner we want to monitor. -func AddRunnerMonitoringData(request *http.Request, r runner.Runner) { - addRunnerID(request, r.ID()) - addEnvironmentID(request, r.Environment()) +func AddRunnerMonitoringData(request *http.Request, runnerID string, environmentID dto.EnvironmentID) { + addRunnerID(request, runnerID) + addEnvironmentID(request, environmentID) } // addRunnerID adds the runner id to the influx data point for the current request. @@ -99,6 +107,23 @@ func AddRequestSize(r *http.Request) { addInfluxDBField(r, influxKeyRequestSize, len(body)) } +func ChangedPrewarmingPoolSize(id dto.EnvironmentID, count uint) { + p := influxdb2.NewPointWithMeasurement(measurementPoolSize) + + p.AddTag(influxKeyEnvironmentID, strconv.Itoa(int(id))) + p.AddField(influxKeyEnvironmentPrewarmingPoolSize, count) + + WriteInfluxPoint(p) +} + +// WriteInfluxPoint schedules the indlux data point to be sent. +func WriteInfluxPoint(p *write.Point) { + if influxClient != nil { + p.AddTag("stage", config.Config.InfluxDB.Stage) + influxClient.WritePoint(p) + } +} + // addInfluxDBTag adds a tag to the influxdb data point in the request. func addInfluxDBTag(r *http.Request, key, value string) { dataPointFromRequest(r).AddTag(key, value) @@ -117,32 +142,3 @@ func dataPointFromRequest(r *http.Request) *write.Point { } return p } - -// getEnvironmentID tries to find the environment id in the influxdb data point. -func getEnvironmentID(p *write.Point) string { - for _, tag := range p.TagList() { - if tag.Key == influxKeyEnvironmentID { - return tag.Value - } - } - return "" -} - -// addEnvironmentData adds environment specific data to the influxdb data point. -func addEnvironmentData(p *write.Point, manager environment.Manager, id dto.EnvironmentID) { - e, err := manager.Get(id, false) - if err == nil { - p.AddTag(influxKeyEnvironmentType, getType(e)) - p.AddField(influxKeyEnvironmentIdleRunner, e.IdleRunnerCount()) - p.AddField(influxKeyEnvironmentPrewarmingPoolSize, e.PrewarmingPoolSize()) - } -} - -// Get type returns the type of the passed execution environment. -func getType(e runner.ExecutionEnvironment) string { - if t := reflect.TypeOf(e); t.Kind() == reflect.Ptr { - return t.Elem().Name() - } else { - return t.Name() - } -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index bb4e0b7..d12c644 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,6 +1,10 @@ package storage import ( + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/openHPI/poseidon/pkg/monitoring" + "strconv" "sync" ) @@ -36,10 +40,14 @@ type Storage[T any] interface { Sample() (o T, ok bool) } +type WriteCallback[T any] func(p *write.Point, object T, isDeletion bool) + // localStorage stores objects in the local application memory. type localStorage[T any] struct { sync.RWMutex - objects map[string]T + objects map[string]T + measurement string + callback WriteCallback[T] } // NewLocalStorage responds with a Storage implementation. @@ -50,6 +58,17 @@ func NewLocalStorage[T any]() *localStorage[T] { } } +// NewMonitoredLocalStorage responds with a Storage implementation. +// All write operations are monitored in the passed measurement. +// Iff callback is set, it will be called on a write operation. +func NewMonitoredLocalStorage[T any](measurement string, callback WriteCallback[T]) *localStorage[T] { + return &localStorage[T]{ + objects: make(map[string]T), + measurement: measurement, + callback: callback, + } +} + func (s *localStorage[T]) List() (o []T) { s.RLock() defer s.RUnlock() @@ -63,6 +82,7 @@ func (s *localStorage[T]) Add(id string, o T) { s.Lock() defer s.Unlock() s.objects[id] = o + s.sendMonitoringData(id, o, false) } func (s *localStorage[T]) Get(id string) (o T, ok bool) { @@ -75,6 +95,7 @@ func (s *localStorage[T]) Get(id string) (o T, ok bool) { func (s *localStorage[T]) Delete(id string) { s.Lock() defer s.Unlock() + s.sendMonitoringData(id, s.objects[id], true) delete(s.objects, id) } @@ -87,6 +108,9 @@ func (s *localStorage[T]) Pop(id string) (T, bool) { func (s *localStorage[T]) Purge() { s.Lock() defer s.Unlock() + for key, object := range s.objects { + s.sendMonitoringData(key, object, true) + } s.objects = make(map[string]T) } @@ -94,6 +118,7 @@ func (s *localStorage[T]) Sample() (o T, ok bool) { s.Lock() defer s.Unlock() for key, object := range s.objects { + s.sendMonitoringData(key, object, true) delete(s.objects, key) return object, true } @@ -105,3 +130,18 @@ func (s *localStorage[T]) Length() uint { defer s.RUnlock() return uint(len(s.objects)) } + +func (s *localStorage[T]) sendMonitoringData(id string, o T, isDeletion bool) { + if s.measurement != "" { + p := influxdb2.NewPointWithMeasurement(s.measurement) + p.AddTag("id", id) + p.AddTag("isDeletion", strconv.FormatBool(isDeletion)) + p.AddField("count", 1) + + if s.callback != nil { + s.callback(p, o, isDeletion) + } + + monitoring.WriteInfluxPoint(p) + } +}