Move Nomad job creation logic to environment manager
This commit is contained in:

committed by
Tobias Kantusch

parent
619cd40fb6
commit
4c3cc0cc4c
130
environment/job.go
Normal file
130
environment/job.go
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
package environment
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "embed"
|
||||||
|
"fmt"
|
||||||
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
|
"github.com/hashicorp/nomad/jobspec2"
|
||||||
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultTaskDriver = "docker"
|
||||||
|
TaskNameFormat = "%s-task"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed default-job.hcl
|
||||||
|
var defaultJobHCL string
|
||||||
|
|
||||||
|
func parseJob(jobHCL string) *nomadApi.Job {
|
||||||
|
config := jobspec2.ParseConfig{
|
||||||
|
Body: []byte(jobHCL),
|
||||||
|
AllowFS: false,
|
||||||
|
Strict: true,
|
||||||
|
}
|
||||||
|
job, err := jobspec2.ParseWithConfig(&config)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Fatal("Error parsing default Nomad job")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return job
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup {
|
||||||
|
var taskGroup *nomadApi.TaskGroup
|
||||||
|
if len(job.TaskGroups) == 0 {
|
||||||
|
taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize))
|
||||||
|
job.TaskGroups = []*nomadApi.TaskGroup{taskGroup}
|
||||||
|
} else {
|
||||||
|
taskGroup = job.TaskGroups[0]
|
||||||
|
taskGroup.Name = &name
|
||||||
|
count := int(prewarmingPoolSize)
|
||||||
|
taskGroup.Count = &count
|
||||||
|
}
|
||||||
|
return taskGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) {
|
||||||
|
if len(taskGroup.Tasks) == 0 {
|
||||||
|
// This function is only used internally and must be called after configuring the task.
|
||||||
|
// This error is not recoverable.
|
||||||
|
log.Fatal("Can't configure network before task has been configured!")
|
||||||
|
}
|
||||||
|
task := taskGroup.Tasks[0]
|
||||||
|
|
||||||
|
if networkAccess {
|
||||||
|
var networkResource *nomadApi.NetworkResource
|
||||||
|
if len(taskGroup.Networks) == 0 {
|
||||||
|
networkResource = &nomadApi.NetworkResource{}
|
||||||
|
taskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
|
||||||
|
} else {
|
||||||
|
networkResource = taskGroup.Networks[0]
|
||||||
|
}
|
||||||
|
// prefer "bridge" network over "host" to have an isolated network namespace with bridged interface
|
||||||
|
// instead of joining the host network namespace
|
||||||
|
networkResource.Mode = "bridge"
|
||||||
|
for _, portNumber := range exposedPorts {
|
||||||
|
port := nomadApi.Port{
|
||||||
|
Label: strconv.FormatUint(uint64(portNumber), 10),
|
||||||
|
To: int(portNumber),
|
||||||
|
}
|
||||||
|
networkResource.DynamicPorts = append(networkResource.DynamicPorts, port)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// somehow, we can't set the network mode to none in the NetworkResource on task group level
|
||||||
|
// see https://github.com/hashicorp/nomad/issues/10540
|
||||||
|
if task.Config == nil {
|
||||||
|
task.Config = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
task.Config["network_mode"] = "none"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func configureTask(
|
||||||
|
taskGroup *nomadApi.TaskGroup,
|
||||||
|
name string,
|
||||||
|
cpuLimit, memoryLimit uint,
|
||||||
|
image string,
|
||||||
|
networkAccess bool,
|
||||||
|
exposedPorts []uint16) {
|
||||||
|
var task *nomadApi.Task
|
||||||
|
if len(taskGroup.Tasks) == 0 {
|
||||||
|
task = nomadApi.NewTask(name, DefaultTaskDriver)
|
||||||
|
taskGroup.Tasks = []*nomadApi.Task{task}
|
||||||
|
} else {
|
||||||
|
task = taskGroup.Tasks[0]
|
||||||
|
task.Name = name
|
||||||
|
}
|
||||||
|
iCpuLimit := int(cpuLimit)
|
||||||
|
iMemoryLimit := int(memoryLimit)
|
||||||
|
task.Resources = &nomadApi.Resources{
|
||||||
|
CPU: &iCpuLimit,
|
||||||
|
MemoryMB: &iMemoryLimit,
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.Config == nil {
|
||||||
|
task.Config = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
task.Config["image"] = image
|
||||||
|
|
||||||
|
configureNetwork(taskGroup, networkAccess, exposedPorts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *NomadEnvironmentManager) createJob(
|
||||||
|
id string,
|
||||||
|
prewarmingPoolSize, cpuLimit, memoryLimit uint,
|
||||||
|
image string,
|
||||||
|
networkAccess bool,
|
||||||
|
exposedPorts []uint16) *nomadApi.Job {
|
||||||
|
|
||||||
|
job := m.defaultJob
|
||||||
|
job.ID = &id
|
||||||
|
job.Name = &id
|
||||||
|
|
||||||
|
var taskGroup = createTaskGroup(&job, fmt.Sprintf(nomad.TaskGroupNameFormat, id), prewarmingPoolSize)
|
||||||
|
configureTask(taskGroup, fmt.Sprintf(TaskNameFormat, id), cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
|
||||||
|
|
||||||
|
return &job
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package nomad
|
package environment
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -7,6 +7,8 @@ import (
|
|||||||
"github.com/sirupsen/logrus/hooks/test"
|
"github.com/sirupsen/logrus/hooks/test"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||||
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -57,7 +59,7 @@ func createTestJob() (*nomadApi.Job, *nomadApi.Job) {
|
|||||||
task.Config["network_mode"] = "none"
|
task.Config["network_mode"] = "none"
|
||||||
task.Resources = createTestResources()
|
task.Resources = createTestResources()
|
||||||
taskGroup := createTestTaskGroup()
|
taskGroup := createTestTaskGroup()
|
||||||
taskGroupName := fmt.Sprintf(TaskGroupNameFormat, *job.ID)
|
taskGroupName := fmt.Sprintf(nomad.TaskGroupNameFormat, *job.ID)
|
||||||
taskGroup.Name = &taskGroupName
|
taskGroup.Name = &taskGroupName
|
||||||
taskGroup.Tasks = []*nomadApi.Task{task}
|
taskGroup.Tasks = []*nomadApi.Task{task}
|
||||||
job.TaskGroups = []*nomadApi.TaskGroup{taskGroup}
|
job.TaskGroups = []*nomadApi.TaskGroup{taskGroup}
|
||||||
@ -238,7 +240,7 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) {
|
|||||||
|
|
||||||
func TestCreateJobSetsAllGivenArguments(t *testing.T) {
|
func TestCreateJobSetsAllGivenArguments(t *testing.T) {
|
||||||
testJob, base := createTestJob()
|
testJob, base := createTestJob()
|
||||||
apiClient := ApiClient{&nomadApiClient{}, *base}
|
apiClient := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.ApiClient{}, *base}
|
||||||
job := apiClient.createJob(
|
job := apiClient.createJob(
|
||||||
*testJob.ID,
|
*testJob.ID,
|
||||||
uint(*testJob.TaskGroups[0].Count),
|
uint(*testJob.TaskGroups[0].Count),
|
@ -1,11 +1,15 @@
|
|||||||
package environment
|
package environment
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||||
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logging.GetLogger("environment")
|
||||||
|
|
||||||
// Manager encapsulates API calls to the executor API for creation and deletion of execution environments.
|
// Manager encapsulates API calls to the executor API for creation and deletion of execution environments.
|
||||||
type Manager interface {
|
type Manager interface {
|
||||||
// Load fetches all already created execution environments from the executor and registers them at the runner manager.
|
// Load fetches all already created execution environments from the executor and registers them at the runner manager.
|
||||||
@ -23,7 +27,7 @@ type Manager interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorApi) *NomadEnvironmentManager {
|
func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.ExecutorApi) *NomadEnvironmentManager {
|
||||||
environmentManager := &NomadEnvironmentManager{runnerManager, apiClient}
|
environmentManager := &NomadEnvironmentManager{runnerManager, apiClient, *parseJob(defaultJobHCL)}
|
||||||
environmentManager.Load()
|
environmentManager.Load()
|
||||||
return environmentManager
|
return environmentManager
|
||||||
}
|
}
|
||||||
@ -31,6 +35,7 @@ func NewNomadEnvironmentManager(runnerManager runner.Manager, apiClient nomad.Ex
|
|||||||
type NomadEnvironmentManager struct {
|
type NomadEnvironmentManager struct {
|
||||||
runnerManager runner.Manager
|
runnerManager runner.Manager
|
||||||
api nomad.ExecutorApi
|
api nomad.ExecutorApi
|
||||||
|
defaultJob nomadApi.Job
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *NomadEnvironmentManager) Create(
|
func (m *NomadEnvironmentManager) Create(
|
||||||
|
120
nomad/job.go
120
nomad/job.go
@ -1,134 +1,14 @@
|
|||||||
package nomad
|
package nomad
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "embed"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
nomadApi "github.com/hashicorp/nomad/api"
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
"github.com/hashicorp/nomad/jobspec2"
|
|
||||||
"strconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed default-job.hcl
|
|
||||||
var defaultJobHCL string
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultTaskDriver = "docker"
|
|
||||||
TaskGroupNameFormat = "%s-group"
|
TaskGroupNameFormat = "%s-group"
|
||||||
TaskNameFormat = "%s-task"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseJob(jobHCL string) *nomadApi.Job {
|
|
||||||
config := jobspec2.ParseConfig{
|
|
||||||
Body: []byte(jobHCL),
|
|
||||||
AllowFS: false,
|
|
||||||
Strict: true,
|
|
||||||
}
|
|
||||||
job, err := jobspec2.ParseWithConfig(&config)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Fatal("Error parsing default Nomad job")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return job
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup {
|
|
||||||
var taskGroup *nomadApi.TaskGroup
|
|
||||||
if len(job.TaskGroups) == 0 {
|
|
||||||
taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize))
|
|
||||||
job.TaskGroups = []*nomadApi.TaskGroup{taskGroup}
|
|
||||||
} else {
|
|
||||||
taskGroup = job.TaskGroups[0]
|
|
||||||
taskGroup.Name = &name
|
|
||||||
count := int(prewarmingPoolSize)
|
|
||||||
taskGroup.Count = &count
|
|
||||||
}
|
|
||||||
return taskGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) {
|
|
||||||
if len(taskGroup.Tasks) == 0 {
|
|
||||||
// This function is only used internally and must be called after configuring the task.
|
|
||||||
// This error is not recoverable.
|
|
||||||
log.Fatal("Can't configure network before task has been configured!")
|
|
||||||
}
|
|
||||||
task := taskGroup.Tasks[0]
|
|
||||||
|
|
||||||
if networkAccess {
|
|
||||||
var networkResource *nomadApi.NetworkResource
|
|
||||||
if len(taskGroup.Networks) == 0 {
|
|
||||||
networkResource = &nomadApi.NetworkResource{}
|
|
||||||
taskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
|
|
||||||
} else {
|
|
||||||
networkResource = taskGroup.Networks[0]
|
|
||||||
}
|
|
||||||
// prefer "bridge" network over "host" to have an isolated network namespace with bridged interface
|
|
||||||
// instead of joining the host network namespace
|
|
||||||
networkResource.Mode = "bridge"
|
|
||||||
for _, portNumber := range exposedPorts {
|
|
||||||
port := nomadApi.Port{
|
|
||||||
Label: strconv.FormatUint(uint64(portNumber), 10),
|
|
||||||
To: int(portNumber),
|
|
||||||
}
|
|
||||||
networkResource.DynamicPorts = append(networkResource.DynamicPorts, port)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// somehow, we can't set the network mode to none in the NetworkResource on task group level
|
|
||||||
// see https://github.com/hashicorp/nomad/issues/10540
|
|
||||||
if task.Config == nil {
|
|
||||||
task.Config = make(map[string]interface{})
|
|
||||||
}
|
|
||||||
task.Config["network_mode"] = "none"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func configureTask(
|
|
||||||
taskGroup *nomadApi.TaskGroup,
|
|
||||||
name string,
|
|
||||||
cpuLimit, memoryLimit uint,
|
|
||||||
image string,
|
|
||||||
networkAccess bool,
|
|
||||||
exposedPorts []uint16) {
|
|
||||||
var task *nomadApi.Task
|
|
||||||
if len(taskGroup.Tasks) == 0 {
|
|
||||||
task = nomadApi.NewTask(name, DefaultTaskDriver)
|
|
||||||
taskGroup.Tasks = []*nomadApi.Task{task}
|
|
||||||
} else {
|
|
||||||
task = taskGroup.Tasks[0]
|
|
||||||
task.Name = name
|
|
||||||
}
|
|
||||||
iCpuLimit := int(cpuLimit)
|
|
||||||
iMemoryLimit := int(memoryLimit)
|
|
||||||
task.Resources = &nomadApi.Resources{
|
|
||||||
CPU: &iCpuLimit,
|
|
||||||
MemoryMB: &iMemoryLimit,
|
|
||||||
}
|
|
||||||
|
|
||||||
if task.Config == nil {
|
|
||||||
task.Config = make(map[string]interface{})
|
|
||||||
}
|
|
||||||
task.Config["image"] = image
|
|
||||||
|
|
||||||
configureNetwork(taskGroup, networkAccess, exposedPorts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (apiClient *ApiClient) createJob(
|
|
||||||
id string,
|
|
||||||
prewarmingPoolSize, cpuLimit, memoryLimit uint,
|
|
||||||
image string,
|
|
||||||
networkAccess bool,
|
|
||||||
exposedPorts []uint16) *nomadApi.Job {
|
|
||||||
|
|
||||||
job := apiClient.defaultJob
|
|
||||||
job.ID = &id
|
|
||||||
job.Name = &id
|
|
||||||
|
|
||||||
var taskGroup = createTaskGroup(&job, fmt.Sprintf(TaskGroupNameFormat, id), prewarmingPoolSize)
|
|
||||||
configureTask(taskGroup, fmt.Sprintf(TaskNameFormat, id), cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
|
|
||||||
|
|
||||||
return &job
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadJobList loads the list of jobs from the Nomad api.
|
// LoadJobList loads the list of jobs from the Nomad api.
|
||||||
func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) {
|
func (nc *nomadApiClient) LoadJobList() (list []*nomadApi.JobListStub, err error) {
|
||||||
list, _, err = nc.client.Jobs().List(nil)
|
list, _, err = nc.client.Jobs().List(nil)
|
||||||
|
@ -2,12 +2,9 @@ package nomad
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
nomadApi "github.com/hashicorp/nomad/api"
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.GetLogger("nomad")
|
|
||||||
|
|
||||||
// ExecutorApi provides access to an container orchestration solution
|
// ExecutorApi provides access to an container orchestration solution
|
||||||
type ExecutorApi interface {
|
type ExecutorApi interface {
|
||||||
apiQuerier
|
apiQuerier
|
||||||
@ -19,7 +16,6 @@ type ExecutorApi interface {
|
|||||||
// ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values.
|
// ApiClient implements the ExecutorApi interface and can be used to perform different operations on the real Executor API and its return values.
|
||||||
type ApiClient struct {
|
type ApiClient struct {
|
||||||
apiQuerier
|
apiQuerier
|
||||||
defaultJob nomadApi.Job
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutorApi creates a new api client.
|
// NewExecutorApi creates a new api client.
|
||||||
@ -36,7 +32,6 @@ func (apiClient *ApiClient) init(nomadURL *url.URL, nomadNamespace string) (err
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
apiClient.defaultJob = *parseJob(defaultJobHCL)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,10 +127,8 @@ const TestNamespace = "unit-tests"
|
|||||||
|
|
||||||
func TestApiClient_init(t *testing.T) {
|
func TestApiClient_init(t *testing.T) {
|
||||||
client := &ApiClient{apiQuerier: &nomadApiClient{}}
|
client := &ApiClient{apiQuerier: &nomadApiClient{}}
|
||||||
defaultJob := parseJob(defaultJobHCL)
|
|
||||||
err := client.init(&TestURL, TestNamespace)
|
err := client.init(&TestURL, TestNamespace)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
assert.Equal(t, *defaultJob, client.defaultJob)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) {
|
func TestApiClientCanNotBeInitializedWithInvalidUrl(t *testing.T) {
|
||||||
|
Reference in New Issue
Block a user