Infinite busy waiting for lost event (#31)
* Close evaluation stream for Nomad Job creation when set event handler have been finished * Remove evaluation event stream requests by handling the events via the main Nomad event handler.
This commit is contained in:
@ -177,7 +177,9 @@ func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't register job: %w", err)
|
return fmt.Errorf("couldn't register job: %w", err)
|
||||||
}
|
}
|
||||||
err = apiClient.MonitorEvaluation(evalID, context.Background())
|
ctx, cancel := context.WithTimeout(context.Background(), nomad.RegisterTimeout)
|
||||||
|
defer cancel()
|
||||||
|
err = apiClient.MonitorEvaluation(evalID, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error during the monitoring of the environment job: %w", err)
|
return fmt.Errorf("error during the monitoring of the environment job: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -116,24 +116,15 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) {
|
|||||||
err := environment.Register(apiClientMock)
|
err := environment.Register(apiClientMock)
|
||||||
|
|
||||||
assert.ErrorIs(t, err, expectedErr)
|
assert.ErrorIs(t, err, expectedErr)
|
||||||
apiClientMock.AssertNotCalled(t, "EvaluationStream")
|
apiClientMock.AssertNotCalled(t, "MonitorEvaluation")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) {
|
func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) {
|
||||||
apiClientMock := &nomad.ExecutorAPIMock{}
|
apiClientMock := &nomad.ExecutorAPIMock{}
|
||||||
evaluationID := "id"
|
evaluationID := "id"
|
||||||
|
|
||||||
stream := make(chan *nomadApi.Events)
|
|
||||||
readonlyStream := func() <-chan *nomadApi.Events {
|
|
||||||
return stream
|
|
||||||
}()
|
|
||||||
// Immediately close stream to avoid any reading from it resulting in endless wait
|
|
||||||
close(stream)
|
|
||||||
|
|
||||||
apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil)
|
apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil)
|
||||||
apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil)
|
apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil)
|
||||||
apiClientMock.On("EvaluationStream", evaluationID, mock.AnythingOfType("*context.emptyCtx")).
|
|
||||||
Return(readonlyStream, nil)
|
|
||||||
|
|
||||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, nil}
|
environment := &NomadEnvironment{"", &nomadApi.Job{}, nil}
|
||||||
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
|
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
|
||||||
|
@ -232,7 +232,7 @@ func TestNomadEnvironmentManager_List(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func mockWatchAllocations(apiMock *nomad.ExecutorAPIMock) {
|
func mockWatchAllocations(apiMock *nomad.ExecutorAPIMock) {
|
||||||
call := apiMock.On("WatchAllocations", mock.Anything, mock.Anything, mock.Anything)
|
call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything)
|
||||||
call.Run(func(args mock.Arguments) {
|
call.Run(func(args mock.Arguments) {
|
||||||
<-time.After(10 * time.Minute) // 10 minutes is the default test timeout
|
<-time.After(10 * time.Minute) // 10 minutes is the default test timeout
|
||||||
call.ReturnArguments = mock.Arguments{nil}
|
call.ReturnArguments = mock.Arguments{nil}
|
||||||
|
@ -47,12 +47,8 @@ type apiQuerier interface {
|
|||||||
// It returns the evaluation ID that can be used when listening to the Nomad event stream.
|
// It returns the evaluation ID that can be used when listening to the Nomad event stream.
|
||||||
RegisterNomadJob(job *nomadApi.Job) (string, error)
|
RegisterNomadJob(job *nomadApi.Job) (string, error)
|
||||||
|
|
||||||
// EvaluationStream returns a Nomad event stream filtered to return only events belonging to the
|
// EventStream returns a Nomad event stream filtered to return only allocation and evaluation events.
|
||||||
// given evaluation ID.
|
EventStream(ctx context.Context) (<-chan *nomadApi.Events, error)
|
||||||
EvaluationStream(evalID string, ctx context.Context) (<-chan *nomadApi.Events, error)
|
|
||||||
|
|
||||||
// AllocationStream returns a Nomad event stream filtered to return only allocation events.
|
|
||||||
AllocationStream(ctx context.Context) (<-chan *nomadApi.Events, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// nomadAPIClient implements the nomadApiQuerier interface and provides access to a real Nomad API.
|
// nomadAPIClient implements the nomadApiQuerier interface and provides access to a real Nomad API.
|
||||||
@ -136,24 +132,11 @@ func (nc *nomadAPIClient) RegisterNomadJob(job *nomadApi.Job) (string, error) {
|
|||||||
return resp.EvalID, nil
|
return resp.EvalID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *nomadAPIClient) EvaluationStream(evalID string, ctx context.Context) (<-chan *nomadApi.Events, error) {
|
func (nc *nomadAPIClient) EventStream(ctx context.Context) (<-chan *nomadApi.Events, error) {
|
||||||
stream, err := nc.client.EventStream().Stream(
|
|
||||||
ctx,
|
|
||||||
map[nomadApi.Topic][]string{
|
|
||||||
nomadApi.TopicEvaluation: {evalID},
|
|
||||||
},
|
|
||||||
0,
|
|
||||||
nc.queryOptions())
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error retrieving Nomad Evaluation event stream: %w", err)
|
|
||||||
}
|
|
||||||
return stream, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nc *nomadAPIClient) AllocationStream(ctx context.Context) (<-chan *nomadApi.Events, error) {
|
|
||||||
stream, err := nc.client.EventStream().Stream(
|
stream, err := nc.client.EventStream().Stream(
|
||||||
ctx,
|
ctx,
|
||||||
map[nomadApi.Topic][]string{
|
map[nomadApi.Topic][]string{
|
||||||
|
nomadApi.TopicEvaluation: {"*"},
|
||||||
nomadApi.TopicAllocation: {
|
nomadApi.TopicAllocation: {
|
||||||
// Necessary to have the "topic" URL param show up in the HTTP request to Nomad.
|
// Necessary to have the "topic" URL param show up in the HTTP request to Nomad.
|
||||||
// Without the param, Nomad will try to deliver all event types.
|
// Without the param, Nomad will try to deliver all event types.
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
|
// Code generated by mockery v2.9.4. DO NOT EDIT.
|
||||||
|
|
||||||
package nomad
|
package nomad
|
||||||
|
|
||||||
@ -18,8 +18,22 @@ type apiQuerierMock struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllocationStream provides a mock function with given fields: ctx
|
// DeleteJob provides a mock function with given fields: jobID
|
||||||
func (_m *apiQuerierMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) {
|
func (_m *apiQuerierMock) DeleteJob(jobID string) error {
|
||||||
|
ret := _m.Called(jobID)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(string) error); ok {
|
||||||
|
r0 = rf(jobID)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventStream provides a mock function with given fields: ctx
|
||||||
|
func (_m *apiQuerierMock) EventStream(ctx context.Context) (<-chan *api.Events, error) {
|
||||||
ret := _m.Called(ctx)
|
ret := _m.Called(ctx)
|
||||||
|
|
||||||
var r0 <-chan *api.Events
|
var r0 <-chan *api.Events
|
||||||
@ -41,43 +55,6 @@ func (_m *apiQuerierMock) AllocationStream(ctx context.Context) (<-chan *api.Eve
|
|||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRunner provides a mock function with given fields: runnerID
|
|
||||||
func (_m *apiQuerierMock) DeleteJob(runnerID string) error {
|
|
||||||
ret := _m.Called(runnerID)
|
|
||||||
|
|
||||||
var r0 error
|
|
||||||
if rf, ok := ret.Get(0).(func(string) error); ok {
|
|
||||||
r0 = rf(runnerID)
|
|
||||||
} else {
|
|
||||||
r0 = ret.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r0
|
|
||||||
}
|
|
||||||
|
|
||||||
// EvaluationStream provides a mock function with given fields: evalID, ctx
|
|
||||||
func (_m *apiQuerierMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) {
|
|
||||||
ret := _m.Called(evalID, ctx)
|
|
||||||
|
|
||||||
var r0 <-chan *api.Events
|
|
||||||
if rf, ok := ret.Get(0).(func(string, context.Context) <-chan *api.Events); ok {
|
|
||||||
r0 = rf(evalID, ctx)
|
|
||||||
} else {
|
|
||||||
if ret.Get(0) != nil {
|
|
||||||
r0 = ret.Get(0).(<-chan *api.Events)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var r1 error
|
|
||||||
if rf, ok := ret.Get(1).(func(string, context.Context) error); ok {
|
|
||||||
r1 = rf(evalID, ctx)
|
|
||||||
} else {
|
|
||||||
r1 = ret.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r0, r1
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr
|
// Execute provides a mock function with given fields: jobID, ctx, command, tty, stdin, stdout, stderr
|
||||||
func (_m *apiQuerierMock) Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
func (_m *apiQuerierMock) Execute(jobID string, ctx context.Context, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) (int, error) {
|
||||||
ret := _m.Called(jobID, ctx, command, tty, stdin, stdout, stderr)
|
ret := _m.Called(jobID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
|
@ -20,29 +20,6 @@ type ExecutorAPIMock struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllocationStream provides a mock function with given fields: ctx
|
|
||||||
func (_m *ExecutorAPIMock) AllocationStream(ctx context.Context) (<-chan *api.Events, error) {
|
|
||||||
ret := _m.Called(ctx)
|
|
||||||
|
|
||||||
var r0 <-chan *api.Events
|
|
||||||
if rf, ok := ret.Get(0).(func(context.Context) <-chan *api.Events); ok {
|
|
||||||
r0 = rf(ctx)
|
|
||||||
} else {
|
|
||||||
if ret.Get(0) != nil {
|
|
||||||
r0 = ret.Get(0).(<-chan *api.Events)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var r1 error
|
|
||||||
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
|
|
||||||
r1 = rf(ctx)
|
|
||||||
} else {
|
|
||||||
r1 = ret.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r0, r1
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteJob provides a mock function with given fields: jobID
|
// DeleteJob provides a mock function with given fields: jobID
|
||||||
func (_m *ExecutorAPIMock) DeleteJob(jobID string) error {
|
func (_m *ExecutorAPIMock) DeleteJob(jobID string) error {
|
||||||
ret := _m.Called(jobID)
|
ret := _m.Called(jobID)
|
||||||
@ -57,13 +34,13 @@ func (_m *ExecutorAPIMock) DeleteJob(jobID string) error {
|
|||||||
return r0
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
// EvaluationStream provides a mock function with given fields: evalID, ctx
|
// EventStream provides a mock function with given fields: ctx
|
||||||
func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) {
|
func (_m *ExecutorAPIMock) EventStream(ctx context.Context) (<-chan *api.Events, error) {
|
||||||
ret := _m.Called(evalID, ctx)
|
ret := _m.Called(ctx)
|
||||||
|
|
||||||
var r0 <-chan *api.Events
|
var r0 <-chan *api.Events
|
||||||
if rf, ok := ret.Get(0).(func(string, context.Context) <-chan *api.Events); ok {
|
if rf, ok := ret.Get(0).(func(context.Context) <-chan *api.Events); ok {
|
||||||
r0 = rf(evalID, ctx)
|
r0 = rf(ctx)
|
||||||
} else {
|
} else {
|
||||||
if ret.Get(0) != nil {
|
if ret.Get(0) != nil {
|
||||||
r0 = ret.Get(0).(<-chan *api.Events)
|
r0 = ret.Get(0).(<-chan *api.Events)
|
||||||
@ -71,8 +48,8 @@ func (_m *ExecutorAPIMock) EvaluationStream(evalID string, ctx context.Context)
|
|||||||
}
|
}
|
||||||
|
|
||||||
var r1 error
|
var r1 error
|
||||||
if rf, ok := ret.Get(1).(func(string, context.Context) error); ok {
|
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
|
||||||
r1 = rf(evalID, ctx)
|
r1 = rf(ctx)
|
||||||
} else {
|
} else {
|
||||||
r1 = ret.Error(1)
|
r1 = ret.Error(1)
|
||||||
}
|
}
|
||||||
@ -335,8 +312,8 @@ func (_m *ExecutorAPIMock) SetJobScale(jobID string, count uint, reason string)
|
|||||||
return r0
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchAllocations provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation
|
// WatchEventStream provides a mock function with given fields: ctx, onNewAllocation, onDeletedAllocation
|
||||||
func (_m *ExecutorAPIMock) WatchAllocations(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error {
|
func (_m *ExecutorAPIMock) WatchEventStream(ctx context.Context, onNewAllocation AllocationProcessor, onDeletedAllocation AllocationProcessor) error {
|
||||||
ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation)
|
ret := _m.Called(ctx, onNewAllocation, onDeletedAllocation)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/openHPI/poseidon/pkg/dto"
|
"github.com/openHPI/poseidon/pkg/dto"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -27,6 +28,7 @@ const (
|
|||||||
ConfigMetaTimeoutKey = "timeout"
|
ConfigMetaTimeoutKey = "timeout"
|
||||||
ConfigMetaPoolSizeKey = "prewarmingPoolSize"
|
ConfigMetaPoolSizeKey = "prewarmingPoolSize"
|
||||||
TemplateJobNameParts = 2
|
TemplateJobNameParts = 2
|
||||||
|
RegisterTimeout = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -44,7 +46,10 @@ func (a *APIClient) RegisterRunnerJob(template *nomadApi.Job) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't register runner job: %w", err)
|
return fmt.Errorf("couldn't register runner job: %w", err)
|
||||||
}
|
}
|
||||||
return a.MonitorEvaluation(evalID, context.Background())
|
|
||||||
|
registerTimeout, cancel := context.WithTimeout(context.Background(), RegisterTimeout)
|
||||||
|
defer cancel()
|
||||||
|
return a.MonitorEvaluation(evalID, registerTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
func FindTaskGroup(job *nomadApi.Job, name string) *nomadApi.TaskGroup {
|
func FindTaskGroup(job *nomadApi.Job, name string) *nomadApi.TaskGroup {
|
||||||
|
@ -24,6 +24,9 @@ var (
|
|||||||
ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found")
|
ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected.
|
||||||
|
const resultChannelWriteTimeout = 10 * time.Millisecond
|
||||||
|
|
||||||
type AllocationProcessor func(*nomadApi.Allocation)
|
type AllocationProcessor func(*nomadApi.Allocation)
|
||||||
|
|
||||||
// ExecutorAPI provides access to a container orchestration solution.
|
// ExecutorAPI provides access to a container orchestration solution.
|
||||||
@ -53,9 +56,10 @@ type ExecutorAPI interface {
|
|||||||
// See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175
|
// See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175
|
||||||
MonitorEvaluation(evaluationID string, ctx context.Context) error
|
MonitorEvaluation(evaluationID string, ctx context.Context) error
|
||||||
|
|
||||||
// WatchAllocations listens on the Nomad event stream for allocation events.
|
// WatchEventStream listens on the Nomad event stream for allocation and evaluation events.
|
||||||
// Depending on the incoming event, any of the given function is executed.
|
// Depending on the incoming event, any of the given function is executed.
|
||||||
WatchAllocations(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error
|
// Do not run multiple times simultaneously.
|
||||||
|
WatchEventStream(ctx context.Context, onNewAllocation, onDeletedAllocation AllocationProcessor) error
|
||||||
|
|
||||||
// ExecuteCommand executes the given command in the allocation with the given id.
|
// ExecuteCommand executes the given command in the allocation with the given id.
|
||||||
// It writes the output of the command to stdout/stderr and reads input from stdin.
|
// It writes the output of the command to stdout/stderr and reads input from stdin.
|
||||||
@ -71,12 +75,14 @@ type ExecutorAPI interface {
|
|||||||
// Executor API and its return values.
|
// Executor API and its return values.
|
||||||
type APIClient struct {
|
type APIClient struct {
|
||||||
apiQuerier
|
apiQuerier
|
||||||
|
evaluations map[string]chan error
|
||||||
|
isListening bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutorAPI creates a new api client.
|
// NewExecutorAPI creates a new api client.
|
||||||
// One client is usually sufficient for the complete runtime of the API.
|
// One client is usually sufficient for the complete runtime of the API.
|
||||||
func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) {
|
func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) {
|
||||||
client := &APIClient{apiQuerier: &nomadAPIClient{}}
|
client := &APIClient{apiQuerier: &nomadAPIClient{}, evaluations: map[string]chan error{}}
|
||||||
err := client.init(nomadConfig)
|
err := client.init(nomadConfig)
|
||||||
return client, err
|
return client, err
|
||||||
}
|
}
|
||||||
@ -136,29 +142,53 @@ func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi
|
|||||||
return jobs, occurredError
|
return jobs, occurredError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error {
|
func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) (err error) {
|
||||||
stream, err := a.apiQuerier.EvaluationStream(evaluationID, ctx)
|
a.evaluations[evaluationID] = make(chan error, 1)
|
||||||
if err != nil {
|
defer delete(a.evaluations, evaluationID)
|
||||||
return fmt.Errorf("failed retrieving evaluation stream: %w", err)
|
|
||||||
|
if !a.isListening {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithCancel(ctx)
|
||||||
|
defer cancel() // cancel the WatchEventStream when the evaluation result was read.
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err = a.WatchEventStream(ctx, func(_ *nomadApi.Allocation) {}, func(_ *nomadApi.Allocation) {})
|
||||||
|
cancel() // cancel the waiting for an evaluation result if watching the event stream ends.
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return err
|
||||||
|
case err := <-a.evaluations[evaluationID]:
|
||||||
|
// At the moment we expect only one error to be sent via this channel.
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
// If ctx is canceled, the stream will be closed by Nomad and we exit the for loop.
|
|
||||||
return receiveAndHandleNomadAPIEvents(stream, handleEvaluationEvent)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *APIClient) WatchAllocations(ctx context.Context,
|
func (a *APIClient) WatchEventStream(ctx context.Context,
|
||||||
onNewAllocation, onDeletedAllocation AllocationProcessor) error {
|
onNewAllocation, onDeletedAllocation AllocationProcessor) error {
|
||||||
startTime := time.Now().UnixNano()
|
startTime := time.Now().UnixNano()
|
||||||
stream, err := a.AllocationStream(ctx)
|
stream, err := a.EventStream(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed retrieving allocation stream: %w", err)
|
return fmt.Errorf("failed retrieving allocation stream: %w", err)
|
||||||
}
|
}
|
||||||
pendingAllocations := make(map[string]bool)
|
pendingAllocations := make(map[string]bool)
|
||||||
|
|
||||||
handler := func(event *nomadApi.Event) (bool, error) {
|
handler := func(event *nomadApi.Event) (bool, error) {
|
||||||
return false, handleAllocationEvent(startTime, pendingAllocations, event, onNewAllocation, onDeletedAllocation)
|
switch event.Topic {
|
||||||
|
case nomadApi.TopicEvaluation:
|
||||||
|
return false, handleEvaluationEvent(a.evaluations, event)
|
||||||
|
case nomadApi.TopicAllocation:
|
||||||
|
return false, handleAllocationEvent(startTime, pendingAllocations, event, onNewAllocation, onDeletedAllocation)
|
||||||
|
default:
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.isListening = true
|
||||||
err = receiveAndHandleNomadAPIEvents(stream, handler)
|
err = receiveAndHandleNomadAPIEvents(stream, handler)
|
||||||
|
a.isListening = false
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,21 +221,29 @@ func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler noma
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleEvaluationEvent is a nomadAPIEventHandler that returns whether the evaluation described by the event
|
// handleEvaluationEvent is an event handler that returns whether the evaluation described by the event
|
||||||
// was successful.
|
// was successful.
|
||||||
func handleEvaluationEvent(event *nomadApi.Event) (bool, error) {
|
func handleEvaluationEvent(evaluations map[string]chan error, event *nomadApi.Event) error {
|
||||||
eval, err := event.Evaluation()
|
eval, err := event.Evaluation()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("failed to monitor evaluation: %w", err)
|
return fmt.Errorf("failed to monitor evaluation: %w", err)
|
||||||
}
|
}
|
||||||
switch eval.Status {
|
switch eval.Status {
|
||||||
case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed:
|
case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed:
|
||||||
return true, checkEvaluation(eval)
|
resultChannel, ok := evaluations[eval.ID]
|
||||||
|
if ok {
|
||||||
|
select {
|
||||||
|
case resultChannel <- checkEvaluation(eval):
|
||||||
|
close(resultChannel)
|
||||||
|
case <-time.After(resultChannelWriteTimeout):
|
||||||
|
log.WithField("eval", eval).Error("Full evaluation channel")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleAllocationEvent is a nomadAPIEventHandler that processes allocation events.
|
// handleAllocationEvent is an event handler that processes allocation events.
|
||||||
// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation
|
// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation
|
||||||
// is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the
|
// is called. The pendingAllocations map is used to store allocations that are pending but not started yet. Using the
|
||||||
// map the state is persisted between multiple calls of this function.
|
// map the state is persisted between multiple calls of this function.
|
||||||
|
@ -124,6 +124,7 @@ func (s *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() {
|
|||||||
const TestNamespace = "unit-tests"
|
const TestNamespace = "unit-tests"
|
||||||
const TestNomadToken = "n0m4d-t0k3n"
|
const TestNomadToken = "n0m4d-t0k3n"
|
||||||
const TestDefaultAddress = "127.0.0.1"
|
const TestDefaultAddress = "127.0.0.1"
|
||||||
|
const evaluationID = "evaluation-id"
|
||||||
|
|
||||||
func NomadTestConfig(address string) *config.Nomad {
|
func NomadTestConfig(address string) *config.Nomad {
|
||||||
return &config.Nomad{
|
return &config.Nomad{
|
||||||
@ -168,12 +169,13 @@ func asynchronouslyMonitorEvaluation(stream chan *nomadApi.Events) chan error {
|
|||||||
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
|
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
|
||||||
|
|
||||||
apiMock := &apiQuerierMock{}
|
apiMock := &apiQuerierMock{}
|
||||||
apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil)
|
apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")).
|
||||||
apiClient := &APIClient{apiMock}
|
Return(readOnlyStream, nil)
|
||||||
|
apiClient := &APIClient{apiMock, map[string]chan error{}, false}
|
||||||
|
|
||||||
errChan := make(chan error)
|
errChan := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
errChan <- apiClient.MonitorEvaluation("id", ctx)
|
errChan <- apiClient.MonitorEvaluation(evaluationID, ctx)
|
||||||
}()
|
}()
|
||||||
return errChan
|
return errChan
|
||||||
}
|
}
|
||||||
@ -195,9 +197,9 @@ func TestApiClient_MonitorEvaluationReturnsNilWhenStreamIsClosed(t *testing.T) {
|
|||||||
|
|
||||||
func TestApiClient_MonitorEvaluationReturnsErrorWhenStreamReturnsError(t *testing.T) {
|
func TestApiClient_MonitorEvaluationReturnsErrorWhenStreamReturnsError(t *testing.T) {
|
||||||
apiMock := &apiQuerierMock{}
|
apiMock := &apiQuerierMock{}
|
||||||
apiMock.On("EvaluationStream", mock.AnythingOfType("string"), mock.AnythingOfType("*context.emptyCtx")).
|
apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")).
|
||||||
Return(nil, tests.ErrDefault)
|
Return(nil, tests.ErrDefault)
|
||||||
apiClient := &APIClient{apiMock}
|
apiClient := &APIClient{apiMock, map[string]chan error{}, false}
|
||||||
err := apiClient.MonitorEvaluation("id", context.Background())
|
err := apiClient.MonitorEvaluation("id", context.Background())
|
||||||
assert.ErrorIs(t, err, tests.ErrDefault)
|
assert.ErrorIs(t, err, tests.ErrDefault)
|
||||||
}
|
}
|
||||||
@ -278,8 +280,8 @@ func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{[]*nomadApi.Events{&events}, 1,
|
{[]*nomadApi.Events{&events}, 1,
|
||||||
"it completes with successful event"},
|
"it completes with successful event"},
|
||||||
{[]*nomadApi.Events{&events, &events}, 1,
|
{[]*nomadApi.Events{&events, &events}, 2,
|
||||||
"it completes at first successful event"},
|
"it keeps listening after first successful event"},
|
||||||
{[]*nomadApi.Events{{}, &events}, 2,
|
{[]*nomadApi.Events{{}, &events}, 2,
|
||||||
"it skips heartbeat and completes"},
|
"it skips heartbeat and completes"},
|
||||||
{[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2,
|
{[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2,
|
||||||
@ -298,7 +300,7 @@ func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) {
|
func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) {
|
||||||
eval := nomadApi.Evaluation{Status: structs.EvalStatusFailed}
|
eval := nomadApi.Evaluation{ID: evaluationID, Status: structs.EvalStatusFailed}
|
||||||
evalErr := checkEvaluation(&eval)
|
evalErr := checkEvaluation(&eval)
|
||||||
require.NotNil(t, evalErr)
|
require.NotNil(t, evalErr)
|
||||||
|
|
||||||
@ -318,9 +320,9 @@ func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
}{
|
}{
|
||||||
{[]*nomadApi.Events{&events}, 1, evalErr,
|
{[]*nomadApi.Events{&events}, 1, evalErr,
|
||||||
"it fails with failing event"},
|
"it completes with failing event"},
|
||||||
{[]*nomadApi.Events{&events, &events}, 1, evalErr,
|
{[]*nomadApi.Events{&events, &events}, 1, evalErr,
|
||||||
"it fails at first failing event"},
|
"it does not fail after first failing event"},
|
||||||
{[]*nomadApi.Events{{}, &events}, 2, evalErr,
|
{[]*nomadApi.Events{{}, &events}, 2, evalErr,
|
||||||
"it skips heartbeat and fail"},
|
"it skips heartbeat and fail"},
|
||||||
{[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2, evalErr,
|
{[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2, evalErr,
|
||||||
@ -510,11 +512,11 @@ func TestHandleAllocationEventBuffersPendingAllocation(t *testing.T) {
|
|||||||
|
|
||||||
func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) {
|
func TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved(t *testing.T) {
|
||||||
apiMock := &apiQuerierMock{}
|
apiMock := &apiQuerierMock{}
|
||||||
apiMock.On("AllocationStream", mock.Anything).Return(nil, tests.ErrDefault)
|
apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault)
|
||||||
apiClient := &APIClient{apiMock}
|
apiClient := &APIClient{apiMock, map[string]chan error{}, false}
|
||||||
|
|
||||||
noop := func(a *nomadApi.Allocation) {}
|
noop := func(a *nomadApi.Allocation) {}
|
||||||
err := apiClient.WatchAllocations(context.Background(), noop, noop)
|
err := apiClient.WatchEventStream(context.Background(), noop, noop)
|
||||||
assert.ErrorIs(t, err, tests.ErrDefault)
|
assert.ErrorIs(t, err, tests.ErrDefault)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -579,12 +581,12 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events,
|
|||||||
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
|
readOnlyStream := func() <-chan *nomadApi.Events { return stream }()
|
||||||
|
|
||||||
apiMock := &apiQuerierMock{}
|
apiMock := &apiQuerierMock{}
|
||||||
apiMock.On("AllocationStream", ctx).Return(readOnlyStream, nil)
|
apiMock.On("EventStream", ctx).Return(readOnlyStream, nil)
|
||||||
apiClient := &APIClient{apiMock}
|
apiClient := &APIClient{apiMock, map[string]chan error{}, false}
|
||||||
|
|
||||||
errChan := make(chan error)
|
errChan := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
errChan <- apiClient.WatchAllocations(ctx, onNewAllocation, onDeletedAllocation)
|
errChan <- apiClient.WatchEventStream(ctx, onNewAllocation, onDeletedAllocation)
|
||||||
}()
|
}()
|
||||||
return errChan
|
return errChan
|
||||||
}
|
}
|
||||||
|
@ -228,7 +228,7 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger
|
|||||||
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
||||||
retries := 0
|
retries := 0
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
err := m.apiClient.WatchAllocations(ctx, m.onAllocationAdded, m.onAllocationStopped)
|
err := m.apiClient.WatchEventStream(ctx, m.onAllocationAdded, m.onAllocationStopped)
|
||||||
retries += 1
|
retries += 1
|
||||||
log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
|
log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
|
||||||
<-time.After(time.Second)
|
<-time.After(time.Second)
|
||||||
|
@ -42,7 +42,7 @@ func (s *ManagerTestSuite) SetupTest() {
|
|||||||
func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []string) {
|
func mockRunnerQueries(apiMock *nomad.ExecutorAPIMock, returnedRunnerIds []string) {
|
||||||
// reset expected calls to allow new mocked return values
|
// reset expected calls to allow new mocked return values
|
||||||
apiMock.ExpectedCalls = []*mock.Call{}
|
apiMock.ExpectedCalls = []*mock.Call{}
|
||||||
call := apiMock.On("WatchAllocations", mock.Anything, mock.Anything, mock.Anything)
|
call := apiMock.On("WatchEventStream", mock.Anything, mock.Anything, mock.Anything)
|
||||||
call.Run(func(args mock.Arguments) {
|
call.Run(func(args mock.Arguments) {
|
||||||
<-time.After(10 * time.Minute) // 10 minutes is the default test timeout
|
<-time.After(10 * time.Minute) // 10 minutes is the default test timeout
|
||||||
call.ReturnArguments = mock.Arguments{nil}
|
call.ReturnArguments = mock.Arguments{nil}
|
||||||
@ -206,7 +206,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersLogsErrorFromWatchAllocation() {
|
|||||||
var hook *test.Hook
|
var hook *test.Hook
|
||||||
logger, hook := test.NewNullLogger()
|
logger, hook := test.NewNullLogger()
|
||||||
log = logger.WithField("pkg", "runner")
|
log = logger.WithField("pkg", "runner")
|
||||||
modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) {
|
modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) {
|
||||||
call.Run(func(args mock.Arguments) {
|
call.Run(func(args mock.Arguments) {
|
||||||
call.ReturnArguments = mock.Arguments{tests.ErrDefault}
|
call.ReturnArguments = mock.Arguments{tests.ErrDefault}
|
||||||
})
|
})
|
||||||
@ -232,7 +232,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersAddsIdleRunner() {
|
|||||||
_, ok = environment.Sample(s.apiMock)
|
_, ok = environment.Sample(s.apiMock)
|
||||||
s.Require().False(ok)
|
s.Require().False(ok)
|
||||||
|
|
||||||
modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) {
|
modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) {
|
||||||
call.Run(func(args mock.Arguments) {
|
call.Run(func(args mock.Arguments) {
|
||||||
onCreate, ok := args.Get(1).(nomad.AllocationProcessor)
|
onCreate, ok := args.Get(1).(nomad.AllocationProcessor)
|
||||||
s.Require().True(ok)
|
s.Require().True(ok)
|
||||||
@ -260,7 +260,7 @@ func (s *ManagerTestSuite) TestUpdateRunnersRemovesIdleAndUsedRunner() {
|
|||||||
environment.AddRunner(testRunner)
|
environment.AddRunner(testRunner)
|
||||||
s.nomadRunnerManager.usedRunners.Add(testRunner)
|
s.nomadRunnerManager.usedRunners.Add(testRunner)
|
||||||
|
|
||||||
modifyMockedCall(s.apiMock, "WatchAllocations", func(call *mock.Call) {
|
modifyMockedCall(s.apiMock, "WatchEventStream", func(call *mock.Call) {
|
||||||
call.Run(func(args mock.Arguments) {
|
call.Run(func(args mock.Arguments) {
|
||||||
onDelete, ok := args.Get(2).(nomad.AllocationProcessor)
|
onDelete, ok := args.Get(2).(nomad.AllocationProcessor)
|
||||||
s.Require().True(ok)
|
s.Require().True(ok)
|
||||||
|
Reference in New Issue
Block a user