
as second criteria (next to the maximum number of attempts) for canceling the retrying. This is required as we started with the previous commit to retry the nomad environment recovery. This always fails for unit tests (as they are not connected to an Nomad cluster). Before, we ignored the one error but the retrying leads to unit test timeouts. Additionally, we now stop retrying to create a runner when the environment got deleted.
206 lines
6.3 KiB
Go
206 lines
6.3 KiB
Go
package environment
|
|
|
|
import (
|
|
"context"
|
|
_ "embed"
|
|
"fmt"
|
|
nomadApi "github.com/hashicorp/nomad/api"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/openHPI/poseidon/internal/nomad"
|
|
"github.com/openHPI/poseidon/internal/runner"
|
|
"github.com/openHPI/poseidon/pkg/dto"
|
|
"github.com/openHPI/poseidon/pkg/logging"
|
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
|
"github.com/openHPI/poseidon/pkg/storage"
|
|
"github.com/openHPI/poseidon/pkg/util"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
// templateEnvironmentJobHCL holds our default job in HCL format.
|
|
// The default job is used when creating new job and provides
|
|
// common settings that all the jobs share.
|
|
//
|
|
//go:embed template-environment-job.hcl
|
|
var templateEnvironmentJobHCL string
|
|
|
|
var log = logging.GetLogger("environment")
|
|
|
|
type NomadEnvironmentManager struct {
|
|
*AbstractManager
|
|
api nomad.ExecutorAPI
|
|
templateEnvironmentHCL string
|
|
}
|
|
|
|
func NewNomadEnvironmentManager(
|
|
runnerManager runner.Manager,
|
|
apiClient nomad.ExecutorAPI,
|
|
templateJobFile string,
|
|
ctx context.Context,
|
|
) (*NomadEnvironmentManager, error) {
|
|
if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := &NomadEnvironmentManager{&AbstractManager{nil, runnerManager},
|
|
apiClient, templateEnvironmentJobHCL}
|
|
if err := util.RetryExponentialContext(ctx, func() error { return m.Load() }); err != nil {
|
|
log.WithError(err).Error("Error recovering the execution environments")
|
|
}
|
|
runnerManager.Load()
|
|
return m, nil
|
|
}
|
|
|
|
func (m *NomadEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (
|
|
executionEnvironment runner.ExecutionEnvironment, err error) {
|
|
executionEnvironment, ok := m.runnerManager.GetEnvironment(id)
|
|
|
|
if fetch {
|
|
fetchedEnvironment, err := fetchEnvironment(id, m.api)
|
|
switch {
|
|
case err != nil:
|
|
return nil, err
|
|
case fetchedEnvironment == nil:
|
|
_, err = m.Delete(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ok = false
|
|
case !ok:
|
|
m.runnerManager.StoreEnvironment(fetchedEnvironment)
|
|
executionEnvironment = fetchedEnvironment
|
|
ok = true
|
|
default:
|
|
executionEnvironment.SetConfigFrom(fetchedEnvironment)
|
|
}
|
|
}
|
|
|
|
if !ok {
|
|
err = runner.ErrUnknownExecutionEnvironment
|
|
}
|
|
return executionEnvironment, err
|
|
}
|
|
|
|
func (m *NomadEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
|
|
if fetch {
|
|
err := m.Load()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return m.runnerManager.ListEnvironments(), nil
|
|
}
|
|
|
|
func (m *NomadEnvironmentManager) CreateOrUpdate(
|
|
id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest, ctx context.Context) (created bool, err error) {
|
|
// Check if execution environment is already existing (in the local memory).
|
|
environment, isExistingEnvironment := m.runnerManager.GetEnvironment(id)
|
|
if isExistingEnvironment {
|
|
// Remove existing environment to force downloading the newest Docker image.
|
|
// See https://github.com/openHPI/poseidon/issues/69
|
|
err = environment.Delete()
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to remove the environment: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create a new environment with the given request options.
|
|
environment, err = NewNomadEnvironmentFromRequest(m.api, m.templateEnvironmentHCL, id, request)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error creating Nomad environment: %w", err)
|
|
}
|
|
|
|
// Keep a copy of environment specification in memory.
|
|
m.runnerManager.StoreEnvironment(environment)
|
|
|
|
// Register template Job with Nomad.
|
|
logging.StartSpan("env.update.register", "Register Environment", ctx, func(_ context.Context) {
|
|
err = environment.Register()
|
|
})
|
|
if err != nil {
|
|
return false, fmt.Errorf("error registering template job in API: %w", err)
|
|
}
|
|
|
|
// Launch idle runners based on the template job.
|
|
logging.StartSpan("env.update.poolsize", "Apply Prewarming Pool Size", ctx, func(_ context.Context) {
|
|
err = environment.ApplyPrewarmingPoolSize()
|
|
})
|
|
if err != nil {
|
|
return false, fmt.Errorf("error scaling template job in API: %w", err)
|
|
}
|
|
|
|
return !isExistingEnvironment, nil
|
|
}
|
|
|
|
func (m *NomadEnvironmentManager) Load() error {
|
|
templateJobs, err := m.api.LoadEnvironmentJobs()
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't load template jobs: %w", err)
|
|
}
|
|
|
|
for _, job := range templateJobs {
|
|
jobLogger := log.WithField("jobID", *job.ID)
|
|
if *job.Status != structs.JobStatusRunning {
|
|
jobLogger.Info("Job not running, skipping ...")
|
|
continue
|
|
}
|
|
configTaskGroup := nomad.FindAndValidateConfigTaskGroup(job)
|
|
if configTaskGroup == nil {
|
|
jobLogger.Error("FindAndValidateConfigTaskGroup is not creating the task group")
|
|
continue
|
|
}
|
|
environment := newNomadEnvironmentFromJob(job, m.api)
|
|
m.runnerManager.StoreEnvironment(environment)
|
|
jobLogger.Info("Successfully recovered environment")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// newNomadEnvironmentFromJob creates a Nomad environment from the passed Nomad job definition.
|
|
func newNomadEnvironmentFromJob(job *nomadApi.Job, apiClient nomad.ExecutorAPI) *NomadEnvironment {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e := &NomadEnvironment{
|
|
apiClient: apiClient,
|
|
jobHCL: templateEnvironmentJobHCL,
|
|
job: job,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad,
|
|
runner.MonitorEnvironmentID[runner.Runner](e.ID()), time.Minute, ctx)
|
|
return e
|
|
}
|
|
|
|
// loadTemplateEnvironmentJobHCL loads the template environment job HCL from the given path.
|
|
// If the path is empty, the embedded default file is used.
|
|
func loadTemplateEnvironmentJobHCL(path string) error {
|
|
if path == "" {
|
|
return nil
|
|
}
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return fmt.Errorf("error loading template environment job: %w", err)
|
|
}
|
|
templateEnvironmentJobHCL = string(data)
|
|
return nil
|
|
}
|
|
|
|
func fetchEnvironment(id dto.EnvironmentID, apiClient nomad.ExecutorAPI) (runner.ExecutionEnvironment, error) {
|
|
environments, err := apiClient.LoadEnvironmentJobs()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error fetching the environment jobs: %w", err)
|
|
}
|
|
var fetchedEnvironment runner.ExecutionEnvironment
|
|
for _, job := range environments {
|
|
environmentID, err := nomad.EnvironmentIDFromTemplateJobID(*job.ID)
|
|
if err != nil {
|
|
log.WithError(err).Warn("Cannot parse environment id of loaded environment")
|
|
continue
|
|
}
|
|
if id == environmentID {
|
|
fetchedEnvironment = newNomadEnvironmentFromJob(job, apiClient)
|
|
}
|
|
}
|
|
return fetchedEnvironment, nil
|
|
}
|