From 49c7a2d4058c30339d0fd0625ee638e5d43d7b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Sun, 17 Jul 2022 13:50:47 +0200 Subject: [PATCH] Save the runner and environment id for executions monitoring. --- internal/runner/aws_runner.go | 3 ++- internal/runner/aws_runner_test.go | 28 ++++++++++++++++---------- internal/runner/nomad_runner.go | 3 ++- internal/runner/runner.go | 14 +++++++++++++ pkg/monitoring/influxdb2_middleware.go | 11 +++++----- 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index 14dfda4..39a2d89 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -48,11 +48,12 @@ func NewAWSFunctionWorkload( workload := &AWSFunctionWorkload{ id: newUUID.String(), fs: make(map[dto.FilePath][]byte), - executions: storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](monitoring.MeasurementExecutionsAWS, nil), runningExecutions: make(map[execution.ID]context.CancelFunc), onDestroy: onDestroy, environment: environment, } + workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( + monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id)) workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error { return workload.Destroy() }) diff --git a/internal/runner/aws_runner_test.go b/internal/runner/aws_runner_test.go index 09955e7..9b04f52 100644 --- a/internal/runner/aws_runner_test.go +++ b/internal/runner/aws_runner_test.go @@ -18,16 +18,18 @@ import ( ) func TestAWSExecutionRequestIsStored(t *testing.T) { - r, err := NewAWSFunctionWorkload(nil, nil) + environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) + r, err := NewAWSFunctionWorkload(environment, nil) assert.NoError(t, err) executionRequest := &dto.ExecutionRequest{ Command: "command", TimeLimit: 10, Environment: nil, } - r.StoreExecution(defaultExecutionID, executionRequest) - assert.True(t, r.ExecutionExists(defaultExecutionID)) - storedExecutionRunner, ok := r.executions.Pop(defaultExecutionID) + r.StoreExecution(tests.DefaultEnvironmentIDAsString, executionRequest) + assert.True(t, r.ExecutionExists(tests.DefaultEnvironmentIDAsString)) + storedExecutionRunner, ok := r.executions.Pop(tests.DefaultEnvironmentIDAsString) assert.True(t, ok, "Getting an execution should not return ok false") assert.Equal(t, executionRequest, storedExecutionRunner) } @@ -58,6 +60,7 @@ func (a *awsEndpointMock) handler(w http.ResponseWriter, r *http.Request) { func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) { environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) environment.On("Image").Return("testImage or AWS endpoint") r, err := NewAWSFunctionWorkload(environment, nil) require.NoError(t, err) @@ -72,8 +75,8 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) { awsMock.ctx, cancel = context.WithCancel(context.Background()) cancel() - r.StoreExecution(defaultExecutionID, &dto.ExecutionRequest{}) - exit, _, err := r.ExecuteInteractively(defaultExecutionID, nil, io.Discard, io.Discard) + r.StoreExecution(tests.DefaultEnvironmentIDAsString, &dto.ExecutionRequest{}) + exit, _, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard) require.NoError(t, err) <-exit assert.True(t, awsMock.hasConnected) @@ -84,9 +87,9 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) { defer cancel() command := "sl" request := &dto.ExecutionRequest{Command: command} - r.StoreExecution(defaultExecutionID, request) + r.StoreExecution(tests.DefaultEnvironmentIDAsString, request) - _, cancel, err := r.ExecuteInteractively(defaultExecutionID, nil, io.Discard, io.Discard) + _, cancel, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard) require.NoError(t, err) <-time.After(tests.ShortTimeout) cancel() @@ -100,6 +103,7 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) { func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) { environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) environment.On("Image").Return("testImage or AWS endpoint") r, err := NewAWSFunctionWorkload(environment, nil) require.NoError(t, err) @@ -114,12 +118,12 @@ func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) { defer cancel() command := "sl" request := &dto.ExecutionRequest{Command: command} - r.StoreExecution(defaultExecutionID, request) + r.StoreExecution(tests.DefaultEnvironmentIDAsString, request) myFile := dto.File{Path: "myPath", Content: []byte("myContent")} err = r.UpdateFileSystem(&dto.UpdateFileSystemRequest{Copy: []dto.File{myFile}}) assert.NoError(t, err) - _, execCancel, err := r.ExecuteInteractively(defaultExecutionID, nil, io.Discard, io.Discard) + _, execCancel, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard) require.NoError(t, err) <-time.After(tests.ShortTimeout) execCancel() @@ -131,8 +135,10 @@ func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) { } func TestAWSFunctionWorkload_Destroy(t *testing.T) { + environment := &ExecutionEnvironmentMock{} + environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)) hasDestroyBeenCalled := false - r, err := NewAWSFunctionWorkload(nil, func(_ Runner) error { + r, err := NewAWSFunctionWorkload(environment, func(_ Runner) error { hasDestroyBeenCalled = true return nil }) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 816445b..b708e08 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -53,9 +53,10 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping, id: id, portMappings: portMappings, api: apiClient, - executions: storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](monitoring.MeasurementExecutionsNomad, nil), onDestroy: onDestroy, } + job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( + monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id)) job.InactivityTimer = NewInactivityTimer(job, onDestroy) return job } diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 066b784..4bb58a7 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -2,8 +2,12 @@ package runner import ( "context" + "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/monitoring" + "github.com/openHPI/poseidon/pkg/storage" "io" + "strconv" ) type ExitInfo struct { @@ -59,3 +63,13 @@ func FromContext(ctx context.Context) (Runner, bool) { runner, ok := ctx.Value(runnerContextKey).(Runner) return runner, ok } + +// monitorExecutionsRunnerID passes the id of the runner executing the execution into the monitoring Point p. +func monitorExecutionsRunnerID(env dto.EnvironmentID, runnerID string) storage.WriteCallback[*dto.ExecutionRequest] { + return func(p *write.Point, e *dto.ExecutionRequest, isDeletion bool) { + if !isDeletion && e != nil { + p.AddTag(monitoring.InfluxKeyRunnerID, runnerID) + p.AddTag(monitoring.InfluxKeyEnvironmentID, strconv.Itoa(int(env))) + } + } +} diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index 804d11f..c57bade 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -30,8 +30,9 @@ const ( MeasurementUsedRunner = measurementPrefix + "used_runners" // The keys for the monitored tags and fields. - influxKeyRunnerID = "runner_id" - influxKeyEnvironmentID = "environment_id" + + InfluxKeyRunnerID = "runner_id" + InfluxKeyEnvironmentID = "environment_id" influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size" influxKeyRequestSize = "request_size" ) @@ -90,12 +91,12 @@ func AddRunnerMonitoringData(request *http.Request, runnerID string, environment // addRunnerID adds the runner id to the influx data point for the current request. func addRunnerID(r *http.Request, id string) { - addInfluxDBTag(r, influxKeyRunnerID, id) + addInfluxDBTag(r, InfluxKeyRunnerID, id) } // addEnvironmentID adds the environment id to the influx data point for the current request. func addEnvironmentID(r *http.Request, id dto.EnvironmentID) { - addInfluxDBTag(r, influxKeyEnvironmentID, strconv.Itoa(int(id))) + addInfluxDBTag(r, InfluxKeyEnvironmentID, strconv.Itoa(int(id))) } // AddRequestSize adds the size of the request body to the influx data point for the current request. @@ -117,7 +118,7 @@ func AddRequestSize(r *http.Request) { func ChangedPrewarmingPoolSize(id dto.EnvironmentID, count uint) { p := influxdb2.NewPointWithMeasurement(measurementPoolSize) - p.AddTag(influxKeyEnvironmentID, strconv.Itoa(int(id))) + p.AddTag(InfluxKeyEnvironmentID, strconv.Itoa(int(id))) p.AddField(influxKeyEnvironmentPrewarmingPoolSize, count) WriteInfluxPoint(p)