diff --git a/ci/python.nomad b/ci/python.nomad index e169f7b..6bfd788 100644 --- a/ci/python.nomad +++ b/ci/python.nomad @@ -3,7 +3,7 @@ job "python" { datacenters = ["dc1"] type = "batch" - group "python" { + group "python-group" { ephemeral_disk { migrate = false size = 10 @@ -21,7 +21,7 @@ job "python" { weight = 100 } - task "python" { + task "python-task" { driver = "docker" kill_timeout = "0s" kill_signal = "SIGKILL" diff --git a/environment/execution_environment.go b/environment/execution_environment.go index d24dccb..a698184 100644 --- a/environment/execution_environment.go +++ b/environment/execution_environment.go @@ -68,27 +68,27 @@ func (environment *NomadExecutionEnvironment) Refresh() { for { runners, err := environment.nomadApiClient.LoadRunners(environment.jobId) if err != nil { - log.WithError(err).Printf("Failed fetching runners") + log.WithError(err).Warn("Failed fetching runners") break } for _, r := range environment.unusedRunners(runners) { // ToDo: Listen on Nomad event stream - log.Printf("Adding allocation %+v", r) + log.WithField("allocation", r).Debug("Adding allocation") environment.allRunners.Add(r) environment.availableRunners <- r } jobScale, err := environment.nomadApiClient.JobScale(environment.jobId) if err != nil { - log.WithError(err).Printf("Failed get allocation count") + log.WithError(err).Warn("Failed get allocation count") break } neededRunners := cap(environment.availableRunners) - len(environment.availableRunners) + 1 runnerCount := jobScale + neededRunners time.Sleep(50 * time.Millisecond) - log.Printf("Set job scaling %d", runnerCount) + log.WithField("count", runnerCount).Debug("Set job scaling") err = environment.nomadApiClient.SetJobScale(environment.jobId, runnerCount, "Runner Requested") if err != nil { - log.WithError(err).Printf("Failed set allocation scaling") + log.WithError(err).Warn("Failed to set allocation scaling") continue } } diff --git a/nomad/api_querier.go b/nomad/api_querier.go index 205252b..cafc6a8 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -16,7 +16,7 @@ type apiQuerier interface { // JobScale returns the scale of the passed job. JobScale(jobId string) (jobScale int, err error) - // SetJobScaling sets the scaling count of the passed job to Nomad. + // SetJobScale sets the scaling count of the passed job to Nomad. SetJobScale(jobId string, count int, reason string) (err error) // DeleteRunner deletes the runner with the given Id. @@ -39,26 +39,6 @@ func (nc *nomadApiClient) init(nomadURL *url.URL) (err error) { return err } -func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { - list, _, err = nc.client.Jobs().List(nil) - return -} - -func (nc *nomadApiClient) JobScale(jobId string) (jobScale int, err error) { - status, _, err := nc.client.Jobs().ScaleStatus(jobId, nil) - if err != nil { - return - } - // ToDo: Consider counting also the placed and desired allocations - jobScale = status.TaskGroups[jobId].Running - return -} - -func (nc *nomadApiClient) SetJobScale(jobId string, count int, reason string) (err error) { - _, _, err = nc.client.Jobs().Scale(jobId, jobId, &count, reason, false, nil, nil) - return -} - func (nc *nomadApiClient) DeleteRunner(runnerId string) (err error) { allocation, _, err := nc.client.Allocations().Info(runnerId, nil) if err != nil { diff --git a/nomad/job.go b/nomad/job.go index 9055776..61187b6 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -130,14 +130,14 @@ func (apiClient *ApiClient) createJob( } // LoadJobList loads the list of jobs from the Nomad api. -func (apiClient *ApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { - list, _, err = apiClient.client.Jobs().List(nil) +func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { + list, _, err = nc.client.Jobs().List(nil) return } // JobScale returns the scale of the passed job. -func (apiClient *ApiClient) JobScale(jobId string) (jobScale int, err error) { - status, _, err := apiClient.client.Jobs().ScaleStatus(jobId, nil) +func (nc *nomadApiClient) JobScale(jobId string) (jobScale int, err error) { + status, _, err := nc.client.Jobs().ScaleStatus(jobId, nil) if err != nil { return } @@ -147,7 +147,7 @@ func (apiClient *ApiClient) JobScale(jobId string) (jobScale int, err error) { } // SetJobScale sets the scaling count of the passed job to Nomad. -func (apiClient *ApiClient) SetJobScale(jobId string, count int, reason string) (err error) { - _, _, err = apiClient.client.Jobs().Scale(jobId, fmt.Sprintf(TaskGroupNameFormat, jobId), &count, reason, false, nil, nil) +func (nc *nomadApiClient) SetJobScale(jobId string, count int, reason string) (err error) { + _, _, err = nc.client.Jobs().Scale(jobId, fmt.Sprintf(TaskGroupNameFormat, jobId), &count, reason, false, nil, nil) return } diff --git a/nomad/job_test.go b/nomad/job_test.go index e1447a3..bea1ece 100644 --- a/nomad/job_test.go +++ b/nomad/job_test.go @@ -238,7 +238,7 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) { func TestCreateJobSetsAllGivenArguments(t *testing.T) { testJob, base := createTestJob() - apiClient := ApiClient{&nomadApiClient{}, &nomadApi.Client{}, *base} + apiClient := ApiClient{&nomadApiClient{}, *base} job := apiClient.createJob( *testJob.ID, uint(*testJob.TaskGroups[0].Count), diff --git a/nomad/nomad.go b/nomad/nomad.go index b75e006..b2019c6 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -19,24 +19,20 @@ type ExecutorApi interface { // ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. type ApiClient struct { apiQuerier - client *nomadApi.Client defaultJob nomadApi.Job } // NewExecutorApi creates a new api client. // One client is usually sufficient for the complete runtime of the API. func NewExecutorApi(nomadURL *url.URL) (ExecutorApi, error) { - client := &ApiClient{apiQuerier: &ApiClient{}, client: &nomadApi.Client{}} + client := &ApiClient{apiQuerier: &nomadApiClient{}} err := client.init(nomadURL) return client, err } // init prepares an apiClient to be able to communicate to a provided Nomad API. func (apiClient *ApiClient) init(nomadURL *url.URL) (err error) { - apiClient.client, err = nomadApi.NewClient(&nomadApi.Config{ - Address: nomadURL.String(), - TLSConfig: &nomadApi.TLSConfig{}, - }) + err = apiClient.apiQuerier.init(nomadURL) if err != nil { return err } diff --git a/nomad/nomad_test.go b/nomad/nomad_test.go index 15f669a..ae5d98c 100644 --- a/nomad/nomad_test.go +++ b/nomad/nomad_test.go @@ -122,7 +122,7 @@ var TestURL = url.URL{ } func TestApiClient_init(t *testing.T) { - client := &ApiClient{} + client := &ApiClient{apiQuerier: &nomadApiClient{}} defaultJob := parseJob(defaultJobHCL) err := client.init(&TestURL) require.Nil(t, err) @@ -130,7 +130,7 @@ func TestApiClient_init(t *testing.T) { } func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) { - client := &ApiClient{} + client := &ApiClient{apiQuerier: &nomadApiClient{}} err := client.init(&url.URL{ Scheme: "http", Host: "http://127.0.0.1:4646", @@ -139,7 +139,7 @@ func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) { } func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) { - expectedClient := &ApiClient{} + expectedClient := &ApiClient{apiQuerier: &nomadApiClient{}} err := expectedClient.init(&TestURL) require.Nil(t, err)