Refactor MemoryLeakTestSuite

as we identified two issues where the goroutine count from before differs from after the test.

1) It seemed like a Go runtime specific Goroutine appeared in rare cases before the test. To avoid this, we introduced a short timeout before looking up the Goroutines.
Another solution might be to do the lookup twice and check if the count matches.

2) A Goroutine that periodically monitors some storage unexpectedly got closed in rare cases. As we could not identify the cause for this, we removed the leaking Goroutines by properly cleaning up.
This commit is contained in:
Maximilian Paß
2024-02-27 21:42:18 +01:00
parent 80b8c27924
commit ab938bfc22
6 changed files with 56 additions and 33 deletions

View File

@ -51,8 +51,8 @@ func (s *MainTestSuite) TestShutdownOnOSSignal_Profiling() {
disableRecovery, cancel := context.WithCancel(context.Background()) disableRecovery, cancel := context.WithCancel(context.Background())
cancel() cancel()
s.ExpectedGoroutingIncrease++ // The shutdownOnOSSignal waits for an exit after stopping the profiling. s.ExpectedGoroutineIncrease++ // The shutdownOnOSSignal waits for an exit after stopping the profiling.
s.ExpectedGoroutingIncrease++ // The shutdownOnOSSignal triggers a os.Signal Goroutine. s.ExpectedGoroutineIncrease++ // The shutdownOnOSSignal triggers a os.Signal Goroutine.
server := initServer(initRouter(disableRecovery)) server := initServer(initRouter(disableRecovery))
go shutdownOnOSSignal(server, context.Background(), func() { go shutdownOnOSSignal(server, context.Background(), func() {

View File

@ -90,14 +90,16 @@ func (s *EnvironmentControllerTestSuite) TestList() {
}) })
s.Run("returns multiple environments", func() { s.Run("returns multiple environments", func() {
s.ExpectedGoroutingIncrease++ // We dont care to delete the created environment. apiMock := &nomad.ExecutorAPIMock{}
s.ExpectedGoroutingIncrease++ // Also not about the second. apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
var firstEnvironment, secondEnvironment *environment.NomadEnvironment
call.Run(func(args mock.Arguments) { call.Run(func(args mock.Arguments) {
firstEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, firstEnvironment, err = environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock,
fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger)))
s.Require().NoError(err) s.Require().NoError(err)
secondEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, secondEnvironment, err = environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock,
fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger)))
s.Require().NoError(err) s.Require().NoError(err)
call.ReturnArguments = mock.Arguments{[]runner.ExecutionEnvironment{firstEnvironment, secondEnvironment}, nil} call.ReturnArguments = mock.Arguments{[]runner.ExecutionEnvironment{firstEnvironment, secondEnvironment}, nil}
@ -114,6 +116,11 @@ func (s *EnvironmentControllerTestSuite) TestList() {
environments, ok := environmentsInterface.([]interface{}) environments, ok := environmentsInterface.([]interface{})
s.Require().True(ok) s.Require().True(ok)
s.Equal(2, len(environments)) s.Equal(2, len(environments))
err = firstEnvironment.Delete(tests.ErrCleanupDestroyReason)
s.NoError(err)
err = secondEnvironment.Delete(tests.ErrCleanupDestroyReason)
s.NoError(err)
}) })
} }
@ -155,10 +162,13 @@ func (s *EnvironmentControllerTestSuite) TestGet() {
s.manager.Calls = []mock.Call{} s.manager.Calls = []mock.Call{}
s.Run("returns environment", func() { s.Run("returns environment", func() {
s.ExpectedGoroutingIncrease++ // We dont care to delete the created environment. apiMock := &nomad.ExecutorAPIMock{}
apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
var testEnvironment *environment.NomadEnvironment
call.Run(func(args mock.Arguments) { call.Run(func(args mock.Arguments) {
testEnvironment, err := environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, nil, testEnvironment, err = environment.NewNomadEnvironment(tests.DefaultEnvironmentIDAsInteger, apiMock,
fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger))) fmt.Sprintf(jobHCLBasicFormat, nomad.TemplateJobID(tests.DefaultEnvironmentIDAsInteger)))
s.Require().NoError(err) s.Require().NoError(err)
call.ReturnArguments = mock.Arguments{testEnvironment, nil} call.ReturnArguments = mock.Arguments{testEnvironment, nil}
@ -176,6 +186,9 @@ func (s *EnvironmentControllerTestSuite) TestGet() {
idFloat, ok := idInterface.(float64) idFloat, ok := idInterface.(float64)
s.Require().True(ok) s.Require().True(ok)
s.Equal(tests.DefaultEnvironmentIDAsInteger, int(idFloat)) s.Equal(tests.DefaultEnvironmentIDAsInteger, int(idFloat))
err = testEnvironment.Delete(tests.ErrCleanupDestroyReason)
s.NoError(err)
}) })
} }

View File

@ -294,7 +294,8 @@ func (s *MainTestSuite) TestWebsocketTLS() {
func (s *MainTestSuite) TestWebSocketProxyStopsReadingTheWebSocketAfterClosingIt() { func (s *MainTestSuite) TestWebSocketProxyStopsReadingTheWebSocketAfterClosingIt() {
apiMock := &nomad.ExecutorAPIMock{} apiMock := &nomad.ExecutorAPIMock{}
executionID := tests.DefaultExecutionID executionID := tests.DefaultExecutionID
r, wsURL := newRunnerWithNotMockedRunnerManager(s, apiMock, executionID) r, wsURL, cleanup := newRunnerWithNotMockedRunnerManager(s, apiMock, executionID)
defer cleanup()
logger, hook := test.NewNullLogger() logger, hook := test.NewNullLogger()
log = logger.WithField("pkg", "api") log = logger.WithField("pkg", "api")
@ -329,9 +330,10 @@ func newNomadAllocationWithMockedAPIClient(runnerID string) (runner.Runner, *nom
} }
func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.ExecutorAPIMock, executionID string) ( func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.ExecutorAPIMock, executionID string) (
r runner.Runner, wsURL *url.URL) { r runner.Runner, wsURL *url.URL, cleanup func()) {
s.T().Helper() s.T().Helper()
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil) apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil) apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil)
call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything) call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything)
@ -342,13 +344,11 @@ func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.Execut
runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx) runnerManager := runner.NewNomadRunnerManager(apiMock, s.TestCtx)
router := NewRouter(runnerManager, nil) router := NewRouter(runnerManager, nil)
s.ExpectedGoroutingIncrease++ // We don't care about closing the server at this point. s.ExpectedGoroutineIncrease++ // The server is not closing properly. Therefore, we don't even try.
server := httptest.NewServer(router) server := httptest.NewServer(router)
runnerID := tests.DefaultRunnerID runnerID := tests.DefaultRunnerID
s.ExpectedGoroutingIncrease++ // We don't care about removing the runner at this place.
runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, nil) runnerJob := runner.NewNomadJob(runnerID, nil, apiMock, nil)
s.ExpectedGoroutingIncrease++ // We don't care about removing the environment at this place.
e, err := environment.NewNomadEnvironment(0, apiMock, "job \"template-0\" {}") e, err := environment.NewNomadEnvironment(0, apiMock, "job \"template-0\" {}")
s.Require().NoError(err) s.Require().NoError(err)
eID, err := nomad.EnvironmentIDFromRunnerID(runnerID) eID, err := nomad.EnvironmentIDFromRunnerID(runnerID)
@ -362,7 +362,13 @@ func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.Execut
s.Require().NoError(err) s.Require().NoError(err)
wsURL, err = webSocketURL("ws", server, router, r.ID(), executionID) wsURL, err = webSocketURL("ws", server, router, r.ID(), executionID)
s.Require().NoError(err) s.Require().NoError(err)
return r, wsURL
return r, wsURL, func() {
err = r.Destroy(tests.ErrCleanupDestroyReason)
s.NoError(err)
err = e.Delete(tests.ErrCleanupDestroyReason)
s.NoError(err)
}
} }
func webSocketURL(scheme string, server *httptest.Server, router *mux.Router, func webSocketURL(scheme string, server *httptest.Server, router *mux.Router,

View File

@ -59,7 +59,7 @@ func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentRe
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false) s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false)
s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true) s.runnerManagerMock.On("StoreEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
s.ExpectedGoroutingIncrease++ // We don't care about removing the created environment. s.ExpectedGoroutineIncrease++ // We don't care about removing the created environment.
_, err := s.manager.CreateOrUpdate( _, err := s.manager.CreateOrUpdate(
dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request, context.Background()) dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request, context.Background())
s.ErrorIs(err, tests.ErrDefault) s.ErrorIs(err, tests.ErrDefault)
@ -89,7 +89,7 @@ func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() {
call.ReturnArguments = mock.Arguments{nil} call.ReturnArguments = mock.Arguments{nil}
}) })
s.ExpectedGoroutingIncrease++ // We dont care about removing the created environment at this point. s.ExpectedGoroutineIncrease++ // We dont care about removing the created environment at this point.
_, err := s.manager.CreateOrUpdate( _, err := s.manager.CreateOrUpdate(
dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request, context.Background()) dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request, context.Background())
s.NoError(err) s.NoError(err)

