298 lines
9.7 KiB
Go
298 lines
9.7 KiB
Go
package environment
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
poseidonK8s "github.com/openHPI/poseidon/internal/kubernetes"
|
|
"github.com/openHPI/poseidon/internal/nomad"
|
|
"github.com/openHPI/poseidon/internal/runner"
|
|
"github.com/openHPI/poseidon/pkg/dto"
|
|
"github.com/openHPI/poseidon/pkg/logging"
|
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
|
"github.com/openHPI/poseidon/pkg/storage"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
type KubernetesEnvironmentManager struct {
|
|
*AbstractManager
|
|
api poseidonK8s.ExecutorAPI
|
|
templateEnvironmentHCL string
|
|
}
|
|
|
|
func NewKubernetesEnvironmentManager(
|
|
runnerManager runner.Manager,
|
|
apiClient *poseidonK8s.ExecutorAPI,
|
|
templateJobFile string,
|
|
) (*KubernetesEnvironmentManager, error) {
|
|
if err := loadTemplateEnvironmentJobHCL(templateJobFile); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := &KubernetesEnvironmentManager{
|
|
AbstractManager: &AbstractManager{nil, runnerManager},
|
|
api: *apiClient,
|
|
templateEnvironmentHCL: templateEnvironmentJobHCL,
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
func (k *KubernetesEnvironmentManager) SetNextHandler(next ManagerHandler) {
|
|
k.nextHandler = next
|
|
}
|
|
|
|
func (k *KubernetesEnvironmentManager) NextHandler() ManagerHandler {
|
|
if k.HasNextHandler() {
|
|
return k.nextHandler
|
|
} else {
|
|
return &AbstractManager{}
|
|
}
|
|
}
|
|
|
|
func (k *KubernetesEnvironmentManager) HasNextHandler() bool {
|
|
return k.nextHandler != nil
|
|
}
|
|
|
|
// List all Kubernetes-based environments
|
|
func (k *KubernetesEnvironmentManager) List(fetch bool) ([]runner.ExecutionEnvironment, error) {
|
|
if fetch {
|
|
if err := k.fetchEnvironments(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return k.runnerManager.ListEnvironments(), nil
|
|
}
|
|
|
|
func (k *KubernetesEnvironmentManager) fetchEnvironments() error {
|
|
remoteDeploymentResponse, err := k.api.LoadEnvironmentJobs()
|
|
if err != nil {
|
|
return fmt.Errorf("failed fetching environments: %w", err)
|
|
}
|
|
|
|
remoteDeployments := make(map[string]appsv1.Deployment)
|
|
|
|
// Update local environments from remote environments.
|
|
for _, deployment := range remoteDeploymentResponse {
|
|
remoteDeployments[deployment.Name] = *deployment
|
|
|
|
// Job Id to Environment Id Integer
|
|
intIdentifier, err := strconv.Atoi(deployment.Name)
|
|
if err != nil {
|
|
log.WithError(err).Warn("Failed to convert job name to int")
|
|
continue
|
|
}
|
|
id := dto.EnvironmentID(intIdentifier)
|
|
|
|
if localEnvironment, ok := k.runnerManager.GetEnvironment(id); ok {
|
|
fetchedEnvironment := newKubernetesEnvironmentFromJob(deployment, &k.api)
|
|
localEnvironment.SetConfigFrom(fetchedEnvironment)
|
|
// We destroy only this (second) local reference to the environment.
|
|
if err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced); err != nil {
|
|
log.WithError(err).Warn("Failed to remove environment locally")
|
|
}
|
|
} else {
|
|
k.runnerManager.StoreEnvironment(newKubernetesEnvironmentFromJob(deployment, &k.api))
|
|
}
|
|
}
|
|
|
|
// Remove local environments that are not remote environments.
|
|
for _, localEnvironment := range k.runnerManager.ListEnvironments() {
|
|
if _, ok := remoteDeployments[localEnvironment.ID().ToString()]; !ok {
|
|
err := localEnvironment.Delete(runner.ErrLocalDestruction)
|
|
log.WithError(err).Warn("Failed to remove environment locally")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// newNomadEnvironmentFromJob creates a Nomad environment from the passed Nomad job definition.
|
|
func newKubernetesEnvironmentFromJob(deployment *appsv1.Deployment, apiClient *poseidonK8s.ExecutorAPI) *KubernetesEnvironment {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e := &KubernetesEnvironment{
|
|
apiClient: apiClient,
|
|
jobHCL: templateEnvironmentJobHCL,
|
|
deployment: deployment,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
e.idleRunners = storage.NewMonitoredLocalStorage[runner.Runner](monitoring.MeasurementIdleRunnerNomad,
|
|
runner.MonitorEnvironmentID[runner.Runner](e.ID()), time.Minute, ctx)
|
|
return e
|
|
}
|
|
|
|
// Get retrieves a specific Kubernetes environment
|
|
func (k *KubernetesEnvironmentManager) Get(id dto.EnvironmentID, fetch bool) (executionEnvironment runner.ExecutionEnvironment, err error) {
|
|
executionEnvironment, ok := k.runnerManager.GetEnvironment(id)
|
|
|
|
if fetch {
|
|
fetchedEnvironment, err := fetchK8sEnvironment(id, k.api)
|
|
switch {
|
|
case err != nil:
|
|
return nil, err
|
|
case fetchedEnvironment == nil:
|
|
_, err = k.Delete(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ok = false
|
|
case !ok:
|
|
k.runnerManager.StoreEnvironment(fetchedEnvironment)
|
|
executionEnvironment = fetchedEnvironment
|
|
ok = true
|
|
default:
|
|
executionEnvironment.SetConfigFrom(fetchedEnvironment)
|
|
// We destroy only this (second) local reference to the environment.
|
|
err = fetchedEnvironment.Delete(runner.ErrDestroyedAndReplaced)
|
|
if err != nil {
|
|
log.WithError(err).Warn("Failed to remove environment locally")
|
|
}
|
|
}
|
|
}
|
|
|
|
if !ok {
|
|
err = runner.ErrUnknownExecutionEnvironment
|
|
}
|
|
return executionEnvironment, err
|
|
}
|
|
|
|
// CreateOrUpdate creates or updates an environment in Kubernetes
|
|
func (k *KubernetesEnvironmentManager) CreateOrUpdate(
|
|
id dto.EnvironmentID, request dto.ExecutionEnvironmentRequest, ctx context.Context) (created bool, err error) {
|
|
|
|
// Check if execution environment is already existing (in the local memory).
|
|
environment, isExistingEnvironment := k.runnerManager.GetEnvironment(id)
|
|
if isExistingEnvironment {
|
|
// Remove existing environment to force downloading the newest Docker image.
|
|
// See https://github.com/openHPI/poseidon/issues/69
|
|
err = environment.Delete(runner.ErrEnvironmentUpdated)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to remove the environment: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create a new environment with the given request options.
|
|
environment, err = NewKubernetesEnvironmentFromRequest(k.api, k.templateEnvironmentHCL, id, request)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error creating Nomad environment: %w", err)
|
|
}
|
|
|
|
// Keep a copy of environment specification in memory.
|
|
k.runnerManager.StoreEnvironment(environment)
|
|
|
|
// Register template Job with Nomad.
|
|
logging.StartSpan("env.update.register", "Register Environment", ctx, func(_ context.Context) {
|
|
err = environment.Register()
|
|
})
|
|
if err != nil {
|
|
return false, fmt.Errorf("error registering template job in API: %w", err)
|
|
}
|
|
|
|
// Launch idle runners based on the template job.
|
|
logging.StartSpan("env.update.poolsize", "Apply Prewarming Pool Size", ctx, func(_ context.Context) {
|
|
err = environment.ApplyPrewarmingPoolSize()
|
|
})
|
|
if err != nil {
|
|
return false, fmt.Errorf("error scaling template job in API: %w", err)
|
|
}
|
|
|
|
return !isExistingEnvironment, nil
|
|
|
|
}
|
|
|
|
// Statistics fetches statistics from Kubernetes
|
|
func (k *KubernetesEnvironmentManager) Statistics() map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData {
|
|
// Collect and return statistics for Kubernetes environments
|
|
return map[dto.EnvironmentID]*dto.StatisticalExecutionEnvironmentData{}
|
|
}
|
|
|
|
// MapExecutionEnvironmentRequestToDeployment maps ExecutionEnvironmentRequest to a Kubernetes Deployment
|
|
func MapExecutionEnvironmentRequestToDeployment(req dto.ExecutionEnvironmentRequest, environmentID string) *appsv1.Deployment {
|
|
// Create the Deployment object
|
|
deployment := &appsv1.Deployment{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Deployment",
|
|
APIVersion: "apps/v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: environmentID, // Set the environment ID as the name of the deployment
|
|
Labels: map[string]string{
|
|
"environment-id": environmentID,
|
|
},
|
|
},
|
|
Spec: appsv1.DeploymentSpec{
|
|
Replicas: int32Ptr(int32(req.PrewarmingPoolSize)), // Use PrewarmingPoolSize to set the number of replicas
|
|
Selector: &metav1.LabelSelector{
|
|
MatchLabels: map[string]string{
|
|
"environment-id": environmentID,
|
|
},
|
|
},
|
|
Template: v1.PodTemplateSpec{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Labels: map[string]string{
|
|
"environment-id": environmentID,
|
|
},
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Name: "runner-container",
|
|
Image: req.Image, // Map the image to the container
|
|
Resources: v1.ResourceRequirements{
|
|
Requests: v1.ResourceList{
|
|
"cpu": resource.MustParse(strconv.Itoa(int(req.CPULimit))), // Map CPU request
|
|
"memory": resource.MustParse(strconv.Itoa(int(req.MemoryLimit)) + "Mi"), // Map Memory request
|
|
},
|
|
Limits: v1.ResourceList{
|
|
"cpu": resource.MustParse(strconv.Itoa(int(req.CPULimit))), // Map CPU limit
|
|
"memory": resource.MustParse(strconv.Itoa(int(req.MemoryLimit)) + "Mi"), // Map Memory limit
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Handle network access and exposed ports
|
|
if req.NetworkAccess {
|
|
var containerPorts []v1.ContainerPort
|
|
for _, port := range req.ExposedPorts {
|
|
containerPorts = append(containerPorts, v1.ContainerPort{
|
|
ContainerPort: int32(port),
|
|
})
|
|
}
|
|
deployment.Spec.Template.Spec.Containers[0].Ports = containerPorts
|
|
}
|
|
|
|
return deployment
|
|
}
|
|
|
|
// Helper function to return a pointer to an int32
|
|
func int32Ptr(i int32) *int32 {
|
|
return &i
|
|
}
|
|
|
|
func fetchK8sEnvironment(id dto.EnvironmentID, apiClient poseidonK8s.ExecutorAPI) (runner.ExecutionEnvironment, error) {
|
|
environments, err := apiClient.LoadEnvironmentJobs()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error fetching the environment jobs: %w", err)
|
|
}
|
|
var fetchedEnvironment runner.ExecutionEnvironment
|
|
for _, deployment := range environments {
|
|
environmentID, err := nomad.EnvironmentIDFromTemplateJobID(deployment.Name)
|
|
if err != nil {
|
|
log.WithError(err).Warn("Cannot parse environment id of loaded environment")
|
|
continue
|
|
}
|
|
if id == environmentID {
|
|
fetchedEnvironment = newKubernetesEnvironmentFromJob(deployment, &apiClient)
|
|
}
|
|
}
|
|
return fetchedEnvironment, nil
|
|
}
|