Resolve merge conflicts

This commit is contained in:
Konrad Hanff
2021-05-12 11:06:28 +02:00
parent cee5eee622
commit 1f921b2ce2
7 changed files with 20 additions and 44 deletions

View File

@@ -3,7 +3,7 @@ job "python" {
datacenters = ["dc1"] datacenters = ["dc1"]
type = "batch" type = "batch"
group "python" { group "python-group" {
ephemeral_disk { ephemeral_disk {
migrate = false migrate = false
size = 10 size = 10
@@ -21,7 +21,7 @@ job "python" {
weight = 100 weight = 100
} }
task "python" { task "python-task" {
driver = "docker" driver = "docker"
kill_timeout = "0s" kill_timeout = "0s"
kill_signal = "SIGKILL" kill_signal = "SIGKILL"

View File

@@ -68,27 +68,27 @@ func (environment *NomadExecutionEnvironment) Refresh() {
for { for {
runners, err := environment.nomadApiClient.LoadRunners(environment.jobId) runners, err := environment.nomadApiClient.LoadRunners(environment.jobId)
if err != nil { if err != nil {
log.WithError(err).Printf("Failed fetching runners") log.WithError(err).Warn("Failed fetching runners")
break break
} }
for _, r := range environment.unusedRunners(runners) { for _, r := range environment.unusedRunners(runners) {
// ToDo: Listen on Nomad event stream // ToDo: Listen on Nomad event stream
log.Printf("Adding allocation %+v", r) log.WithField("allocation", r).Debug("Adding allocation")
environment.allRunners.Add(r) environment.allRunners.Add(r)
environment.availableRunners <- r environment.availableRunners <- r
} }
jobScale, err := environment.nomadApiClient.JobScale(environment.jobId) jobScale, err := environment.nomadApiClient.JobScale(environment.jobId)
if err != nil { if err != nil {
log.WithError(err).Printf("Failed get allocation count") log.WithError(err).Warn("Failed get allocation count")
break break
} }
neededRunners := cap(environment.availableRunners) - len(environment.availableRunners) + 1 neededRunners := cap(environment.availableRunners) - len(environment.availableRunners) + 1
runnerCount := jobScale + neededRunners runnerCount := jobScale + neededRunners
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
log.Printf("Set job scaling %d", runnerCount) log.WithField("count", runnerCount).Debug("Set job scaling")
err = environment.nomadApiClient.SetJobScale(environment.jobId, runnerCount, "Runner Requested") err = environment.nomadApiClient.SetJobScale(environment.jobId, runnerCount, "Runner Requested")
if err != nil { if err != nil {
log.WithError(err).Printf("Failed set allocation scaling") log.WithError(err).Warn("Failed to set allocation scaling")
continue continue
} }
} }

View File

@@ -16,7 +16,7 @@ type apiQuerier interface {
// JobScale returns the scale of the passed job. // JobScale returns the scale of the passed job.
JobScale(jobId string) (jobScale int, err error) JobScale(jobId string) (jobScale int, err error)
// SetJobScaling sets the scaling count of the passed job to Nomad. // SetJobScale sets the scaling count of the passed job to Nomad.
SetJobScale(jobId string, count int, reason string) (err error) SetJobScale(jobId string, count int, reason string) (err error)
// DeleteRunner deletes the runner with the given Id. // DeleteRunner deletes the runner with the given Id.
@@ -39,26 +39,6 @@ func (nc *nomadApiClient) init(nomadURL *url.URL) (err error) {
return err return err
} }
func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) {
list, _, err = nc.client.Jobs().List(nil)
return
}
func (nc *nomadApiClient) JobScale(jobId string) (jobScale int, err error) {
status, _, err := nc.client.Jobs().ScaleStatus(jobId, nil)
if err != nil {
return
}
// ToDo: Consider counting also the placed and desired allocations
jobScale = status.TaskGroups[jobId].Running
return
}
func (nc *nomadApiClient) SetJobScale(jobId string, count int, reason string) (err error) {
_, _, err = nc.client.Jobs().Scale(jobId, jobId, &count, reason, false, nil, nil)
return
}
func (nc *nomadApiClient) DeleteRunner(runnerId string) (err error) { func (nc *nomadApiClient) DeleteRunner(runnerId string) (err error) {
allocation, _, err := nc.client.Allocations().Info(runnerId, nil) allocation, _, err := nc.client.Allocations().Info(runnerId, nil)
if err != nil { if err != nil {

View File

@@ -130,14 +130,14 @@ func (apiClient *ApiClient) createJob(
} }
// LoadJobList loads the list of jobs from the Nomad api. // LoadJobList loads the list of jobs from the Nomad api.
func (apiClient *ApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) {
list, _, err = apiClient.client.Jobs().List(nil) list, _, err = nc.client.Jobs().List(nil)
return return
} }
// JobScale returns the scale of the passed job. // JobScale returns the scale of the passed job.
func (apiClient *ApiClient) JobScale(jobId string) (jobScale int, err error) { func (nc *nomadApiClient) JobScale(jobId string) (jobScale int, err error) {
status, _, err := apiClient.client.Jobs().ScaleStatus(jobId, nil) status, _, err := nc.client.Jobs().ScaleStatus(jobId, nil)
if err != nil { if err != nil {
return return
} }
@@ -147,7 +147,7 @@ func (apiClient *ApiClient) JobScale(jobId string) (jobScale int, err error) {
} }
// SetJobScale sets the scaling count of the passed job to Nomad. // SetJobScale sets the scaling count of the passed job to Nomad.
func (apiClient *ApiClient) SetJobScale(jobId string, count int, reason string) (err error) { func (nc *nomadApiClient) SetJobScale(jobId string, count int, reason string) (err error) {
_, _, err = apiClient.client.Jobs().Scale(jobId, fmt.Sprintf(TaskGroupNameFormat, jobId), &count, reason, false, nil, nil) _, _, err = nc.client.Jobs().Scale(jobId, fmt.Sprintf(TaskGroupNameFormat, jobId), &count, reason, false, nil, nil)
return return
} }

View File

@@ -238,7 +238,7 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) {
func TestCreateJobSetsAllGivenArguments(t *testing.T) { func TestCreateJobSetsAllGivenArguments(t *testing.T) {
testJob, base := createTestJob() testJob, base := createTestJob()
apiClient := ApiClient{&nomadApiClient{}, &nomadApi.Client{}, *base} apiClient := ApiClient{&nomadApiClient{}, *base}
job := apiClient.createJob( job := apiClient.createJob(
*testJob.ID, *testJob.ID,
uint(*testJob.TaskGroups[0].Count), uint(*testJob.TaskGroups[0].Count),

View File

@@ -19,24 +19,20 @@ type ExecutorApi interface {
// ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values. // ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values.
type ApiClient struct { type ApiClient struct {
apiQuerier apiQuerier
client *nomadApi.Client
defaultJob nomadApi.Job defaultJob nomadApi.Job
} }
// NewExecutorApi creates a new api client. // NewExecutorApi creates a new api client.
// One client is usually sufficient for the complete runtime of the API. // One client is usually sufficient for the complete runtime of the API.
func NewExecutorApi(nomadURL *url.URL) (ExecutorApi, error) { func NewExecutorApi(nomadURL *url.URL) (ExecutorApi, error) {
client := &ApiClient{apiQuerier: &ApiClient{}, client: &nomadApi.Client{}} client := &ApiClient{apiQuerier: &nomadApiClient{}}
err := client.init(nomadURL) err := client.init(nomadURL)
return client, err return client, err
} }
// init prepares an apiClient to be able to communicate to a provided Nomad API. // init prepares an apiClient to be able to communicate to a provided Nomad API.
func (apiClient *ApiClient) init(nomadURL *url.URL) (err error) { func (apiClient *ApiClient) init(nomadURL *url.URL) (err error) {
apiClient.client, err = nomadApi.NewClient(&nomadApi.Config{ err = apiClient.apiQuerier.init(nomadURL)
Address: nomadURL.String(),
TLSConfig: &nomadApi.TLSConfig{},
})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -122,7 +122,7 @@ var TestURL = url.URL{
} }
func TestApiClient_init(t *testing.T) { func TestApiClient_init(t *testing.T) {
client := &ApiClient{} client := &ApiClient{apiQuerier: &nomadApiClient{}}
defaultJob := parseJob(defaultJobHCL) defaultJob := parseJob(defaultJobHCL)
err := client.init(&TestURL) err := client.init(&TestURL)
require.Nil(t, err) require.Nil(t, err)
@@ -130,7 +130,7 @@ func TestApiClient_init(t *testing.T) {
} }
func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) { func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) {
client := &ApiClient{} client := &ApiClient{apiQuerier: &nomadApiClient{}}
err := client.init(&url.URL{ err := client.init(&url.URL{
Scheme: "http", Scheme: "http",
Host: "http://127.0.0.1:4646", Host: "http://127.0.0.1:4646",
@@ -139,7 +139,7 @@ func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) {
} }
func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) { func TestNewExecutorApiCanBeCreatedWithoutError(t *testing.T) {
expectedClient := &ApiClient{} expectedClient := &ApiClient{apiQuerier: &nomadApiClient{}}
err := expectedClient.init(&TestURL) err := expectedClient.init(&TestURL)
require.Nil(t, err) require.Nil(t, err)