Files
poseidon/internal/environment/nomad_environment.go
Maximilian Paß 354c16cc37 Fix missing rescheduled idle runners.
In today's unattended upgrade, we have seen how the prewarming pool size dropped to (near) zero. This was based on lost Nomad allocations. The allocations got rescheduled, but not added again to Poseidon.

The reason for this is a miscommunication between the Event Handling and the Nomad Manager. `removedByPoseidon` was true even if the runner was not removed by the manager, but an idle runner.
2023-09-05 15:15:39 +02:00

390 lines
13 KiB
Go

package environment
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/google/uuid"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/jobspec2"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/pkg/util"
"strconv"
"sync"
"time"
)
const portNumberBase = 10
var ErrScaleDown = errors.New("cannot scale down the environment")
type NomadEnvironment struct {
apiClient nomad.ExecutorAPI
jobHCL string
job *nomadApi.Job
idleRunners storage.Storage[runner.Runner]
ctx context.Context
cancel context.CancelFunc
}
func NewNomadEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI, jobHCL string) (*NomadEnvironment, error) {
job, err := parseJob(jobHCL)
if err != nil {
return nil, fmt.Errorf("error parsing Nomad job: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
e := &NomadEnvironment{apiClient, jobHCL, job, nil, ctx, cancel}
e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad,
runner.MonitorEnvironmentID[runner.Runner](id), time.Minute, ctx)
return e, nil
}
func NewNomadEnvironmentFromRequest(
apiClient nomad.ExecutorAPI, jobHCL string, id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest) (
*NomadEnvironment, error) {
environment, err := NewNomadEnvironment(id, apiClient, jobHCL)
if err != nil {
return nil, err
}
environment.SetID(id)
// Set options according to request
environment.SetPrewarmingPoolSize(request.PrewarmingPoolSize)
environment.SetCPULimit(request.CPULimit)
environment.SetMemoryLimit(request.MemoryLimit)
environment.SetImage(request.Image)
environment.SetNetworkAccess(request.NetworkAccess, request.ExposedPorts)
return environment, nil
}
func (n *NomadEnvironment) ID() dto.EnvironmentID {
id, err := nomad.EnvironmentIDFromTemplateJobID(*n.job.ID)
if err != nil {
log.WithError(err).Error("Environment ID can not be parsed from Job")
}
return id
}
func (n *NomadEnvironment) SetID(id dto.EnvironmentID) {
name := nomad.TemplateJobID(id)
n.job.ID = &name
n.job.Name = &name
}
func (n *NomadEnvironment) PrewarmingPoolSize() uint {
configTaskGroup := nomad.FindAndValidateConfigTaskGroup(n.job)
count, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaPoolSizeKey])
if err != nil {
log.WithError(err).Error("Prewarming pool size can not be parsed from Job")
}
return uint(count)
}
func (n *NomadEnvironment) SetPrewarmingPoolSize(count uint) {
monitoring.ChangedPrewarmingPoolSize(n.ID(), count)
taskGroup := nomad.FindAndValidateConfigTaskGroup(n.job)
if taskGroup.Meta == nil {
taskGroup.Meta = make(map[string]string)
}
taskGroup.Meta[nomad.ConfigMetaPoolSizeKey] = strconv.Itoa(int(count))
}
func (n *NomadEnvironment) CPULimit() uint {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
return uint(*defaultTask.Resources.CPU)
}
func (n *NomadEnvironment) SetCPULimit(limit uint) {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
integerCPULimit := int(limit)
defaultTask.Resources.CPU = &integerCPULimit
}
func (n *NomadEnvironment) MemoryLimit() uint {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
maxMemoryLimit := defaultTask.Resources.MemoryMaxMB
if maxMemoryLimit != nil {
return uint(*maxMemoryLimit)
} else {
return 0
}
}
func (n *NomadEnvironment) SetMemoryLimit(limit uint) {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
integerMemoryMaxLimit := int(limit)
defaultTask.Resources.MemoryMaxMB = &integerMemoryMaxLimit
}
func (n *NomadEnvironment) Image() string {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
image, ok := defaultTask.Config["image"].(string)
if !ok {
image = ""
}
return image
}
func (n *NomadEnvironment) SetImage(image string) {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
defaultTask.Config["image"] = image
}
func (n *NomadEnvironment) NetworkAccess() (allowed bool, ports []uint16) {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
allowed = defaultTask.Config["network_mode"] != "none"
if len(defaultTaskGroup.Networks) > 0 {
networkResource := defaultTaskGroup.Networks[0]
for _, port := range networkResource.DynamicPorts {
ports = append(ports, uint16(port.To))
}
}
return allowed, ports
}
func (n *NomadEnvironment) SetNetworkAccess(allow bool, exposedPorts []uint16) {
defaultTaskGroup := nomad.FindAndValidateDefaultTaskGroup(n.job)
defaultTask := nomad.FindAndValidateDefaultTask(defaultTaskGroup)
if len(defaultTaskGroup.Tasks) == 0 {
// This function is only used internally and must be called as last step when configuring the task.
// This error is not recoverable.
log.Fatal("Can't configure network before task has been configured!")
}
if allow {
var networkResource *nomadApi.NetworkResource
if len(defaultTaskGroup.Networks) == 0 {
networkResource = &nomadApi.NetworkResource{}
defaultTaskGroup.Networks = []*nomadApi.NetworkResource{networkResource}
} else {
networkResource = defaultTaskGroup.Networks[0]
}
// Prefer "bridge" network over "host" to have an isolated network namespace with bridged interface
// instead of joining the host network namespace.
networkResource.Mode = "cni/secure-bridge"
for _, portNumber := range exposedPorts {
port := nomadApi.Port{
Label: strconv.FormatUint(uint64(portNumber), portNumberBase),
To: int(portNumber),
}
networkResource.DynamicPorts = append(networkResource.DynamicPorts, port)
}
// Explicitly set mode to override existing settings when updating job from without to with network.
// Don't use bridge as it collides with the bridge mode above. This results in Docker using 'bridge'
// mode, meaning all allocations will be attached to the `docker0` adapter and could reach other
// non-Nomad containers attached to it. This is avoided when using Nomads bridge network mode.
defaultTask.Config["network_mode"] = ""
} else {
// Somehow, we can't set the network mode to none in the NetworkResource on task group level.
// See https://github.com/hashicorp/nomad/issues/10540
defaultTask.Config["network_mode"] = "none"
// Explicitly set Networks to signal Nomad to remove the possibly existing networkResource
defaultTaskGroup.Networks = []*nomadApi.NetworkResource{}
}
}
// Register creates a Nomad job based on the default job configuration and the given parameters.
// It registers the job with Nomad and waits until the registration completes.
func (n *NomadEnvironment) Register() error {
nomad.SetForcePullFlag(n.job, true) // This must be the default as otherwise new runners could have different images.
evalID, err := n.apiClient.RegisterNomadJob(n.job)
if err != nil {
return fmt.Errorf("couldn't register job: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), nomad.RegisterTimeout)
defer cancel()
err = n.apiClient.MonitorEvaluation(evalID, ctx)
if err != nil {
return fmt.Errorf("error during the monitoring of the environment job: %w", err)
}
return nil
}
func (n *NomadEnvironment) Delete() error {
n.cancel()
err := n.removeRunners()
if err != nil {
return err
}
err = n.apiClient.DeleteJob(*n.job.ID)
if err != nil {
return fmt.Errorf("couldn't delete environment job: %w", err)
}
return nil
}
func (n *NomadEnvironment) ApplyPrewarmingPoolSize() error {
required := int(n.PrewarmingPoolSize()) - int(n.idleRunners.Length())
if required < 0 {
return fmt.Errorf("%w. Runners to remove: %d", ErrScaleDown, -required)
}
return n.createRunners(uint(required), true)
}
func (n *NomadEnvironment) Sample() (runner.Runner, bool) {
r, ok := n.idleRunners.Sample()
if ok && n.idleRunners.Length() < n.PrewarmingPoolSize() {
go func() {
err := util.RetryExponentialContext(n.ctx, func() error { return n.createRunner(false) })
if err != nil {
log.WithError(err).WithField(dto.KeyEnvironmentID, n.ID().ToString()).
Error("Couldn't create new runner for claimed one")
}
}()
} else if ok {
log.WithField(dto.KeyEnvironmentID, n.ID().ToString()).Warn("Too many idle runner")
}
return r, ok
}
func (n *NomadEnvironment) AddRunner(r runner.Runner) {
n.idleRunners.Add(r.ID(), r)
}
func (n *NomadEnvironment) DeleteRunner(id string) (ok bool) {
_, ok = n.idleRunners.Get(id)
n.idleRunners.Delete(id)
return ok
}
func (n *NomadEnvironment) IdleRunnerCount() uint {
return n.idleRunners.Length()
}
// MarshalJSON implements the json.Marshaler interface.
// This converts the NomadEnvironment into the expected schema for dto.ExecutionEnvironmentData.
func (n *NomadEnvironment) MarshalJSON() (res []byte, err error) {
networkAccess, exposedPorts := n.NetworkAccess()
res, err = json.Marshal(dto.ExecutionEnvironmentData{
ID: int(n.ID()),
ExecutionEnvironmentRequest: dto.ExecutionEnvironmentRequest{
PrewarmingPoolSize: n.PrewarmingPoolSize(),
CPULimit: n.CPULimit(),
MemoryLimit: n.MemoryLimit(),
Image: n.Image(),
NetworkAccess: networkAccess,
ExposedPorts: exposedPorts,
},
})
if err != nil {
return res, fmt.Errorf("couldn't marshal execution environment: %w", err)
}
return res, nil
}
// DeepCopyJob clones the native Nomad job in a way that it can be used as Runner job.
func (n *NomadEnvironment) DeepCopyJob() *nomadApi.Job {
copyJob, err := parseJob(n.jobHCL)
if err != nil {
log.WithError(err).Error("The HCL of an existing environment should throw no error!")
return nil
}
copyEnvironment := &NomadEnvironment{job: copyJob}
copyEnvironment.SetConfigFrom(n)
return copyEnvironment.job
}
// SetConfigFrom gets the options from the environment job and saves it into another temporary job.
// IMPROVE: The getters use a validation function that theoretically could edit the environment job.
// But this modification might never been saved to Nomad.
func (n *NomadEnvironment) SetConfigFrom(environment runner.ExecutionEnvironment) {
n.SetID(environment.ID())
n.SetPrewarmingPoolSize(environment.PrewarmingPoolSize())
n.SetCPULimit(environment.CPULimit())
n.SetMemoryLimit(environment.MemoryLimit())
n.SetImage(environment.Image())
n.SetNetworkAccess(environment.NetworkAccess())
}
func parseJob(jobHCL string) (*nomadApi.Job, error) {
config := jobspec2.ParseConfig{
Body: []byte(jobHCL),
AllowFS: false,
Strict: true,
}
job, err := jobspec2.ParseWithConfig(&config)
if err != nil {
return job, fmt.Errorf("couldn't parse job HCL: %w", err)
}
return job, nil
}
func (n *NomadEnvironment) createRunners(count uint, forcePull bool) error {
log.WithField("runnersRequired", count).WithField(dto.KeyEnvironmentID, n.ID()).Debug("Creating new runners")
for i := 0; i < int(count); i++ {
err := n.createRunner(forcePull)
if err != nil {
return fmt.Errorf("couldn't create new runner: %w", err)
}
}
return nil
}
func (n *NomadEnvironment) createRunner(forcePull bool) error {
newUUID, err := uuid.NewUUID()
if err != nil {
return fmt.Errorf("failed generating runner id: %w", err)
}
newRunnerID := nomad.RunnerJobID(n.ID(), newUUID.String())
template := n.DeepCopyJob()
template.ID = &newRunnerID
template.Name = &newRunnerID
nomad.SetForcePullFlag(template, forcePull)
err = n.apiClient.RegisterRunnerJob(template)
if err != nil {
return fmt.Errorf("error registering new runner job: %w", err)
}
return nil
}
// removeRunners removes all (idle and used) runners for the given environment n.
func (n *NomadEnvironment) removeRunners() error {
// This prevents a race condition where the number of required runners is miscalculated in the up-scaling process
// based on the number of allocation that has been stopped at the moment of the scaling.
n.idleRunners.Purge()
ids, err := n.apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), ""))
if err != nil {
return fmt.Errorf("failed to load runner ids: %w", err)
}
// Block execution until Nomad confirmed all deletion requests.
var wg sync.WaitGroup
for _, id := range ids {
wg.Add(1)
go func(jobID string) {
defer wg.Done()
deleteErr := n.apiClient.DeleteJob(jobID)
if deleteErr != nil {
err = deleteErr
}
}(id)
}
wg.Wait()
return err
}