diff --git a/environment/job.go b/environment/job.go index 31b6d54..935eb18 100644 --- a/environment/job.go +++ b/environment/job.go @@ -1,6 +1,7 @@ package environment import ( + "context" _ "embed" "fmt" nomadApi "github.com/hashicorp/nomad/api" @@ -14,9 +15,46 @@ const ( TaskNameFormat = "%s-task" ) +// defaultJobHCL holds our default job in HCL format. +// The default job is used when creating new job and provides +// common settings that all the jobs share. //go:embed default-job.hcl var defaultJobHCL string +// registerJob creates a Nomad job based on the default job configuration and the given parameters. +// It registers the job with Nomad and waits until the registration completes. +func (m *NomadEnvironmentManager) registerJob( + id string, + prewarmingPoolSize, cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) error { + job := createJob(m.defaultJob, id, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + evalID, err := m.api.RegisterNomadJob(job) + if err != nil { + return err + } + return m.api.MonitorEvaluation(evalID, context.Background()) +} + +func createJob( + defaultJob nomadApi.Job, + id string, + prewarmingPoolSize, cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) *nomadApi.Job { + + job := defaultJob + job.ID = &id + job.Name = &id + + var taskGroup = createTaskGroup(&job, fmt.Sprintf(nomad.TaskGroupNameFormat, id), prewarmingPoolSize) + configureTask(taskGroup, fmt.Sprintf(TaskNameFormat, id), cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + + return &job +} + func parseJob(jobHCL string) *nomadApi.Job { config := jobspec2.ParseConfig{ Body: []byte(jobHCL), @@ -25,7 +63,7 @@ func parseJob(jobHCL string) *nomadApi.Job { } job, err := jobspec2.ParseWithConfig(&config) if err != nil { - log.WithError(err).Fatal("Error parsing default Nomad job") + log.WithError(err).Fatal("Error parsing Nomad job") return nil } @@ -48,7 +86,7 @@ func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *n func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) { if len(taskGroup.Tasks) == 0 { - // This function is only used internally and must be called after configuring the task. + // This function is only used internally and must be called as last step when configuring the task. // This error is not recoverable. log.Fatal("Can't configure network before task has been configured!") } @@ -62,8 +100,8 @@ func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposed } else { networkResource = taskGroup.Networks[0] } - // prefer "bridge" network over "host" to have an isolated network namespace with bridged interface - // instead of joining the host network namespace + // Prefer "bridge" network over "host" to have an isolated network namespace with bridged interface + // instead of joining the host network namespace. networkResource.Mode = "bridge" for _, portNumber := range exposedPorts { port := nomadApi.Port{ @@ -73,8 +111,8 @@ func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposed networkResource.DynamicPorts = append(networkResource.DynamicPorts, port) } } else { - // somehow, we can't set the network mode to none in the NetworkResource on task group level - // see https://github.com/hashicorp/nomad/issues/10540 + // Somehow, we can't set the network mode to none in the NetworkResource on task group level. + // See https://github.com/hashicorp/nomad/issues/10540 if task.Config == nil { task.Config = make(map[string]interface{}) } @@ -97,11 +135,11 @@ func configureTask( task = taskGroup.Tasks[0] task.Name = name } - iCpuLimit := int(cpuLimit) - iMemoryLimit := int(memoryLimit) + integerCpuLimit := int(cpuLimit) + integerMemoryLimit := int(memoryLimit) task.Resources = &nomadApi.Resources{ - CPU: &iCpuLimit, - MemoryMB: &iMemoryLimit, + CPU: &integerCpuLimit, + MemoryMB: &integerMemoryLimit, } if task.Config == nil { @@ -111,20 +149,3 @@ func configureTask( configureNetwork(taskGroup, networkAccess, exposedPorts) } - -func (m *NomadEnvironmentManager) createJob( - id string, - prewarmingPoolSize, cpuLimit, memoryLimit uint, - image string, - networkAccess bool, - exposedPorts []uint16) *nomadApi.Job { - - job := m.defaultJob - job.ID = &id - job.Name = &id - - var taskGroup = createTaskGroup(&job, fmt.Sprintf(nomad.TaskGroupNameFormat, id), prewarmingPoolSize) - configureTask(taskGroup, fmt.Sprintf(TaskNameFormat, id), cpuLimit, memoryLimit, image, networkAccess, exposedPorts) - - return &job -} diff --git a/environment/job_test.go b/environment/job_test.go index af949df..d1f8d8f 100644 --- a/environment/job_test.go +++ b/environment/job_test.go @@ -1,11 +1,13 @@ package environment import ( + "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" @@ -240,8 +242,9 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { func TestCreateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() - apiClient := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.ApiClient{}, *base} - job := apiClient.createJob( + manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.ApiClient{}, *base} + job := createJob( + manager.defaultJob, *testJob.ID, uint(*testJob.TaskGroups[0].Count), uint(*testJob.TaskGroups[0].Tasks[0].Resources.CPU), @@ -252,3 +255,55 @@ func TestCreateJobSetsAllGivenArguments(t *testing.T) { ) assert.Equal(t, *testJob, *job) } + +func TestRegisterJobWhenNomadJobRegistrationFails(t *testing.T) { + apiMock := nomad.ExecutorApiMock{} + expectedErr := errors.New("test error") + + apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", expectedErr) + + m := NomadEnvironmentManager{ + runnerManager: nil, + api: &apiMock, + defaultJob: nomadApi.Job{}, + } + + err := m.registerJob("id", 1, 2, 3, "image", false, []uint16{}) + assert.Equal(t, expectedErr, err) + apiMock.AssertNotCalled(t, "EvaluationStream") +} + +func TestRegisterJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) { + apiMock := nomad.ExecutorApiMock{} + evaluationID := "id" + + apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) + apiMock.On("MonitorEvaluation", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(nil) + + m := NomadEnvironmentManager{ + runnerManager: nil, + api: &apiMock, + defaultJob: nomadApi.Job{}, + } + + err := m.registerJob("id", 1, 2, 3, "image", false, []uint16{}) + assert.NoError(t, err) +} + +func TestRegisterJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) { + apiMock := nomad.ExecutorApiMock{} + evaluationID := "id" + expectedErr := errors.New("test error") + + apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil) + apiMock.On("MonitorEvaluation", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(expectedErr) + + m := NomadEnvironmentManager{ + runnerManager: nil, + api: &apiMock, + defaultJob: nomadApi.Job{}, + } + + err := m.registerJob("id", 1, 2, 3, "image", false, []uint16{}) + assert.Equal(t, expectedErr, err) +} diff --git a/nomad/api_querier.go b/nomad/api_querier.go index f8772ec..a07d78f 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -1,6 +1,7 @@ package nomad import ( + "context" nomadApi "github.com/hashicorp/nomad/api" "net/url" ) @@ -24,11 +25,20 @@ type apiQuerier interface { // loadRunners loads all allocations of the specified job. loadRunners(jobId string) (allocationListStub []*nomadApi.AllocationListStub, err error) + + // RegisterNomadJob registers a job with Nomad. + // It returns the evaluation ID that can be used when listening to the Nomad event stream. + RegisterNomadJob(job *nomadApi.Job) (string, error) + + // EvaluationStream returns a Nomad event stream filtered to return only events belonging to the + // given evaluation ID. + EvaluationStream(evalID string, ctx context.Context) (<-chan *nomadApi.Events, error) } // nomadApiClient implements the nomadApiQuerier interface and provides access to a real Nomad API. type nomadApiClient struct { - client *nomadApi.Client + client *nomadApi.Client + namespace string } func (nc *nomadApiClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { @@ -37,6 +47,7 @@ func (nc *nomadApiClient) init(nomadURL *url.URL, nomadNamespace string) (err er TLSConfig: &nomadApi.TLSConfig{}, Namespace: nomadNamespace, }) + nc.namespace = nomadNamespace return err } @@ -53,3 +64,29 @@ func (nc *nomadApiClient) loadRunners(jobId string) (allocationListStub []*nomad allocationListStub, _, err = nc.client.Jobs().Allocations(jobId, true, nil) return } + +func (nc *nomadApiClient) RegisterNomadJob(job *nomadApi.Job) (string, error) { + job.Namespace = &nc.namespace + resp, _, err := nc.client.Jobs().Register(job, nil) + if err != nil { + return "", err + } + if resp.Warnings != "" { + log. + WithField("job", job). + WithField("warnings", resp.Warnings). + Warn("Received warnings when registering job") + } + return resp.EvalID, nil +} + +func (nc *nomadApiClient) EvaluationStream(evalID string, ctx context.Context) (stream <-chan *nomadApi.Events, err error) { + stream, err = nc.client.EventStream().Stream( + ctx, + map[nomadApi.Topic][]string{ + nomadApi.TopicEvaluation: {evalID}, + }, + 0, + nil) + return +} diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index f9b8ccb..d471cc0 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -3,7 +3,10 @@ package nomad import ( + context "context" + api "github.com/hashicorp/nomad/api" + mock "github.com/stretchr/testify/mock" url "net/url" @@ -28,6 +31,29 @@ func (_m *apiQuerierMock) DeleteRunner(runnerId string) error { return r0 } +// EvaluationStream provides a mock function with given fields: evalID, ctx +func (_m *apiQuerierMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) { + ret := _m.Called(evalID, ctx) + + var r0 <-chan *api.Events + if rf, ok := ret.Get(0).(func(string, context.Context) <-chan *api.Events); ok { + r0 = rf(evalID, ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *api.Events) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, context.Context) error); ok { + r1 = rf(evalID, ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // JobScale provides a mock function with given fields: jobId func (_m *apiQuerierMock) JobScale(jobId string) (int, error) { ret := _m.Called(jobId) @@ -72,6 +98,27 @@ func (_m *apiQuerierMock) LoadJobList() ([]*api.JobListStub, error) { return r0, r1 } +// RegisterNomadJob provides a mock function with given fields: job +func (_m *apiQuerierMock) RegisterNomadJob(job *api.Job) (string, error) { + ret := _m.Called(job) + + var r0 string + if rf, ok := ret.Get(0).(func(*api.Job) string); ok { + r0 = rf(job) + } else { + r0 = ret.Get(0).(string) + } + + var r1 error + if rf, ok := ret.Get(1).(func(*api.Job) error); ok { + r1 = rf(job) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SetJobScale provides a mock function with given fields: jobId, count, reason func (_m *apiQuerierMock) SetJobScale(jobId string, count int, reason string) error { ret := _m.Called(jobId, count, reason) diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 522f2d5..dcbce12 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -1,9 +1,12 @@ -// Code generated by mockery v2.7.5. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package nomad import ( + context "context" + api "github.com/hashicorp/nomad/api" + mock "github.com/stretchr/testify/mock" url "net/url" @@ -28,6 +31,29 @@ func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error { return r0 } +// EvaluationStream provides a mock function with given fields: evalID, ctx +func (_m *ExecutorApiMock) EvaluationStream(evalID string, ctx context.Context) (<-chan *api.Events, error) { + ret := _m.Called(evalID, ctx) + + var r0 <-chan *api.Events + if rf, ok := ret.Get(0).(func(string, context.Context) <-chan *api.Events); ok { + r0 = rf(evalID, ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *api.Events) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, context.Context) error); ok { + r1 = rf(evalID, ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // JobScale provides a mock function with given fields: jobId func (_m *ExecutorApiMock) JobScale(jobId string) (int, error) { ret := _m.Called(jobId) @@ -95,6 +121,41 @@ func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) { return r0, r1 } +// MonitorEvaluation provides a mock function with given fields: evalID, ctx +func (_m *ExecutorApiMock) MonitorEvaluation(evalID string, ctx context.Context) error { + ret := _m.Called(evalID, ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(string, context.Context) error); ok { + r0 = rf(evalID, ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RegisterNomadJob provides a mock function with given fields: job +func (_m *ExecutorApiMock) RegisterNomadJob(job *api.Job) (string, error) { + ret := _m.Called(job) + + var r0 string + if rf, ok := ret.Get(0).(func(*api.Job) string); ok { + r0 = rf(job) + } else { + r0 = ret.Get(0).(string) + } + + var r1 error + if rf, ok := ret.Get(1).(func(*api.Job) error); ok { + r1 = rf(job) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SetJobScale provides a mock function with given fields: jobId, count, reason func (_m *ExecutorApiMock) SetJobScale(jobId string, count int, reason string) error { ret := _m.Called(jobId, count, reason) diff --git a/nomad/nomad.go b/nomad/nomad.go index cc5831a..6003163 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -1,16 +1,30 @@ package nomad import ( + "context" + "errors" + "fmt" nomadApi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" + "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "net/url" + "strings" ) +var log = logging.GetLogger("nomad") + // ExecutorApi provides access to an container orchestration solution type ExecutorApi interface { apiQuerier // LoadRunners loads all allocations of the specified job which are running and not about to get stopped. LoadRunners(jobId string) (runnerIds []string, err error) + + // MonitorEvaluation monitors the given evaluation ID. + // It waits until the evaluation reaches one of the states complete, cancelled or failed. + // If the evaluation was not successful, an error containing the failures is returned. + // See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175 + MonitorEvaluation(evalID string, ctx context.Context) error } // ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. @@ -27,8 +41,8 @@ func NewExecutorApi(nomadURL *url.URL, nomadNamespace string) (ExecutorApi, erro } // init prepares an apiClient to be able to communicate to a provided Nomad API. -func (apiClient *ApiClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { - err = apiClient.apiQuerier.init(nomadURL, nomadNamespace) +func (a *ApiClient) init(nomadURL *url.URL, nomadNamespace string) (err error) { + err = a.apiQuerier.init(nomadURL, nomadNamespace) if err != nil { return err } @@ -36,9 +50,9 @@ func (apiClient *ApiClient) init(nomadURL *url.URL, nomadNamespace string) (err } // LoadRunners loads the allocations of the specified job. -func (apiClient *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { +func (a *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { //list, _, err := apiClient.client.Jobs().Allocations(jobId, true, nil) - list, err := apiClient.loadRunners(jobId) + list, err := a.loadRunners(jobId) if err != nil { return nil, err } @@ -50,3 +64,59 @@ func (apiClient *ApiClient) LoadRunners(jobId string) (runnerIds []string, err e } return } + +func (a *ApiClient) MonitorEvaluation(evalID string, ctx context.Context) error { + var events *nomadApi.Events + stream, err := a.EvaluationStream(evalID, ctx) + if err != nil { + return err + } + for { + select { + case events = <-stream: + case <-ctx.Done(): + return nil + } + if events.IsHeartbeat() { + continue + } + if err := events.Err; err != nil { + log.WithError(err).Warn("Error monitoring evaluation") + return err + } + for _, event := range events.Events { + eval, err := event.Evaluation() + if err != nil { + log.WithError(err).Warn("Error retrieving evaluation from streamed event") + return err + } + switch eval.Status { + case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed: + return checkEvaluation(eval) + default: + } + } + } +} + +// checkEvaluation checks whether the given evaluation failed. +// If the evaluation failed, it returns an error with a message containing the failure information. +func checkEvaluation(eval *nomadApi.Evaluation) error { + if len(eval.FailedTGAllocs) == 0 { + if eval.Status == structs.EvalStatusComplete { + return nil + } + return fmt.Errorf("evaluation could not complete: %q", eval.Status) + } else { + messages := []string{ + fmt.Sprintf("Evaluation %q finished with status %q but failed to place all allocations.", eval.ID, eval.Status), + } + for tg, metrics := range eval.FailedTGAllocs { + messages = append(messages, fmt.Sprintf("%s: %#v", tg, metrics)) + } + if eval.BlockedEval != "" { + messages = append(messages, fmt.Sprintf("Evaluation %q waiting for additional capacity to place remainder", eval.BlockedEval)) + } + return errors.New(strings.Join(messages, "\n")) + } +} diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index a1200f2..b29fea6 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -1,14 +1,19 @@ package nomad import ( + "context" "errors" + "fmt" nomadApi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/mapstructure" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "net/url" "testing" + "time" ) func TestLoadRunnersTestSuite(t *testing.T) { @@ -148,3 +153,239 @@ func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) { _, err = NewExecutorApi(&TestURL, TestNamespace) require.Nil(t, err) } + +func TestApiClient_MonitorEvaluationReturnsNilWhenContextCancelled(t *testing.T) { + stream := make(<-chan *nomadApi.Events) + ctx, cancel := context.WithCancel(context.Background()) + + apiMock := &apiQuerierMock{} + apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(stream, nil) + apiClient := &ApiClient{apiMock} + + var err error + errChan := make(chan error) + go func() { + errChan <- apiClient.MonitorEvaluation("id", ctx) + }() + cancel() + // If cancel doesn't terminate MonitorEvaluation, this test won't complete without a timeout. + select { + case err = <-errChan: + case <-time.After(time.Millisecond * 10): + t.Fatal("MonitorEvaluation didn't finish as expected") + } + assert.Nil(t, err) +} + +func TestApiClient_MonitorEvaluationReturnsErrorWhenStreamReturnsError(t *testing.T) { + apiMock := &apiQuerierMock{} + expectedErr := errors.New("test error") + apiMock.On("EvaluationStream", mock.AnythingOfType("string"), mock.AnythingOfType("*context.emptyCtx")). + Return(nil, expectedErr) + apiClient := &ApiClient{apiMock} + err := apiClient.MonitorEvaluation("id", context.Background()) + assert.Equal(t, expectedErr, err) +} + +type eventPayload struct { + Evaluation *nomadApi.Evaluation +} + +// eventForEvaluation takes an evaluation and creates an Event with the given evaluation +// as its payload. Nomad uses the mapstructure library to decode the payload, which we +// simply reverse here. +func eventForEvaluation(t *testing.T, eval nomadApi.Evaluation) nomadApi.Event { + payload := make(map[string]interface{}) + + err := mapstructure.Decode(eventPayload{&eval}, &payload) + if err != nil { + t.Fatalf("Couldn't encode evaluation %v", eval) + return nomadApi.Event{} + } + event := nomadApi.Event{Topic: nomadApi.TopicEvaluation, Payload: payload} + return event +} + +func runEvaluationMonitoring(t *testing.T, events []*nomadApi.Events) (eventsProcessed int, err error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream := make(chan *nomadApi.Events) + + apiMock := &apiQuerierMock{} + + // Yes it is hacky. However, we can only get a read-only channel once we return it from a function. + readOnlyStream := func() <-chan *nomadApi.Events { return stream }() + + apiMock.On("EvaluationStream", mock.AnythingOfType("string"), ctx).Return(readOnlyStream, nil) + apiClient := &ApiClient{apiMock} + + errChan := make(chan error) + go func() { + errChan <- apiClient.MonitorEvaluation("id", ctx) + }() + + var e *nomadApi.Events + for eventsProcessed, e = range events { + select { + case err = <-errChan: + return + case stream <- e: + } + } + // wait for error after streaming final event + select { + case err = <-errChan: + case <-time.After(time.Millisecond * 10): + t.Fatal("MonitorEvaluation didn't finish as expected") + } + // Increment once as range starts at 0 + eventsProcessed++ + return +} + +func TestApiClient_MonitorEvaluationWithSuccessfulEvent(t *testing.T) { + eval := nomadApi.Evaluation{Status: structs.EvalStatusComplete} + pendingEval := nomadApi.Evaluation{Status: structs.EvalStatusPending} + + // make sure that the tested function can complete + require.Nil(t, checkEvaluation(&eval)) + + events := nomadApi.Events{Events: []nomadApi.Event{eventForEvaluation(t, eval)}} + pendingEvaluationEvents := nomadApi.Events{Events: []nomadApi.Event{eventForEvaluation(t, pendingEval)}} + multipleEventsWithPending := nomadApi.Events{Events: []nomadApi.Event{ + eventForEvaluation(t, pendingEval), eventForEvaluation(t, eval), + }} + + var cases = []struct { + streamedEvents []*nomadApi.Events + expectedEventsProcessed int + name string + }{ + {[]*nomadApi.Events{&events}, 1, + "it completes with successful event"}, + {[]*nomadApi.Events{&events, &events}, 1, + "it completes at first successful event"}, + {[]*nomadApi.Events{{}, &events}, 2, + "it skips heartbeat and completes"}, + {[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2, + "it skips pending evaluation and completes"}, + {[]*nomadApi.Events{&multipleEventsWithPending}, 1, + "it handles multiple events per received event"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + eventsProcessed, err := runEvaluationMonitoring(t, c.streamedEvents) + assert.Nil(t, err) + assert.Equal(t, c.expectedEventsProcessed, eventsProcessed) + }) + } +} + +func TestApiClient_MonitorEvaluationWithFailingEvent(t *testing.T) { + eval := nomadApi.Evaluation{Status: structs.EvalStatusFailed} + evalErr := checkEvaluation(&eval) + require.NotNil(t, evalErr) + + pendingEval := nomadApi.Evaluation{Status: structs.EvalStatusPending} + eventsErr := errors.New("my events error") + + events := nomadApi.Events{Events: []nomadApi.Event{eventForEvaluation(t, eval)}} + pendingEvaluationEvents := nomadApi.Events{Events: []nomadApi.Event{eventForEvaluation(t, pendingEval)}} + multipleEventsWithPending := nomadApi.Events{Events: []nomadApi.Event{ + eventForEvaluation(t, pendingEval), eventForEvaluation(t, eval), + }} + eventsWithErr := nomadApi.Events{Err: eventsErr, Events: []nomadApi.Event{{}}} + + var cases = []struct { + streamedEvents []*nomadApi.Events + expectedEventsProcessed int + expectedError error + name string + }{ + {[]*nomadApi.Events{&events}, 1, evalErr, + "it fails with failing event"}, + {[]*nomadApi.Events{&events, &events}, 1, evalErr, + "it fails at first failing event"}, + {[]*nomadApi.Events{{}, &events}, 2, evalErr, + "it skips heartbeat and fail"}, + {[]*nomadApi.Events{&pendingEvaluationEvents, &events}, 2, evalErr, + "it skips pending evaluation and fail"}, + {[]*nomadApi.Events{&multipleEventsWithPending}, 1, evalErr, + "it handles multiple events per received event and fails"}, + {[]*nomadApi.Events{&eventsWithErr}, 1, eventsErr, + "it fails with event error when event has error"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + eventsProcessed, err := runEvaluationMonitoring(t, c.streamedEvents) + assert.Equal(t, c.expectedError, err) + assert.Equal(t, c.expectedEventsProcessed, eventsProcessed) + }) + } +} + +func TestApiClient_MonitorEvaluationFailsWhenFailingToDecodeEvaluation(t *testing.T) { + event := nomadApi.Event{ + Topic: nomadApi.TopicEvaluation, + // This should fail decoding, as Evaluation.Status is expected to be a string, not int + Payload: map[string]interface{}{"Evaluation": map[string]interface{}{"Status": 1}}, + } + _, err := event.Evaluation() + require.NotNil(t, err) + eventsProcessed, err := runEvaluationMonitoring(t, []*nomadApi.Events{{Events: []nomadApi.Event{event}}}) + assert.Equal(t, err, err) + assert.Equal(t, 1, eventsProcessed) +} + +func TestCheckEvaluationWithFailedAllocations(t *testing.T) { + testKey := "test1" + failedAllocs := map[string]*nomadApi.AllocationMetric{ + testKey: {NodesExhausted: 1}, + } + evaluation := nomadApi.Evaluation{FailedTGAllocs: failedAllocs, Status: structs.EvalStatusFailed} + + assertMessageContainsCorrectStrings := func(msg string) { + assert.Contains(t, msg, evaluation.Status, "error should contain the evaluation status") + assert.Contains(t, msg, fmt.Sprintf("%s: %#v", testKey, failedAllocs[testKey]), + "error should contain the failed allocations metric") + } + + var msgWithoutBlockedEval, msgWithBlockedEval string + t.Run("without blocked eval", func(t *testing.T) { + err := checkEvaluation(&evaluation) + require.NotNil(t, err) + msgWithoutBlockedEval = err.Error() + assertMessageContainsCorrectStrings(msgWithoutBlockedEval) + }) + + t.Run("with blocked eval", func(t *testing.T) { + evaluation.BlockedEval = "blocking-eval" + err := checkEvaluation(&evaluation) + require.NotNil(t, err) + msgWithBlockedEval = err.Error() + assertMessageContainsCorrectStrings(msgWithBlockedEval) + }) + + assert.NotEqual(t, msgWithBlockedEval, msgWithoutBlockedEval) +} + +func TestCheckEvaluationWithoutFailedAllocations(t *testing.T) { + evaluation := nomadApi.Evaluation{FailedTGAllocs: make(map[string]*nomadApi.AllocationMetric)} + + t.Run("when evaluation status complete", func(t *testing.T) { + evaluation.Status = structs.EvalStatusComplete + err := checkEvaluation(&evaluation) + assert.Nil(t, err) + }) + + t.Run("when evaluation status not complete", func(t *testing.T) { + for _, status := range []string{structs.EvalStatusFailed, structs.EvalStatusCancelled, structs.EvalStatusBlocked, structs.EvalStatusPending} { + evaluation.Status = status + err := checkEvaluation(&evaluation) + require.NotNil(t, err) + assert.Contains(t, err.Error(), status, "error should contain the evaluation status") + } + }) +}