diff --git a/nomad/default-job.hcl b/nomad/default-job.hcl new file mode 100644 index 0000000..454760a --- /dev/null +++ b/nomad/default-job.hcl @@ -0,0 +1,52 @@ +// This is the default job configuration that is used when no path to another default configuration is given + +job "default-poseidon-job" { + datacenters = ["dc1"] + type = "batch" + + group "default-poseidon-group" { + ephemeral_disk { + migrate = false + size = 10 + sticky = false + } + count = 1 + scaling { + enabled = true + max = 300 + } + spread { + // see https://www.nomadproject.io/docs/job-specification/spread#even-spread-across-data-center + // This spreads the load evenly amongst our nodes + attribute = "${node.unique.name}" + weight = 100 + } + + task "default-poseidon-task" { + driver = "docker" + kill_timeout = "0s" + kill_signal = "SIGKILL" + + config { + image = "python:latest" + command = "sleep" + args = ["infinity"] + network_mode = "none" + } + + logs { + max_files = 1 + max_file_size = 1 + } + + resources { + cpu = 40 + memory = 40 + } + + restart { + delay = "0s" + } + } + } +} diff --git a/nomad/job.go b/nomad/job.go new file mode 100644 index 0000000..07debce --- /dev/null +++ b/nomad/job.go @@ -0,0 +1,147 @@ +package nomad + +import ( + _ "embed" + "fmt" + nomadApi "github.com/hashicorp/nomad/api" + "strconv" +) + +//go:embed default-job.hcl +var defaultJobHCL string + +const ( + DefaultTaskDriver = "docker" + TaskGroupNameFormat = "%s-group" + TaskNameFormat = "%s-task" +) + +func (apiClient *ApiClient) defaultJob() *nomadApi.Job { + job, err := apiClient.client.Jobs().ParseHCL(defaultJobHCL, true) + if err != nil { + log.WithError(err).Fatal("Error parsing default Nomad job") + return nil + } + + return job +} + +func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup { + var taskGroup *nomadApi.TaskGroup + if len(job.TaskGroups) == 0 { + taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize)) + job.TaskGroups = []*nomadApi.TaskGroup{taskGroup} + } else { + taskGroup = job.TaskGroups[0] + taskGroup.Name = &name + count := int(prewarmingPoolSize) + taskGroup.Count = &count + } + return taskGroup +} + +func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) { + if len(taskGroup.Tasks) == 0 { + // This function is only used internally and must be called after configuring the task. + // This error is not recoverable. + log.Fatal("Can't configure network before task has been configured!") + } + task := taskGroup.Tasks[0] + + if networkAccess { + var networkResource *nomadApi.NetworkResource + if len(taskGroup.Networks) == 0 { + networkResource = &nomadApi.NetworkResource{} + taskGroup.Networks = []*nomadApi.NetworkResource{networkResource} + } else { + networkResource = taskGroup.Networks[0] + } + // prefer "bridge" network over "host" to have an isolated network namespace with bridged interface + // instead of joining the host network namespace + networkResource.Mode = "bridge" + for _, portNumber := range exposedPorts { + port := nomadApi.Port{ + Label: strconv.FormatUint(uint64(portNumber), 10), + To: int(portNumber), + } + networkResource.DynamicPorts = append(networkResource.DynamicPorts, port) + } + } else { + // somehow, we can't set the network mode to none in the NetworkResource on task group level + // see https://github.com/hashicorp/nomad/issues/10540 + if task.Config == nil { + task.Config = make(map[string]interface{}) + } + task.Config["network_mode"] = "none" + } +} + +func configureTask( + taskGroup *nomadApi.TaskGroup, + name string, + cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) { + var task *nomadApi.Task + if len(taskGroup.Tasks) == 0 { + task = nomadApi.NewTask(name, DefaultTaskDriver) + taskGroup.Tasks = []*nomadApi.Task{task} + } else { + task = taskGroup.Tasks[0] + task.Name = name + } + iCpuLimit := int(cpuLimit) + iMemoryLimit := int(memoryLimit) + task.Resources = &nomadApi.Resources{ + CPU: &iCpuLimit, + MemoryMB: &iMemoryLimit, + } + + if task.Config == nil { + task.Config = make(map[string]interface{}) + } + task.Config["image"] = image + + configureNetwork(taskGroup, networkAccess, exposedPorts) +} + +func (apiClient *ApiClient) createJob( + id string, + prewarmingPoolSize, cpuLimit, memoryLimit uint, + image string, + networkAccess bool, + exposedPorts []uint16) *nomadApi.Job { + + job := apiClient.defaultJob() + job.ID = &id + job.Name = &id + + var taskGroup = createTaskGroup(&job, fmt.Sprintf(TaskGroupNameFormat, id), prewarmingPoolSize) + configureTask(taskGroup, fmt.Sprintf(TaskNameFormat, id), cpuLimit, memoryLimit, image, networkAccess, exposedPorts) + + return job +} + +// LoadJobList loads the list of jobs from the Nomad api. +func (apiClient *ApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) { + list, _, err = apiClient.client.Jobs().List(nil) + return +} + +// GetJobScale returns the scale of the passed job. +func (apiClient *ApiClient) GetJobScale(jobId string) (jobScale int, err error) { + status, _, err := apiClient.client.Jobs().ScaleStatus(jobId, nil) + if err != nil { + return + } + // ToDo: Consider counting also the placed and desired allocations + jobScale = status.TaskGroups[fmt.Sprintf(TaskGroupNameFormat, jobId)].Running + return +} + +// SetJobScaling sets the scaling count of the passed job to Nomad. +func (apiClient *ApiClient) SetJobScaling(jobId string, count int, reason string) (err error) { + _, _, err = apiClient.client.Jobs().Scale(jobId, fmt.Sprintf(TaskGroupNameFormat, jobId), &count, reason, false, nil, nil) + return +} diff --git a/nomad/nomad.go b/nomad/nomad.go index ee09d5f..300278a 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -2,10 +2,13 @@ package nomad import ( nomadApi "github.com/hashicorp/nomad/api" + "gitlab.hpi.de/codeocean/codemoon/poseidon/logging" "net/url" ) -// ExecutorApi provides access to a container orchestration solution. +var log = logging.GetLogger("nomad") + +// ExecutorApi provides access to an container orchestration solution type ExecutorApi interface { apiQuerier @@ -26,8 +29,18 @@ func NewExecutorApi(nomadURL *url.URL) (ExecutorApi, error) { return client, err } -func (c *ApiClient) LoadAvailableRunners(jobId string) (runnerIds []string, err error) { - list, err := c.loadRunners(jobId) +// init prepares an apiClient to be able to communicate to a provided Nomad API. +func (apiClient *ApiClient) init(nomadURL *url.URL) (err error) { + apiClient.client, err = nomadApi.NewClient(&nomadApi.Config{ + Address: nomadURL.String(), + TLSConfig: &nomadApi.TLSConfig{}, + }) + return err +} + +// LoadRunners loads the allocations of the specified job. +func (apiClient *ApiClient) LoadRunners(jobId string) (runnerIds []string, err error) { + list, _, err := apiClient.client.Jobs().Allocations(jobId, true, nil) if err != nil { return nil, err }