Fix not canceling monitoring events for removed environments

and runners.
This commit is contained in:
Maximilian Paß
2022-10-13 22:17:45 +01:00
committed by Sebastian Serth
parent 5d54b0f786
commit 7119f3e012
9 changed files with 59 additions and 31 deletions

View File

@ -29,6 +29,8 @@ type NomadEnvironment struct {
jobHCL string jobHCL string
job *nomadApi.Job job *nomadApi.Job
idleRunners storage.Storage[runner.Runner] idleRunners storage.Storage[runner.Runner]
ctx context.Context
cancel context.CancelFunc
} }
func NewNomadEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvironment, error) { func NewNomadEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvironment, error) {
@ -37,9 +39,10 @@ func NewNomadEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI, jobH
return nil, fmt.Errorf("error parsing Nomad job: %w", err) return nil, fmt.Errorf("error parsing Nomad job: %w", err)
} }
e := &NomadEnvironment{apiClient, jobHCL, job, nil} ctx, cancel := context.WithCancel(context.Background())
e := &NomadEnvironment{apiClient, jobHCL, job, nil, ctx, cancel}
e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad, e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad,
runner.MonitorEnvironmentID[runner.Runner](id), time.Minute) runner.MonitorEnvironmentID[runner.Runner](id), time.Minute, ctx)
return e, nil return e, nil
} }
@ -218,6 +221,7 @@ func (n *NomadEnvironment) Register() error {
} }
func (n *NomadEnvironment) Delete() error { func (n *NomadEnvironment) Delete() error {
n.cancel()
err := n.removeRunners() err := n.removeRunners()
if err != nil { if err != nil {
return err return err

View File

@ -18,7 +18,7 @@ import (
func TestConfigureNetworkCreatesNewNetworkWhenNoNetworkExists(t *testing.T) { func TestConfigureNetworkCreatesNewNetworkWhenNoNetworkExists(t *testing.T) {
_, job := helpers.CreateTemplateJob() _, job := helpers.CreateTemplateJob()
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(job) defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(job)
environment := &NomadEnvironment{nil, "", job, nil} environment := &NomadEnvironment{nil, "", job, nil, nil, nil}
if assert.Equal(t, 0, len(defaultTaskGroup.Networks)) { if assert.Equal(t, 0, len(defaultTaskGroup.Networks)) {
environment.SetNetworkAccess(true, []uint16{}) environment.SetNetworkAccess(true, []uint16{})
@ -30,7 +30,7 @@ func TestConfigureNetworkCreatesNewNetworkWhenNoNetworkExists(t *testing.T) {
func TestConfigureNetworkDoesNotCreateNewNetworkWhenNetworkExists(t *testing.T) { func TestConfigureNetworkDoesNotCreateNewNetworkWhenNetworkExists(t *testing.T) {
_, job := helpers.CreateTemplateJob() _, job := helpers.CreateTemplateJob()
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(job) defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(job)
environment := &NomadEnvironment{nil, "", job, nil} environment := &NomadEnvironment{nil, "", job, nil, nil, nil}
networkResource := &nomadApi.NetworkResource{Mode: "cni/secure-bridge"} networkResource := &nomadApi.NetworkResource{Mode: "cni/secure-bridge"}
defaultTaskGroup.Networks = []*nomadApi.NetworkResource{networkResource} defaultTaskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
@ -59,7 +59,7 @@ func TestConfigureNetworkSetsCorrectValues(t *testing.T) {
_, testJob := helpers.CreateTemplateJob() _, testJob := helpers.CreateTemplateJob()
testTaskGroup := nomad.FindAndValidateDefaultTaskGroup(testJob) testTaskGroup := nomad.FindAndValidateDefaultTaskGroup(testJob)
testTask := nomad.FindAndValidateDefaultTask(testTaskGroup) testTask := nomad.FindAndValidateDefaultTask(testTaskGroup)
testEnvironment := &NomadEnvironment{nil, "", job, nil} testEnvironment := &NomadEnvironment{nil, "", job, nil, nil, nil}
testEnvironment.SetNetworkAccess(false, ports) testEnvironment.SetNetworkAccess(false, ports)
mode, ok := testTask.Config["network_mode"] mode, ok := testTask.Config["network_mode"]
@ -74,7 +74,7 @@ func TestConfigureNetworkSetsCorrectValues(t *testing.T) {
_, testJob := helpers.CreateTemplateJob() _, testJob := helpers.CreateTemplateJob()
testTaskGroup := nomad.FindAndValidateDefaultTaskGroup(testJob) testTaskGroup := nomad.FindAndValidateDefaultTaskGroup(testJob)
testTask := nomad.FindAndValidateDefaultTask(testTaskGroup) testTask := nomad.FindAndValidateDefaultTask(testTaskGroup)
testEnvironment := &NomadEnvironment{nil, "", testJob, nil} testEnvironment := &NomadEnvironment{nil, "", testJob, nil, nil, nil}
testEnvironment.SetNetworkAccess(true, ports) testEnvironment.SetNetworkAccess(true, ports)
require.Equal(t, 1, len(testTaskGroup.Networks)) require.Equal(t, 1, len(testTaskGroup.Networks))
@ -114,7 +114,8 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) {
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, storage.NewLocalStorage[runner.Runner]()} environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{},
storage.NewLocalStorage[runner.Runner](), nil, nil}
environment.SetID(tests.DefaultEnvironmentIDAsInteger) environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register() err := environment.Register()
@ -131,7 +132,8 @@ func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, storage.NewLocalStorage[runner.Runner]()} environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{},
storage.NewLocalStorage[runner.Runner](), nil, nil}
environment.SetID(tests.DefaultEnvironmentIDAsInteger) environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register() err := environment.Register()
@ -147,7 +149,8 @@ func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil) apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{}, storage.NewLocalStorage[runner.Runner]()} environment := &NomadEnvironment{apiClientMock, "", &nomadApi.Job{},
storage.NewLocalStorage[runner.Runner](), nil, nil}
environment.SetID(tests.DefaultEnvironmentIDAsInteger) environment.SetID(tests.DefaultEnvironmentIDAsInteger)
err := environment.Register() err := environment.Register()
@ -173,7 +176,8 @@ func TestTwoSampleAddExactlyTwoRunners(t *testing.T) {
apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil) apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil)
_, job := helpers.CreateTemplateJob() _, job := helpers.CreateTemplateJob()
environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job, storage.NewLocalStorage[runner.Runner]()} environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job,
storage.NewLocalStorage[runner.Runner](), nil, nil}
runner1 := &runner.RunnerMock{} runner1 := &runner.RunnerMock{}
runner1.On("ID").Return(tests.DefaultRunnerID) runner1.On("ID").Return(tests.DefaultRunnerID)
runner2 := &runner.RunnerMock{} runner2 := &runner.RunnerMock{}
@ -206,7 +210,8 @@ func TestSampleDoesNotSetForcePullFlag(t *testing.T) {
}) })
_, job := helpers.CreateTemplateJob() _, job := helpers.CreateTemplateJob()
environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job, storage.NewLocalStorage[runner.Runner]()} environment := &NomadEnvironment{apiMock, templateEnvironmentJobHCL, job,
storage.NewLocalStorage[runner.Runner](), nil, nil}
runner1 := &runner.RunnerMock{} runner1 := &runner.RunnerMock{}
runner1.On("ID").Return(tests.DefaultRunnerID) runner1.On("ID").Return(tests.DefaultRunnerID)
environment.AddRunner(runner1) environment.AddRunner(runner1)

View File

@ -1,6 +1,7 @@
package environment package environment
import ( import (
"context"
_ "embed" _ "embed"
"fmt" "fmt"
nomadApi "github.com/hashicorp/nomad/api" nomadApi "github.com/hashicorp/nomad/api"
@ -151,13 +152,16 @@ func (m *NomadEnvironmentManager) Load() error {
// newNomadEnvironmetFromJob creates a Nomad environment from the passed Nomad job definition. // newNomadEnvironmetFromJob creates a Nomad environment from the passed Nomad job definition.
func newNomadEnvironmetFromJob(job *nomadApi.Job, apiClient nomad.ExecutorAPI) *NomadEnvironment { func newNomadEnvironmetFromJob(job *nomadApi.Job, apiClient nomad.ExecutorAPI) *NomadEnvironment {
ctx, cancel := context.WithCancel(context.Background())
e := &NomadEnvironment{ e := &NomadEnvironment{
apiClient: apiClient, apiClient: apiClient,
jobHCL: templateEnvironmentJobHCL, jobHCL: templateEnvironmentJobHCL,
job: job, job: job,
ctx: ctx,
cancel: cancel,
} }
e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad, e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad,
runner.MonitorEnvironmentID[runner.Runner](e.ID()), time.Minute) runner.MonitorEnvironmentID[runner.Runner](e.ID()), time.Minute, ctx)
return e return e
} }

View File

@ -1,6 +1,7 @@
package runner package runner
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
@ -22,12 +23,13 @@ type AbstractManager struct {
} }
// NewAbstractManager creates a new abstract runner manager that keeps track of all runners of one kind. // NewAbstractManager creates a new abstract runner manager that keeps track of all runners of one kind.
// Since this manager is currently directly bound to the lifespan of Poseidon, it does not need a context cancel.
func NewAbstractManager() *AbstractManager { func NewAbstractManager() *AbstractManager {
return &AbstractManager{ return &AbstractManager{
environments: storage.NewMonitoredLocalStorage[ExecutionEnvironment]( environments: storage.NewMonitoredLocalStorage[ExecutionEnvironment](
monitoring.MeasurementEnvironments, monitorEnvironmentData, 0), monitoring.MeasurementEnvironments, monitorEnvironmentData, 0, context.Background()),
usedRunners: storage.NewMonitoredLocalStorage[Runner]( usedRunners: storage.NewMonitoredLocalStorage[Runner](
monitoring.MeasurementUsedRunner, MonitorRunnersEnvironmentID, time.Hour), monitoring.MeasurementUsedRunner, MonitorRunnersEnvironmentID, time.Hour, context.Background()),
} }
} }

View File

@ -37,6 +37,8 @@ type AWSFunctionWorkload struct {
runningExecutions map[execution.ID]context.CancelFunc runningExecutions map[execution.ID]context.CancelFunc
onDestroy DestroyRunnerHandler onDestroy DestroyRunnerHandler
environment ExecutionEnvironment environment ExecutionEnvironment
ctx context.Context
cancel context.CancelFunc
} }
// NewAWSFunctionWorkload creates a new AWSFunctionWorkload with the provided id. // NewAWSFunctionWorkload creates a new AWSFunctionWorkload with the provided id.
@ -47,15 +49,18 @@ func NewAWSFunctionWorkload(
return nil, fmt.Errorf("failed generating runner id: %w", err) return nil, fmt.Errorf("failed generating runner id: %w", err)
} }
ctx, cancel := context.WithCancel(context.Background())
workload := &AWSFunctionWorkload{ workload := &AWSFunctionWorkload{
id: newUUID.String(), id: newUUID.String(),
fs: make(map[dto.FilePath][]byte), fs: make(map[dto.FilePath][]byte),
runningExecutions: make(map[execution.ID]context.CancelFunc), runningExecutions: make(map[execution.ID]context.CancelFunc),
onDestroy: onDestroy, onDestroy: onDestroy,
environment: environment, environment: environment,
ctx: ctx,
cancel: cancel,
} }
workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute) monitoring.MeasurementExecutionsAWS, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute, ctx)
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error { workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
return workload.Destroy() return workload.Destroy()
}) })
@ -92,7 +97,7 @@ func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, s
} }
hideEnvironmentVariables(request, "AWS") hideEnvironmentVariables(request, "AWS")
request.PrivilegedExecution = true // AWS does not support multiple users at this moment. request.PrivilegedExecution = true // AWS does not support multiple users at this moment.
command, ctx, cancel := prepareExecution(request) command, ctx, cancel := prepareExecution(request, w.ctx)
exitInternal := make(chan ExitInfo) exitInternal := make(chan ExitInfo)
exit := make(chan ExitInfo, 1) exit := make(chan ExitInfo, 1)
@ -131,9 +136,7 @@ func (w *AWSFunctionWorkload) GetFileContent(_ string, _ http.ResponseWriter, _
} }
func (w *AWSFunctionWorkload) Destroy() error { func (w *AWSFunctionWorkload) Destroy() error {
for _, cancel := range w.runningExecutions { w.cancel()
cancel()
}
if err := w.onDestroy(w); err != nil { if err := w.onDestroy(w); err != nil {
return fmt.Errorf("error while destroying aws runner: %w", err) return fmt.Errorf("error while destroying aws runner: %w", err)
} }

View File

@ -47,6 +47,8 @@ type NomadJob struct {
portMappings []nomadApi.PortMapping portMappings []nomadApi.PortMapping
api nomad.ExecutorAPI api nomad.ExecutorAPI
onDestroy DestroyRunnerHandler onDestroy DestroyRunnerHandler
ctx context.Context
cancel context.CancelFunc
} }
// NewNomadJob creates a new NomadJob with the provided id. // NewNomadJob creates a new NomadJob with the provided id.
@ -55,14 +57,17 @@ type NomadJob struct {
func NewNomadJob(id string, portMappings []nomadApi.PortMapping, func NewNomadJob(id string, portMappings []nomadApi.PortMapping,
apiClient nomad.ExecutorAPI, onDestroy DestroyRunnerHandler, apiClient nomad.ExecutorAPI, onDestroy DestroyRunnerHandler,
) *NomadJob { ) *NomadJob {
ctx, cancel := context.WithCancel(context.Background())
job := &NomadJob{ job := &NomadJob{
id: id, id: id,
portMappings: portMappings, portMappings: portMappings,
api: apiClient, api: apiClient,
onDestroy: onDestroy, onDestroy: onDestroy,
ctx: ctx,
cancel: cancel,
} }
job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest]( job.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute) monitoring.MeasurementExecutionsNomad, monitorExecutionsRunnerID(job.Environment(), id), time.Minute, ctx)
job.InactivityTimer = NewInactivityTimer(job, onDestroy) job.InactivityTimer = NewInactivityTimer(job, onDestroy)
return job return job
} }
@ -111,10 +116,10 @@ func (r *NomadJob) ExecuteInteractively(
r.ResetTimeout() r.ResetTimeout()
command, ctx, cancel := prepareExecution(request) command, ctx, cancel := prepareExecution(request, r.ctx)
exitInternal := make(chan ExitInfo) exitInternal := make(chan ExitInfo)
exit := make(chan ExitInfo, 1) exit := make(chan ExitInfo, 1)
ctxExecute, cancelExecute := context.WithCancel(context.Background()) ctxExecute, cancelExecute := context.WithCancel(r.ctx)
go r.executeCommand(ctxExecute, command, request.PrivilegedExecution, stdin, stdout, stderr, exitInternal) go r.executeCommand(ctxExecute, command, request.PrivilegedExecution, stdin, stdout, stderr, exitInternal)
go r.handleExitOrContextDone(ctx, cancelExecute, exitInternal, exit, stdin) go r.handleExitOrContextDone(ctx, cancelExecute, exitInternal, exit, stdin)
@ -203,20 +208,21 @@ func (r *NomadJob) GetFileContent(
} }
func (r *NomadJob) Destroy() error { func (r *NomadJob) Destroy() error {
r.cancel()
if err := r.onDestroy(r); err != nil { if err := r.onDestroy(r); err != nil {
return fmt.Errorf("error while destroying runner: %w", err) return fmt.Errorf("error while destroying runner: %w", err)
} }
return nil return nil
} }
func prepareExecution(request *dto.ExecutionRequest) ( func prepareExecution(request *dto.ExecutionRequest, environmentCtx context.Context) (
command []string, ctx context.Context, cancel context.CancelFunc, command []string, ctx context.Context, cancel context.CancelFunc,
) { ) {
command = request.FullCommand() command = request.FullCommand()
if request.TimeLimit == 0 { if request.TimeLimit == 0 {
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(environmentCtx)
} else { } else {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(request.TimeLimit)*time.Second) ctx, cancel = context.WithTimeout(environmentCtx, time.Duration(request.TimeLimit)*time.Second)
} }
return command, ctx, cancel return command, ctx, cancel
} }

