#110 Add periodical monitoring events.

This commit is contained in:
Maximilian Paß
2022-08-17 12:45:31 +02:00
parent c3460317a4
commit 5590c50e14
9 changed files with 74 additions and 26 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/storage"
"strconv" "strconv"
"sync" "sync"
"time"
) )
const ( const (
@ -37,7 +38,7 @@ func NewNomadEnvironment(apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvi
} }
return &NomadEnvironment{apiClient, jobHCL, job, storage.NewMonitoredLocalStorage[runner.Runner]( return &NomadEnvironment{apiClient, jobHCL, job, storage.NewMonitoredLocalStorage[runner.Runner](
monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID)}, nil monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID, time.Minute)}, nil
} }
func NewNomadEnvironmentFromRequest( func NewNomadEnvironmentFromRequest(

View File

@ -12,6 +12,7 @@ import (
"github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/storage"
"os" "os"
"time"
) )
// templateEnvironmentJobHCL holds our default job in HCL format. // templateEnvironmentJobHCL holds our default job in HCL format.
@ -155,7 +156,7 @@ func newNomadEnvironmetFromJob(job *nomadApi.Job, apiClient nomad.ExecutorAPI) *
jobHCL: templateEnvironmentJobHCL, jobHCL: templateEnvironmentJobHCL,
job: job, job: job,
idleRunners: storage.NewMonitoredLocalStorage[runner.Runner]( idleRunners: storage.NewMonitoredLocalStorage[runner.Runner](
monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID), monitoring.MeasurementIdleRunnerNomad, runner.MonitorRunnersEnvironmentID, time.Minute),
} }
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/storage"
"time"
) )
var ErrNullObject = errors.New("functionality not available for the null object") var ErrNullObject = errors.New("functionality not available for the null object")
@ -24,13 +25,14 @@ type AbstractManager struct {
func NewAbstractManager() *AbstractManager { func NewAbstractManager() *AbstractManager {
return &AbstractManager{ return &AbstractManager{
environments: storage.NewMonitoredLocalStorage[ExecutionEnvironment]( environments: storage.NewMonitoredLocalStorage[ExecutionEnvironment](
monitoring.MeasurementEnvironments, monitorEnvironmentData), monitoring.MeasurementEnvironments, monitorEnvironmentData, 0),
usedRunners: storage.NewMonitoredLocalStorage[Runner](monitoring.MeasurementUsedRunner, MonitorRunnersEnvironmentID), usedRunners: storage.NewMonitoredLocalStorage[Runner](
monitoring.MeasurementUsedRunner, MonitorRunnersEnvironmentID, time.Hour),
} }
} }
// MonitorRunnersEnvironmentID passes the id of the environment e into the monitoring Point p. // MonitorRunnersEnvironmentID passes the id of the environment e into the monitoring Point p.
func MonitorRunnersEnvironmentID(p *write.Point, e Runner, _ bool) { func MonitorRunnersEnvironmentID(p *write.Point, e Runner, _ storage.EventType) {
if e != nil { if e != nil {
p.AddTag(monitoring.InfluxKeyEnvironmentID, e.Environment().ToString()) p.AddTag(monitoring.InfluxKeyEnvironmentID, e.Environment().ToString())
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/storage"
"io" "io"
"time"
) )
var ErrWrongMessageType = errors.New("received message that is not a text message") var ErrWrongMessageType = errors.New("received message that is not a text message")
@ -53,7 +54,7 @@ func NewAWSFunctionWorkload(
environment: environment, environment: environment,
} }
workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id)) monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute)
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error { workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
return workload.Destroy() return workload.Destroy()
}) })

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/storage"
"strconv" "strconv"
) )
@ -51,8 +52,8 @@ type ExecutionEnvironment interface {
} }
// monitorEnvironmentData passes the configuration of the environment e into the monitoring Point p. // monitorEnvironmentData passes the configuration of the environment e into the monitoring Point p.
func monitorEnvironmentData(p *write.Point, e ExecutionEnvironment, isDeletion bool) { func monitorEnvironmentData(p *write.Point, e ExecutionEnvironment, eventType storage.EventType) {
if !isDeletion && e != nil { if eventType == storage.Creation && e != nil {
p.AddTag("image", e.Image()) p.AddTag("image", e.Image())
p.AddTag("cpu_limit", strconv.Itoa(int(e.CPULimit()))) p.AddTag("cpu_limit", strconv.Itoa(int(e.CPULimit())))
p.AddTag("memory_limit", strconv.Itoa(int(e.MemoryLimit()))) p.AddTag("memory_limit", strconv.Itoa(int(e.MemoryLimit())))

View File

@ -56,7 +56,7 @@ func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
onDestroy: onDestroy, onDestroy: onDestroy,
} }
job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id)) monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute)
job.InactivityTimer = NewInactivityTimer(job, onDestroy) job.InactivityTimer = NewInactivityTimer(job, onDestroy)
return job return job
} }

