diff --git a/environment/execution_environment.go b/environment/execution_environment.go index 68c4081..d24dccb 100644 --- a/environment/execution_environment.go +++ b/environment/execution_environment.go @@ -66,7 +66,7 @@ func (environment *NomadExecutionEnvironment) NextRunner() (r runner.Runner, err // Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable... func (environment *NomadExecutionEnvironment) Refresh() { for { - runners, err := environment.nomadApiClient.LoadAvailableRunners(environment.jobId) + runners, err := environment.nomadApiClient.LoadRunners(environment.jobId) if err != nil { log.WithError(err).Printf("Failed fetching runners") break @@ -77,7 +77,7 @@ func (environment *NomadExecutionEnvironment) Refresh() { environment.allRunners.Add(r) environment.availableRunners <- r } - jobScale, err := environment.nomadApiClient.GetJobScale(environment.jobId) + jobScale, err := environment.nomadApiClient.JobScale(environment.jobId) if err != nil { log.WithError(err).Printf("Failed get allocation count") break @@ -86,7 +86,7 @@ func (environment *NomadExecutionEnvironment) Refresh() { runnerCount := jobScale + neededRunners time.Sleep(50 * time.Millisecond) log.Printf("Set job scaling %d", runnerCount) - err = environment.nomadApiClient.SetJobScaling(environment.jobId, runnerCount, "Runner Requested") + err = environment.nomadApiClient.SetJobScale(environment.jobId, runnerCount, "Runner Requested") if err != nil { log.WithError(err).Printf("Failed set allocation scaling") continue diff --git a/environment/execution_environment_test.go b/environment/execution_environment_test.go index 6bd6a84..3c28043 100644 --- a/environment/execution_environment_test.go +++ b/environment/execution_environment_test.go @@ -66,7 +66,7 @@ func TestRefreshFetchRunners(t *testing.T) { // ToDo: Terminate Refresh when test finished (also in other tests) go environment.Refresh() _, _ = environment.NextRunner() - apiMock.AssertCalled(t, "LoadAvailableRunners", jobId) + apiMock.AssertCalled(t, "LoadRunners", jobId) } func TestRefreshFetchesRunnersIntoChannel(t *testing.T) { @@ -81,7 +81,7 @@ func TestRefreshScalesJob(t *testing.T) { go environment.Refresh() _, _ = environment.NextRunner() time.Sleep(100 * time.Millisecond) // ToDo: Be safe this test is not flaky - apiMock.AssertCalled(t, "SetJobScaling", jobId, 52, "Runner Requested") + apiMock.AssertCalled(t, "SetJobScale", jobId, 52, "Runner Requested") } func TestRefreshAddsRunnerToPool(t *testing.T) { @@ -96,9 +96,9 @@ func TestRefreshAddsRunnerToPool(t *testing.T) { func newRefreshMock(returnedRunnerIds []string, allRunners RunnerPool) (apiClient *nomad.ExecutorApiMock, environment *NomadExecutionEnvironment) { apiClient = &nomad.ExecutorApiMock{} - apiClient.On("LoadAvailableRunners", jobId).Return(returnedRunnerIds, nil) - apiClient.On("GetJobScale", jobId).Return(len(returnedRunnerIds), nil) - apiClient.On("SetJobScaling", jobId, mock.AnythingOfType("int"), "Runner Requested").Return(nil) + apiClient.On("LoadRunners", jobId).Return(returnedRunnerIds, nil) + apiClient.On("JobScale", jobId).Return(len(returnedRunnerIds), nil) + apiClient.On("SetJobScale", jobId, mock.AnythingOfType("int"), "Runner Requested").Return(nil) environment = &NomadExecutionEnvironment{ jobId: jobId, availableRunners: make(chan runner.Runner, 50), diff --git a/go.mod b/go.mod index ab33fcd..8698162 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/nomad v1.0.4 github.com/hashicorp/nomad/api v0.0.0-20210505182403-7d5a9ecde95c diff --git a/nomad/api_querier.go b/nomad/api_querier.go index c545c96..205252b 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -13,11 +13,11 @@ type apiQuerier interface { // LoadJobList loads the list of jobs from the Nomad API. LoadJobList() (list []*nomadApi.JobListStub, err error) - // GetJobScale returns the scale of the passed job. - GetJobScale(jobId string) (jobScale int, err error) + // JobScale returns the scale of the passed job. + JobScale(jobId string) (jobScale int, err error) // SetJobScaling sets the scaling count of the passed job to Nomad. - SetJobScaling(jobId string, count int, reason string) (err error) + SetJobScale(jobId string, count int, reason string) (err error) // DeleteRunner deletes the runner with the given Id. DeleteRunner(runnerId string) (err error) @@ -44,7 +44,7 @@ func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error return } -func (nc *nomadApiClient) GetJobScale(jobId string) (jobScale int, err error) { +func (nc *nomadApiClient) JobScale(jobId string) (jobScale int, err error) { status, _, err := nc.client.Jobs().ScaleStatus(jobId, nil) if err != nil { return @@ -54,7 +54,7 @@ func (nc *nomadApiClient) GetJobScale(jobId string) (jobScale int, err error) { return } -func (nc *nomadApiClient) SetJobScaling(jobId string, count int, reason string) (err error) { +func (nc *nomadApiClient) SetJobScale(jobId string, count int, reason string) (err error) { _, _, err = nc.client.Jobs().Scale(jobId, jobId, &count, reason, false, nil, nil) return } diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index 2e7e4e3..4668bd5 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -28,8 +28,8 @@ func (_m *apiQuerierMock) DeleteRunner(runnerId string) error { return r0 } -// GetJobScale provides a mock function with given fields: jobId -func (_m *apiQuerierMock) GetJobScale(jobId string) (int, error) { +// JobScale provides a mock function with given fields: jobId +func (_m *apiQuerierMock) JobScale(jobId string) (int, error) { ret := _m.Called(jobId) var r0 int @@ -122,3 +122,17 @@ func (_m *apiQuerierMock) loadRunners(jobId string) ([]*api.AllocationListStub, return r0, r1 } + +// SetJobScale provides a mock function with given fields: jobId, count, reason +func (_m *apiQuerierMock) SetJobScale(jobId string, count int, reason string) error { + ret := _m.Called(jobId, count, reason) + + var r0 error + if rf, ok := ret.Get(0).(func(string, int, string) error); ok { + r0 = rf(jobId, count, reason) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index 6bd6974..4f81de1 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.7.5. DO NOT EDIT. package nomad @@ -28,8 +28,8 @@ func (_m *ExecutorApiMock) DeleteRunner(runnerId string) error { return r0 } -// GetJobScale provides a mock function with given fields: jobId -func (_m *ExecutorApiMock) GetJobScale(jobId string) (int, error) { +// JobScale provides a mock function with given fields: jobId +func (_m *ExecutorApiMock) JobScale(jobId string) (int, error) { ret := _m.Called(jobId) var r0 int @@ -49,29 +49,6 @@ func (_m *ExecutorApiMock) GetJobScale(jobId string) (int, error) { return r0, r1 } -// LoadAvailableRunners provides a mock function with given fields: jobId -func (_m *ExecutorApiMock) LoadAvailableRunners(jobId string) ([]string, error) { - ret := _m.Called(jobId) - - var r0 []string - if rf, ok := ret.Get(0).(func(string) []string); ok { - r0 = rf(jobId) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(jobId) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // LoadJobList provides a mock function with given fields: func (_m *ExecutorApiMock) LoadJobList() ([]*api.JobListStub, error) { ret := _m.Called() @@ -95,8 +72,31 @@ func (_m *ExecutorApiMock) LoadJobList() ([]*api.JobListStub, error) { return r0, r1 } -// SetJobScaling provides a mock function with given fields: jobId, count, reason -func (_m *ExecutorApiMock) SetJobScaling(jobId string, count int, reason string) error { +// LoadRunners provides a mock function with given fields: jobId +func (_m *ExecutorApiMock) LoadRunners(jobId string) ([]string, error) { + ret := _m.Called(jobId) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(jobId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(jobId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SetJobScale provides a mock function with given fields: jobId, count, reason +func (_m *ExecutorApiMock) SetJobScale(jobId string, count int, reason string) error { ret := _m.Called(jobId, count, reason) var r0 error diff --git a/nomad/job.go b/nomad/job.go index 472f58a..9055776 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -135,8 +135,8 @@ func (apiClient *ApiClient) LoadJobList() (list []*nomadApi.JobListStub, err err return } -// GetJobScale returns the scale of the passed job. -func (apiClient *ApiClient) GetJobScale(jobId string) (jobScale int, err error) { +// JobScale returns the scale of the passed job. +func (apiClient *ApiClient) JobScale(jobId string) (jobScale int, err error) { status, _, err := apiClient.client.Jobs().ScaleStatus(jobId, nil) if err != nil { return @@ -146,8 +146,8 @@ func (apiClient *ApiClient) GetJobScale(jobId string) (jobScale int, err error) return } -// SetJobScaling sets the scaling count of the passed job to Nomad. -func (apiClient *ApiClient) SetJobScaling(jobId string, count int, reason string) (err error) { +// SetJobScale sets the scaling count of the passed job to Nomad. +func (apiClient *ApiClient) SetJobScale(jobId string, count int, reason string) (err error) { _, _, err = apiClient.client.Jobs().Scale(jobId, fmt.Sprintf(TaskGroupNameFormat, jobId), &count, reason, false, nil, nil) return } diff --git a/nomad/nomad.go b/nomad/nomad.go index 2af2cfc..b75e006 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -12,8 +12,8 @@ var log = logging.GetLogger("nomad") type ExecutorApi interface { apiQuerier - // LoadAvailableRunners loads all allocations of the specified job which are running and not about to get stopped. - LoadAvailableRunners(jobId string) (runnerIds []string, err error) + // LoadRunners loads all allocations of the specified job which are running and not about to get stopped. + LoadRunners(jobId string) (runnerIds []string, err error) } // ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. @@ -26,7 +26,7 @@ type ApiClient struct { // NewExecutorApi creates a new api client. // One client is usually sufficient for the complete runtime of the API. func NewExecutorApi(nomadURL *url.URL) (ExecutorApi, error) { - client := &ApiClient{apiQuerier: &nomadApiClient{}} + client := &ApiClient{apiQuerier: &ApiClient{}, client: &nomadApi.Client{}} err := client.init(nomadURL) return client, err } @@ -44,8 +44,9 @@ func (apiClient *ApiClient) init(nomadURL *url.URL) (err error) { return nil } -// LoadAvailableRunners loads the allocations of the specified job. -func (apiClient *ApiClient) LoadAvailableRunners(jobId string) (runnerIds []string, err error) { +// LoadRunners loads the allocations of the specified job. +func (apiClient *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { + //list, _, err := apiClient.client.Jobs().Allocations(jobId, true, nil) list, err := apiClient.loadRunners(jobId) if err != nil { return nil, err diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index de7fd42..15f669a 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -11,11 +11,11 @@ import ( "testing" ) -func TestLoadAvailableRunnersTestSuite(t *testing.T) { - suite.Run(t, new(LoadAvailableRunnersTestSuite)) +func TestLoadRunnersTestSuite(t *testing.T) { + suite.Run(t, new(LoadRunnersTestSuite)) } -type LoadAvailableRunnersTestSuite struct { +type LoadRunnersTestSuite struct { suite.Suite jobId string mock *apiQuerierMock @@ -26,7 +26,7 @@ type LoadAvailableRunnersTestSuite struct { stoppingRunner *nomadApi.AllocationListStub } -func (suite *LoadAvailableRunnersTestSuite) SetupTest() { +func (suite *LoadRunnersTestSuite) SetupTest() { suite.jobId = "1d-0f-v3ry-sp3c14l-j0b" suite.mock = &apiQuerierMock{} @@ -57,50 +57,50 @@ func (suite *LoadAvailableRunnersTestSuite) SetupTest() { } } -func (suite *LoadAvailableRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { +func (suite *LoadRunnersTestSuite) TestErrorOfUnderlyingApiCallIsPropagated() { errorString := "api errored" suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return(nil, errors.New(errorString)) - returnedIds, err := suite.nomadApiClient.LoadAvailableRunners(suite.jobId) + returnedIds, err := suite.nomadApiClient.LoadRunners(suite.jobId) suite.Nil(returnedIds) suite.Error(err) } -func (suite *LoadAvailableRunnersTestSuite) TestThrowsNoErrorWhenUnderlyingApiCallDoesNot() { +func (suite *LoadRunnersTestSuite) TestThrowsNoErrorWhenUnderlyingApiCallDoesNot() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{}, nil) - _, err := suite.nomadApiClient.LoadAvailableRunners(suite.jobId) + _, err := suite.nomadApiClient.LoadRunners(suite.jobId) suite.NoError(err) } -func (suite *LoadAvailableRunnersTestSuite) TestAvailableRunnerIsReturned() { +func (suite *LoadRunnersTestSuite) TestAvailableRunnerIsReturned() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{suite.availableRunner}, nil) - returnedIds, _ := suite.nomadApiClient.LoadAvailableRunners(suite.jobId) + returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) suite.Len(returnedIds, 1) suite.Equal(suite.availableRunner.ID, returnedIds[0]) } -func (suite *LoadAvailableRunnersTestSuite) TestStoppedRunnerIsNotReturned() { +func (suite *LoadRunnersTestSuite) TestStoppedRunnerIsNotReturned() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{suite.stoppedRunner}, nil) - returnedIds, _ := suite.nomadApiClient.LoadAvailableRunners(suite.jobId) + returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) suite.Empty(returnedIds) } -func (suite *LoadAvailableRunnersTestSuite) TestStoppingRunnerIsNotReturned() { +func (suite *LoadRunnersTestSuite) TestStoppingRunnerIsNotReturned() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return([]*nomadApi.AllocationListStub{suite.stoppingRunner}, nil) - returnedIds, _ := suite.nomadApiClient.LoadAvailableRunners(suite.jobId) + returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) suite.Empty(returnedIds) } -func (suite *LoadAvailableRunnersTestSuite) TestReturnsAllAvailableRunners() { +func (suite *LoadRunnersTestSuite) TestReturnsAllAvailableRunners() { runnersList := []*nomadApi.AllocationListStub{ suite.availableRunner, suite.anotherAvailableRunner, @@ -110,7 +110,7 @@ func (suite *LoadAvailableRunnersTestSuite) TestReturnsAllAvailableRunners() { suite.mock.On("loadRunners", mock.AnythingOfType("string")). Return(runnersList, nil) - returnedIds, _ := suite.nomadApiClient.LoadAvailableRunners(suite.jobId) + returnedIds, _ := suite.nomadApiClient.LoadRunners(suite.jobId) suite.Len(returnedIds, 2) suite.Contains(returnedIds, suite.availableRunner.ID) suite.Contains(returnedIds, suite.anotherAvailableRunner.ID)