View File

@ -127,6 +127,7 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() {
id: tests.DefaultRunnerID, id: tests.DefaultRunnerID,
api: s.apiMock, api: s.apiMock,
onDestroy: s.manager.Return, onDestroy: s.manager.Return,
ctx: context.Background(),
} }
} }
@ -207,6 +208,7 @@ func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal(
}) })
timeLimit := 1 timeLimit := 1
executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit} executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit}
s.runner.cancel = func() {}
s.runner.StoreExecution(defaultExecutionID, executionRequest) s.runner.StoreExecution(defaultExecutionID, executionRequest)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil) _, _, err := s.runner.ExecuteInteractively(defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil)
s.Require().NoError(err) s.Require().NoError(err)

View File

@ -75,14 +75,14 @@ func NewLocalStorage[T any]() *localStorage[T] {
// Iff callback is set, it will be called on a write operation. // Iff callback is set, it will be called on a write operation.
// Iff additionalEvents not zero, the duration will be used to periodically send additional monitoring events. // Iff additionalEvents not zero, the duration will be used to periodically send additional monitoring events.
func NewMonitoredLocalStorage[T any]( func NewMonitoredLocalStorage[T any](
measurement string, callback WriteCallback[T], additionalEvents time.Duration) *localStorage[T] { measurement string, callback WriteCallback[T], additionalEvents time.Duration, ctx context.Context) *localStorage[T] {
s := &localStorage[T]{ s := &localStorage[T]{
objects: make(map[string]T), objects: make(map[string]T),
measurement: measurement, measurement: measurement,
callback: callback, callback: callback,
} }
if additionalEvents != 0 { if additionalEvents != 0 {
go s.periodicallySendMonitoringData(additionalEvents) go s.periodicallySendMonitoringData(additionalEvents, ctx)
} }
return s return s
} }
@ -172,8 +172,7 @@ func (s *localStorage[T]) sendMonitoringData(id string, o T, eventType EventType
} }
} }
func (s *localStorage[T]) periodicallySendMonitoringData(d time.Duration) { func (s *localStorage[T]) periodicallySendMonitoringData(d time.Duration, ctx context.Context) {
ctx := context.Background()
for ctx.Err() == nil { for ctx.Err() == nil {
stub := new(T) stub := new(T)
s.sendMonitoringData("", *stub, Periodically, s.Length()) s.sendMonitoringData("", *stub, Periodically, s.Length())

View File

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"context"
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/tests" "github.com/openHPI/poseidon/tests"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -123,7 +124,7 @@ func TestNewMonitoredLocalStorage_Callback(t *testing.T) {
} else if eventType == Creation { } else if eventType == Creation {
callbackAdditions++ callbackAdditions++
} }
}, 0) }, 0, context.Background())
assertCallbackCounts := func(test func(), totalCalls, additions, deletions int) { assertCallbackCounts := func(test func(), totalCalls, additions, deletions int) {
beforeTotal := callbackCalls beforeTotal := callbackCalls
@ -174,11 +175,13 @@ func TestNewMonitoredLocalStorage_Callback(t *testing.T) {
} }
func TestNewMonitoredLocalStorage_Periodically(t *testing.T) { func TestNewMonitoredLocalStorage_Periodically(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
callbackCalls := 0 callbackCalls := 0
NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, eventType EventType) { NewMonitoredLocalStorage[string]("testMeasurement", func(p *write.Point, o string, eventType EventType) {
callbackCalls++ callbackCalls++
assert.Equal(t, Periodically, eventType) assert.Equal(t, Periodically, eventType)
}, 200*time.Millisecond) }, 200*time.Millisecond, ctx)
time.Sleep(tests.ShortTimeout) time.Sleep(tests.ShortTimeout)
assert.Equal(t, 1, callbackCalls) assert.Equal(t, 1, callbackCalls)