From cbcd5f233e3469de83268db31c8eadecd8f7a6ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Thu, 30 May 2024 12:14:20 +0200 Subject: [PATCH] Fix idle runner being memory leaked when its allocation is restarted by Nomad. Fix logic created in 354c16cc. --- internal/environment/aws_environment.go | 2 +- internal/environment/nomad_environment.go | 6 +- internal/runner/aws_manager_test.go | 2 +- internal/runner/execution_environment.go | 5 +- internal/runner/execution_environment_mock.go | 80 +++++++++++++++++-- internal/runner/nomad_manager.go | 19 +++-- internal/runner/nomad_manager_test.go | 18 ++--- 7 files changed, 102 insertions(+), 30 deletions(-) diff --git a/internal/environment/aws_environment.go b/internal/environment/aws_environment.go index c9e1a45..e6fcbb8 100644 --- a/internal/environment/aws_environment.go +++ b/internal/environment/aws_environment.go @@ -116,6 +116,6 @@ func (a *AWSEnvironment) AddRunner(_ runner.Runner) { panic("not supported") } -func (a *AWSEnvironment) DeleteRunner(_ string) (ok bool) { +func (a *AWSEnvironment) DeleteRunner(_ string) (r runner.Runner, ok bool) { panic("not supported") } diff --git a/internal/environment/nomad_environment.go b/internal/environment/nomad_environment.go index 8fc2056..a080cda 100644 --- a/internal/environment/nomad_environment.go +++ b/internal/environment/nomad_environment.go @@ -271,10 +271,10 @@ func (n *NomadEnvironment) AddRunner(r runner.Runner) { n.idleRunners.Add(r.ID(), r) } -func (n *NomadEnvironment) DeleteRunner(id string) (ok bool) { - _, ok = n.idleRunners.Get(id) +func (n *NomadEnvironment) DeleteRunner(id string) (r runner.Runner, ok bool) { + r, ok = n.idleRunners.Get(id) n.idleRunners.Delete(id) - return ok + return r, ok } func (n *NomadEnvironment) IdleRunnerCount() uint { diff --git a/internal/runner/aws_manager_test.go b/internal/runner/aws_manager_test.go index cf1f91a..8aee34b 100644 --- a/internal/runner/aws_manager_test.go +++ b/internal/runner/aws_manager_test.go @@ -110,7 +110,7 @@ func createBasicEnvironmentMock(id dto.EnvironmentID) *ExecutionEnvironmentMock environment.On("CPULimit").Return(uint(0)) environment.On("MemoryLimit").Return(uint(0)) environment.On("NetworkAccess").Return(false, nil) - environment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) + environment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false) environment.On("ApplyPrewarmingPoolSize").Return(nil) environment.On("IdleRunnerCount").Return(uint(1)).Maybe() environment.On("PrewarmingPoolSize").Return(uint(1)).Maybe() diff --git a/internal/runner/execution_environment.go b/internal/runner/execution_environment.go index f3dd8e9..73c583f 100644 --- a/internal/runner/execution_environment.go +++ b/internal/runner/execution_environment.go @@ -46,9 +46,10 @@ type ExecutionEnvironment interface { Sample() (r Runner, ok bool) // AddRunner adds an existing runner to the idle runners of the environment. AddRunner(r Runner) - // DeleteRunner removes an idle runner from the environment. + // DeleteRunner removes an idle runner from the environment and returns it. + // This function handles only the environment. The runner has to be destroyed separately. // ok is true iff the runner was found (and deleted). - DeleteRunner(id string) (ok bool) + DeleteRunner(id string) (r Runner, ok bool) // IdleRunnerCount returns the number of idle runners of the environment. IdleRunnerCount() uint } diff --git a/internal/runner/execution_environment_mock.go b/internal/runner/execution_environment_mock.go index 2b328cb..101213f 100644 --- a/internal/runner/execution_environment_mock.go +++ b/internal/runner/execution_environment_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.36.0. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package runner @@ -21,6 +21,10 @@ func (_m *ExecutionEnvironmentMock) AddRunner(r Runner) { func (_m *ExecutionEnvironmentMock) ApplyPrewarmingPoolSize() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for ApplyPrewarmingPoolSize") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -35,6 +39,10 @@ func (_m *ExecutionEnvironmentMock) ApplyPrewarmingPoolSize() error { func (_m *ExecutionEnvironmentMock) CPULimit() uint { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for CPULimit") + } + var r0 uint if rf, ok := ret.Get(0).(func() uint); ok { r0 = rf() @@ -49,6 +57,10 @@ func (_m *ExecutionEnvironmentMock) CPULimit() uint { func (_m *ExecutionEnvironmentMock) Delete(reason DestroyReason) error { ret := _m.Called(reason) + if len(ret) == 0 { + panic("no return value specified for Delete") + } + var r0 error if rf, ok := ret.Get(0).(func(DestroyReason) error); ok { r0 = rf(reason) @@ -60,23 +72,43 @@ func (_m *ExecutionEnvironmentMock) Delete(reason DestroyReason) error { } // DeleteRunner provides a mock function with given fields: id -func (_m *ExecutionEnvironmentMock) DeleteRunner(id string) bool { +func (_m *ExecutionEnvironmentMock) DeleteRunner(id string) (Runner, bool) { ret := _m.Called(id) - var r0 bool - if rf, ok := ret.Get(0).(func(string) bool); ok { - r0 = rf(id) - } else { - r0 = ret.Get(0).(bool) + if len(ret) == 0 { + panic("no return value specified for DeleteRunner") } - return r0 + var r0 Runner + var r1 bool + if rf, ok := ret.Get(0).(func(string) (Runner, bool)); ok { + return rf(id) + } + if rf, ok := ret.Get(0).(func(string) Runner); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(Runner) + } + } + + if rf, ok := ret.Get(1).(func(string) bool); ok { + r1 = rf(id) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 } // ID provides a mock function with given fields: func (_m *ExecutionEnvironmentMock) ID() dto.EnvironmentID { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for ID") + } + var r0 dto.EnvironmentID if rf, ok := ret.Get(0).(func() dto.EnvironmentID); ok { r0 = rf() @@ -91,6 +123,10 @@ func (_m *ExecutionEnvironmentMock) ID() dto.EnvironmentID { func (_m *ExecutionEnvironmentMock) IdleRunnerCount() uint { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IdleRunnerCount") + } + var r0 uint if rf, ok := ret.Get(0).(func() uint); ok { r0 = rf() @@ -105,6 +141,10 @@ func (_m *ExecutionEnvironmentMock) IdleRunnerCount() uint { func (_m *ExecutionEnvironmentMock) Image() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Image") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -119,6 +159,10 @@ func (_m *ExecutionEnvironmentMock) Image() string { func (_m *ExecutionEnvironmentMock) MarshalJSON() ([]byte, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for MarshalJSON") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func() ([]byte, error)); ok { @@ -145,6 +189,10 @@ func (_m *ExecutionEnvironmentMock) MarshalJSON() ([]byte, error) { func (_m *ExecutionEnvironmentMock) MemoryLimit() uint { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for MemoryLimit") + } + var r0 uint if rf, ok := ret.Get(0).(func() uint); ok { r0 = rf() @@ -159,6 +207,10 @@ func (_m *ExecutionEnvironmentMock) MemoryLimit() uint { func (_m *ExecutionEnvironmentMock) NetworkAccess() (bool, []uint16) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for NetworkAccess") + } + var r0 bool var r1 []uint16 if rf, ok := ret.Get(0).(func() (bool, []uint16)); ok { @@ -185,6 +237,10 @@ func (_m *ExecutionEnvironmentMock) NetworkAccess() (bool, []uint16) { func (_m *ExecutionEnvironmentMock) PrewarmingPoolSize() uint { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for PrewarmingPoolSize") + } + var r0 uint if rf, ok := ret.Get(0).(func() uint); ok { r0 = rf() @@ -199,6 +255,10 @@ func (_m *ExecutionEnvironmentMock) PrewarmingPoolSize() uint { func (_m *ExecutionEnvironmentMock) Register() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Register") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -213,6 +273,10 @@ func (_m *ExecutionEnvironmentMock) Register() error { func (_m *ExecutionEnvironmentMock) Sample() (Runner, bool) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Sample") + } + var r0 Runner var r1 bool if rf, ok := ret.Get(0).(func() (Runner, bool)); ok { diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index 022c7ef..c08cffc 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -312,8 +312,8 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) return false } - r, stillActive := m.usedRunners.Get(runnerID) - if stillActive { + r, stillUsed := m.usedRunners.Get(runnerID) + if stillUsed { m.usedRunners.Delete(runnerID) if err := r.Destroy(reason); err != nil { log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed") @@ -321,12 +321,19 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) } environment, ok := m.GetEnvironment(environmentID) - if ok { - stillActive = stillActive || environment.DeleteRunner(runnerID) - go m.checkPrewarmingPoolAlert(environment, false) + if !ok { + return !stillUsed } - return !stillActive + r, stillIdle := environment.DeleteRunner(runnerID) + if stillIdle { + if err := r.Destroy(reason); err != nil { + log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed") + } + } + go m.checkPrewarmingPoolAlert(environment, false) + + return !(stillUsed || stillIdle) } // onRunnerDestroyed is the callback when the runner destroys itself. diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 6b877ac..5dea5c0 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -91,8 +91,8 @@ func mockIdleRunners(environmentMock *ExecutionEnvironmentMock) { if !ok { log.Fatal("Cannot parse ID") } - _, ok = idleRunner.Get(id) - deleteCall.ReturnArguments = mock.Arguments{ok} + r, ok := idleRunner.Get(id) + deleteCall.ReturnArguments = mock.Arguments{r, ok} if !ok { return } @@ -167,7 +167,7 @@ func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() { func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() { s.exerciseEnvironment.On("Sample", mock.Anything).Return(s.exerciseRunner, true) - s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false) s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) util.MaxConnectionRetriesExponential = 1 modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) { @@ -199,7 +199,7 @@ func (s *ManagerTestSuite) TestGetReturnsErrorIfRunnerNotFound() { func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() { s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) - s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false) s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner) err := s.nomadRunnerManager.Return(s.exerciseRunner) s.Nil(err) @@ -209,7 +209,7 @@ func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() { func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() { s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) - s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false) err := s.nomadRunnerManager.Return(s.exerciseRunner) s.Nil(err) s.apiMock.AssertCalled(s.T(), "DeleteJob", s.exerciseRunner.ID()) @@ -220,7 +220,7 @@ func (s *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() { s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(tests.ErrDefault) defer s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) defer tests.RemoveMethodFromMock(&s.apiMock.Mock, "DeleteJob") - s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false) + s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false) util.MaxConnectionRetriesExponential = 1 util.InitialWaitingDuration = 2 * tests.ShortTimeout @@ -442,7 +442,7 @@ func (s *ManagerTestSuite) TestOnAllocationStopped() { environment.AddRunner(r) alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(tests.DefaultRunnerID, nil) s.False(alreadyRemoved) - s.NoError(r.Destroy(nil)) + s.Error(r.ctx.Err(), "The runner should be destroyed and its context canceled") }) s.Run("returns false and stops inactivity timer", func() { runner, runnerDestroyed := testStoppedInactivityTimer(s) @@ -556,7 +556,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { runnerManager.usedRunners.Purge() s.Run("Restart timeout of used runner", func() { apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) - environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return(false) + environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return(nil, false) timeout := 1 _, job := helpers.CreateTemplateJob() @@ -674,7 +674,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert_reloadsR environment.On("PrewarmingPoolSize").Return(uint(1)).Twice() environment.On("IdleRunnerCount").Return(uint(0)).Twice() - environment.On("DeleteRunner", mock.Anything).Return(false).Once() + environment.On("DeleteRunner", mock.Anything).Return(nil, false).Once() s.Require().Empty(m.usedRunners.Length()) _, usedJob := helpers.CreateTemplateJob()