Fix missing rescheduled idle runners.

In today's unattended upgrade, we have seen how the prewarming pool size dropped to (near) zero. This was based on lost Nomad allocations. The allocations got rescheduled, but not added again to Poseidon.

The reason for this is a miscommunication between the Event Handling and the Nomad Manager. `removedByPoseidon` was true even if the runner was not removed by the manager, but an idle runner.
This commit is contained in:
Maximilian Paß
2023-08-29 23:27:38 +02:00
committed by Sebastian Serth
parent 67297ec5a2
commit 354c16cc37
7 changed files with 46 additions and 21 deletions

View File

@ -115,6 +115,6 @@ func (a *AWSEnvironment) AddRunner(_ runner.Runner) {
panic("not supported")
}
func (a *AWSEnvironment) DeleteRunner(_ string) {
func (a *AWSEnvironment) DeleteRunner(_ string) (ok bool) {
panic("not supported")
}

View File

@ -261,8 +261,10 @@ func (n *NomadEnvironment) AddRunner(r runner.Runner) {
n.idleRunners.Add(r.ID(), r)
}
func (n *NomadEnvironment) DeleteRunner(id string) {
func (n *NomadEnvironment) DeleteRunner(id string) (ok bool) {
_, ok = n.idleRunners.Get(id)
n.idleRunners.Delete(id)
return ok
}
func (n *NomadEnvironment) IdleRunnerCount() uint {

View File

@ -47,6 +47,9 @@ type AllocationProcessing struct {
OnDeleted DeletedAllocationProcessor
}
type RunnerDeletedReason error
// DeletedAllocationProcessor is a handler that will be called for each deleted allocation.
// removedByPoseidon should be true iff the Nomad Manager has removed the runner before.
type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool)
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)

View File

@ -46,7 +46,8 @@ type ExecutionEnvironment interface {
// AddRunner adds an existing runner to the idle runners of the environment.
AddRunner(r Runner)
// DeleteRunner removes an idle runner from the environment.
DeleteRunner(id string)
// ok is true iff the runner was found (and deleted).
DeleteRunner(id string) (ok bool)
// IdleRunnerCount returns the number of idle runners of the environment.
IdleRunnerCount() uint
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.13.1. DO NOT EDIT.
// Code generated by mockery v2.33.1. DO NOT EDIT.
package runner
@ -60,8 +60,17 @@ func (_m *ExecutionEnvironmentMock) Delete() error {
}
// DeleteRunner provides a mock function with given fields: id
func (_m *ExecutionEnvironmentMock) DeleteRunner(id string) {
_m.Called(id)
func (_m *ExecutionEnvironmentMock) DeleteRunner(id string) bool {
ret := _m.Called(id)
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ID provides a mock function with given fields:
@ -111,6 +120,10 @@ func (_m *ExecutionEnvironmentMock) MarshalJSON() ([]byte, error) {
ret := _m.Called()
var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func() ([]byte, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
@ -119,7 +132,6 @@ func (_m *ExecutionEnvironmentMock) MarshalJSON() ([]byte, error) {
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
@ -148,13 +160,16 @@ func (_m *ExecutionEnvironmentMock) NetworkAccess() (bool, []uint16) {
ret := _m.Called()
var r0 bool
var r1 []uint16
if rf, ok := ret.Get(0).(func() (bool, []uint16)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
var r1 []uint16
if rf, ok := ret.Get(1).(func() []uint16); ok {
r1 = rf()
} else {
@ -199,6 +214,10 @@ func (_m *ExecutionEnvironmentMock) Sample() (Runner, bool) {
ret := _m.Called()
var r0 Runner
var r1 bool
if rf, ok := ret.Get(0).(func() (Runner, bool)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() Runner); ok {
r0 = rf()
} else {
@ -207,7 +226,6 @@ func (_m *ExecutionEnvironmentMock) Sample() (Runner, bool) {
}
}
var r1 bool
if rf, ok := ret.Get(1).(func() bool); ok {
r1 = rf()
} else {
@ -252,13 +270,12 @@ func (_m *ExecutionEnvironmentMock) SetPrewarmingPoolSize(count uint) {
_m.Called(count)
}
type mockConstructorTestingTNewExecutionEnvironmentMock interface {
// NewExecutionEnvironmentMock creates a new instance of ExecutionEnvironmentMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewExecutionEnvironmentMock(t interface {
mock.TestingT
Cleanup(func())
}
// NewExecutionEnvironmentMock creates a new instance of ExecutionEnvironmentMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewExecutionEnvironmentMock(t mockConstructorTestingTNewExecutionEnvironmentMock) *ExecutionEnvironmentMock {
}) *ExecutionEnvironmentMock {
mock := &ExecutionEnvironmentMock{}
mock.Mock.Test(t)

View File

@ -198,7 +198,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error)
environment, ok := m.environments.Get(environmentID.ToString())
if ok {
environment.DeleteRunner(runnerID)
stillActive = stillActive || environment.DeleteRunner(runnerID)
}
return !stillActive

View File

@ -79,6 +79,7 @@ func mockIdleRunners(environmentMock *ExecutionEnvironmentMock) {
deleteCall := environmentMock.On("DeleteRunner", mock.AnythingOfType("string"))
deleteCall.Run(func(args mock.Arguments) {
id, ok := args.Get(0).(string)
deleteCall.ReturnArguments = mock.Arguments{ok}
if !ok {
return
}
@ -150,7 +151,7 @@ func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() {
func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() {
s.exerciseEnvironment.On("Sample", mock.Anything).Return(s.exerciseRunner, true)
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
util.MaxConnectionRetriesExponential = 1
modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) {
@ -182,7 +183,7 @@ func (s *ManagerTestSuite) TestGetReturnsErrorIfRunnerNotFound() {
func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() {
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner)
err := s.nomadRunnerManager.Return(s.exerciseRunner)
s.Nil(err)
@ -192,7 +193,7 @@ func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() {
func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() {
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
err := s.nomadRunnerManager.Return(s.exerciseRunner)
s.Nil(err)
s.apiMock.AssertCalled(s.T(), "DeleteJob", s.exerciseRunner.ID())
@ -200,7 +201,7 @@ func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() {
func (s *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() {
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(tests.ErrDefault)
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
err := s.nomadRunnerManager.Return(s.exerciseRunner)
s.Error(err)
}
@ -378,7 +379,8 @@ func (s *ManagerTestSuite) TestOnAllocationStopped() {
s.Require().True(ok)
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
environment.AddRunner(NewNomadJob(tests.DefaultRunnerID, []nomadApi.PortMapping{}, s.apiMock, func(r Runner) error { return nil }))
environment.AddRunner(
NewNomadJob(tests.DefaultRunnerID, []nomadApi.PortMapping{}, s.apiMock, func(r Runner) error { return nil }))
alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(tests.DefaultRunnerID, nil)
s.False(alreadyRemoved)
})
@ -465,7 +467,7 @@ func TestNomadRunnerManager_Load(t *testing.T) {
runnerManager.usedRunners.Purge()
t.Run("Restart timeout of used runner", func(t *testing.T) {
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return()
environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return(false)
timeout := 1
_, job := helpers.CreateTemplateJob()