Move Nomad job creation to Nomad package

Previously, low level Nomad job creation was done in the environment manager.
It used many functions of the nomad package so we felt like this logic
better belongs to the nomad package.
This commit is contained in:
sirkrypt0
2021-06-11 11:53:41 +02:00
committed by Maximilian Paß
parent 87f823756b
commit ff582805b4
9 changed files with 389 additions and 343 deletions

View File

@ -1,198 +0,0 @@
package environment
import (
"context"
_ "embed"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/jobspec2"
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
"strconv"
)
// defaultJobHCL holds our default job in HCL format.
// The default job is used when creating new job and provides
// common settings that all the jobs share.
//go:embed default-job.hcl
var defaultJobHCL string
// registerTemplateJob creates a Nomad job based on the default job configuration and the given parameters.
// It registers the job with Nomad and waits until the registration completes.
func (m *NomadEnvironmentManager) registerTemplateJob(
environmentID runner.EnvironmentID,
prewarmingPoolSize, cpuLimit, memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16) (*nomadApi.Job, error) {
job := createTemplateJob(m.defaultJob, environmentID, prewarmingPoolSize,
cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
evalID, err := m.api.RegisterNomadJob(job)
if err != nil {
return nil, err
}
return job, m.api.MonitorEvaluation(evalID, context.Background())
}
func createTemplateJob(
defaultJob nomadApi.Job,
environmentID runner.EnvironmentID,
prewarmingPoolSize, cpuLimit, memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16) *nomadApi.Job {
job := defaultJob
templateJobID := nomad.TemplateJobID(strconv.Itoa(int(environmentID)))
job.ID = &templateJobID
job.Name = &templateJobID
var taskGroup = createTaskGroup(&job, nomad.TaskGroupName, prewarmingPoolSize)
configureTask(taskGroup, nomad.TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
storeConfiguration(&job, environmentID, prewarmingPoolSize)
return &job
}
func parseJob(jobHCL string) *nomadApi.Job {
config := jobspec2.ParseConfig{
Body: []byte(jobHCL),
AllowFS: false,
Strict: true,
}
job, err := jobspec2.ParseWithConfig(&config)
if err != nil {
log.WithError(err).Fatal("Error parsing Nomad job")
return nil
}
return job
}
func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup {
var taskGroup *nomadApi.TaskGroup
if len(job.TaskGroups) == 0 {
taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize))
job.TaskGroups = []*nomadApi.TaskGroup{taskGroup}
} else {
taskGroup = job.TaskGroups[0]
taskGroup.Name = &name
count := 1
taskGroup.Count = &count
}
return taskGroup
}
func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) {
if len(taskGroup.Tasks) == 0 {
// This function is only used internally and must be called as last step when configuring the task.
// This error is not recoverable.
log.Fatal("Can't configure network before task has been configured!")
}
task := taskGroup.Tasks[0]
if task.Config == nil {
task.Config = make(map[string]interface{})
}
if networkAccess {
var networkResource *nomadApi.NetworkResource
if len(taskGroup.Networks) == 0 {
networkResource = &nomadApi.NetworkResource{}
taskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
} else {
networkResource = taskGroup.Networks[0]
}
// Prefer "bridge" network over "host" to have an isolated network namespace with bridged interface
// instead of joining the host network namespace.
networkResource.Mode = "bridge"
for _, portNumber := range exposedPorts {
port := nomadApi.Port{
Label: strconv.FormatUint(uint64(portNumber), 10),
To: int(portNumber),
}
networkResource.DynamicPorts = append(networkResource.DynamicPorts, port)
}
// Explicitly set mode to override existing settings when updating job from without to with network.
// Don't use bridge as it collides with the bridge mode above. This results in Docker using 'bridge'
// mode, meaning all allocations will be attached to the `docker0` adapter and could reach other
// non-Nomad containers attached to it. This is avoided when using Nomads bridge network mode.
task.Config["network_mode"] = ""
} else {
// Somehow, we can't set the network mode to none in the NetworkResource on task group level.
// See https://github.com/hashicorp/nomad/issues/10540
task.Config["network_mode"] = "none"
// Explicitly set Networks to signal Nomad to remove the possibly existing networkResource
taskGroup.Networks = []*nomadApi.NetworkResource{}
}
}
func configureTask(
taskGroup *nomadApi.TaskGroup,
name string,
cpuLimit, memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16) {
var task *nomadApi.Task
if len(taskGroup.Tasks) == 0 {
task = nomadApi.NewTask(name, nomad.DefaultTaskDriver)
taskGroup.Tasks = []*nomadApi.Task{task}
} else {
task = taskGroup.Tasks[0]
task.Name = name
}
integerCPULimit := int(cpuLimit)
integerMemoryLimit := int(memoryLimit)
task.Resources = &nomadApi.Resources{
CPU: &integerCPULimit,
MemoryMB: &integerMemoryLimit,
}
if task.Config == nil {
task.Config = make(map[string]interface{})
}
task.Config["image"] = image
configureNetwork(taskGroup, networkAccess, exposedPorts)
}
func storeConfiguration(job *nomadApi.Job, id runner.EnvironmentID, prewarmingPoolSize uint) {
taskGroup := findOrCreateConfigTaskGroup(job)
if taskGroup.Meta == nil {
taskGroup.Meta = make(map[string]string)
}
taskGroup.Meta[nomad.ConfigMetaEnvironmentKey] = strconv.Itoa(int(id))
taskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUnusedValue
taskGroup.Meta[nomad.ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize))
}
func findOrCreateConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup {
taskGroup := nomad.FindConfigTaskGroup(job)
if taskGroup == nil {
taskGroup = nomadApi.NewTaskGroup(nomad.ConfigTaskGroupName, 0)
}
createDummyTaskIfNotPresent(taskGroup)
return taskGroup
}
// createDummyTaskIfNotPresent ensures that a dummy task is in the task group so that the group is accepted by Nomad.
func createDummyTaskIfNotPresent(taskGroup *nomadApi.TaskGroup) {
var task *nomadApi.Task
for _, t := range taskGroup.Tasks {
if t.Name == nomad.DummyTaskName {
task = t
break
}
}
if task == nil {
task = nomadApi.NewTask(nomad.DummyTaskName, nomad.DefaultDummyTaskDriver)
taskGroup.Tasks = append(taskGroup.Tasks, task)
}
if task.Config == nil {
task.Config = make(map[string]interface{})
}
task.Config["command"] = nomad.DefaultTaskCommand
}

