diff --git a/internal/nomad/executor_api_mock.go b/internal/nomad/executor_api_mock.go index 81a73d4..8e1cdf3 100644 --- a/internal/nomad/executor_api_mock.go +++ b/internal/nomad/executor_api_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v2.14.0. DO NOT EDIT. package nomad @@ -312,13 +312,13 @@ func (_m *ExecutorAPIMock) SetJobScale(jobID string, count uint, reason string) return r0 } -// WatchEventStream provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation -func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error { - ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation) +// WatchEventStream provides a mock function with given fields: ctx, callbacks +func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error { + ret := _m.Called(ctx, callbacks) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, AllocationProcessor, AllocationProcessor) error); ok { - r0 = rf(ctx, onNewAllocation, onDeletedAllocation) + if rf, ok := ret.Get(0).(func(context.Context, *AllocationProcessoring) error); ok { + r0 = rf(ctx, callbacks) } else { r0 = ret.Error(0) } @@ -408,3 +408,18 @@ func (_m *ExecutorAPIMock) listJobs(prefix string) ([]*api.JobListStub, error) { return r0, r1 } + +type mockConstructorTestingTNewExecutorAPIMock interface { + mock.TestingT + Cleanup(func()) +} + +// NewExecutorAPIMock creates a new instance of ExecutorAPIMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewExecutorAPIMock(t mockConstructorTestingTNewExecutorAPIMock) *ExecutorAPIMock { + mock := &ExecutorAPIMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/nomad/nomad.go b/internal/nomad/nomad.go index 100d124..1358521 100644 --- a/internal/nomad/nomad.go +++ b/internal/nomad/nomad.go @@ -27,7 +27,13 @@ var ( // resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected. const resultChannelWriteTimeout = 10 * time.Millisecond +// AllocationProcessoring includes the callbacks to interact with allcoation events. +type AllocationProcessoring struct { + OnNew AllocationProcessorMonitored + OnDeleted AllocationProcessor +} type AllocationProcessor func(*nomadApi.Allocation) +type AllocationProcessorMonitored func(*nomadApi.Allocation, time.Duration) // ExecutorAPI provides access to a container orchestration solution. type ExecutorAPI interface { @@ -59,7 +65,7 @@ type ExecutorAPI interface { // WatchEventStream listens on the Nomad event stream for allocation and evaluation events. // Depending on the incoming event, any of the given function is executed. // Do not run multiple times simultaneously. - WatchEventStream(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error + WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error // ExecuteCommand executes the given command in the allocation with the given id. // It writes the output of the command to stdout/stderr and reads input from stdin. @@ -152,7 +158,10 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) defer cancel() // cancel the WatchEventStream when the evaluation result was read. go func() { - err = a.WatchEventStream(ctx, func(_ *nomadApi.Allocation) {}, func(_ *nomadApi.Allocation) {}) + err = a.WatchEventStream(ctx, &AllocationProcessoring{ + OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, + OnDeleted: func(_ *nomadApi.Allocation) {}, + }) cancel() // cancel the waiting for an evaluation result if watching the event stream ends. }() } @@ -166,21 +175,20 @@ func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) } } -func (a *APIClient) WatchEventStream(ctx context.Context, - onNewAllocation, onDeletedAllocation AllocationProcessor) error { +func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessoring) error { startTime := time.Now().UnixNano() stream, err := a.EventStream(ctx) if err != nil { return fmt.Errorf("failed retrieving allocation stream: %w", err) } - pendingAllocations := make(map[string]bool) + pendingAllocations := make(map[string]time.Time) handler := func(event *nomadApi.Event) (bool, error) { switch event.Topic { case nomadApi.TopicEvaluation: return false, handleEvaluationEvent(a.evaluations, event) case nomadApi.TopicAllocation: - return false, handleAllocationEvent(startTime, pendingAllocations, event, onNewAllocation, onDeletedAllocation) + return false, handleAllocationEvent(startTime, pendingAllocations, event, callbacks) default: return false, nil } @@ -247,8 +255,8 @@ func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Ev // If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation // is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the // map the state is persisted between multiple calls of this function. -func handleAllocationEvent(startTime int64, pendingAllocations map[string]bool, event *nomadApi.Event, - onNewAllocation, onDeletedAllocation AllocationProcessor) error { +func handleAllocationEvent(startTime int64, pendingAllocations map[string]time.Time, event *nomadApi.Event, + callbacks *AllocationProcessoring) error { if event.Type != structs.TypeAllocationUpdated { return nil } @@ -270,7 +278,7 @@ func handleAllocationEvent(startTime int64, pendingAllocations map[string]bool, case structs.AllocClientStatusPending: handlePendingAllocationEvent(alloc, pendingAllocations) case structs.AllocClientStatusRunning: - handleRunningAllocationEvent(alloc, pendingAllocations, onNewAllocation, onDeletedAllocation) + handleRunningAllocationEvent(alloc, pendingAllocations, callbacks) case structs.AllocClientStatusFailed: handleFailedAllocationEvent(alloc) } @@ -278,24 +286,25 @@ func handleAllocationEvent(startTime int64, pendingAllocations map[string]bool, } // handlePendingAllocationEvent sets flag in pendingAllocations that can be used to filter following events. -func handlePendingAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]bool) { +func handlePendingAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]time.Time) { if alloc.DesiredStatus == structs.AllocDesiredStatusRun { // allocation is started, wait until it runs and add to our list afterwards - pendingAllocations[alloc.ID] = true + pendingAllocations[alloc.ID] = time.Now() } } // handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events. -func handleRunningAllocationEvent(alloc *nomadApi.Allocation, - pendingAllocations map[string]bool, onNewAllocation, onDeletedAllocation AllocationProcessor) { +func handleRunningAllocationEvent(alloc *nomadApi.Allocation, pendingAllocations map[string]time.Time, + callbacks *AllocationProcessoring) { switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop: - onDeletedAllocation(alloc) + callbacks.OnDeleted(alloc) case structs.AllocDesiredStatusRun: // is first event that marks the transition between pending and running? - _, ok := pendingAllocations[alloc.ID] + startedAt, ok := pendingAllocations[alloc.ID] if ok { - onNewAllocation(alloc) + startupDuration := time.Since(startedAt) + callbacks.OnNew(alloc, startupDuration) delete(pendingAllocations, alloc.ID) } } diff --git a/internal/nomad/nomad_test.go b/internal/nomad/nomad_test.go index f6822dd..39cefff 100644 --- a/internal/nomad/nomad_test.go +++ b/internal/nomad/nomad_test.go @@ -21,6 +21,13 @@ import ( "time" ) +var ( + noopAllocationProcessoring = &AllocationProcessoring{ + OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {}, + OnDeleted: func(_ *nomadApi.Allocation) {}, + } +) + func TestLoadRunnersTestSuite(t *testing.T) { suite.Run(t, new(LoadRunnersTestSuite)) } @@ -502,13 +509,12 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) { newPendingAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) newPendingEvent := eventForAllocation(t, newPendingAllocation) - pendingMap := make(map[string]bool) - var noop AllocationProcessor = func(allocation *nomadApi.Allocation) {} - - err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, noop, noop) + pendingMap := make(map[string]time.Time) + err := handleAllocationEvent(time.Now().UnixNano(), pendingMap, &newPendingEvent, noopAllocationProcessoring) require.NoError(t, err) - assert.True(t, pendingMap[newPendingAllocation.ID]) + _, ok := pendingMap[newPendingAllocation.ID] + assert.True(t, ok) } func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) { @@ -516,14 +522,12 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetri apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault) apiClient := &APIClient{apiMock, map[string]chan error{}, false} - noop := func(a *nomadApi.Allocation) {} - err := apiClient.WatchEventStream(context.Background(), noop, noop) + err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessoring) assert.ErrorIs(t, err, tests.ErrDefault) } func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWithoutReceivingFurtherEvents( t *testing.T) { - noop := func(a *nomadApi.Allocation) {} event := nomadApi.Event{ Type: structs.TypeAllocationUpdated, Topic: nomadApi.TopicAllocation, @@ -534,7 +538,7 @@ func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationCannotBeRetrievedWi require.Error(t, err) events := []*nomadApi.Events{{Events: []nomadApi.Event{event}}, {}} - eventsProcessed, err := runAllocationWatching(t, events, noop, noop) + eventsProcessed, err := runAllocationWatching(t, events, noopAllocationProcessoring) assert.Error(t, err) assert.Equal(t, 1, eventsProcessed) } @@ -543,16 +547,17 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, expectedNewAllocations, expectedDeletedAllocations []*nomadApi.Allocation) { t.Helper() var newAllocations []*nomadApi.Allocation - onNewAllocation := func(alloc *nomadApi.Allocation) { - newAllocations = append(newAllocations, alloc) - } - var deletedAllocations []*nomadApi.Allocation - onDeletedAllocation := func(alloc *nomadApi.Allocation) { - deletedAllocations = append(deletedAllocations, alloc) + callbacks := &AllocationProcessoring{ + OnNew: func(alloc *nomadApi.Allocation, _ time.Duration) { + newAllocations = append(newAllocations, alloc) + }, + OnDeleted: func(alloc *nomadApi.Allocation) { + deletedAllocations = append(deletedAllocations, alloc) + }, } - eventsProcessed, err := runAllocationWatching(t, events, onNewAllocation, onDeletedAllocation) + eventsProcessed, err := runAllocationWatching(t, events, callbacks) assert.NoError(t, err) assert.Equal(t, len(events), eventsProcessed) @@ -563,11 +568,11 @@ func assertWatchAllocation(t *testing.T, events []*nomadApi.Events, // runAllocationWatching simulates events streamed from the Nomad event stream // to the MonitorEvaluation method. It starts the MonitorEvaluation function as a goroutine // and sequentially transfers the events from the given array to a channel simulating the stream. -func runAllocationWatching(t *testing.T, events []*nomadApi.Events, - onNewAllocation, onDeletedAllocation AllocationProcessor) (eventsProcessed int, err error) { +func runAllocationWatching(t *testing.T, events []*nomadApi.Events, callbacks *AllocationProcessoring) ( + eventsProcessed int, err error) { t.Helper() stream := make(chan *nomadApi.Events) - errChan := asynchronouslyWatchAllocations(stream, onNewAllocation, onDeletedAllocation) + errChan := asynchronouslyWatchAllocations(stream, callbacks) return simulateNomadEventStream(stream, errChan, events) } @@ -575,8 +580,7 @@ func runAllocationWatching(t *testing.T, events []*nomadApi.Events, // runs the MonitorEvaluation method in a goroutine. The mock returns a read-only // version of the given stream to simulate an event stream gotten from the real // Nomad API. -func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, - onNewAllocation, onDeletedAllocation AllocationProcessor) chan error { +func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *AllocationProcessoring) chan error { ctx := context.Background() // We can only get a read-only channel once we return it from a function. readOnlyStream := func() <-chan *nomadApi.Events { return stream }() @@ -587,7 +591,7 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, errChan := make(chan error) go func() { - errChan <- apiClient.WatchEventStream(ctx, onNewAllocation, onDeletedAllocation) + errChan <- apiClient.WatchEventStream(ctx, callbacks) }() return errChan } diff --git a/internal/runner/abstract_manager.go b/internal/runner/abstract_manager.go index 83a54f4..03611be 100644 --- a/internal/runner/abstract_manager.go +++ b/internal/runner/abstract_manager.go @@ -7,7 +7,6 @@ import ( "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" - "strconv" ) var ErrNullObject = errors.New("functionality not available for the null object") @@ -33,7 +32,7 @@ func NewAbstractManager() *AbstractManager { // MonitorRunnersEnvironmentID passes the id of the environment e into the monitoring Point p. func MonitorRunnersEnvironmentID(p *write.Point, e Runner, isDeletion bool) { if !isDeletion && e != nil { - p.AddTag(monitoring.InfluxKeyEnvironmentID, strconv.Itoa(int(e.Environment()))) + p.AddTag(monitoring.InfluxKeyEnvironmentID, e.Environment().ToString()) } } diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index ad21e0f..7e627f2 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -5,9 +5,11 @@ import ( "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" + "github.com/openHPI/poseidon/pkg/monitoring" "github.com/sirupsen/logrus" "strconv" "time" @@ -118,15 +120,16 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) { retries := 0 for ctx.Err() == nil { - err := m.apiClient.WatchEventStream(ctx, m.onAllocationAdded, m.onAllocationStopped) + err := m.apiClient.WatchEventStream(ctx, + &nomad.AllocationProcessoring{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped}) retries += 1 log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries) <-time.After(time.Second) } } -func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { - log.WithField("id", alloc.JobID).Debug("Runner started") +func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, startup time.Duration) { + log.WithField("id", alloc.JobID).WithField("startupDuration", startup).Debug("Runner started") if nomad.IsEnvironmentTemplateID(alloc.JobID) { return @@ -145,9 +148,18 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { mappedPorts = alloc.AllocatedResources.Shared.Ports } environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.Return)) + monitorAllocationStartupDuration(startup, alloc.JobID, environmentID) } } +func monitorAllocationStartupDuration(startup time.Duration, runnerID string, environmentID dto.EnvironmentID) { + p := influxdb2.NewPointWithMeasurement(monitoring.MeasurementIdleRunnerNomad) + p.AddField(monitoring.InfluxKeyDuration, startup) + p.AddTag(monitoring.InfluxKeyEnvironmentID, environmentID.ToString()) + p.AddTag(monitoring.InfluxKeyRunnerID, runnerID) + monitoring.WriteInfluxPoint(p) +} + func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) { log.WithField("id", alloc.JobID).Debug("Runner stopped") diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 0f5079f..49158e0 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -227,9 +227,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() { modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { - onCreate, ok := args.Get(1).(nomad.AllocationProcessor) + callbacks, ok := args.Get(1).(*nomad.AllocationProcessoring) s.Require().True(ok) - onCreate(allocation) + callbacks.OnNew(allocation, 0) call.ReturnArguments = mock.Arguments{nil} }) }) @@ -255,9 +255,9 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() { modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) { call.Run(func(args mock.Arguments) { - onDelete, ok := args.Get(2).(nomad.AllocationProcessor) + callbacks, ok := args.Get(1).(*nomad.AllocationProcessoring) s.Require().True(ok) - onDelete(allocation) + callbacks.OnDeleted(allocation) call.ReturnArguments = mock.Arguments{nil} }) }) @@ -288,7 +288,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() { mockIdleRunners(environment.(*ExecutionEnvironmentMock)) alloc := &nomadApi.Allocation{JobID: nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger)} - s.nomadRunnerManager.onAllocationAdded(alloc) + s.nomadRunnerManager.onAllocationAdded(alloc, 0) _, ok = environment.Sample() s.False(ok) @@ -296,7 +296,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() { s.Run("does not panic when environment id cannot be parsed", func() { alloc := &nomadApi.Allocation{JobID: ""} s.NotPanics(func() { - s.nomadRunnerManager.onAllocationAdded(alloc) + s.nomadRunnerManager.onAllocationAdded(alloc, 0) }) }) s.Run("does not panic when environment does not exist", func() { @@ -306,7 +306,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() { alloc := &nomadApi.Allocation{JobID: nomad.RunnerJobID(nonExistentEnvironment, "1-1-1-1")} s.NotPanics(func() { - s.nomadRunnerManager.onAllocationAdded(alloc) + s.nomadRunnerManager.onAllocationAdded(alloc, 0) }) }) s.Run("adds correct job", func() { @@ -319,7 +319,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() { JobID: tests.DefaultRunnerID, AllocatedResources: nil, } - s.nomadRunnerManager.onAllocationAdded(alloc) + s.nomadRunnerManager.onAllocationAdded(alloc, 0) runner, ok := environment.Sample() s.True(ok) @@ -339,7 +339,7 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() { Shared: nomadApi.AllocatedSharedResources{Ports: tests.DefaultPortMappings}, }, } - s.nomadRunnerManager.onAllocationAdded(alloc) + s.nomadRunnerManager.onAllocationAdded(alloc, 0) runner, ok := environment.Sample() s.True(ok) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 4bb58a7..a099a4d 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -7,7 +7,6 @@ import ( "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" "io" - "strconv" ) type ExitInfo struct { @@ -69,7 +68,7 @@ func monitorExecutionsRunnerID(env dto.EnvironmentID, runnerID string) storage.W return func(p *write.Point, e *dto.ExecutionRequest, isDeletion bool) { if !isDeletion && e != nil { p.AddTag(monitoring.InfluxKeyRunnerID, runnerID) - p.AddTag(monitoring.InfluxKeyEnvironmentID, strconv.Itoa(int(env))) + p.AddTag(monitoring.InfluxKeyEnvironmentID, env.ToString()) } } } diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index c57bade..58fd37c 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -33,6 +33,7 @@ const ( InfluxKeyRunnerID = "runner_id" InfluxKeyEnvironmentID = "environment_id" + InfluxKeyDuration = "duration" influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size" influxKeyRequestSize = "request_size" ) @@ -76,7 +77,7 @@ func InfluxDB2Middleware(next http.Handler) http.Handler { lrw := logging.NewLoggingResponseWriter(w) next.ServeHTTP(lrw, requestWithPoint) - p.AddField("duration", time.Now().UTC().Sub(start).Nanoseconds()) + p.AddField(InfluxKeyDuration, time.Now().UTC().Sub(start).Nanoseconds()) p.AddTag("status", strconv.Itoa(lrw.StatusCode)) WriteInfluxPoint(p) @@ -96,7 +97,7 @@ func addRunnerID(r *http.Request, id string) { // addEnvironmentID adds the environment id to the influx data point for the current request. func addEnvironmentID(r *http.Request, id dto.EnvironmentID) { - addInfluxDBTag(r, InfluxKeyEnvironmentID, strconv.Itoa(int(id))) + addInfluxDBTag(r, InfluxKeyEnvironmentID, id.ToString()) } // AddRequestSize adds the size of the request body to the influx data point for the current request. @@ -118,7 +119,7 @@ func AddRequestSize(r *http.Request) { func ChangedPrewarmingPoolSize(id dto.EnvironmentID, count uint) { p := influxdb2.NewPointWithMeasurement(measurementPoolSize) - p.AddTag(InfluxKeyEnvironmentID, strconv.Itoa(int(id))) + p.AddTag(InfluxKeyEnvironmentID, id.ToString()) p.AddField(influxKeyEnvironmentPrewarmingPoolSize, count) WriteInfluxPoint(p)