Fix missing rescheduled idle runners.
In today's unattended upgrade, we have seen how the prewarming pool size dropped to (near) zero. This was based on lost Nomad allocations. The allocations got rescheduled, but not added again to Poseidon. The reason for this is a miscommunication between the Event Handling and the Nomad Manager. `removedByPoseidon` was true even if the runner was not removed by the manager, but an idle runner.
This commit is contained in:

committed by
Sebastian Serth

parent
67297ec5a2
commit
354c16cc37
@ -115,6 +115,6 @@ func (a *AWSEnvironment) AddRunner(_ runner.Runner) {
|
|||||||
panic("not supported")
|
panic("not supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AWSEnvironment) DeleteRunner(_ string) {
|
func (a *AWSEnvironment) DeleteRunner(_ string) (ok bool) {
|
||||||
panic("not supported")
|
panic("not supported")
|
||||||
}
|
}
|
||||||
|
@ -261,8 +261,10 @@ func (n *NomadEnvironment) AddRunner(r runner.Runner) {
|
|||||||
n.idleRunners.Add(r.ID(), r)
|
n.idleRunners.Add(r.ID(), r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NomadEnvironment) DeleteRunner(id string) {
|
func (n *NomadEnvironment) DeleteRunner(id string) (ok bool) {
|
||||||
|
_, ok = n.idleRunners.Get(id)
|
||||||
n.idleRunners.Delete(id)
|
n.idleRunners.Delete(id)
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NomadEnvironment) IdleRunnerCount() uint {
|
func (n *NomadEnvironment) IdleRunnerCount() uint {
|
||||||
|
@ -47,6 +47,9 @@ type AllocationProcessing struct {
|
|||||||
OnDeleted DeletedAllocationProcessor
|
OnDeleted DeletedAllocationProcessor
|
||||||
}
|
}
|
||||||
type RunnerDeletedReason error
|
type RunnerDeletedReason error
|
||||||
|
|
||||||
|
// DeletedAllocationProcessor is a handler that will be called for each deleted allocation.
|
||||||
|
// removedByPoseidon should be true iff the Nomad Manager has removed the runner before.
|
||||||
type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool)
|
type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool)
|
||||||
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
|
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
|
||||||
|
|
||||||
|
@ -46,7 +46,8 @@ type ExecutionEnvironment interface {
|
|||||||
// AddRunner adds an existing runner to the idle runners of the environment.
|
// AddRunner adds an existing runner to the idle runners of the environment.
|
||||||
AddRunner(r Runner)
|
AddRunner(r Runner)
|
||||||
// DeleteRunner removes an idle runner from the environment.
|
// DeleteRunner removes an idle runner from the environment.
|
||||||
DeleteRunner(id string)
|
// ok is true iff the runner was found (and deleted).
|
||||||
|
DeleteRunner(id string) (ok bool)
|
||||||
// IdleRunnerCount returns the number of idle runners of the environment.
|
// IdleRunnerCount returns the number of idle runners of the environment.
|
||||||
IdleRunnerCount() uint
|
IdleRunnerCount() uint
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Code generated by mockery v2.13.1. DO NOT EDIT.
|
// Code generated by mockery v2.33.1. DO NOT EDIT.
|
||||||
|
|
||||||
package runner
|
package runner
|
||||||
|
|
||||||
@ -60,8 +60,17 @@ func (_m *ExecutionEnvironmentMock) Delete() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRunner provides a mock function with given fields: id
|
// DeleteRunner provides a mock function with given fields: id
|
||||||
func (_m *ExecutionEnvironmentMock) DeleteRunner(id string) {
|
func (_m *ExecutionEnvironmentMock) DeleteRunner(id string) bool {
|
||||||
_m.Called(id)
|
ret := _m.Called(id)
|
||||||
|
|
||||||
|
var r0 bool
|
||||||
|
if rf, ok := ret.Get(0).(func(string) bool); ok {
|
||||||
|
r0 = rf(id)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
// ID provides a mock function with given fields:
|
// ID provides a mock function with given fields:
|
||||||
@ -111,6 +120,10 @@ func (_m *ExecutionEnvironmentMock) MarshalJSON() ([]byte, error) {
|
|||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|
||||||
var r0 []byte
|
var r0 []byte
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func() ([]byte, error)); ok {
|
||||||
|
return rf()
|
||||||
|
}
|
||||||
if rf, ok := ret.Get(0).(func() []byte); ok {
|
if rf, ok := ret.Get(0).(func() []byte); ok {
|
||||||
r0 = rf()
|
r0 = rf()
|
||||||
} else {
|
} else {
|
||||||
@ -119,7 +132,6 @@ func (_m *ExecutionEnvironmentMock) MarshalJSON() ([]byte, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var r1 error
|
|
||||||
if rf, ok := ret.Get(1).(func() error); ok {
|
if rf, ok := ret.Get(1).(func() error); ok {
|
||||||
r1 = rf()
|
r1 = rf()
|
||||||
} else {
|
} else {
|
||||||
@ -148,13 +160,16 @@ func (_m *ExecutionEnvironmentMock) NetworkAccess() (bool, []uint16) {
|
|||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|
||||||
var r0 bool
|
var r0 bool
|
||||||
|
var r1 []uint16
|
||||||
|
if rf, ok := ret.Get(0).(func() (bool, []uint16)); ok {
|
||||||
|
return rf()
|
||||||
|
}
|
||||||
if rf, ok := ret.Get(0).(func() bool); ok {
|
if rf, ok := ret.Get(0).(func() bool); ok {
|
||||||
r0 = rf()
|
r0 = rf()
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Get(0).(bool)
|
r0 = ret.Get(0).(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
var r1 []uint16
|
|
||||||
if rf, ok := ret.Get(1).(func() []uint16); ok {
|
if rf, ok := ret.Get(1).(func() []uint16); ok {
|
||||||
r1 = rf()
|
r1 = rf()
|
||||||
} else {
|
} else {
|
||||||
@ -199,6 +214,10 @@ func (_m *ExecutionEnvironmentMock) Sample() (Runner, bool) {
|
|||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|
||||||
var r0 Runner
|
var r0 Runner
|
||||||
|
var r1 bool
|
||||||
|
if rf, ok := ret.Get(0).(func() (Runner, bool)); ok {
|
||||||
|
return rf()
|
||||||
|
}
|
||||||
if rf, ok := ret.Get(0).(func() Runner); ok {
|
if rf, ok := ret.Get(0).(func() Runner); ok {
|
||||||
r0 = rf()
|
r0 = rf()
|
||||||
} else {
|
} else {
|
||||||
@ -207,7 +226,6 @@ func (_m *ExecutionEnvironmentMock) Sample() (Runner, bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var r1 bool
|
|
||||||
if rf, ok := ret.Get(1).(func() bool); ok {
|
if rf, ok := ret.Get(1).(func() bool); ok {
|
||||||
r1 = rf()
|
r1 = rf()
|
||||||
} else {
|
} else {
|
||||||
@ -252,13 +270,12 @@ func (_m *ExecutionEnvironmentMock) SetPrewarmingPoolSize(count uint) {
|
|||||||
_m.Called(count)
|
_m.Called(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockConstructorTestingTNewExecutionEnvironmentMock interface {
|
// NewExecutionEnvironmentMock creates a new instance of ExecutionEnvironmentMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||||
|
// The first argument is typically a *testing.T value.
|
||||||
|
func NewExecutionEnvironmentMock(t interface {
|
||||||
mock.TestingT
|
mock.TestingT
|
||||||
Cleanup(func())
|
Cleanup(func())
|
||||||
}
|
}) *ExecutionEnvironmentMock {
|
||||||
|
|
||||||
// NewExecutionEnvironmentMock creates a new instance of ExecutionEnvironmentMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
|
||||||
func NewExecutionEnvironmentMock(t mockConstructorTestingTNewExecutionEnvironmentMock) *ExecutionEnvironmentMock {
|
|
||||||
mock := &ExecutionEnvironmentMock{}
|
mock := &ExecutionEnvironmentMock{}
|
||||||
mock.Mock.Test(t)
|
mock.Mock.Test(t)
|
||||||
|
|
||||||
|
@ -198,7 +198,7 @@ func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error)
|
|||||||
|
|
||||||
environment, ok := m.environments.Get(environmentID.ToString())
|
environment, ok := m.environments.Get(environmentID.ToString())
|
||||||
if ok {
|
if ok {
|
||||||
environment.DeleteRunner(runnerID)
|
stillActive = stillActive || environment.DeleteRunner(runnerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return !stillActive
|
return !stillActive
|
||||||
|
@ -79,6 +79,7 @@ func mockIdleRunners(environmentMock *ExecutionEnvironmentMock) {
|
|||||||
deleteCall := environmentMock.On("DeleteRunner", mock.AnythingOfType("string"))
|
deleteCall := environmentMock.On("DeleteRunner", mock.AnythingOfType("string"))
|
||||||
deleteCall.Run(func(args mock.Arguments) {
|
deleteCall.Run(func(args mock.Arguments) {
|
||||||
id, ok := args.Get(0).(string)
|
id, ok := args.Get(0).(string)
|
||||||
|
deleteCall.ReturnArguments = mock.Arguments{ok}
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -150,7 +151,7 @@ func (s *ManagerTestSuite) TestClaimAddsRunnerToUsedRunners() {
|
|||||||
|
|
||||||
func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() {
|
func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() {
|
||||||
s.exerciseEnvironment.On("Sample", mock.Anything).Return(s.exerciseRunner, true)
|
s.exerciseEnvironment.On("Sample", mock.Anything).Return(s.exerciseRunner, true)
|
||||||
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
|
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
|
||||||
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||||
util.MaxConnectionRetriesExponential = 1
|
util.MaxConnectionRetriesExponential = 1
|
||||||
modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) {
|
modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) {
|
||||||
@ -182,7 +183,7 @@ func (s *ManagerTestSuite) TestGetReturnsErrorIfRunnerNotFound() {
|
|||||||
|
|
||||||
func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() {
|
func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() {
|
||||||
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||||
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
|
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
|
||||||
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner)
|
s.nomadRunnerManager.usedRunners.Add(s.exerciseRunner.ID(), s.exerciseRunner)
|
||||||
err := s.nomadRunnerManager.Return(s.exerciseRunner)
|
err := s.nomadRunnerManager.Return(s.exerciseRunner)
|
||||||
s.Nil(err)
|
s.Nil(err)
|
||||||
@ -192,7 +193,7 @@ func (s *ManagerTestSuite) TestReturnRemovesRunnerFromUsedRunners() {
|
|||||||
|
|
||||||
func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() {
|
func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() {
|
||||||
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||||
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
|
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
|
||||||
err := s.nomadRunnerManager.Return(s.exerciseRunner)
|
err := s.nomadRunnerManager.Return(s.exerciseRunner)
|
||||||
s.Nil(err)
|
s.Nil(err)
|
||||||
s.apiMock.AssertCalled(s.T(), "DeleteJob", s.exerciseRunner.ID())
|
s.apiMock.AssertCalled(s.T(), "DeleteJob", s.exerciseRunner.ID())
|
||||||
@ -200,7 +201,7 @@ func (s *ManagerTestSuite) TestReturnCallsDeleteRunnerApiMethod() {
|
|||||||
|
|
||||||
func (s *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() {
|
func (s *ManagerTestSuite) TestReturnReturnsErrorWhenApiCallFailed() {
|
||||||
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(tests.ErrDefault)
|
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(tests.ErrDefault)
|
||||||
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return()
|
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(false)
|
||||||
err := s.nomadRunnerManager.Return(s.exerciseRunner)
|
err := s.nomadRunnerManager.Return(s.exerciseRunner)
|
||||||
s.Error(err)
|
s.Error(err)
|
||||||
}
|
}
|
||||||
@ -378,7 +379,8 @@ func (s *ManagerTestSuite) TestOnAllocationStopped() {
|
|||||||
s.Require().True(ok)
|
s.Require().True(ok)
|
||||||
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
|
mockIdleRunners(environment.(*ExecutionEnvironmentMock))
|
||||||
|
|
||||||
environment.AddRunner(NewNomadJob(tests.DefaultRunnerID, []nomadApi.PortMapping{}, s.apiMock, func(r Runner) error { return nil }))
|
environment.AddRunner(
|
||||||
|
NewNomadJob(tests.DefaultRunnerID, []nomadApi.PortMapping{}, s.apiMock, func(r Runner) error { return nil }))
|
||||||
alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(tests.DefaultRunnerID, nil)
|
alreadyRemoved := s.nomadRunnerManager.onAllocationStopped(tests.DefaultRunnerID, nil)
|
||||||
s.False(alreadyRemoved)
|
s.False(alreadyRemoved)
|
||||||
})
|
})
|
||||||
@ -465,7 +467,7 @@ func TestNomadRunnerManager_Load(t *testing.T) {
|
|||||||
runnerManager.usedRunners.Purge()
|
runnerManager.usedRunners.Purge()
|
||||||
t.Run("Restart timeout of used runner", func(t *testing.T) {
|
t.Run("Restart timeout of used runner", func(t *testing.T) {
|
||||||
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||||
environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return()
|
environmentMock.On("DeleteRunner", mock.AnythingOfType("string")).Once().Return(false)
|
||||||
timeout := 1
|
timeout := 1
|
||||||
|
|
||||||
_, job := helpers.CreateTemplateJob()
|
_, job := helpers.CreateTemplateJob()
|
||||||
|
Reference in New Issue
Block a user