Restore existing jobs and fix rebase (7c99eff3) issues
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
nomadApi "github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||
"strconv"
|
||||
@@ -20,8 +21,6 @@ var (
|
||||
ErrRunnerNotFound = errors.New("no runner found with this id")
|
||||
)
|
||||
|
||||
const runnerNameFormat = "%s-%s"
|
||||
|
||||
type EnvironmentID int
|
||||
|
||||
func (e EnvironmentID) toString() string {
|
||||
@@ -35,7 +34,7 @@ type NomadJobID string
|
||||
type Manager interface {
|
||||
// CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates
|
||||
// the existing environment and all runners.
|
||||
CreateOrUpdateEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint) (bool, error)
|
||||
CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint) (bool, error)
|
||||
|
||||
// Claim returns a new runner.
|
||||
// It makes sure that the runner is not in use yet and returns an error if no runner could be provided.
|
||||
@@ -59,14 +58,18 @@ type NomadRunnerManager struct {
|
||||
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
|
||||
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
|
||||
// If you cancel the context the background synchronization will be stopped.
|
||||
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
|
||||
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) (*NomadRunnerManager, error) {
|
||||
m := &NomadRunnerManager{
|
||||
apiClient,
|
||||
NewLocalNomadJobStorage(),
|
||||
NewLocalRunnerStorage(),
|
||||
}
|
||||
err := m.loadExistingEnvironments()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go m.updateRunners(ctx)
|
||||
return m
|
||||
return m, nil
|
||||
}
|
||||
|
||||
type NomadEnvironment struct {
|
||||
@@ -140,7 +143,7 @@ func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunn
|
||||
errorResult := strings.Join(occurredErrors, "\n")
|
||||
return fmt.Errorf("%d errors occurred when updating environment: %s", len(occurredErrors), errorResult)
|
||||
}
|
||||
return nil
|
||||
return m.scaleEnvironment(id)
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error) {
|
||||
@@ -153,10 +156,16 @@ func (m *NomadRunnerManager) Claim(environmentID EnvironmentID) (Runner, error)
|
||||
return nil, ErrNoRunnersAvailable
|
||||
}
|
||||
m.usedRunners.Add(runner)
|
||||
err := m.scaleEnvironment(environmentID)
|
||||
err := m.apiClient.MarkRunnerAsUsed(runner.Id())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can not scale up: %w", err)
|
||||
return nil, fmt.Errorf("can't mark runner as used: %w", err)
|
||||
}
|
||||
|
||||
err = m.scaleEnvironment(environmentID)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("environmentID", environmentID).Error("Couldn't scale environment")
|
||||
}
|
||||
|
||||
return runner, nil
|
||||
}
|
||||
|
||||
@@ -188,31 +197,37 @@ func (m *NomadRunnerManager) updateRunners(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
|
||||
log.WithField("id", alloc.ID).Debug("Allocation started")
|
||||
log.WithField("id", alloc.JobID).Debug("Runner started")
|
||||
|
||||
intJobID, err := strconv.Atoi(alloc.JobID)
|
||||
if nomad.IsDefaultJobID(alloc.JobID) {
|
||||
return
|
||||
}
|
||||
|
||||
environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID)
|
||||
intEnvironmentID, err := strconv.Atoi(environmentID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
job, ok := m.environments.Get(EnvironmentID(intJobID))
|
||||
job, ok := m.environments.Get(EnvironmentID(intEnvironmentID))
|
||||
if ok {
|
||||
job.idleRunners.Add(NewNomadAllocation(alloc.ID, m.apiClient))
|
||||
job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
|
||||
log.WithField("id", alloc.ID).Debug("Allocation stopped")
|
||||
log.WithField("id", alloc.JobID).Debug("Runner stopped")
|
||||
|
||||
intJobID, err := strconv.Atoi(alloc.JobID)
|
||||
environmentID := nomad.EnvironmentIDFromJobID(alloc.JobID)
|
||||
intEnvironmentID, err := strconv.Atoi(environmentID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.usedRunners.Delete(alloc.ID)
|
||||
job, ok := m.environments.Get(EnvironmentID(intJobID))
|
||||
m.usedRunners.Delete(alloc.JobID)
|
||||
job, ok := m.environments.Get(EnvironmentID(intEnvironmentID))
|
||||
if ok {
|
||||
job.idleRunners.Delete(alloc.ID)
|
||||
job.idleRunners.Delete(alloc.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,6 +239,9 @@ func (m *NomadRunnerManager) scaleEnvironment(id EnvironmentID) error {
|
||||
}
|
||||
|
||||
required := int(environment.desiredIdleRunnersCount) - environment.idleRunners.Length()
|
||||
|
||||
log.WithField("required", required).Debug("Scaling environment")
|
||||
|
||||
for i := 0; i < required; i++ {
|
||||
err := m.createRunner(environment)
|
||||
if err != nil {
|
||||
@@ -238,7 +256,7 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed generating runner id")
|
||||
}
|
||||
newRunnerID := fmt.Sprintf(runnerNameFormat, environment.ID().toString(), newUUID.String())
|
||||
newRunnerID := nomad.RunnerJobID(environment.ID().toString(), newUUID.String())
|
||||
|
||||
template := *environment.templateJob
|
||||
template.ID = &newRunnerID
|
||||
@@ -252,11 +270,11 @@ func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't monitor evaluation: %w", err)
|
||||
}
|
||||
environment.idleRunners.Add(NewNomadJob(newRunnerID, m.apiClient))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) unusedRunners(environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) {
|
||||
func (m *NomadRunnerManager) unusedRunners(
|
||||
environmentID EnvironmentID, fetchedRunnerIds []string) (newRunners []Runner) {
|
||||
newRunners = make([]Runner, 0)
|
||||
job, ok := m.environments.Get(environmentID)
|
||||
if !ok {
|
||||
@@ -272,5 +290,80 @@ func (m *NomadRunnerManager) unusedRunners(environmentID EnvironmentID, fetchedR
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return newRunners
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) loadExistingEnvironments() error {
|
||||
jobs, err := m.apiClient.LoadAllJobs()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't load template jobs: %w", err)
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
m.loadExistingJob(job)
|
||||
}
|
||||
|
||||
for _, environmentID := range m.environments.List() {
|
||||
err := m.scaleEnvironment(environmentID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can not scale up: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) loadExistingJob(job *nomadApi.Job) {
|
||||
if *job.Status != structs.JobStatusRunning {
|
||||
return
|
||||
}
|
||||
|
||||
jobLogger := log.WithField("jobID", *job.ID)
|
||||
|
||||
configTaskGroup := nomad.FindConfigTaskGroup(job)
|
||||
if configTaskGroup == nil {
|
||||
jobLogger.Info("Couldn't find config task group in job, skipping ...")
|
||||
return
|
||||
}
|
||||
|
||||
if configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue {
|
||||
m.usedRunners.Add(NewNomadJob(*job.ID, m.apiClient))
|
||||
jobLogger.Info("Added job to usedRunners")
|
||||
return
|
||||
}
|
||||
|
||||
environmentID := configTaskGroup.Meta[nomad.ConfigMetaEnvironmentKey]
|
||||
environmentIDInt, err := strconv.Atoi(environmentID)
|
||||
if err != nil {
|
||||
jobLogger.WithField("environmentID", environmentID).
|
||||
WithError(err).
|
||||
Error("Couldn't convert environment id of template job to int")
|
||||
return
|
||||
}
|
||||
|
||||
environment, ok := m.environments.Get(EnvironmentID(environmentIDInt))
|
||||
if !ok {
|
||||
desiredIdleRunnersCount, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey])
|
||||
if err != nil {
|
||||
jobLogger.WithError(err).Error("Couldn't convert pool size to int")
|
||||
return
|
||||
}
|
||||
|
||||
environment = &NomadEnvironment{
|
||||
environmentID: EnvironmentID(environmentIDInt),
|
||||
idleRunners: NewLocalRunnerStorage(),
|
||||
desiredIdleRunnersCount: uint(desiredIdleRunnersCount),
|
||||
}
|
||||
m.environments.Add(environment)
|
||||
log.WithField("environmentID", environment.environmentID).Info("Added existing environment")
|
||||
}
|
||||
|
||||
if nomad.IsDefaultJobID(*job.ID) {
|
||||
environment.templateJob = job
|
||||
} else {
|
||||
log.WithField("jobID", *job.ID).
|
||||
WithField("environmentID", environment.environmentID).
|
||||
Info("Added idle runner")
|
||||
environment.idleRunners.Add(NewNomadJob(*job.ID, m.apiClient))
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user