From 5590c50e14ae8b4c560bfa421fbfe061adee30b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Wed, 17 Aug 2022 12:45:31 +0200 Subject: [PATCH] #110 Add periodical monitoring events. --- internal/environment/nomad_environment.go | 3 +- internal/environment/nomad_manager.go | 3 +- internal/runner/abstract_manager.go | 8 ++-- internal/runner/aws_runner.go | 3 +- internal/runner/execution_environment.go | 5 ++- internal/runner/nomad_runner.go | 2 +- internal/runner/runner.go | 4 +- pkg/storage/storage.go | 49 ++++++++++++++++++----- pkg/storage/storage_test.go | 23 +++++++++-- 9 files changed, 74 insertions(+), 26 deletions(-) diff --git a/internal/environment/nomad_environment.go b/internal/environment/nomad_environment.go index 9eadd3b..775eb26 100644 --- a/internal/environment/nomad_environment.go +++ b/internal/environment/nomad_environment.go @@ -15,6 +15,7 @@ import ( "github.com/openHPI/poseidon/pkg/storage" "strconv" "sync" + "time" ) const ( @@ -37,7 +38,7 @@ func NewNomadEnvironment(apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvi } return &NomadEnvironment{apiClient, jobHCL, job, storage.NewMonitoredLocalStorage[runner.Runner]( - monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID)}, nil + monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID, time.Minute)}, nil } func NewNomadEnvironmentFromRequest( diff --git a/internal/environment/nomad_manager.go b/internal/environment/nomad_manager.go index bd115b7..1657db2 100644 --- a/internal/environment/nomad_manager.go +++ b/internal/environment/nomad_manager.go @@ -12,6 +12,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "os" + "time" ) // templateEnvironmentJobHCL holds our default job in HCL format. @@ -155,7 +156,7 @@ func newNomadEnvironmetFromJob(job *nomadApi.Job, apiClient nomad.ExecutorAPI) * jobHCL: templateEnvironmentJobHCL, job: job, idleRunners: storage.NewMonitoredLocalStorage[runner.Runner]( - monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID), + monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID, time.Minute), } } diff --git a/internal/runner/abstract_manager.go b/internal/runner/abstract_manager.go index e3fd209..2593b13 100644 --- a/internal/runner/abstract_manager.go +++ b/internal/runner/abstract_manager.go @@ -7,6 +7,7 @@ import ( "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" + "time" ) var ErrNullObject = errors.New("functionality not available for the null object") @@ -24,13 +25,14 @@ type AbstractManager struct { func NewAbstractManager() *AbstractManager { return &AbstractManager{ environments: storage.NewMonitoredLocalStorage[ExecutionEnvironment]( - monitoring.MeasurementEnvironments, monitorEnvironmentData), - usedRunners: storage.NewMonitoredLocalStorage[Runner](monitoring.MeasurementUsedRunner, MonitorRunnersEnvironmentID), + monitoring.MeasurementEnvironments, monitorEnvironmentData, 0), + usedRunners: storage.NewMonitoredLocalStorage[Runner]( + monitoring.MeasurementUsedRunner, MonitorRunnersEnvironmentID, time.Hour), } } // MonitorRunnersEnvironmentID passes the id of the environment e into the monitoring Point p. -func MonitorRunnersEnvironmentID(p *write.Point, e Runner, _ bool) { +func MonitorRunnersEnvironmentID(p *write.Point, e Runner, _ storage.EventType) { if e != nil { p.AddTag(monitoring.InfluxKeyEnvironmentID, e.Environment().ToString()) } diff --git a/internal/runner/aws_runner.go b/internal/runner/aws_runner.go index 39a2d89..7332cc6 100644 --- a/internal/runner/aws_runner.go +++ b/internal/runner/aws_runner.go @@ -13,6 +13,7 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "io" + "time" ) var ErrWrongMessageType = errors.New("received message that is not a text message") @@ -53,7 +54,7 @@ func NewAWSFunctionWorkload( environment: environment, } workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( - monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id)) + monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute) workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error { return workload.Destroy() }) diff --git a/internal/runner/execution_environment.go b/internal/runner/execution_environment.go index b9a8bae..44298b4 100644 --- a/internal/runner/execution_environment.go +++ b/internal/runner/execution_environment.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/storage" "strconv" ) @@ -51,8 +52,8 @@ type ExecutionEnvironment interface { } // monitorEnvironmentData passes the configuration of the environment e into the monitoring Point p. -func monitorEnvironmentData(p *write.Point, e ExecutionEnvironment, isDeletion bool) { - if !isDeletion && e != nil { +func monitorEnvironmentData(p *write.Point, e ExecutionEnvironment, eventType storage.EventType) { + if eventType == storage.Creation && e != nil { p.AddTag("image", e.Image()) p.AddTag("cpu_limit", strconv.Itoa(int(e.CPULimit()))) p.AddTag("memory_limit", strconv.Itoa(int(e.MemoryLimit()))) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index b708e08..c592089 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -56,7 +56,7 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping, onDestroy: onDestroy, } job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( - monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id)) + monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute) job.InactivityTimer = NewInactivityTimer(job, onDestroy) return job } diff --git a/internal/runner/runner.go b/internal/runner/runner.go index a099a4d..1703eb3 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -65,8 +65,8 @@ func FromContext(ctx context.Context) (Runner, bool) { // monitorExecutionsRunnerID passes the id of the runner executing the execution into the monitoring Point p. func monitorExecutionsRunnerID(env dto.EnvironmentID, runnerID string) storage.WriteCallback[*dto.ExecutionRequest] { - return func(p *write.Point, e *dto.ExecutionRequest, isDeletion bool) { - if !isDeletion && e != nil { + return func(p *write.Point, e *dto.ExecutionRequest, eventType storage.EventType) { + if eventType == storage.Creation && e != nil { p.AddTag(monitoring.InfluxKeyRunnerID, runnerID) p.AddTag(monitoring.InfluxKeyEnvironmentID, env.ToString()) } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index ad70d3c..0721bd4 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,12 +1,13 @@ package storage import ( + "context" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/monitoring" - "strconv" "sync" + "time" ) var log = logging.GetLogger("storage") @@ -43,7 +44,18 @@ type Storage[T any] interface { Sample() (o T, ok bool) } -type WriteCallback[T any] func(p *write.Point, object T, isDeletion bool) +// EventType is an enum type to declare the different causes of a monitoring event. +type EventType string + +const ( + Creation EventType = "creation" + Deletion EventType = "deletion" + Periodically EventType = "periodically" +) + +// WriteCallback is called before an event gets monitored. +// Iff eventType is Periodically it is no object provided. +type WriteCallback[T any] func(p *write.Point, object T, eventType EventType) // localStorage stores objects in the local application memory. type localStorage[T any] struct { @@ -64,12 +76,18 @@ func NewLocalStorage[T any]() *localStorage[T] { // NewMonitoredLocalStorage responds with a Storage implementation. // All write operations are monitored in the passed measurement. // Iff callback is set, it will be called on a write operation. -func NewMonitoredLocalStorage[T any](measurement string, callback WriteCallback[T]) *localStorage[T] { - return &localStorage[T]{ +// Iff additionalEvents not zero, the duration will be used to periodically send additional monitoring events. +func NewMonitoredLocalStorage[T any]( + measurement string, callback WriteCallback[T], additionalEvents time.Duration) *localStorage[T] { + s := &localStorage[T]{ objects: make(map[string]T), measurement: measurement, callback: callback, } + if additionalEvents != 0 { + go s.periodicallySendMonitoringData(additionalEvents) + } + return s } func (s *localStorage[T]) List() (o []T) { @@ -85,7 +103,7 @@ func (s *localStorage[T]) Add(id string, o T) { s.Lock() defer s.Unlock() s.objects[id] = o - s.sendMonitoringData(id, o, false, s.unsafeLength()) + s.sendMonitoringData(id, o, Creation, s.unsafeLength()) } func (s *localStorage[T]) Get(id string) (o T, ok bool) { @@ -101,7 +119,7 @@ func (s *localStorage[T]) Delete(id string) { o, ok := s.objects[id] if ok { delete(s.objects, id) - s.sendMonitoringData(id, o, true, s.unsafeLength()) + s.sendMonitoringData(id, o, Deletion, s.unsafeLength()) } } @@ -115,7 +133,7 @@ func (s *localStorage[T]) Purge() { s.Lock() defer s.Unlock() for key, object := range s.objects { - s.sendMonitoringData(key, object, true, 0) + s.sendMonitoringData(key, object, Deletion, 0) } s.objects = make(map[string]T) } @@ -125,7 +143,7 @@ func (s *localStorage[T]) Sample() (o T, ok bool) { defer s.Unlock() for key, object := range s.objects { delete(s.objects, key) - s.sendMonitoringData(key, object, true, s.unsafeLength()) + s.sendMonitoringData(key, object, Deletion, s.unsafeLength()) return object, true } return o, false @@ -143,17 +161,26 @@ func (s *localStorage[T]) unsafeLength() uint { return uint(length) } -func (s *localStorage[T]) sendMonitoringData(id string, o T, isDeletion bool, count uint) { +func (s *localStorage[T]) sendMonitoringData(id string, o T, eventType EventType, count uint) { if s.measurement != "" { p := influxdb2.NewPointWithMeasurement(s.measurement) p.AddTag("id", id) - p.AddTag("isDeletion", strconv.FormatBool(isDeletion)) + p.AddTag("event_type", string(eventType)) p.AddField("count", count) if s.callback != nil { - s.callback(p, o, isDeletion) + s.callback(p, o, eventType) } monitoring.WriteInfluxPoint(p) } } + +func (s *localStorage[T]) periodicallySendMonitoringData(d time.Duration) { + ctx := context.Background() + for ctx.Err() == nil { + stub := new(T) + s.sendMonitoringData("", *stub, Periodically, s.Length()) + <-time.After(d) + } +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index e5fa3eb..c08d52b 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -2,9 +2,11 @@ package storage import ( "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/openHPI/poseidon/tests" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "testing" + "time" ) func TestRunnerPoolTestSuite(t *testing.T) { @@ -114,14 +116,14 @@ func TestNewMonitoredLocalStorage_Callback(t *testing.T) { callbackCalls := 0 callbackAdditions := 0 callbackDeletions := 0 - os := NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, isDeletion bool) { + os := NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, eventType EventType) { callbackCalls++ - if isDeletion { + if eventType == Deletion { callbackDeletions++ - } else { + } else if eventType == Creation { callbackAdditions++ } - }) + }, 0) assertCallbackCounts := func(test func(), totalCalls, additions, deletions int) { beforeTotal := callbackCalls @@ -170,3 +172,16 @@ func TestNewMonitoredLocalStorage_Callback(t *testing.T) { }, 2, 0, 2) }) } + +func TestNewMonitoredLocalStorage_Periodically(t *testing.T) { + callbackCalls := 0 + NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, eventType EventType) { + callbackCalls++ + assert.Equal(t, Periodically, eventType) + }, 200*time.Millisecond) + + time.Sleep(tests.ShortTimeout) + assert.Equal(t, 1, callbackCalls) + time.Sleep(200 * time.Millisecond) + assert.Equal(t, 2, callbackCalls) +}