package environment import ( "context" "fmt" poseidonK8s "github.com/openHPI/poseidon/internal/kubernetes" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/monitoring" "github.com/openHPI/poseidon/pkg/storage" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strconv" "time" ) type KubernetesEnvironmentManager struct { *AbstractManager api poseidonK8s.ExecutorAPI templateEnvironmentHCL string } func NewKubernetesEnvironmentManager( runnerManager runner.Manager, apiClient *poseidonK8s.ExecutorAPI, templateJobFile string, ) (*KubernetesEnvironmentManager, error) { if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil { return nil, err } m := &KubernetesEnvironmentManager{ AbstractManager: &AbstractManager{nil, runnerManager}, api: *apiClient, templateEnvironmentHCL: templateEnvironmentJobHCL, } return m, nil } func (k *KubernetesEnvironmentManager) SetNextHandler(next ManagerHandler) { k.nextHandler = next } func (k *KubernetesEnvironmentManager) NextHandler() ManagerHandler { if k.HasNextHandler() { return k.nextHandler } else { return &AbstractManager{} } } func (k *KubernetesEnvironmentManager) HasNextHandler() bool { return k.nextHandler != nil } // List all Kubernetes-based environments func (k *KubernetesEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) { if fetch { if err := k.fetchEnvironments(); err != nil { return nil, err } } return k.runnerManager.ListEnvironments(), nil } func (k *KubernetesEnvironmentManager) fetchEnvironments() error { remoteDeploymentResponse, err := k.api.LoadEnvironmentJobs() if err != nil { return fmt.Errorf("failed fetching environments: %w", err) } remoteDeployments := make(map[string]appsv1.Deployment) // Update local environments from remote environments. for _, deployment := range remoteDeploymentResponse { remoteDeployments[deployment.Name] = *deployment // Job Id to Environment Id Integer intIdentifier, err := strconv.Atoi(deployment.Name) if err != nil { log.WithError(err).Warn("Failed to convert job name to int") continue } id := dto.EnvironmentID(intIdentifier) if localEnvironment, ok := k.runnerManager.GetEnvironment(id); ok { fetchedEnvironment := newKubernetesEnvironmentFromJob(deployment, &k.api) localEnvironment.SetConfigFrom(fetchedEnvironment) // We destroy only this (second) local reference to the environment. if err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced); err != nil { log.WithError(err).Warn("Failed to remove environment locally") } } else { k.runnerManager.StoreEnvironment(newKubernetesEnvironmentFromJob(deployment, &k.api)) } } // Remove local environments that are not remote environments. for _, localEnvironment := range k.runnerManager.ListEnvironments() { if _, ok := remoteDeployments[localEnvironment.ID().ToString()]; !ok { err := localEnvironment.Delete(runner.ErrLocalDestruction) log.WithError(err).Warn("Failed to remove environment locally") } } return nil } // newNomadEnvironmentFromJob creates a Nomad environment from the passed Nomad job definition. func newKubernetesEnvironmentFromJob(deployment *appsv1.Deployment, apiClient *poseidonK8s.ExecutorAPI) *KubernetesEnvironment { ctx, cancel := context.WithCancel(context.Background()) e := &KubernetesEnvironment{ apiClient: apiClient, jobHCL: templateEnvironmentJobHCL, deployment: deployment, ctx: ctx, cancel: cancel, } e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad, runner.MonitorEnvironmentID[runner.Runner](e.ID()), time.Minute, ctx) return e } // Get retrieves a specific Kubernetes environment func (k *KubernetesEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (executionEnvironment runner.ExecutionEnvironment, err error) { executionEnvironment, ok := k.runnerManager.GetEnvironment(id) if fetch { fetchedEnvironment, err := fetchK8sEnvironment(id, k.api) switch { case err != nil: return nil, err case fetchedEnvironment == nil: _, err = k.Delete(id) if err != nil { return nil, err } ok = false case !ok: k.runnerManager.StoreEnvironment(fetchedEnvironment) executionEnvironment = fetchedEnvironment ok = true default: executionEnvironment.SetConfigFrom(fetchedEnvironment) // We destroy only this (second) local reference to the environment. err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced) if err != nil { log.WithError(err).Warn("Failed to remove environment locally") } } } if !ok { err = runner.ErrUnknownExecutionEnvironment } return executionEnvironment, err } // CreateOrUpdate creates or updates an environment in Kubernetes func (k *KubernetesEnvironmentManager) CreateOrUpdate( id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest, ctx context.Context) (created bool, err error) { // Check if execution environment is already existing (in the local memory). environment, isExistingEnvironment := k.runnerManager.GetEnvironment(id) if isExistingEnvironment { // Remove existing environment to force downloading the newest Docker image. // See https://github.com/openHPI/poseidon/issues/69 err = environment.Delete(runner.ErrEnvironmentUpdated) if err != nil { return false, fmt.Errorf("failed to remove the environment: %w", err) } } // Create a new environment with the given request options. environment, err = NewKubernetesEnvironmentFromRequest(k.api, k.templateEnvironmentHCL, id, request) if err != nil { return false, fmt.Errorf("error creating Nomad environment: %w", err) } // Keep a copy of environment specification in memory. k.runnerManager.StoreEnvironment(environment) // Register template Job with Nomad. logging.StartSpan("env.update.register", "Register Environment", ctx, func(_ context.Context) { err = environment.Register() }) if err != nil { return false, fmt.Errorf("error registering template job in API: %w", err) } // Launch idle runners based on the template job. logging.StartSpan("env.update.poolsize", "Apply Prewarming Pool Size", ctx, func(_ context.Context) { err = environment.ApplyPrewarmingPoolSize() }) if err != nil { return false, fmt.Errorf("error scaling template job in API: %w", err) } return !isExistingEnvironment, nil } // Statistics fetches statistics from Kubernetes func (k *KubernetesEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData { // Collect and return statistics for Kubernetes environments return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{} } // MapExecutionEnvironmentRequestToDeployment maps ExecutionEnvironmentRequest to a Kubernetes Deployment func MapExecutionEnvironmentRequestToDeployment(req dto.ExecutionEnvironmentRequest, environmentID string) *appsv1.Deployment { // Create the Deployment object deployment := &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ Kind: "Deployment", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: environmentID, // Set the environment ID as the name of the deployment Labels: map[string]string{ "environment-id": environmentID, }, }, Spec: appsv1.DeploymentSpec{ Replicas: int32Ptr(int32(req.PrewarmingPoolSize)), // Use PrewarmingPoolSize to set the number of replicas Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "environment-id": environmentID, }, }, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "environment-id": environmentID, }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ { Name: "runner-container", Image: req.Image, // Map the image to the container Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ "cpu": resource.MustParse(strconv.Itoa(int(req.CPULimit))), // Map CPU request "memory": resource.MustParse(strconv.Itoa(int(req.MemoryLimit)) + "Mi"), // Map Memory request }, Limits: v1.ResourceList{ "cpu": resource.MustParse(strconv.Itoa(int(req.CPULimit))), // Map CPU limit "memory": resource.MustParse(strconv.Itoa(int(req.MemoryLimit)) + "Mi"), // Map Memory limit }, }, }, }, }, }, }, } // Handle network access and exposed ports if req.NetworkAccess { var containerPorts []v1.ContainerPort for _, port := range req.ExposedPorts { containerPorts = append(containerPorts, v1.ContainerPort{ ContainerPort: int32(port), }) } deployment.Spec.Template.Spec.Containers[0].Ports = containerPorts } return deployment } // Helper function to return a pointer to an int32 func int32Ptr(i int32) *int32 { return &i } func fetchK8sEnvironment(id dto.EnvironmentID, apiClient poseidonK8s.ExecutorAPI) (runner.ExecutionEnvironment, error) { environments, err := apiClient.LoadEnvironmentJobs() if err != nil { return nil, fmt.Errorf("error fetching the environment jobs: %w", err) } var fetchedEnvironment runner.ExecutionEnvironment for _, deployment := range environments { environmentID, err := nomad.EnvironmentIDFromTemplateJobID(deployment.Name) if err != nil { log.WithError(err).Warn("Cannot parse environment id of loaded environment") continue } if id == environmentID { fetchedEnvironment = newKubernetesEnvironmentFromJob(deployment, &apiClient) } } return fetchedEnvironment, nil }