Implement merge request comments

This commit is contained in:
Maximilian Paß
2021-06-10 19:08:14 +02:00
parent 25d78df557
commit 87f823756b
26 changed files with 482 additions and 383 deletions

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"github.com/google/uuid"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
"strconv"
@@ -23,6 +22,11 @@ var (
type EnvironmentID int
func NewEnvironmentID(id string) (EnvironmentID, error) {
environment, err := strconv.Atoi(id)
return EnvironmentID(environment), err
}
func (e EnvironmentID) toString() string {
return strconv.Itoa(int(e))
}
@@ -33,8 +37,8 @@ type NomadJobID string
// runners to new clients and ensure no runner is used twice.
type Manager interface {
// CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates
// the existing environment and all runners.
CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error)
// the existing environment and all runners. Iff a new Environment has been created, it returns true.
CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, teplateJob *nomadApi.Job) (bool, error)
// Claim returns a new runner.
// It makes sure that the runner is not in use yet and returns an error if no runner could be provided.
@@ -47,6 +51,17 @@ type Manager interface {
// Return signals that the runner is no longer used by the caller and can be claimed by someone else.
// The runner is deleted or cleaned up for reuse depending on the used executor.
Return(r Runner) error
// ScaleAllEnvironments checks for all environments if enough runners are created.
ScaleAllEnvironments() error
// RecoverEnvironment adds a recovered Environment to the internal structure.
// This is intended to recover environments after a restart.
RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job, desiredIdleRunnersCount uint)
// RecoverRunner adds a recovered runner to the internal structure.
// This is intended to recover runners after a restart.
RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool)
}
type NomadRunnerManager struct {
@@ -58,18 +73,14 @@ type NomadRunnerManager struct {
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
// If you cancel the context the background synchronization will be stopped.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) (*NomadRunnerManager, error) {
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
m := &NomadRunnerManager{
apiClient,
NewLocalNomadJobStorage(),
NewLocalNomadEnvironmentStorage(),
NewLocalRunnerStorage(),
}
err := m.loadExistingEnvironments()
if err != nil {
return nil, err
}
go m.updateRunners(ctx)
return m, nil
return m
}
type NomadEnvironment struct {
@@ -83,47 +94,52 @@ func (j *NomadEnvironment) ID() EnvironmentID {
return j.environmentID
}
func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error) {
func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) (bool, error) {
_, ok := m.environments.Get(id)
if !ok {
return true, m.registerEnvironment(id, desiredIdleRunnersCount)
return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob)
}
return false, m.updateEnvironment(id, desiredIdleRunnersCount)
return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob)
}
func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint) error {
templateJob, err := m.apiClient.LoadTemplateJob(environmentID.toString())
if err != nil {
return fmt.Errorf("couldn't register environment: %w", err)
}
func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job) error {
m.environments.Add(&NomadEnvironment{
environmentID,
NewLocalRunnerStorage(),
desiredIdleRunnersCount,
templateJob,
})
err = m.scaleEnvironment(environmentID)
err := m.scaleEnvironment(environmentID)
if err != nil {
return fmt.Errorf("couldn't upscale environment %w", err)
}
return nil
}
func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) error {
// updateEnvironment updates all runners of the specified environment. This is required as attributes like the
// CPULimit or MemoryMB could be changed in the new template job.
func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, newTemplateJob *nomadApi.Job) error {
environment, ok := m.environments.Get(id)
if !ok {
return ErrUnknownExecutionEnvironment
}
environment.desiredIdleRunnersCount = desiredIdleRunnersCount
templateJob, err := m.apiClient.LoadTemplateJob(id.toString())
environment.templateJob = newTemplateJob
err := nomad.SetMetaConfigValue(newTemplateJob, nomad.ConfigMetaPoolSizeKey, strconv.Itoa(int(desiredIdleRunnersCount)))
if err != nil {
return fmt.Errorf("update environment couldn't load template job: %w", err)
return fmt.Errorf("update environment couldn't update template environment: %w", err)
}
environment.templateJob = templateJob
runners, err := m.apiClient.LoadRunners(id.toString())
err = m.updateRunnerSpecs(id, newTemplateJob)
if err != nil {
return err
}
return m.scaleEnvironment(id)
}
func (m *NomadRunnerManager) updateRunnerSpecs(environmentID EnvironmentID, templateJob *nomadApi.Job) error {
runners, err := m.apiClient.LoadRunners(environmentID.toString())
if err != nil {
return fmt.Errorf("update environment couldn't load runners: %w", err)
}
@@ -131,7 +147,7 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn
for _, id := range runners {
// avoid taking the address of the loop variable
runnerID := id
updatedRunnerJob := *environment.templateJob
updatedRunnerJob := *templateJob
updatedRunnerJob.ID = &runnerID
updatedRunnerJob.Name = &runnerID
_, err := m.apiClient.RegisterNomadJob(&updatedRunnerJob)
@@ -143,7 +159,7 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn
errorResult := strings.Join(occurredErrors, "\n")
return fmt.Errorf("%d errors occurred when updating environment: %s", len(occurredErrors), errorResult)
}
return m.scaleEnvironment(id)
return nil
}
func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) {
@@ -186,6 +202,52 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) {
return
}
func (m *NomadRunnerManager) ScaleAllEnvironments() error {
for _, environmentID := range m.environments.List() {
err := m.scaleEnvironment(environmentID)
if err != nil {
return fmt.Errorf("can not scale up: %w", err)
}
}
return nil
}
func (m *NomadRunnerManager) RecoverEnvironment(id EnvironmentID, templateJob *nomadApi.Job,
desiredIdleRunnersCount uint) {
_, ok := m.environments.Get(id)
if ok {
log.Error("Recovering existing environment.")
return
}
environment := &NomadEnvironment{
environmentID: id,
idleRunners: NewLocalRunnerStorage(),
}
m.environments.Add(environment)
log.WithField("environmentID", environment.environmentID).Info("Added recovered environment")
environment.desiredIdleRunnersCount = desiredIdleRunnersCount
environment.templateJob = templateJob
}
func (m *NomadRunnerManager) RecoverRunner(id EnvironmentID, job *nomadApi.Job, isUsed bool) {
environment, ok := m.environments.Get(id)
if !ok {
log.Error("Environment missing. Can not recover runner")
return
}
log.WithField("jobID", *job.ID).
WithField("environmentID", environment.environmentID).
Info("Added idle runner")
newJob := NewNomadJob(*job.ID, m.apiClient)
if isUsed {
m.usedRunners.Add(newJob)
} else {
environment.idleRunners.Add(newJob)
}
}
func (m *NomadRunnerManager) updateRunners(ctx context.Context) {
retries := 0
for ctx.Err() == nil {
@@ -199,17 +261,17 @@ func (m *NomadRunnerManager) updateRunners(ctx context.Context) {
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner started")
if nomad.IsDefaultJobID(alloc.JobID) {
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
return
}
environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID)
intEnvironmentID, err := strconv.Atoi(environmentID)
environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Allocation could not be added")
return
}
job, ok := m.environments.Get(EnvironmentID(intEnvironmentID))
job, ok := m.environments.Get(EnvironmentID(environmentID))
if ok {
job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient))
}
@@ -218,14 +280,14 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
log.WithField("id", alloc.JobID).Debug("Runner stopped")
environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID)
intEnvironmentID, err := strconv.Atoi(environmentID)
environmentID, err := nomad.EnvironmentIDFromJobID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Stopped allocation can not be handled")
return
}
m.usedRunners.Delete(alloc.JobID)
job, ok := m.environments.Get(EnvironmentID(intEnvironmentID))
job, ok := m.environments.Get(EnvironmentID(environmentID))
if ok {
job.idleRunners.Delete(alloc.JobID)
}
@@ -240,7 +302,7 @@ func (m *NomadRunnerManager) scaleEnvironment(id EnvironmentID) error {
required := int(environment.desiredIdleRunnersCount) - environment.idleRunners.Length()
log.WithField("required", required).Debug("Scaling environment")
log.WithField("runnersRequired", required).WithField("id", id).Debug("Scaling environment")
for i := 0; i < required; i++ {
err := m.createRunner(environment)
@@ -264,7 +326,7 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error {
evalID, err := m.apiClient.RegisterNomadJob(&template)
if err != nil {
return fmt.Errorf("couldn't register Nomad job: %w", err)
return fmt.Errorf("couldn't register Nomad environment: %w", err)
}
err = m.apiClient.MonitorEvaluation(evalID, context.Background())
if err != nil {
@@ -272,98 +334,3 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error {
}
return nil
}
func (m *NomadRunnerManager) unusedRunners(
environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) {
newRunners = make([]Runner, 0)
job, ok := m.environments.Get(environmentID)
if !ok {
// the environment does not exist, so it won't have any unused runners
return
}
for _, runnerID := range fetchedRunnerIds {
_, ok := m.usedRunners.Get(runnerID)
if !ok {
_, ok = job.idleRunners.Get(runnerID)
if !ok {
newRunners = append(newRunners, NewNomadJob(runnerID, m.apiClient))
}
}
}
return newRunners
}
func (m *NomadRunnerManager) loadExistingEnvironments() error {
jobs, err := m.apiClient.LoadAllJobs()
if err != nil {
return fmt.Errorf("can't load template jobs: %w", err)
}
for _, job := range jobs {
m.loadExistingJob(job)
}
for _, environmentID := range m.environments.List() {
err := m.scaleEnvironment(environmentID)
if err != nil {
return fmt.Errorf("can not scale up: %w", err)
}
}
return nil
}
func (m *NomadRunnerManager) loadExistingJob(job *nomadApi.Job) {
if *job.Status != structs.JobStatusRunning {
return
}
jobLogger := log.WithField("jobID", *job.ID)
configTaskGroup := nomad.FindConfigTaskGroup(job)
if configTaskGroup == nil {
jobLogger.Info("Couldn't find config task group in job, skipping ...")
return
}
if configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue {
m.usedRunners.Add(NewNomadJob(*job.ID, m.apiClient))
jobLogger.Info("Added job to usedRunners")
return
}
environmentID := configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey]
environmentIDInt, err := strconv.Atoi(environmentID)
if err != nil {
jobLogger.WithField("environmentID", environmentID).
WithError(err).
Error("Couldn't convert environment id of template job to int")
return
}
environment, ok := m.environments.Get(EnvironmentID(environmentIDInt))
if !ok {
desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey])
if err != nil {
jobLogger.WithError(err).Error("Couldn't convert pool size to int")
return
}
environment = &NomadEnvironment{
environmentID: EnvironmentID(environmentIDInt),
idleRunners: NewLocalRunnerStorage(),
desiredIdleRunnersCount: uint(desiredIdleRunnersCount),
}
m.environments.Add(environment)
log.WithField("environmentID", environment.environmentID).Info("Added existing environment")
}
if nomad.IsDefaultJobID(*job.ID) {
environment.templateJob = job
} else {
log.WithField("jobID", *job.ID).
WithField("environmentID", environment.environmentID).
Info("Added idle runner")
environment.idleRunners.Add(NewNomadJob(*job.ID, m.apiClient))
}
}