Implement even more merge request comments
This commit is contained in:
@@ -14,10 +14,12 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
log = logging.GetLogger("runner")
|
||||
ErrUnknownExecutionEnvironment = errors.New("execution environment not found")
|
||||
ErrNoRunnersAvailable = errors.New("no runners available for this execution environment")
|
||||
ErrRunnerNotFound = errors.New("no runner found with this id")
|
||||
log = logging.GetLogger("runner")
|
||||
ErrUnknownExecutionEnvironment = errors.New("execution environment not found")
|
||||
ErrNoRunnersAvailable = errors.New("no runners available for this execution environment")
|
||||
ErrRunnerNotFound = errors.New("no runner found with this id")
|
||||
ErrorUpdatingExecutionEnvironment = errors.New("errors occurred when updating environment")
|
||||
ErrorInvalidJobID = errors.New("invalid job id")
|
||||
)
|
||||
|
||||
type EnvironmentID int
|
||||
@@ -38,7 +40,9 @@ type NomadJobID string
|
||||
type Manager interface {
|
||||
// CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates
|
||||
// the existing environment and all runners. Iff a new Environment has been created, it returns true.
|
||||
CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *nomadApi.Job) (bool, error)
|
||||
// Iff scale is true, runners are created until the desiredIdleRunnersCount is reached.
|
||||
CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job,
|
||||
scale bool) (bool, error)
|
||||
|
||||
// Claim returns a new runner.
|
||||
// It makes sure that the runner is not in use yet and returns an error if no runner could be provided.
|
||||
@@ -55,13 +59,9 @@ type Manager interface {
|
||||
// ScaleAllEnvironments checks for all environments if enough runners are created.
|
||||
ScaleAllEnvironments() error
|
||||
|
||||
// RecoverEnvironment adds a recovered Environment to the internal structure.
|
||||
// This is intended to recover environments after a restart.
|
||||
RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job, desiredIdleRunnersCount uint)
|
||||
|
||||
// RecoverRunner adds a recovered runner to the internal structure.
|
||||
// This is intended to recover runners after a restart.
|
||||
RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool)
|
||||
// Load fetches all already created runners from the executor and registers them.
|
||||
// It should be called during the startup process (e.g. on creation of the Manager).
|
||||
Load()
|
||||
}
|
||||
|
||||
type NomadRunnerManager struct {
|
||||
@@ -79,7 +79,7 @@ func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *No
|
||||
NewLocalNomadEnvironmentStorage(),
|
||||
NewLocalRunnerStorage(),
|
||||
}
|
||||
go m.updateRunners(ctx)
|
||||
go m.keepRunnersSynced(ctx)
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -94,31 +94,36 @@ func (j *NomadEnvironment) ID() EnvironmentID {
|
||||
return j.environmentID
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) (bool, error) {
|
||||
func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint,
|
||||
templateJob *nomadApi.Job, scale bool) (bool, error) {
|
||||
_, ok := m.environments.Get(id)
|
||||
if !ok {
|
||||
return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob)
|
||||
return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob, scale)
|
||||
}
|
||||
return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob)
|
||||
return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob, scale)
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) error {
|
||||
func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint,
|
||||
templateJob *nomadApi.Job, scale bool) error {
|
||||
m.environments.Add(&NomadEnvironment{
|
||||
environmentID,
|
||||
NewLocalRunnerStorage(),
|
||||
desiredIdleRunnersCount,
|
||||
templateJob,
|
||||
})
|
||||
err := m.scaleEnvironment(environmentID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't upscale environment %w", err)
|
||||
if scale {
|
||||
err := m.scaleEnvironment(environmentID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't upscale environment %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateEnvironment updates all runners of the specified environment. This is required as attributes like the
|
||||
// CPULimit or MemoryMB could be changed in the new template job.
|
||||
func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, newTemplateJob *nomadApi.Job) error {
|
||||
func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint,
|
||||
newTemplateJob *nomadApi.Job, scale bool) error {
|
||||
environment, ok := m.environments.Get(id)
|
||||
if !ok {
|
||||
return ErrUnknownExecutionEnvironment
|
||||
@@ -135,31 +140,34 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn
|
||||
return err
|
||||
}
|
||||
|
||||
return m.scaleEnvironment(id)
|
||||
if scale {
|
||||
err = m.scaleEnvironment(id)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) updateRunnerSpecs(environmentID EnvironmentID, templateJob *nomadApi.Job) error {
|
||||
runners, err := m.apiClient.LoadRunners(environmentID.toString())
|
||||
runners, err := m.apiClient.LoadRunnerIDs(environmentID.toString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("update environment couldn't load runners: %w", err)
|
||||
}
|
||||
var occurredErrors []string
|
||||
|
||||
var occurredError error
|
||||
for _, id := range runners {
|
||||
// avoid taking the address of the loop variable
|
||||
runnerID := id
|
||||
updatedRunnerJob := *templateJob
|
||||
updatedRunnerJob.ID = &runnerID
|
||||
updatedRunnerJob.Name = &runnerID
|
||||
_, err := m.apiClient.RegisterNomadJob(&updatedRunnerJob)
|
||||
err := m.apiClient.RegisterRunnerJob(&updatedRunnerJob)
|
||||
if err != nil {
|
||||
occurredErrors = append(occurredErrors, err.Error())
|
||||
if occurredError == nil {
|
||||
occurredError = ErrorUpdatingExecutionEnvironment
|
||||
}
|
||||
occurredError = fmt.Errorf("%w; new api error for runner %s - %v", occurredError, id, err)
|
||||
}
|
||||
}
|
||||
if len(occurredErrors) > 0 {
|
||||
errorResult := strings.Join(occurredErrors, "\n")
|
||||
return fmt.Errorf("%d errors occurred when updating environment: %s", len(occurredErrors), errorResult)
|
||||
}
|
||||
return nil
|
||||
return occurredError
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) {
|
||||
@@ -203,8 +211,8 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) {
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) ScaleAllEnvironments() error {
|
||||
for _, environmentID := range m.environments.List() {
|
||||
err := m.scaleEnvironment(environmentID)
|
||||
for _, environment := range m.environments.List() {
|
||||
err := m.scaleEnvironment(environment.ID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("can not scale up: %w", err)
|
||||
}
|
||||
@@ -212,43 +220,35 @@ func (m *NomadRunnerManager) ScaleAllEnvironments() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job,
|
||||
desiredIdleRunnersCount uint) {
|
||||
_, ok := m.environments.Get(id)
|
||||
if ok {
|
||||
log.Error("Recovering existing environment.")
|
||||
return
|
||||
}
|
||||
environment := &NomadEnvironment{
|
||||
environmentID: id,
|
||||
idleRunners: NewLocalRunnerStorage(),
|
||||
}
|
||||
m.environments.Add(environment)
|
||||
log.WithField("environmentID", environment.environmentID).Info("Added recovered environment")
|
||||
environment.desiredIdleRunnersCount = desiredIdleRunnersCount
|
||||
environment.templateJob = templateJob
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool) {
|
||||
environment, ok := m.environments.Get(id)
|
||||
if !ok {
|
||||
log.Error("Environment missing. Can not recover runner")
|
||||
return
|
||||
}
|
||||
|
||||
log.WithField("jobID", *job.ID).
|
||||
WithField("environmentID", environment.environmentID).
|
||||
Info("Added idle runner")
|
||||
|
||||
newJob := NewNomadJob(*job.ID, m.apiClient)
|
||||
if isUsed {
|
||||
m.usedRunners.Add(newJob)
|
||||
} else {
|
||||
environment.idleRunners.Add(newJob)
|
||||
func (m *NomadRunnerManager) Load() {
|
||||
for _, environment := range m.environments.List() {
|
||||
environmentLogger := log.WithField("environmentID", environment.ID())
|
||||
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID().toString())
|
||||
if err != nil {
|
||||
environmentLogger.WithError(err).Warn("Error fetching the runner jobs")
|
||||
}
|
||||
for _, job := range runnerJobs {
|
||||
configTaskGroup := nomad.FindConfigTaskGroup(job)
|
||||
if configTaskGroup == nil {
|
||||
environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID)
|
||||
continue
|
||||
}
|
||||
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
|
||||
newJob := NewNomadJob(*job.ID, m.apiClient)
|
||||
if isUsed {
|
||||
m.usedRunners.Add(newJob)
|
||||
} else {
|
||||
environment.idleRunners.Add(newJob)
|
||||
}
|
||||
}
|
||||
err = m.scaleEnvironment(environment.ID())
|
||||
if err != nil {
|
||||
environmentLogger.Error("Couldn't scale environment")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) updateRunners(ctx context.Context) {
|
||||
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
||||
retries := 0
|
||||
for ctx.Err() == nil {
|
||||
err := m.apiClient.WatchAllocations(ctx, m.onAllocationAdded, m.onAllocationStopped)
|
||||
@@ -261,17 +261,17 @@ func (m *NomadRunnerManager) updateRunners(ctx context.Context) {
|
||||
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
|
||||
log.WithField("id", alloc.JobID).Debug("Runner started")
|
||||
|
||||
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
|
||||
if IsEnvironmentTemplateID(alloc.JobID) {
|
||||
return
|
||||
}
|
||||
|
||||
environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID)
|
||||
environmentID, err := EnvironmentIDFromJobID(alloc.JobID)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Allocation could not be added")
|
||||
return
|
||||
}
|
||||
|
||||
job, ok := m.environments.Get(EnvironmentID(environmentID))
|
||||
job, ok := m.environments.Get(environmentID)
|
||||
if ok {
|
||||
job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient))
|
||||
}
|
||||
@@ -280,14 +280,14 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
|
||||
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
|
||||
log.WithField("id", alloc.JobID).Debug("Runner stopped")
|
||||
|
||||
environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID)
|
||||
environmentID, err := EnvironmentIDFromJobID(alloc.JobID)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Stopped allocation can not be handled")
|
||||
return
|
||||
}
|
||||
|
||||
m.usedRunners.Delete(alloc.JobID)
|
||||
job, ok := m.environments.Get(EnvironmentID(environmentID))
|
||||
job, ok := m.environments.Get(environmentID)
|
||||
if ok {
|
||||
job.idleRunners.Delete(alloc.JobID)
|
||||
}
|
||||
@@ -318,19 +318,40 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed generating runner id")
|
||||
}
|
||||
newRunnerID := nomad.RunnerJobID(environment.ID().toString(), newUUID.String())
|
||||
newRunnerID := RunnerJobID(environment.ID(), newUUID.String())
|
||||
|
||||
template := *environment.templateJob
|
||||
template.ID = &newRunnerID
|
||||
template.Name = &newRunnerID
|
||||
|
||||
evalID, err := m.apiClient.RegisterNomadJob(&template)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't register Nomad environment: %w", err)
|
||||
}
|
||||
err = m.apiClient.MonitorEvaluation(evalID, context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't monitor evaluation: %w", err)
|
||||
}
|
||||
return nil
|
||||
return m.apiClient.RegisterRunnerJob(&template)
|
||||
}
|
||||
|
||||
// RunnerJobID returns the nomad job id of the runner with the given environment id and uuid.
|
||||
func RunnerJobID(environmentID EnvironmentID, uuid string) string {
|
||||
return fmt.Sprintf("%d-%s", environmentID, uuid)
|
||||
}
|
||||
|
||||
// EnvironmentIDFromJobID returns the environment id that is part of the passed job id.
|
||||
func EnvironmentIDFromJobID(jobID string) (EnvironmentID, error) {
|
||||
parts := strings.Split(jobID, "-")
|
||||
if len(parts) == 0 {
|
||||
return 0, fmt.Errorf("empty job id: %w", ErrorInvalidJobID)
|
||||
}
|
||||
environmentID, err := strconv.Atoi(parts[0])
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid environment id par %v: %w", err, ErrorInvalidJobID)
|
||||
}
|
||||
return EnvironmentID(environmentID), nil
|
||||
}
|
||||
|
||||
// TemplateJobID returns the id of the template job for the environment with the given id.
|
||||
func TemplateJobID(id EnvironmentID) string {
|
||||
return fmt.Sprintf("%s-%d", nomad.TemplateJobPrefix, id)
|
||||
}
|
||||
|
||||
// IsEnvironmentTemplateID checks if the passed job id belongs to a template job.
|
||||
func IsEnvironmentTemplateID(jobID string) bool {
|
||||
parts := strings.Split(jobID, "-")
|
||||
return len(parts) == 2 && parts[0] == nomad.TemplateJobPrefix
|
||||
}
|
||||
|
Reference in New Issue
Block a user