View File

@ -1,8 +1,10 @@
package environment
import (
_ "embed"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/jobspec2"
"github.com/hashicorp/nomad/nomad/structs"
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
@ -11,6 +13,12 @@ import (
"strconv"
)
// defaultJobHCL holds our default job in HCL format.
// The default job is used when creating new job and provides
// common settings that all the jobs share.
//go:embed default-job.hcl
var defaultJobHCL string
var log = logging.GetLogger("environment")
// Manager encapsulates API calls to the executor API for creation and deletion of execution environments.
@ -43,7 +51,7 @@ func (m *NomadEnvironmentManager) CreateOrUpdate(
id runner.EnvironmentID,
request dto.ExecutionEnvironmentRequest,
) (bool, error) {
templateJob, err := m.registerTemplateJob(id,
templateJob, err := m.api.RegisterTemplateJob(&m.defaultJob, int(id),
request.PrewarmingPoolSize, request.CPULimit, request.MemoryLimit,
request.Image, request.NetworkAccess, request.ExposedPorts)
@ -132,3 +140,18 @@ func (m *NomadEnvironmentManager) recoverJobs(jobs []*nomadApi.Job, onJob jobAdd
}
}
}
func parseJob(jobHCL string) *nomadApi.Job {
config := jobspec2.ParseConfig{
Body: []byte(jobHCL),
AllowFS: false,
Strict: true,
}
job, err := jobspec2.ParseWithConfig(&config)
if err != nil {
log.WithError(err).Fatal("Error parsing Nomad job")
return nil
}
return job
}

View File

@ -2,6 +2,9 @@ package environment
import (
nomadApi "github.com/hashicorp/nomad/api"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
@ -13,11 +16,11 @@ import (
type CreateOrUpdateTestSuite struct {
suite.Suite
runnerManagerMock runner.ManagerMock
apiMock nomad.ExecutorAPIMock
registerNomadJobMockCall *mock.Call
request dto.ExecutionEnvironmentRequest
manager *NomadEnvironmentManager
runnerManagerMock runner.ManagerMock
apiMock nomad.ExecutorAPIMock
request dto.ExecutionEnvironmentRequest
manager *NomadEnvironmentManager
environmentID runner.EnvironmentID
}
func TestCreateOrUpdateTestSuite(t *testing.T) {
@ -37,82 +40,107 @@ func (s *CreateOrUpdateTestSuite) SetupTest() {
ExposedPorts: nil,
}
s.registerNomadJobMockCall = s.apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("eval-id", nil)
s.apiMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.AnythingOfType("*context.emptyCtx")).Return(nil)
s.manager = &NomadEnvironmentManager{
runnerManager: &s.runnerManagerMock,
api: &s.apiMock,
}
s.environmentID = runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger)
}
func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(exists bool) *mock.Call {
return s.runnerManagerMock.On("CreateOrUpdateEnvironment",
func (s *CreateOrUpdateTestSuite) mockRegisterTemplateJob(job *nomadApi.Job, err error) {
s.apiMock.On("RegisterTemplateJob",
mock.AnythingOfType("*api.Job"), mock.AnythingOfType("int"),
mock.AnythingOfType("uint"), mock.AnythingOfType("uint"), mock.AnythingOfType("uint"),
mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("[]uint16")).
Return(job, err)
}
func (s *CreateOrUpdateTestSuite) mockCreateOrUpdateEnvironment(created bool, err error) {
s.runnerManagerMock.On("CreateOrUpdateEnvironment",
mock.AnythingOfType("EnvironmentID"), mock.AnythingOfType("uint"), mock.AnythingOfType("*api.Job")).
Return(!exists, nil)
Return(created, err)
}
func (s *CreateOrUpdateTestSuite) createJobForRequest() *nomadApi.Job {
return createTemplateJob(s.manager.defaultJob, tests.DefaultEnvironmentIDAsInteger,
return nomad.CreateTemplateJob(&s.manager.defaultJob, tests.DefaultEnvironmentIDAsInteger,
s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit,
s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts)
}
func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsRegistersCorrectJob() {
s.mockCreateOrUpdateEnvironment(true)
expectedJob := s.createJobForRequest()
func (s *CreateOrUpdateTestSuite) TestRegistersCorrectTemplateJob() {
s.mockRegisterTemplateJob(&nomadApi.Job{}, nil)
s.mockCreateOrUpdateEnvironment(true, nil)
created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request)
_, err := s.manager.CreateOrUpdate(s.environmentID, s.request)
s.NoError(err)
s.False(created)
s.apiMock.AssertCalled(s.T(), "RegisterNomadJob", expectedJob)
s.apiMock.AssertCalled(s.T(), "RegisterTemplateJob",
&s.manager.defaultJob, int(s.environmentID),
s.request.PrewarmingPoolSize, s.request.CPULimit, s.request.MemoryLimit,
s.request.Image, s.request.NetworkAccess, s.request.ExposedPorts)
}
func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsOccurredErrorIsPassed() {
s.mockCreateOrUpdateEnvironment(true)
func (s *CreateOrUpdateTestSuite) TestReturnsErrorWhenRegisterTemplateJobReturnsError() {
s.mockRegisterTemplateJob(nil, tests.ErrDefault)
s.registerNomadJobMockCall.Return("", tests.ErrDefault)
created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request)
s.False(created)
created, err := s.manager.CreateOrUpdate(s.environmentID, s.request)
s.Equal(tests.ErrDefault, err)
}
func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentExistsReturnsFalse() {
s.mockCreateOrUpdateEnvironment(true)
created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request)
s.NoError(err)
s.False(created)
}
func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrectJob() {
s.mockCreateOrUpdateEnvironment(false)
func (s *CreateOrUpdateTestSuite) TestCreatesOrUpdatesCorrectEnvironment() {
templateJobID := tests.DefaultJobID
templateJob := &nomadApi.Job{ID: &templateJobID}
s.mockRegisterTemplateJob(templateJob, nil)
s.mockCreateOrUpdateEnvironment(true, nil)
expectedJob := s.createJobForRequest()
created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request)
s.NoError(err)
s.True(created)
s.apiMock.AssertCalled(s.T(), "RegisterNomadJob", expectedJob)
}
func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistRegistersCorrectEnvironment() {
s.mockCreateOrUpdateEnvironment(false)
expectedJob := s.createJobForRequest()
created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request)
s.True(created)
_, err := s.manager.CreateOrUpdate(s.environmentID, s.request)
s.NoError(err)
s.runnerManagerMock.AssertCalled(s.T(), "CreateOrUpdateEnvironment",
runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request.PrewarmingPoolSize, expectedJob)
s.environmentID, s.request.PrewarmingPoolSize, templateJob)
}
func (s *CreateOrUpdateTestSuite) TestWhenEnvironmentDoesNotExistOccurredErrorIsPassedAndNoEnvironmentRegistered() {
s.mockCreateOrUpdateEnvironment(false)
s.registerNomadJobMockCall.Return("", tests.ErrDefault)
created, err := s.manager.CreateOrUpdate(tests.DefaultEnvironmentIDAsInteger, s.request)
s.False(created)
func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentReturnsError() {
s.mockRegisterTemplateJob(&nomadApi.Job{}, nil)
s.mockCreateOrUpdateEnvironment(false, tests.ErrDefault)
_, err := s.manager.CreateOrUpdate(runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request)
s.Equal(tests.ErrDefault, err)
s.runnerManagerMock.AssertNotCalled(s.T(), "CreateOrUpdateEnvironment")
}
func (s *CreateOrUpdateTestSuite) TestReturnsTrueIfCreatesOrUpdateEnvironmentReturnsTrue() {
s.mockRegisterTemplateJob(&nomadApi.Job{}, nil)
s.mockCreateOrUpdateEnvironment(true, nil)
created, _ := s.manager.CreateOrUpdate(runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request)
s.True(created)
}
func (s *CreateOrUpdateTestSuite) TestReturnsFalseIfCreatesOrUpdateEnvironmentReturnsFalse() {
s.mockRegisterTemplateJob(&nomadApi.Job{}, nil)
s.mockCreateOrUpdateEnvironment(false, nil)
created, _ := s.manager.CreateOrUpdate(runner.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request)
s.False(created)
}
func TestParseJob(t *testing.T) {
exited := false
logger, hook := test.NewNullLogger()
logger.ExitFunc = func(i int) {
exited = true
}
log = logger.WithField("pkg", "nomad")
t.Run("parses the given default job", func(t *testing.T) {
job := parseJob(defaultJobHCL)
assert.False(t, exited)
assert.NotNil(t, job)
})
t.Run("fatals when given wrong job", func(t *testing.T) {
job := parseJob("")
assert.True(t, exited)
assert.Nil(t, job)
assert.Equal(t, logrus.FatalLevel, hook.LastEntry().Level)
})
}

View File

@ -143,3 +143,32 @@ func (nc *nomadAPIClient) writeOptions() *nomadApi.WriteOptions {
Namespace: nc.namespace,
}
}
// LoadJobList loads the list of jobs from the Nomad api.
func (nc *nomadAPIClient) LoadJobList() (list []*nomadApi.JobListStub, err error) {
list, _, err = nc.client.Jobs().List(nc.queryOptions())
return
}
// JobScale returns the scale of the passed job.
func (nc *nomadAPIClient) JobScale(jobID string) (jobScale uint, err error) {
status, _, err := nc.client.Jobs().ScaleStatus(jobID, nc.queryOptions())
if err != nil {
return
}
// ToDo: Consider counting also the placed and desired allocations
jobScale = uint(status.TaskGroups[TaskGroupName].Running)
return
}
// SetJobScale sets the scaling count of the passed job to Nomad.
func (nc *nomadAPIClient) SetJobScale(jobID string, count uint, reason string) (err error) {
intCount := int(count)
_, _, err = nc.client.Jobs().Scale(jobID, TaskGroupName, &intCount, reason, false, nil, nil)
return
}
func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) {
job, _, err = nc.client.Jobs().Info(jobID, nil)
return
}