View File

@ -65,8 +65,8 @@ func FromContext(ctx context.Context) (Runner, bool) {
// monitorExecutionsRunnerID passes the id of the runner executing the execution into the monitoring Point p. // monitorExecutionsRunnerID passes the id of the runner executing the execution into the monitoring Point p.
func monitorExecutionsRunnerID(env dto.EnvironmentID, runnerID string) storage.WriteCallback[*dto.ExecutionRequest] { func monitorExecutionsRunnerID(env dto.EnvironmentID, runnerID string) storage.WriteCallback[*dto.ExecutionRequest] {
return func(p *write.Point, e *dto.ExecutionRequest, isDeletion bool) { return func(p *write.Point, e *dto.ExecutionRequest, eventType storage.EventType) {
if !isDeletion && e != nil { if eventType == storage.Creation && e != nil {
p.AddTag(monitoring.InfluxKeyRunnerID, runnerID) p.AddTag(monitoring.InfluxKeyRunnerID, runnerID)
p.AddTag(monitoring.InfluxKeyEnvironmentID, env.ToString()) p.AddTag(monitoring.InfluxKeyEnvironmentID, env.ToString())
} }

View File

@ -1,12 +1,13 @@
package storage package storage
import ( import (
"context"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/monitoring"
"strconv"
"sync" "sync"
"time"
) )
var log = logging.GetLogger("storage") var log = logging.GetLogger("storage")
@ -43,7 +44,18 @@ type Storage[T any] interface {
Sample() (o T, ok bool) Sample() (o T, ok bool)
} }
type WriteCallback[T any] func(p *write.Point, object T, isDeletion bool) // EventType is an enum type to declare the different causes of a monitoring event.
type EventType string
const (
Creation EventType = "creation"
Deletion EventType = "deletion"
Periodically EventType = "periodically"
)
// WriteCallback is called before an event gets monitored.
// Iff eventType is Periodically it is no object provided.
type WriteCallback[T any] func(p *write.Point, object T, eventType EventType)
// localStorage stores objects in the local application memory. // localStorage stores objects in the local application memory.
type localStorage[T any] struct { type localStorage[T any] struct {
@ -64,12 +76,18 @@ func NewLocalStorage[T any]() *localStorage[T] {
// NewMonitoredLocalStorage responds with a Storage implementation. // NewMonitoredLocalStorage responds with a Storage implementation.
// All write operations are monitored in the passed measurement. // All write operations are monitored in the passed measurement.
// Iff callback is set, it will be called on a write operation. // Iff callback is set, it will be called on a write operation.
func NewMonitoredLocalStorage[T any](measurement string, callback WriteCallback[T]) *localStorage[T] { // Iff additionalEvents not zero, the duration will be used to periodically send additional monitoring events.
return &localStorage[T]{ func NewMonitoredLocalStorage[T any](
measurement string, callback WriteCallback[T], additionalEvents time.Duration) *localStorage[T] {
s := &localStorage[T]{
objects: make(map[string]T), objects: make(map[string]T),
measurement: measurement, measurement: measurement,
callback: callback, callback: callback,
} }
if additionalEvents != 0 {
go s.periodicallySendMonitoringData(additionalEvents)
}
return s
} }
func (s *localStorage[T]) List() (o []T) { func (s *localStorage[T]) List() (o []T) {
@ -85,7 +103,7 @@ func (s *localStorage[T]) Add(id string, o T) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.objects[id] = o s.objects[id] = o
s.sendMonitoringData(id, o, false, s.unsafeLength()) s.sendMonitoringData(id, o, Creation, s.unsafeLength())
} }
func (s *localStorage[T]) Get(id string) (o T, ok bool) { func (s *localStorage[T]) Get(id string) (o T, ok bool) {
@ -101,7 +119,7 @@ func (s *localStorage[T]) Delete(id string) {
o, ok := s.objects[id] o, ok := s.objects[id]
if ok { if ok {
delete(s.objects, id) delete(s.objects, id)
s.sendMonitoringData(id, o, true, s.unsafeLength()) s.sendMonitoringData(id, o, Deletion, s.unsafeLength())
} }
} }
@ -115,7 +133,7 @@ func (s *localStorage[T]) Purge() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
for key, object := range s.objects { for key, object := range s.objects {
s.sendMonitoringData(key, object, true, 0) s.sendMonitoringData(key, object, Deletion, 0)
} }
s.objects = make(map[string]T) s.objects = make(map[string]T)
} }
@ -125,7 +143,7 @@ func (s *localStorage[T]) Sample() (o T, ok bool) {
defer s.Unlock() defer s.Unlock()
for key, object := range s.objects { for key, object := range s.objects {
delete(s.objects, key) delete(s.objects, key)
s.sendMonitoringData(key, object, true, s.unsafeLength()) s.sendMonitoringData(key, object, Deletion, s.unsafeLength())
return object, true return object, true
} }
return o, false return o, false
@ -143,17 +161,26 @@ func (s *localStorage[T]) unsafeLength() uint {
return uint(length) return uint(length)
} }
func (s *localStorage[T]) sendMonitoringData(id string, o T, isDeletion bool, count uint) { func (s *localStorage[T]) sendMonitoringData(id string, o T, eventType EventType, count uint) {
if s.measurement != "" { if s.measurement != "" {
p := influxdb2.NewPointWithMeasurement(s.measurement) p := influxdb2.NewPointWithMeasurement(s.measurement)
p.AddTag("id", id) p.AddTag("id", id)
p.AddTag("isDeletion", strconv.FormatBool(isDeletion)) p.AddTag("event_type", string(eventType))
p.AddField("count", count) p.AddField("count", count)
if s.callback != nil { if s.callback != nil {
s.callback(p, o, isDeletion) s.callback(p, o, eventType)
} }
monitoring.WriteInfluxPoint(p) monitoring.WriteInfluxPoint(p)
} }
} }
func (s *localStorage[T]) periodicallySendMonitoringData(d time.Duration) {
ctx := context.Background()
for ctx.Err() == nil {
stub := new(T)
s.sendMonitoringData("", *stub, Periodically, s.Length())
<-time.After(d)
}
}

View File

@ -2,9 +2,11 @@ package storage
import ( import (
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/tests"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"testing" "testing"
"time"
) )
func TestRunnerPoolTestSuite(t *testing.T) { func TestRunnerPoolTestSuite(t *testing.T) {
@ -114,14 +116,14 @@ func TestNewMonitoredLocalStorage_Callback(t *testing.T) {
callbackCalls := 0 callbackCalls := 0
callbackAdditions := 0 callbackAdditions := 0
callbackDeletions := 0 callbackDeletions := 0
os := NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, isDeletion bool) { os := NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, eventType EventType) {
callbackCalls++ callbackCalls++
if isDeletion { if eventType == Deletion {
callbackDeletions++ callbackDeletions++
} else { } else if eventType == Creation {
callbackAdditions++ callbackAdditions++
} }
}) }, 0)
assertCallbackCounts := func(test func(), totalCalls, additions, deletions int) { assertCallbackCounts := func(test func(), totalCalls, additions, deletions int) {
beforeTotal := callbackCalls beforeTotal := callbackCalls
@ -170,3 +172,16 @@ func TestNewMonitoredLocalStorage_Callback(t *testing.T) {
}, 2, 0, 2) }, 2, 0, 2)
}) })
} }
func TestNewMonitoredLocalStorage_Periodically(t *testing.T) {
callbackCalls := 0
NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, eventType EventType) {
callbackCalls++
assert.Equal(t, Periodically, eventType)
}, 200*time.Millisecond)
time.Sleep(tests.ShortTimeout)
assert.Equal(t, 1, callbackCalls)
time.Sleep(200 * time.Millisecond)
assert.Equal(t, 2, callbackCalls)
}