407 lines
13 KiB
Go
407 lines
13 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/google/uuid"
|
|
nomadApi "github.com/hashicorp/nomad/api"
|
|
"github.com/openHPI/poseidon/internal/nomad"
|
|
"github.com/openHPI/poseidon/pkg/logging"
|
|
"github.com/sirupsen/logrus"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
log = logging.GetLogger("runner")
|
|
ErrUnknownExecutionEnvironment = errors.New("execution environment not found")
|
|
ErrNoRunnersAvailable = errors.New("no runners available for this execution environment")
|
|
ErrRunnerNotFound = errors.New("no runner found with this id")
|
|
ErrorUpdatingExecutionEnvironment = errors.New("errors occurred when updating environment")
|
|
ErrorInvalidJobID = errors.New("invalid job id")
|
|
)
|
|
|
|
type EnvironmentID int
|
|
|
|
func NewEnvironmentID(id string) (EnvironmentID, error) {
|
|
environment, err := strconv.Atoi(id)
|
|
return EnvironmentID(environment), err
|
|
}
|
|
|
|
func (e EnvironmentID) toString() string {
|
|
return strconv.Itoa(int(e))
|
|
}
|
|
|
|
type NomadJobID string
|
|
|
|
// Manager keeps track of the used and unused runners of all execution environments in order to provide unused
|
|
// runners to new clients and ensure no runner is used twice.
|
|
type Manager interface {
|
|
// CreateOrUpdateEnvironment creates the given environment if it does not exist. Otherwise, it updates
|
|
// the existing environment and all runners. Iff a new Environment has been created, it returns true.
|
|
// Iff scale is true, runners are created until the desiredIdleRunnersCount is reached.
|
|
CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint, templateJob *nomadApi.Job,
|
|
scale bool) (bool, error)
|
|
|
|
// Claim returns a new runner. The runner is deleted after duration seconds if duration is not 0.
|
|
// It makes sure that the runner is not in use yet and returns an error if no runner could be provided.
|
|
Claim(id EnvironmentID, duration int) (Runner, error)
|
|
|
|
// Get returns the used runner with the given runnerId.
|
|
// If no runner with the given runnerId is currently used, it returns an error.
|
|
Get(runnerID string) (Runner, error)
|
|
|
|
// Return signals that the runner is no longer used by the caller and can be claimed by someone else.
|
|
// The runner is deleted or cleaned up for reuse depending on the used executor.
|
|
Return(r Runner) error
|
|
|
|
// Load fetches all already created runners from the executor and registers them.
|
|
// It should be called during the startup process (e.g. on creation of the Manager).
|
|
Load()
|
|
}
|
|
|
|
type NomadRunnerManager struct {
|
|
apiClient nomad.ExecutorAPI
|
|
environments NomadEnvironmentStorage
|
|
usedRunners Storage
|
|
}
|
|
|
|
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
|
|
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
|
|
// If you cancel the context the background synchronization will be stopped.
|
|
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
|
|
m := &NomadRunnerManager{
|
|
apiClient,
|
|
NewLocalNomadEnvironmentStorage(),
|
|
NewLocalRunnerStorage(),
|
|
}
|
|
go m.keepRunnersSynced(ctx)
|
|
return m
|
|
}
|
|
|
|
type NomadEnvironment struct {
|
|
environmentID EnvironmentID
|
|
idleRunners Storage
|
|
desiredIdleRunnersCount uint
|
|
templateJob *nomadApi.Job
|
|
}
|
|
|
|
func (j *NomadEnvironment) ID() EnvironmentID {
|
|
return j.environmentID
|
|
}
|
|
|
|
func (m *NomadRunnerManager) CreateOrUpdateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint,
|
|
templateJob *nomadApi.Job, scale bool) (bool, error) {
|
|
_, ok := m.environments.Get(id)
|
|
if !ok {
|
|
return true, m.registerEnvironment(id, desiredIdleRunnersCount, templateJob, scale)
|
|
}
|
|
return false, m.updateEnvironment(id, desiredIdleRunnersCount, templateJob, scale)
|
|
}
|
|
|
|
func (m *NomadRunnerManager) registerEnvironment(environmentID EnvironmentID, desiredIdleRunnersCount uint,
|
|
templateJob *nomadApi.Job, scale bool) error {
|
|
m.environments.Add(&NomadEnvironment{
|
|
environmentID,
|
|
NewLocalRunnerStorage(),
|
|
desiredIdleRunnersCount,
|
|
templateJob,
|
|
})
|
|
if scale {
|
|
err := m.scaleEnvironment(environmentID)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't upscale environment %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateEnvironment updates all runners of the specified environment. This is required as attributes like the
|
|
// CPULimit or MemoryMB could be changed in the new template job.
|
|
func (m *NomadRunnerManager) updateEnvironment(id EnvironmentID, desiredIdleRunnersCount uint,
|
|
newTemplateJob *nomadApi.Job, scale bool) error {
|
|
environment, ok := m.environments.Get(id)
|
|
if !ok {
|
|
return ErrUnknownExecutionEnvironment
|
|
}
|
|
environment.desiredIdleRunnersCount = desiredIdleRunnersCount
|
|
environment.templateJob = newTemplateJob
|
|
err := nomad.SetMetaConfigValue(newTemplateJob, nomad.ConfigMetaPoolSizeKey,
|
|
strconv.Itoa(int(desiredIdleRunnersCount)))
|
|
if err != nil {
|
|
return fmt.Errorf("update environment couldn't update template environment: %w", err)
|
|
}
|
|
|
|
err = m.updateRunnerSpecs(id, newTemplateJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if scale {
|
|
err = m.scaleEnvironment(id)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (m *NomadRunnerManager) updateRunnerSpecs(environmentID EnvironmentID, templateJob *nomadApi.Job) error {
|
|
runners, err := m.apiClient.LoadRunnerIDs(environmentID.toString())
|
|
if err != nil {
|
|
return fmt.Errorf("update environment couldn't load runners: %w", err)
|
|
}
|
|
|
|
var occurredError error
|
|
for _, id := range runners {
|
|
// avoid taking the address of the loop variable
|
|
runnerID := id
|
|
updatedRunnerJob := *templateJob
|
|
updatedRunnerJob.ID = &runnerID
|
|
updatedRunnerJob.Name = &runnerID
|
|
err := m.apiClient.RegisterRunnerJob(&updatedRunnerJob)
|
|
if err != nil {
|
|
if occurredError == nil {
|
|
occurredError = ErrorUpdatingExecutionEnvironment
|
|
}
|
|
occurredError = fmt.Errorf("%w; new api error for runner %s - %v", occurredError, id, err)
|
|
}
|
|
}
|
|
return occurredError
|
|
}
|
|
|
|
func (m *NomadRunnerManager) Claim(environmentID EnvironmentID, duration int) (Runner, error) {
|
|
environment, ok := m.environments.Get(environmentID)
|
|
if !ok {
|
|
return nil, ErrUnknownExecutionEnvironment
|
|
}
|
|
runner, ok := environment.idleRunners.Sample()
|
|
if !ok {
|
|
return nil, ErrNoRunnersAvailable
|
|
}
|
|
m.usedRunners.Add(runner)
|
|
err := m.apiClient.MarkRunnerAsUsed(runner.ID(), duration)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't mark runner as used: %w", err)
|
|
}
|
|
|
|
runner.SetupTimeout(time.Duration(duration) * time.Second)
|
|
|
|
err = m.createRunner(environment)
|
|
if err != nil {
|
|
log.WithError(err).WithField("environmentID", environmentID).Error("Couldn't create new runner for claimed one")
|
|
}
|
|
|
|
return runner, nil
|
|
}
|
|
|
|
func (m *NomadRunnerManager) Get(runnerID string) (Runner, error) {
|
|
runner, ok := m.usedRunners.Get(runnerID)
|
|
if !ok {
|
|
return nil, ErrRunnerNotFound
|
|
}
|
|
return runner, nil
|
|
}
|
|
|
|
func (m *NomadRunnerManager) Return(r Runner) error {
|
|
r.StopTimeout()
|
|
err := m.apiClient.DeleteRunner(r.ID())
|
|
if err != nil {
|
|
return fmt.Errorf("error deleting runner in Nomad: %w", err)
|
|
}
|
|
m.usedRunners.Delete(r.ID())
|
|
return nil
|
|
}
|
|
|
|
func (m *NomadRunnerManager) Load() {
|
|
for _, environment := range m.environments.List() {
|
|
environmentLogger := log.WithField("environmentID", environment.ID())
|
|
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID().toString())
|
|
if err != nil {
|
|
environmentLogger.WithError(err).Warn("Error fetching the runner jobs")
|
|
}
|
|
for _, job := range runnerJobs {
|
|
m.loadSingleJob(job, environmentLogger, environment)
|
|
}
|
|
err = m.scaleEnvironment(environment.ID())
|
|
if err != nil {
|
|
environmentLogger.Error("Couldn't scale environment")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry,
|
|
environment *NomadEnvironment,
|
|
) {
|
|
configTaskGroup := nomad.FindConfigTaskGroup(job)
|
|
if configTaskGroup == nil {
|
|
environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID)
|
|
return
|
|
}
|
|
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
|
|
portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID)
|
|
if err != nil {
|
|
environmentLogger.WithError(err).Warn("Error loading runner portMappings")
|
|
return
|
|
}
|
|
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m)
|
|
if isUsed {
|
|
m.usedRunners.Add(newJob)
|
|
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
|
|
if err != nil {
|
|
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
|
|
} else {
|
|
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
|
|
}
|
|
} else {
|
|
environment.idleRunners.Add(newJob)
|
|
}
|
|
}
|
|
|
|
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
|
retries := 0
|
|
for ctx.Err() == nil {
|
|
err := m.apiClient.WatchAllocations(ctx, m.onAllocationAdded, m.onAllocationStopped)
|
|
retries += 1
|
|
log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
|
|
<-time.After(time.Second)
|
|
}
|
|
}
|
|
|
|
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) {
|
|
log.WithField("id", alloc.JobID).Debug("Runner started")
|
|
|
|
if IsEnvironmentTemplateID(alloc.JobID) {
|
|
return
|
|
}
|
|
|
|
environmentID, err := EnvironmentIDFromJobID(alloc.JobID)
|
|
if err != nil {
|
|
log.WithError(err).Warn("Allocation could not be added")
|
|
return
|
|
}
|
|
|
|
job, ok := m.environments.Get(environmentID)
|
|
if ok {
|
|
var mappedPorts []nomadApi.PortMapping
|
|
if alloc.AllocatedResources != nil {
|
|
mappedPorts = alloc.AllocatedResources.Shared.Ports
|
|
}
|
|
job.idleRunners.Add(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m))
|
|
}
|
|
}
|
|
|
|
func (m *NomadRunnerManager) onAllocationStopped(alloc *nomadApi.Allocation) {
|
|
log.WithField("id", alloc.JobID).Debug("Runner stopped")
|
|
|
|
environmentID, err := EnvironmentIDFromJobID(alloc.JobID)
|
|
if err != nil {
|
|
log.WithError(err).Warn("Stopped allocation can not be handled")
|
|
return
|
|
}
|
|
|
|
m.usedRunners.Delete(alloc.JobID)
|
|
job, ok := m.environments.Get(environmentID)
|
|
if ok {
|
|
job.idleRunners.Delete(alloc.JobID)
|
|
}
|
|
}
|
|
|
|
// scaleEnvironment makes sure that the amount of idle runners is at least the desiredIdleRunnersCount.
|
|
func (m *NomadRunnerManager) scaleEnvironment(id EnvironmentID) error {
|
|
environment, ok := m.environments.Get(id)
|
|
if !ok {
|
|
return ErrUnknownExecutionEnvironment
|
|
}
|
|
|
|
required := int(environment.desiredIdleRunnersCount) - environment.idleRunners.Length()
|
|
|
|
if required > 0 {
|
|
return m.createRunners(environment, uint(required))
|
|
} else {
|
|
return m.removeRunners(environment, uint(-required))
|
|
}
|
|
}
|
|
|
|
func (m *NomadRunnerManager) createRunners(environment *NomadEnvironment, count uint) error {
|
|
log.WithField("runnersRequired", count).WithField("id", environment.ID()).Debug("Creating new runners")
|
|
for i := 0; i < int(count); i++ {
|
|
err := m.createRunner(environment)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't create new runner: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *NomadRunnerManager) createRunner(environment *NomadEnvironment) error {
|
|
newUUID, err := uuid.NewUUID()
|
|
if err != nil {
|
|
return fmt.Errorf("failed generating runner id: %w", err)
|
|
}
|
|
newRunnerID := RunnerJobID(environment.ID(), newUUID.String())
|
|
|
|
template := *environment.templateJob
|
|
template.ID = &newRunnerID
|
|
template.Name = &newRunnerID
|
|
|
|
err = m.apiClient.RegisterRunnerJob(&template)
|
|
if err != nil {
|
|
return fmt.Errorf("error registering new runner job: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *NomadRunnerManager) removeRunners(environment *NomadEnvironment, count uint) error {
|
|
log.WithField("runnersToDelete", count).WithField("id", environment.ID()).Debug("Removing idle runners")
|
|
for i := 0; i < int(count); i++ {
|
|
r, ok := environment.idleRunners.Sample()
|
|
if !ok {
|
|
return fmt.Errorf("could not delete expected idle runner: %w", ErrRunnerNotFound)
|
|
}
|
|
err := m.apiClient.DeleteRunner(r.ID())
|
|
if err != nil {
|
|
return fmt.Errorf("could not delete expected Nomad idle runner: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunnerJobID returns the nomad job id of the runner with the given environmentID and id.
|
|
func RunnerJobID(environmentID EnvironmentID, id string) string {
|
|
return fmt.Sprintf("%d-%s", environmentID, id)
|
|
}
|
|
|
|
// EnvironmentIDFromJobID returns the environment id that is part of the passed job id.
|
|
func EnvironmentIDFromJobID(jobID string) (EnvironmentID, error) {
|
|
parts := strings.Split(jobID, "-")
|
|
if len(parts) == 0 {
|
|
return 0, fmt.Errorf("empty job id: %w", ErrorInvalidJobID)
|
|
}
|
|
environmentID, err := strconv.Atoi(parts[0])
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid environment id par %v: %w", err, ErrorInvalidJobID)
|
|
}
|
|
return EnvironmentID(environmentID), nil
|
|
}
|
|
|
|
const templateJobNameParts = 2
|
|
|
|
// TemplateJobID returns the id of the template job for the environment with the given id.
|
|
func TemplateJobID(id EnvironmentID) string {
|
|
return fmt.Sprintf("%s-%d", nomad.TemplateJobPrefix, id)
|
|
}
|
|
|
|
// IsEnvironmentTemplateID checks if the passed job id belongs to a template job.
|
|
func IsEnvironmentTemplateID(jobID string) bool {
|
|
parts := strings.Split(jobID, "-")
|
|
return len(parts) == templateJobNameParts && parts[0] == nomad.TemplateJobPrefix
|
|
}
|
|
|
|
func EnvironmentIDFromTemplateJobID(id string) (string, error) {
|
|
parts := strings.Split(id, "-")
|
|
if len(parts) < templateJobNameParts {
|
|
return "", fmt.Errorf("invalid template job id: %w", ErrorInvalidJobID)
|
|
}
|
|
return parts[1], nil
|
|
}
|