View File

@ -193,7 +193,7 @@ func (_m *apiQuerierMock) init(nomadURL *url.URL, nomadNamespace string) error {
return r0
}
// jobInfo provides a mock function with given fields: jobID
// job provides a mock function with given fields: jobID
func (_m *apiQuerierMock) job(jobID string) (*api.Job, error) {
ret := _m.Called(jobID)

View File

@ -165,6 +165,29 @@ func (_m *ExecutorAPIMock) LoadAllJobs() ([]*api.Job, error) {
return r0, r1
}
// LoadEnvironmentTemplate provides a mock function with given fields: environmentID
func (_m *ExecutorAPIMock) LoadEnvironmentTemplate(environmentID string) (*api.Job, error) {
ret := _m.Called(environmentID)
var r0 *api.Job
if rf, ok := ret.Get(0).(func(string) *api.Job); ok {
r0 = rf(environmentID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*api.Job)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(environmentID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// LoadJobList provides a mock function with given fields:
func (_m *ExecutorAPIMock) LoadJobList() ([]*api.JobListStub, error) {
ret := _m.Called()
@ -283,6 +306,29 @@ func (_m *ExecutorAPIMock) RegisterNomadJob(job *api.Job) (string, error) {
return r0, r1
}
// RegisterTemplateJob provides a mock function with given fields: defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts
func (_m *ExecutorAPIMock) RegisterTemplateJob(defaultJob *api.Job, environmentID int, prewarmingPoolSize uint, cpuLimit uint, memoryLimit uint, image string, networkAccess bool, exposedPorts []uint16) (*api.Job, error) {
ret := _m.Called(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
var r0 *api.Job
if rf, ok := ret.Get(0).(func(*api.Job, int, uint, uint, uint, string, bool, []uint16) *api.Job); ok {
r0 = rf(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*api.Job)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*api.Job, int, uint, uint, uint, string, bool, []uint16) error); ok {
r1 = rf(defaultJob, environmentID, prewarmingPoolSize, cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SetJobScale provides a mock function with given fields: jobId, count, reason
func (_m *ExecutorAPIMock) SetJobScale(jobId string, count uint, reason string) error {
ret := _m.Called(jobId, count, reason)
@ -325,7 +371,7 @@ func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error
return r0
}
// jobInfo provides a mock function with given fields: jobID
// job provides a mock function with given fields: jobID
func (_m *ExecutorAPIMock) job(jobID string) (*api.Job, error) {
ret := _m.Called(jobID)

View File

@ -1,6 +1,7 @@
package nomad
import (
"context"
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
@ -78,31 +79,171 @@ func SetMetaConfigValue(job *nomadApi.Job, key, value string) error {
return nil
}
// LoadJobList loads the list of jobs from the Nomad api.
func (nc *nomadAPIClient) LoadJobList() (list []*nomadApi.JobListStub, err error) {
list, _, err = nc.client.Jobs().List(nc.queryOptions())
return
}
// JobScale returns the scale of the passed job.
func (nc *nomadAPIClient) JobScale(jobID string) (jobScale uint, err error) {
status, _, err := nc.client.Jobs().ScaleStatus(jobID, nc.queryOptions())
// RegisterTemplateJob creates a Nomad job based on the default job configuration and the given parameters.
// It registers the job with Nomad and waits until the registration completes.
func (a *APIClient) RegisterTemplateJob(
defaultJob *nomadApi.Job,
environmentID int,
prewarmingPoolSize, cpuLimit, memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16) (*nomadApi.Job, error) {
job := CreateTemplateJob(defaultJob, environmentID, prewarmingPoolSize,
cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
evalID, err := a.apiQuerier.RegisterNomadJob(job)
if err != nil {
return
return nil, fmt.Errorf("couldn't register template job: %w", err)
}
// ToDo: Consider counting also the placed and desired allocations
jobScale = uint(status.TaskGroups[TaskGroupName].Running)
return
return job, a.MonitorEvaluation(evalID, context.Background())
}
// SetJobScale sets the scaling count of the passed job to Nomad.
func (nc *nomadAPIClient) SetJobScale(jobID string, count uint, reason string) (err error) {
intCount := int(count)
_, _, err = nc.client.Jobs().Scale(jobID, TaskGroupName, &intCount, reason, false, nil, nil)
return
// CreateTemplateJob creates a Nomad job based on the default job configuration and the given parameters.
// It registers the job with Nomad and waits until the registration completes.
func CreateTemplateJob(
defaultJob *nomadApi.Job,
environmentID int,
prewarmingPoolSize, cpuLimit, memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16) *nomadApi.Job {
job := *defaultJob
templateJobID := TemplateJobID(strconv.Itoa(environmentID))
job.ID = &templateJobID
job.Name = &templateJobID
var taskGroup = createTaskGroup(&job, TaskGroupName, prewarmingPoolSize)
configureTask(taskGroup, TaskName, cpuLimit, memoryLimit, image, networkAccess, exposedPorts)
storeConfiguration(&job, environmentID, prewarmingPoolSize)
return &job
}
func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) {
job, _, err = nc.client.Jobs().Info(jobID, nil)
return
func createTaskGroup(job *nomadApi.Job, name string, prewarmingPoolSize uint) *nomadApi.TaskGroup {
var taskGroup *nomadApi.TaskGroup
if len(job.TaskGroups) == 0 {
taskGroup = nomadApi.NewTaskGroup(name, int(prewarmingPoolSize))
job.TaskGroups = []*nomadApi.TaskGroup{taskGroup}
} else {
taskGroup = job.TaskGroups[0]
taskGroup.Name = &name
count := 1
taskGroup.Count = &count
}
return taskGroup
}
func configureNetwork(taskGroup *nomadApi.TaskGroup, networkAccess bool, exposedPorts []uint16) {
if len(taskGroup.Tasks) == 0 {
// This function is only used internally and must be called as last step when configuring the task.
// This error is not recoverable.
log.Fatal("Can't configure network before task has been configured!")
}
task := taskGroup.Tasks[0]
if task.Config == nil {
task.Config = make(map[string]interface{})
}
if networkAccess {
var networkResource *nomadApi.NetworkResource
if len(taskGroup.Networks) == 0 {
networkResource = &nomadApi.NetworkResource{}
taskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
} else {
networkResource = taskGroup.Networks[0]
}
// Prefer "bridge" network over "host" to have an isolated network namespace with bridged interface
// instead of joining the host network namespace.
networkResource.Mode = "bridge"
for _, portNumber := range exposedPorts {
port := nomadApi.Port{
Label: strconv.FormatUint(uint64(portNumber), 10),
To: int(portNumber),
}
networkResource.DynamicPorts = append(networkResource.DynamicPorts, port)
}
// Explicitly set mode to override existing settings when updating job from without to with network.
// Don't use bridge as it collides with the bridge mode above. This results in Docker using 'bridge'
// mode, meaning all allocations will be attached to the `docker0` adapter and could reach other
// non-Nomad containers attached to it. This is avoided when using Nomads bridge network mode.
task.Config["network_mode"] = ""
} else {
// Somehow, we can't set the network mode to none in the NetworkResource on task group level.
// See https://github.com/hashicorp/nomad/issues/10540
task.Config["network_mode"] = "none"
// Explicitly set Networks to signal Nomad to remove the possibly existing networkResource
taskGroup.Networks = []*nomadApi.NetworkResource{}
}
}
func configureTask(
taskGroup *nomadApi.TaskGroup,
name string,
cpuLimit, memoryLimit uint,
image string,
networkAccess bool,
exposedPorts []uint16) {
var task *nomadApi.Task
if len(taskGroup.Tasks) == 0 {
task = nomadApi.NewTask(name, DefaultTaskDriver)
taskGroup.Tasks = []*nomadApi.Task{task}
} else {
task = taskGroup.Tasks[0]
task.Name = name
}
integerCPULimit := int(cpuLimit)
integerMemoryLimit := int(memoryLimit)
task.Resources = &nomadApi.Resources{
CPU: &integerCPULimit,
MemoryMB: &integerMemoryLimit,
}
if task.Config == nil {
task.Config = make(map[string]interface{})
}
task.Config["image"] = image
configureNetwork(taskGroup, networkAccess, exposedPorts)
}
func storeConfiguration(job *nomadApi.Job, id int, prewarmingPoolSize uint) {
taskGroup := findOrCreateConfigTaskGroup(job)
if taskGroup.Meta == nil {
taskGroup.Meta = make(map[string]string)
}
taskGroup.Meta[ConfigMetaEnvironmentKey] = strconv.Itoa(id)
taskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUnusedValue
taskGroup.Meta[ConfigMetaPoolSizeKey] = strconv.Itoa(int(prewarmingPoolSize))
}
func findOrCreateConfigTaskGroup(job *nomadApi.Job) *nomadApi.TaskGroup {
taskGroup := FindConfigTaskGroup(job)
if taskGroup == nil {
taskGroup = nomadApi.NewTaskGroup(ConfigTaskGroupName, 0)
}
createDummyTaskIfNotPresent(taskGroup)
return taskGroup
}
// createDummyTaskIfNotPresent ensures that a dummy task is in the task group so that the group is accepted by Nomad.
func createDummyTaskIfNotPresent(taskGroup *nomadApi.TaskGroup) {
var task *nomadApi.Task
for _, t := range taskGroup.Tasks {
if t.Name == DummyTaskName {
task = t
break
}
}
if task == nil {
task = nomadApi.NewTask(DummyTaskName, DefaultDummyTaskDriver)
taskGroup.Tasks = append(taskGroup.Tasks, task)
}
if task.Config == nil {
task.Config = make(map[string]interface{})
}
task.Config["command"] = DefaultTaskCommand
}

View File

@ -1,7 +1,6 @@
package environment
package nomad
import (
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/sirupsen/logrus"
@ -9,35 +8,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
"gitlab.hpi.de/codeocean/codemoon/poseidon/tests"
"testing"
)
func TestParseJob(t *testing.T) {
exited := false
logger, hook := test.NewNullLogger()
logger.ExitFunc = func(i int) {
exited = true
}
log = logger.WithField("pkg", "nomad")
t.Run("parses the given default job", func(t *testing.T) {
job := parseJob(defaultJobHCL)
assert.False(t, exited)
assert.NotNil(t, job)
})
t.Run("fatals when given wrong job", func(t *testing.T) {
job := parseJob("")
assert.True(t, exited)
assert.Nil(t, job)
assert.Equal(t, logrus.FatalLevel, hook.LastEntry().Level)
})
}
func createTestTaskGroup() *nomadApi.TaskGroup {
return nomadApi.NewTaskGroup("taskGroup", 42)
}
@ -52,18 +26,18 @@ func createTestResources() *nomadApi.Resources {
return &nomadApi.Resources{CPU: &expectedCPULimit, MemoryMB: &expectedMemoryLimit}
}
func createTestJob() (*nomadApi.Job, *nomadApi.Job) {
jobID := nomad.TemplateJobID(tests.DefaultEnvironmentIDAsString)
base := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100)
job := nomadApi.NewBatchJob(jobID, jobID, "region-name", 100)
func createTestJob() (job, base *nomadApi.Job) {
jobID := TemplateJobID(tests.DefaultEnvironmentIDAsString)
base = nomadApi.NewBatchJob(jobID, jobID, "region-name", 100)
job = nomadApi.NewBatchJob(jobID, jobID, "region-name", 100)
task := createTestTask()
task.Name = nomad.TaskName
task.Name = TaskName
image := "python:latest"
task.Config = map[string]interface{}{"image": image}
task.Config["network_mode"] = "none"
task.Resources = createTestResources()
taskGroup := createTestTaskGroup()
taskGroupName := nomad.TaskGroupName
taskGroupName := TaskGroupName
taskGroup.Name = &taskGroupName
taskGroup.Tasks = []*nomadApi.Task{task}
taskGroup.Networks = []*nomadApi.NetworkResource{}
@ -206,7 +180,7 @@ func TestConfigureTaskWhenNoTaskExists(t *testing.T) {
expectedResources := createTestResources()
expectedTaskGroup := *taskGroup
expectedTask := nomadApi.NewTask("task", nomad.DefaultTaskDriver)
expectedTask := nomadApi.NewTask("task", DefaultTaskDriver)
expectedTask.Resources = expectedResources
expectedImage := "python:latest"
expectedTask.Config = map[string]interface{}{"image": expectedImage, "network_mode": "none"}
@ -247,12 +221,11 @@ func TestConfigureTaskWhenTaskExists(t *testing.T) {
func TestCreateTemplateJobSetsAllGivenArguments(t *testing.T) {
testJob, base := createTestJob()
manager := NomadEnvironmentManager{&runner.NomadRunnerManager{}, &nomad.APIClient{}, *base}
testJobEnvironmentID, err := nomad.EnvironmentIDFromJobID(*testJob.ID)
testJobEnvironmentID, err := EnvironmentIDFromJobID(*testJob.ID)
assert.NoError(t, err)
job := createTemplateJob(
manager.defaultJob,
runner.EnvironmentID(testJobEnvironmentID),
job := CreateTemplateJob(
base,
testJobEnvironmentID,
uint(*testJob.TaskGroups[0].Count),
uint(*testJob.TaskGroups[0].Tasks[0].Resources.CPU),
uint(*testJob.TaskGroups[0].Tasks[0].Resources.MemoryMB),
@ -264,53 +237,51 @@ func TestCreateTemplateJobSetsAllGivenArguments(t *testing.T) {
}
func TestRegisterTemplateJobFailsWhenNomadJobRegistrationFails(t *testing.T) {
apiMock := nomad.ExecutorAPIMock{}
expectedErr := errors.New("test error")
apiMock := apiQuerierMock{}
expectedErr := tests.ErrDefault
apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", expectedErr)
m := NomadEnvironmentManager{
runnerManager: nil,
api: &apiMock,
defaultJob: nomadApi.Job{},
}
apiClient := &APIClient{&apiMock}
_, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{})
assert.Equal(t, expectedErr, err)
_, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger,
1, 2, 3, "image", false, []uint16{})
assert.ErrorIs(t, err, expectedErr)
apiMock.AssertNotCalled(t, "EvaluationStream")
}
func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.T) {
apiMock := nomad.ExecutorAPIMock{}
apiMock := apiQuerierMock{}
evaluationID := "id"
stream := make(chan *nomadApi.Events)
readonlyStream := func() <-chan *nomadApi.Events {
return stream
}()
// Immediately close stream to avoid any reading from it resulting in endless wait
close(stream)
apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil)
apiMock.On("MonitorEvaluation", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(nil)
apiMock.On("EvaluationStream", evaluationID, mock.AnythingOfType("*context.emptyCtx")).
Return(readonlyStream, nil)
m := NomadEnvironmentManager{
runnerManager: nil,
api: &apiMock,
defaultJob: nomadApi.Job{},
}
apiClient := &APIClient{&apiMock}
_, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{})
_, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger,
1, 2, 3, "image", false, []uint16{})
assert.NoError(t, err)
}
func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing.T) {
apiMock := nomad.ExecutorAPIMock{}
apiMock := apiQuerierMock{}
evaluationID := "id"
expectedErr := errors.New("test error")
apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil)
apiMock.On("MonitorEvaluation", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(expectedErr)
apiMock.On("EvaluationStream", evaluationID, mock.AnythingOfType("*context.emptyCtx")).Return(nil, tests.ErrDefault)
m := NomadEnvironmentManager{
runnerManager: nil,
api: &apiMock,
defaultJob: nomadApi.Job{},
}
apiClient := &APIClient{&apiMock}
_, err := m.registerTemplateJob(tests.DefaultEnvironmentIDAsInteger, 1, 2, 3, "image", false, []uint16{})
assert.Equal(t, expectedErr, err)
_, err := apiClient.RegisterTemplateJob(&nomadApi.Job{}, tests.DefaultEnvironmentIDAsInteger,
1, 2, 3, "image", false, []uint16{})
assert.ErrorIs(t, err, tests.ErrDefault)
}

View File

@ -32,6 +32,12 @@ type ExecutorAPI interface {
// LoadRunners loads all runners of the specified environment which are running and not about to get stopped.
LoadRunners(environmentID string) (runnerIds []string, err error)
// RegisterTemplateJob creates a template job based on the default job configuration and the given parameters.
// It registers the job and waits until the registration completes.
RegisterTemplateJob(defaultJob *nomadApi.Job, environmentID int,
prewarmingPoolSize, cpuLimit, memoryLimit uint,
image string, networkAccess bool, exposedPorts []uint16) (*nomadApi.Job, error)
// LoadEnvironmentTemplate loads the template job of the specified environment.
// Based on the template job new runners can be created.
LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job, error)
@ -98,7 +104,7 @@ func (a *APIClient) LoadEnvironmentTemplate(environmentID string) (*nomadApi.Job
}
func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error {
stream, err := a.EvaluationStream(evaluationID, ctx)
stream, err := a.apiQuerier.EvaluationStream(evaluationID, ctx)
if err != nil {
return fmt.Errorf("failed retrieving evaluation stream: %w", err)
}