Files
poseidon/internal/environment/kubernetes_manager.go
2024-09-18 10:43:38 +02:00

298 lines
9.7 KiB
Go

package environment
import (
"context"
"fmt"
poseidonK8s "github.com/openHPI/poseidon/internal/kubernetes"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"
"time"
)
type KubernetesEnvironmentManager struct {
*AbstractManager
api poseidonK8s.ExecutorAPI
templateEnvironmentHCL string
}
func NewKubernetesEnvironmentManager(
runnerManager runner.Manager,
apiClient *poseidonK8s.ExecutorAPI,
templateJobFile string,
) (*KubernetesEnvironmentManager, error) {
if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil {
return nil, err
}
m := &KubernetesEnvironmentManager{
AbstractManager: &AbstractManager{nil, runnerManager},
api: *apiClient,
templateEnvironmentHCL: templateEnvironmentJobHCL,
}
return m, nil
}
func (k *KubernetesEnvironmentManager) SetNextHandler(next ManagerHandler) {
k.nextHandler = next
}
func (k *KubernetesEnvironmentManager) NextHandler() ManagerHandler {
if k.HasNextHandler() {
return k.nextHandler
} else {
return &AbstractManager{}
}
}
func (k *KubernetesEnvironmentManager) HasNextHandler() bool {
return k.nextHandler != nil
}
// List all Kubernetes-based environments
func (k *KubernetesEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
if fetch {
if err := k.fetchEnvironments(); err != nil {
return nil, err
}
}
return k.runnerManager.ListEnvironments(), nil
}
func (k *KubernetesEnvironmentManager) fetchEnvironments() error {
remoteDeploymentResponse, err := k.api.LoadEnvironmentJobs()
if err != nil {
return fmt.Errorf("failed fetching environments: %w", err)
}
remoteDeployments := make(map[string]appsv1.Deployment)
// Update local environments from remote environments.
for _, deployment := range remoteDeploymentResponse {
remoteDeployments[deployment.Name] = *deployment
// Job Id to Environment Id Integer
intIdentifier, err := strconv.Atoi(deployment.Name)
if err != nil {
log.WithError(err).Warn("Failed to convert job name to int")
continue
}
id := dto.EnvironmentID(intIdentifier)
if localEnvironment, ok := k.runnerManager.GetEnvironment(id); ok {
fetchedEnvironment := newKubernetesEnvironmentFromJob(deployment, &k.api)
localEnvironment.SetConfigFrom(fetchedEnvironment)
// We destroy only this (second) local reference to the environment.
if err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced); err != nil {
log.WithError(err).Warn("Failed to remove environment locally")
}
} else {
k.runnerManager.StoreEnvironment(newKubernetesEnvironmentFromJob(deployment, &k.api))
}
}
// Remove local environments that are not remote environments.
for _, localEnvironment := range k.runnerManager.ListEnvironments() {
if _, ok := remoteDeployments[localEnvironment.ID().ToString()]; !ok {
err := localEnvironment.Delete(runner.ErrLocalDestruction)
log.WithError(err).Warn("Failed to remove environment locally")
}
}
return nil
}
// newNomadEnvironmentFromJob creates a Nomad environment from the passed Nomad job definition.
func newKubernetesEnvironmentFromJob(deployment *appsv1.Deployment, apiClient *poseidonK8s.ExecutorAPI) *KubernetesEnvironment {
ctx, cancel := context.WithCancel(context.Background())
e := &KubernetesEnvironment{
apiClient: apiClient,
jobHCL: templateEnvironmentJobHCL,
deployment: deployment,
ctx: ctx,
cancel: cancel,
}
e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad,
runner.MonitorEnvironmentID[runner.Runner](e.ID()), time.Minute, ctx)
return e
}
// Get retrieves a specific Kubernetes environment
func (k *KubernetesEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (executionEnvironment runner.ExecutionEnvironment, err error) {
executionEnvironment, ok := k.runnerManager.GetEnvironment(id)
if fetch {
fetchedEnvironment, err := fetchK8sEnvironment(id, k.api)
switch {
case err != nil:
return nil, err
case fetchedEnvironment == nil:
_, err = k.Delete(id)
if err != nil {
return nil, err
}
ok = false
case !ok:
k.runnerManager.StoreEnvironment(fetchedEnvironment)
executionEnvironment = fetchedEnvironment
ok = true
default:
executionEnvironment.SetConfigFrom(fetchedEnvironment)
// We destroy only this (second) local reference to the environment.
err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced)
if err != nil {
log.WithError(err).Warn("Failed to remove environment locally")
}
}
}
if !ok {
err = runner.ErrUnknownExecutionEnvironment
}
return executionEnvironment, err
}
// CreateOrUpdate creates or updates an environment in Kubernetes
func (k *KubernetesEnvironmentManager) CreateOrUpdate(
id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest, ctx context.Context) (created bool, err error) {
// Check if execution environment is already existing (in the local memory).
environment, isExistingEnvironment := k.runnerManager.GetEnvironment(id)
if isExistingEnvironment {
// Remove existing environment to force downloading the newest Docker image.
// See https://github.com/openHPI/poseidon/issues/69
err = environment.Delete(runner.ErrEnvironmentUpdated)
if err != nil {
return false, fmt.Errorf("failed to remove the environment: %w", err)
}
}
// Create a new environment with the given request options.
environment, err = NewKubernetesEnvironmentFromRequest(k.api, k.templateEnvironmentHCL, id, request)
if err != nil {
return false, fmt.Errorf("error creating Nomad environment: %w", err)
}
// Keep a copy of environment specification in memory.
k.runnerManager.StoreEnvironment(environment)
// Register template Job with Nomad.
logging.StartSpan("env.update.register", "Register Environment", ctx, func(_ context.Context) {
err = environment.Register()
})
if err != nil {
return false, fmt.Errorf("error registering template job in API: %w", err)
}
// Launch idle runners based on the template job.
logging.StartSpan("env.update.poolsize", "Apply Prewarming Pool Size", ctx, func(_ context.Context) {
err = environment.ApplyPrewarmingPoolSize()
})
if err != nil {
return false, fmt.Errorf("error scaling template job in API: %w", err)
}
return !isExistingEnvironment, nil
}
// Statistics fetches statistics from Kubernetes
func (k *KubernetesEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
// Collect and return statistics for Kubernetes environments
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
}
// MapExecutionEnvironmentRequestToDeployment maps ExecutionEnvironmentRequest to a Kubernetes Deployment
func MapExecutionEnvironmentRequestToDeployment(req dto.ExecutionEnvironmentRequest, environmentID string) *appsv1.Deployment {
// Create the Deployment object
deployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: environmentID, // Set the environment ID as the name of the deployment
Labels: map[string]string{
"environment-id": environmentID,
},
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(int32(req.PrewarmingPoolSize)), // Use PrewarmingPoolSize to set the number of replicas
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"environment-id": environmentID,
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"environment-id": environmentID,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "runner-container",
Image: req.Image, // Map the image to the container
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(strconv.Itoa(int(req.CPULimit))), // Map CPU request
"memory": resource.MustParse(strconv.Itoa(int(req.MemoryLimit)) + "Mi"), // Map Memory request
},
Limits: v1.ResourceList{
"cpu": resource.MustParse(strconv.Itoa(int(req.CPULimit))), // Map CPU limit
"memory": resource.MustParse(strconv.Itoa(int(req.MemoryLimit)) + "Mi"), // Map Memory limit
},
},
},
},
},
},
},
}
// Handle network access and exposed ports
if req.NetworkAccess {
var containerPorts []v1.ContainerPort
for _, port := range req.ExposedPorts {
containerPorts = append(containerPorts, v1.ContainerPort{
ContainerPort: int32(port),
})
}
deployment.Spec.Template.Spec.Containers[0].Ports = containerPorts
}
return deployment
}
// Helper function to return a pointer to an int32
func int32Ptr(i int32) *int32 {
return &i
}
func fetchK8sEnvironment(id dto.EnvironmentID, apiClient poseidonK8s.ExecutorAPI) (runner.ExecutionEnvironment, error) {
environments, err := apiClient.LoadEnvironmentJobs()
if err != nil {
return nil, fmt.Errorf("error fetching the environment jobs: %w", err)
}
var fetchedEnvironment runner.ExecutionEnvironment
for _, deployment := range environments {
environmentID, err := nomad.EnvironmentIDFromTemplateJobID(deployment.Name)
if err != nil {
log.WithError(err).Warn("Cannot parse environment id of loaded environment")
continue
}
if id == environmentID {
fetchedEnvironment = newKubernetesEnvironmentFromJob(deployment, &apiClient)
}
}
return fetchedEnvironment, nil
}