Save the runner and environment id for executions monitoring.

This commit is contained in:
Maximilian Paß
2022-07-17 13:50:47 +02:00
parent 00f3d9b77b
commit 49c7a2d405
5 changed files with 41 additions and 18 deletions

View File

@ -48,11 +48,12 @@ func NewAWSFunctionWorkload(
workload := &AWSFunctionWorkload{
id: newUUID.String(),
fs: make(map[dto.FilePath][]byte),
executions: storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](monitoring.MeasurementExecutionsAWS, nil),
runningExecutions: make(map[execution.ID]context.CancelFunc),
onDestroy: onDestroy,
environment: environment,
}
workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id))
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
return workload.Destroy()
})

View File

@ -18,16 +18,18 @@ import (
)
func TestAWSExecutionRequestIsStored(t *testing.T) {
r, err := NewAWSFunctionWorkload(nil, nil)
environment := &ExecutionEnvironmentMock{}
environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger))
r, err := NewAWSFunctionWorkload(environment, nil)
assert.NoError(t, err)
executionRequest := &dto.ExecutionRequest{
Command: "command",
TimeLimit: 10,
Environment: nil,
}
r.StoreExecution(defaultExecutionID, executionRequest)
assert.True(t, r.ExecutionExists(defaultExecutionID))
storedExecutionRunner, ok := r.executions.Pop(defaultExecutionID)
r.StoreExecution(tests.DefaultEnvironmentIDAsString, executionRequest)
assert.True(t, r.ExecutionExists(tests.DefaultEnvironmentIDAsString))
storedExecutionRunner, ok := r.executions.Pop(tests.DefaultEnvironmentIDAsString)
assert.True(t, ok, "Getting an execution should not return ok false")
assert.Equal(t, executionRequest, storedExecutionRunner)
}
@ -58,6 +60,7 @@ func (a *awsEndpointMock) handler(w http.ResponseWriter, r *http.Request) {
func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) {
environment := &ExecutionEnvironmentMock{}
environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger))
environment.On("Image").Return("testImage or AWS endpoint")
r, err := NewAWSFunctionWorkload(environment, nil)
require.NoError(t, err)
@ -72,8 +75,8 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) {
awsMock.ctx, cancel = context.WithCancel(context.Background())
cancel()
r.StoreExecution(defaultExecutionID, &dto.ExecutionRequest{})
exit, _, err := r.ExecuteInteractively(defaultExecutionID, nil, io.Discard, io.Discard)
r.StoreExecution(tests.DefaultEnvironmentIDAsString, &dto.ExecutionRequest{})
exit, _, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard)
require.NoError(t, err)
<-exit
assert.True(t, awsMock.hasConnected)
@ -84,9 +87,9 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) {
defer cancel()
command := "sl"
request := &dto.ExecutionRequest{Command: command}
r.StoreExecution(defaultExecutionID, request)
r.StoreExecution(tests.DefaultEnvironmentIDAsString, request)
_, cancel, err := r.ExecuteInteractively(defaultExecutionID, nil, io.Discard, io.Discard)
_, cancel, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard)
require.NoError(t, err)
<-time.After(tests.ShortTimeout)
cancel()
@ -100,6 +103,7 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) {
func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) {
environment := &ExecutionEnvironmentMock{}
environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger))
environment.On("Image").Return("testImage or AWS endpoint")
r, err := NewAWSFunctionWorkload(environment, nil)
require.NoError(t, err)
@ -114,12 +118,12 @@ func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) {
defer cancel()
command := "sl"
request := &dto.ExecutionRequest{Command: command}
r.StoreExecution(defaultExecutionID, request)
r.StoreExecution(tests.DefaultEnvironmentIDAsString, request)
myFile := dto.File{Path: "myPath", Content: []byte("myContent")}
err = r.UpdateFileSystem(&dto.UpdateFileSystemRequest{Copy: []dto.File{myFile}})
assert.NoError(t, err)
_, execCancel, err := r.ExecuteInteractively(defaultExecutionID, nil, io.Discard, io.Discard)
_, execCancel, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard)
require.NoError(t, err)
<-time.After(tests.ShortTimeout)
execCancel()
@ -131,8 +135,10 @@ func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) {
}
func TestAWSFunctionWorkload_Destroy(t *testing.T) {
environment := &ExecutionEnvironmentMock{}
environment.On("ID").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger))
hasDestroyBeenCalled := false
r, err := NewAWSFunctionWorkload(nil, func(_ Runner) error {
r, err := NewAWSFunctionWorkload(environment, func(_ Runner) error {
hasDestroyBeenCalled = true
return nil
})

View File

@ -53,9 +53,10 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
id: id,
portMappings: portMappings,
api: apiClient,
executions: storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](monitoring.MeasurementExecutionsNomad, nil),
onDestroy: onDestroy,
}
job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id))
job.InactivityTimer = NewInactivityTimer(job, onDestroy)
return job
}

View File

@ -2,8 +2,12 @@ package runner
import (
"context"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage"
"io"
"strconv"
)
type ExitInfo struct {
@ -59,3 +63,13 @@ func FromContext(ctx context.Context) (Runner, bool) {
runner, ok := ctx.Value(runnerContextKey).(Runner)
return runner, ok
}
// monitorExecutionsRunnerID passes the id of the runner executing the execution into the monitoring Point p.
func monitorExecutionsRunnerID(env dto.EnvironmentID, runnerID string) storage.WriteCallback[*dto.ExecutionRequest] {
return func(p *write.Point, e *dto.ExecutionRequest, isDeletion bool) {
if !isDeletion && e != nil {
p.AddTag(monitoring.InfluxKeyRunnerID, runnerID)
p.AddTag(monitoring.InfluxKeyEnvironmentID, strconv.Itoa(int(env)))
}
}
}

View File

@ -30,8 +30,9 @@ const (
MeasurementUsedRunner = measurementPrefix + "used_runners"
// The keys for the monitored tags and fields.
influxKeyRunnerID = "runner_id"
influxKeyEnvironmentID = "environment_id"
InfluxKeyRunnerID = "runner_id"
InfluxKeyEnvironmentID = "environment_id"
influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size"
influxKeyRequestSize = "request_size"
)
@ -90,12 +91,12 @@ func AddRunnerMonitoringData(request *http.Request, runnerID string, environment
// addRunnerID adds the runner id to the influx data point for the current request.
func addRunnerID(r *http.Request, id string) {
addInfluxDBTag(r, influxKeyRunnerID, id)
addInfluxDBTag(r, InfluxKeyRunnerID, id)
}
// addEnvironmentID adds the environment id to the influx data point for the current request.
func addEnvironmentID(r *http.Request, id dto.EnvironmentID) {
addInfluxDBTag(r, influxKeyEnvironmentID, strconv.Itoa(int(id)))
addInfluxDBTag(r, InfluxKeyEnvironmentID, strconv.Itoa(int(id)))
}
// AddRequestSize adds the size of the request body to the influx data point for the current request.
@ -117,7 +118,7 @@ func AddRequestSize(r *http.Request) {
func ChangedPrewarmingPoolSize(id dto.EnvironmentID, count uint) {
p := influxdb2.NewPointWithMeasurement(measurementPoolSize)
p.AddTag(influxKeyEnvironmentID, strconv.Itoa(int(id)))
p.AddTag(InfluxKeyEnvironmentID, strconv.Itoa(int(id)))
p.AddField(influxKeyEnvironmentPrewarmingPoolSize, count)
WriteInfluxPoint(p)