Files
poseidon/internal/runner/nomad_manager.go
Maximilian Paß a7d27e8f65 Add missing error log statements.
When "markRunnerAsUsed" fails, we silently ignored it. Only, when additionally the return of the runner failed, we threw the error.

When a Runner is destroyed, we are only notified that Nomad removed the allocation, but cannot tell about the reason.

For "the execution did not stop after SIGQUIT" we did not log the belonging runner id.
2023-08-21 22:40:37 +02:00

218 lines
7.3 KiB
Go

package runner
import (
"context"
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/util"
"github.com/sirupsen/logrus"
"strconv"
"time"
)
var (
log = logging.GetLogger("runner")
ErrUnknownExecutionEnvironment = errors.New("execution environment not found")
ErrNoRunnersAvailable = errors.New("no runners available for this execution environment")
ErrRunnerNotFound = errors.New("no runner found with this id")
)
type NomadRunnerManager struct {
*AbstractManager
apiClient nomad.ExecutorAPI
}
// NewNomadRunnerManager creates a new runner manager that keeps track of all runners.
// It uses the apiClient for all requests and runs a background task to keep the runners in sync with Nomad.
// If you cancel the context the background synchronization will be stopped.
func NewNomadRunnerManager(apiClient nomad.ExecutorAPI, ctx context.Context) *NomadRunnerManager {
m := &NomadRunnerManager{NewAbstractManager(), apiClient}
go m.keepRunnersSynced(ctx)
return m
}
func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int) (Runner, error) {
environment, ok := m.environments.Get(environmentID.ToString())
if !ok {
return nil, ErrUnknownExecutionEnvironment
}
runner, ok := environment.Sample()
if !ok {
return nil, ErrNoRunnersAvailable
}
m.usedRunners.Add(runner.ID(), runner)
go m.markRunnerAsUsed(runner, duration)
runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}
func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
err := util.RetryExponential(func() (err error) {
if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil {
err = fmt.Errorf("cannot mark runner as used: %w", err)
}
return
})
if err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("cannot mark runner as used")
err := m.Return(runner)
if err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("can't mark runner as used and can't return runner")
}
}
}
func (m *NomadRunnerManager) Return(r Runner) error {
m.usedRunners.Delete(r.ID())
err := r.Destroy(ErrDestroyedByAPIRequest)
if err != nil {
err = fmt.Errorf("cannot return runner: %w", err)
}
return err
}
func (m *NomadRunnerManager) Load() {
for _, environment := range m.environments.List() {
environmentLogger := log.WithField(dto.KeyEnvironmentID, environment.ID().ToString())
runnerJobs, err := m.apiClient.LoadRunnerJobs(environment.ID())
if err != nil {
environmentLogger.WithError(err).Warn("Error fetching the runner jobs")
}
for _, job := range runnerJobs {
m.loadSingleJob(job, environmentLogger, environment)
}
err = environment.ApplyPrewarmingPoolSize()
if err != nil {
environmentLogger.WithError(err).Error("Couldn't scale environment")
}
}
}
func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger *logrus.Entry,
environment ExecutionEnvironment) {
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
if configTaskGroup == nil {
environmentLogger.Infof("Couldn't find config task group in job %s, skipping ...", *job.ID)
return
}
isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue
portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID)
if err != nil {
environmentLogger.WithError(err).Warn("Error loading runner portMappings")
return
}
newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m.onRunnerDestroyed)
log.WithField("isUsed", isUsed).WithField(dto.KeyRunnerID, newJob.ID()).Debug("Recovered Runner")
if isUsed {
m.usedRunners.Add(newJob.ID(), newJob)
timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey])
if err != nil {
environmentLogger.WithError(err).Warn("Error loading timeout from meta values")
} else {
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
}
} else {
environment.AddRunner(newJob)
}
}
func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
retries := 0
for ctx.Err() == nil {
err := m.apiClient.WatchEventStream(ctx,
&nomad.AllocationProcessing{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
retries += 1
log.WithContext(ctx).WithError(err).WithField("count", retries).Errorf("Nomad Event Stream failed! Retrying...")
<-time.After(time.Second)
}
}
// onAllocationAdded is the callback for when Nomad started an allocation.
func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation, startup time.Duration) {
log.WithField(dto.KeyRunnerID, alloc.JobID).WithField("startupDuration", startup).Debug("Runner started")
if nomad.IsEnvironmentTemplateID(alloc.JobID) {
return
}
if _, ok := m.usedRunners.Get(alloc.JobID); ok {
log.WithField(dto.KeyRunnerID, alloc.JobID).WithField("states", alloc.TaskStates).
Error("Started Runner is already in use")
return
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(alloc.JobID)
if err != nil {
log.WithError(err).Warn("Allocation could not be added")
return
}
environment, ok := m.environments.Get(environmentID.ToString())
if ok {
var mappedPorts []nomadApi.PortMapping
if alloc.AllocatedResources != nil {
mappedPorts = alloc.AllocatedResources.Shared.Ports
}
environment.AddRunner(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed))
monitorAllocationStartupDuration(startup, alloc.JobID, environmentID)
}
}
func monitorAllocationStartupDuration(startup time.Duration, runnerID string, environmentID dto.EnvironmentID) {
p := influxdb2.NewPointWithMeasurement(monitoring.MeasurementIdleRunnerNomad)
p.AddField(monitoring.InfluxKeyStartupDuration, startup.Nanoseconds())
p.AddTag(monitoring.InfluxKeyEnvironmentID, environmentID.ToString())
p.AddTag(monitoring.InfluxKeyRunnerID, runnerID)
monitoring.WriteInfluxPoint(p)
}
// onAllocationStopped is the callback for when Nomad stopped an allocation.
func (m *NomadRunnerManager) onAllocationStopped(runnerID string, reason error) (alreadyRemoved bool) {
log.WithField(dto.KeyRunnerID, runnerID).Debug("Runner stopped")
if nomad.IsEnvironmentTemplateID(runnerID) {
return false
}
environmentID, err := nomad.EnvironmentIDFromRunnerID(runnerID)
if err != nil {
log.WithError(err).Warn("Stopped allocation can not be handled")
return false
}
r, stillActive := m.usedRunners.Get(runnerID)
if stillActive {
m.usedRunners.Delete(runnerID)
if err := r.Destroy(reason); err != nil {
log.WithError(err).Warn("Runner of stopped allocation cannot be destroyed")
}
}
environment, ok := m.environments.Get(environmentID.ToString())
if ok {
environment.DeleteRunner(runnerID)
}
return !stillActive
}
// onRunnerDestroyed is the callback when the runner destroys itself.
// The main use of this callback is to remove the runner from the used runners, when its timeout exceeds.
func (m *NomadRunnerManager) onRunnerDestroyed(r Runner) error {
m.usedRunners.Delete(r.ID())
environment, ok := m.environments.Get(r.Environment().ToString())
if ok {
environment.DeleteRunner(r.ID())
}
return nil
}