View File

@ -528,7 +528,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
jobID := tests.DefaultRunnerID jobID := tests.DefaultRunnerID
job.ID = &jobID job.ID = &jobID
job.Name = &jobID job.Name = &jobID
s.ExpectedGoroutingIncrease++ // We dont care about destroying the created runner. s.ExpectedGoroutineIncrease++ // We dont care about destroying the created runner.
call.Return([]*nomadApi.Job{job}, nil) call.Return([]*nomadApi.Job{job}, nil)
runnerManager.Load() runnerManager.Load()
@ -544,7 +544,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
s.Require().NotNil(configTaskGroup) s.Require().NotNil(configTaskGroup)
configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue
s.ExpectedGoroutingIncrease++ // We don't care about destroying the created runner. s.ExpectedGoroutineIncrease++ // We don't care about destroying the created runner.
call.Return([]*nomadApi.Job{job}, nil) call.Return([]*nomadApi.Job{job}, nil)
s.Require().Zero(runnerManager.usedRunners.Length()) s.Require().Zero(runnerManager.usedRunners.Length())

View File

@ -31,31 +31,35 @@ var numGoroutines = regexp.MustCompile(`^goroutine profile: total (\d*)\n`)
// Be aware not to overwrite the SetupTest or TearDownTest function! // Be aware not to overwrite the SetupTest or TearDownTest function!
type MemoryLeakTestSuite struct { type MemoryLeakTestSuite struct {
suite.Suite suite.Suite
ExpectedGoroutingIncrease int ExpectedGoroutineIncrease int
TestCtx context.Context TestCtx context.Context
testCtxCancel context.CancelFunc testCtxCancel context.CancelFunc
goroutineCountBefore int goroutineCountBefore int
goroutinesBefore *bytes.Buffer goroutinesBefore *bytes.Buffer
} }
func (s *MemoryLeakTestSuite) SetupTest() { func (s *MemoryLeakTestSuite) lookupGoroutines() (debugOutput *bytes.Buffer, goroutineCount int) {
// Without this first line we observed some goroutines just closing. debugOutput = &bytes.Buffer{}
runtime.Gosched() err := pprof.Lookup("goroutine").WriteTo(debugOutput, 1)
s.ExpectedGoroutingIncrease = 0
s.goroutinesBefore = &bytes.Buffer{}
err := pprof.Lookup("goroutine").WriteTo(s.goroutinesBefore, 1)
s.Require().NoError(err) s.Require().NoError(err)
match := numGoroutines.FindSubmatch(s.goroutinesBefore.Bytes()) match := numGoroutines.FindSubmatch(debugOutput.Bytes())
if match == nil { if match == nil {
s.Fail("gouroutines could not be parsed: " + s.goroutinesBefore.String()) s.Fail("gouroutines could not be parsed: " + debugOutput.String())
} }
// We do not use runtime.NumGoroutine() to not create inconsistency to the Lookup. // We do not use runtime.NumGoroutine() to not create inconsistency to the Lookup.
s.goroutineCountBefore, err = strconv.Atoi(string(match[1])) goroutineCount, err = strconv.Atoi(string(match[1]))
if err != nil { if err != nil {
s.Fail("number of goroutines could not be parsed: " + err.Error()) s.Fail("number of goroutines could not be parsed: " + err.Error())
} }
return debugOutput, goroutineCount
}
func (s *MemoryLeakTestSuite) SetupTest() {
runtime.Gosched() // Flush done Goroutines
<-time.After(ShortTimeout) // Just to make sure
s.ExpectedGoroutineIncrease = 0
s.goroutinesBefore, s.goroutineCountBefore = s.lookupGoroutines()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s.TestCtx = ctx s.TestCtx = ctx
@ -66,13 +70,13 @@ func (s *MemoryLeakTestSuite) TearDownTest() {
s.testCtxCancel() s.testCtxCancel()
runtime.Gosched() // Flush done Goroutines runtime.Gosched() // Flush done Goroutines
<-time.After(ShortTimeout) // Just to make sure <-time.After(ShortTimeout) // Just to make sure
goroutinesAfter := runtime.NumGoroutine()
s.Equal(s.goroutineCountBefore+s.ExpectedGoroutingIncrease, goroutinesAfter)
if s.goroutineCountBefore+s.ExpectedGoroutingIncrease != goroutinesAfter { goroutinesAfter, goroutineCountAfter := s.lookupGoroutines()
s.Equal(s.goroutineCountBefore+s.ExpectedGoroutineIncrease, goroutineCountAfter)
if s.goroutineCountBefore+s.ExpectedGoroutineIncrease != goroutineCountAfter {
_, err := io.Copy(os.Stderr, s.goroutinesBefore) _, err := io.Copy(os.Stderr, s.goroutinesBefore)
s.NoError(err) s.NoError(err)
err = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) _, err = io.Copy(os.Stderr, goroutinesAfter)
s.NoError(err) s.NoError(err)
} }
} }