186 lines
4.7 KiB
Go
186 lines
4.7 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Storage is an interface for storing objects.
|
|
type Storage[T any] interface {
|
|
// List returns all objects from the storage.
|
|
List() []T
|
|
|
|
// Add adds an object to the storage.
|
|
// It overwrites the old object if one with the same id was already stored.
|
|
Add(id string, o T)
|
|
|
|
// Get returns an object from the storage.
|
|
// Iff the object does not exist in the storage, ok will be false.
|
|
Get(id string) (o T, ok bool)
|
|
|
|
// Delete deletes the object with the passed id from the storage.
|
|
// It does nothing if no object with the id is present in the store.
|
|
Delete(id string)
|
|
|
|
// Pop deletes the object with the given id from the storage and returns it.
|
|
// Iff no such execution exists, ok is false and true otherwise.
|
|
Pop(id string) (o T, ok bool)
|
|
|
|
// Purge removes all objects from the storage.
|
|
Purge()
|
|
|
|
// Length returns the number of currently stored objects in the storage.
|
|
Length() uint
|
|
|
|
// Sample returns and removes an arbitrary object from the storage.
|
|
// ok is true iff an object was returned.
|
|
Sample() (o T, ok bool)
|
|
}
|
|
|
|
// EventType is an enum type to declare the different causes of a monitoring event.
|
|
type EventType string
|
|
|
|
const (
|
|
Creation EventType = "creation"
|
|
Deletion EventType = "deletion"
|
|
Periodically EventType = "periodically"
|
|
)
|
|
|
|
// WriteCallback is called before an event gets monitored.
|
|
// Iff eventType is Periodically it is no object provided.
|
|
type WriteCallback[T any] func(p *write.Point, object T, eventType EventType)
|
|
|
|
// localStorage stores objects in the local application memory.
|
|
type localStorage[T any] struct {
|
|
sync.RWMutex
|
|
objects map[string]T
|
|
measurement string
|
|
callback WriteCallback[T]
|
|
}
|
|
|
|
// NewLocalStorage responds with a Storage implementation.
|
|
// This implementation stores the data thread-safe in the local application memory.
|
|
func NewLocalStorage[T any]() *localStorage[T] {
|
|
return &localStorage[T]{
|
|
objects: make(map[string]T),
|
|
}
|
|
}
|
|
|
|
// NewMonitoredLocalStorage responds with a Storage implementation.
|
|
// All write operations are monitored in the passed measurement.
|
|
// Iff callback is set, it will be called on a write operation.
|
|
// Iff additionalEvents not zero, the duration will be used to periodically send additional monitoring events.
|
|
func NewMonitoredLocalStorage[T any](
|
|
measurement string, callback WriteCallback[T], additionalEvents time.Duration, ctx context.Context) *localStorage[T] {
|
|
s := &localStorage[T]{
|
|
objects: make(map[string]T),
|
|
measurement: measurement,
|
|
callback: callback,
|
|
}
|
|
if additionalEvents != 0 {
|
|
go s.periodicallySendMonitoringData(additionalEvents, ctx)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *localStorage[T]) List() (o []T) {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
for _, value := range s.objects {
|
|
o = append(o, value)
|
|
}
|
|
return o
|
|
}
|
|
|
|
func (s *localStorage[T]) Add(id string, o T) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.objects[id] = o
|
|
s.sendMonitoringData(id, o, Creation, s.unsafeLength())
|
|
}
|
|
|
|
func (s *localStorage[T]) Get(id string) (o T, ok bool) {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
o, ok = s.objects[id]
|
|
return
|
|
}
|
|
|
|
func (s *localStorage[T]) Delete(id string) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
o, ok := s.objects[id]
|
|
if ok {
|
|
delete(s.objects, id)
|
|
s.sendMonitoringData(id, o, Deletion, s.unsafeLength())
|
|
}
|
|
}
|
|
|
|
func (s *localStorage[T]) Pop(id string) (T, bool) {
|
|
o, ok := s.Get(id)
|
|
s.Delete(id)
|
|
return o, ok
|
|
}
|
|
|
|
func (s *localStorage[T]) Purge() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
for key, object := range s.objects {
|
|
s.sendMonitoringData(key, object, Deletion, 0)
|
|
}
|
|
s.objects = make(map[string]T)
|
|
}
|
|
|
|
func (s *localStorage[T]) Sample() (o T, ok bool) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
for key, object := range s.objects {
|
|
delete(s.objects, key)
|
|
s.sendMonitoringData(key, object, Deletion, s.unsafeLength())
|
|
return object, true
|
|
}
|
|
return o, false
|
|
}
|
|
|
|
func (s *localStorage[T]) Length() uint {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.unsafeLength()
|
|
}
|
|
|
|
func (s *localStorage[T]) unsafeLength() uint {
|
|
length := len(s.objects)
|
|
return uint(length)
|
|
}
|
|
|
|
func (s *localStorage[T]) sendMonitoringData(id string, o T, eventType EventType, count uint) {
|
|
if s.measurement != "" {
|
|
p := influxdb2.NewPointWithMeasurement(s.measurement)
|
|
p.AddTag("id", id)
|
|
p.AddTag("event_type", string(eventType))
|
|
p.AddField("count", count)
|
|
|
|
if s.callback != nil {
|
|
s.callback(p, o, eventType)
|
|
}
|
|
|
|
monitoring.WriteInfluxPoint(p)
|
|
}
|
|
}
|
|
|
|
func (s *localStorage[T]) periodicallySendMonitoringData(d time.Duration, ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(d):
|
|
stub := new(T)
|
|
s.sendMonitoringData("", *stub, Periodically, s.Length())
|
|
}
|
|
}
|
|
}
|