252 lines
7.9 KiB
Go
252 lines
7.9 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/google/uuid"
|
|
"github.com/openHPI/poseidon/pkg/dto"
|
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
|
"github.com/openHPI/poseidon/pkg/storage"
|
|
"io"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
var ErrPodCreationFailed = errors.New("failed to create pod")
|
|
|
|
var (
|
|
ErrorUnknownExecution = errors.New("unknown execution")
|
|
ErrFileNotFound = errors.New("file not found or insufficient permissions")
|
|
ErrOOMKilled DestroyReason = errors.New("the runner was killed due to out of memory")
|
|
ErrDestroyedByAPIRequest DestroyReason = errors.New("the client wants to stop the runner")
|
|
)
|
|
|
|
// KubernetesPodWorkload is an abstraction to manage a Kubernetes pod.
|
|
// It is not persisted on a Poseidon restart.
|
|
// The InactivityTimer is used actively. It stops and deletes the pod.
|
|
type KubernetesPodWorkload struct {
|
|
InactivityTimer
|
|
id string
|
|
fs map[dto.FilePath][]byte
|
|
executions storage.Storage[*dto.ExecutionRequest]
|
|
runningExecutions map[string]context.CancelFunc
|
|
onDestroy DestroyRunnerHandler
|
|
environment ExecutionEnvironment
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
clientset *kubernetes.Clientset
|
|
podName string
|
|
namespace string
|
|
}
|
|
|
|
// NewKubernetesPodWorkload creates a new KubernetesPodWorkload with the provided id.
|
|
func NewKubernetesPodWorkload(
|
|
environment ExecutionEnvironment, onDestroy DestroyRunnerHandler, clientset *kubernetes.Clientset) (*KubernetesPodWorkload, error) {
|
|
newUUID, err := uuid.NewUUID()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed generating runner id: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
workload := &KubernetesPodWorkload{
|
|
id: newUUID.String(),
|
|
fs: make(map[dto.FilePath][]byte),
|
|
runningExecutions: make(map[string]context.CancelFunc),
|
|
onDestroy: onDestroy,
|
|
environment: environment,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
clientset: clientset,
|
|
namespace: "default", // You might want to make this configurable
|
|
podName: fmt.Sprintf("workload-%s", newUUID.String()),
|
|
}
|
|
workload.executions = storage.NewMonitoredLocalStorage[*dto.ExecutionRequest](
|
|
monitoring.MeasurementExecutionsK8s, monitorExecutionsRunnerID(environment.ID(), workload.id), time.Minute, ctx)
|
|
workload.InactivityTimer = NewInactivityTimer(workload, func(_ Runner) error {
|
|
return workload.Destroy(nil)
|
|
})
|
|
return workload, nil
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) ID() string {
|
|
return w.id
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) Environment() dto.EnvironmentID {
|
|
return w.environment.ID()
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) MappedPorts() []*dto.MappedPort {
|
|
// Implement port mapping logic for Kubernetes
|
|
return []*dto.MappedPort{}
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) StoreExecution(id string, request *dto.ExecutionRequest) {
|
|
w.executions.Add(id, request)
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) ExecutionExists(id string) bool {
|
|
_, ok := w.executions.Get(id)
|
|
return ok
|
|
}
|
|
|
|
// ExecuteInteractively runs the execution request in a Kubernetes pod.
|
|
func (w *KubernetesPodWorkload) ExecuteInteractively(
|
|
id string, _ io.ReadWriter, stdout, stderr io.Writer, ctx context.Context) (
|
|
<-chan ExitInfo, context.CancelFunc, error) {
|
|
w.ResetTimeout()
|
|
request, ok := w.executions.Pop(id)
|
|
if !ok {
|
|
return nil, nil, ErrorUnknownExecution
|
|
}
|
|
hideEnvironmentVariablesK8s(request, "K8S")
|
|
command, executionCtx, cancel := prepareExecution(request, w.ctx)
|
|
exitInternal := make(chan ExitInfo)
|
|
exit := make(chan ExitInfo, 1)
|
|
|
|
go w.executeCommand(executionCtx, command, stdout, stderr, exitInternal)
|
|
go w.handleRunnerTimeout(executionCtx, exitInternal, exit, id)
|
|
|
|
return exit, cancel, nil
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) ListFileSystem(path string, recursive bool, writer io.Writer, humanReadable bool, ctx context.Context) error {
|
|
// Implement file system listing for Kubernetes pods
|
|
return dto.ErrNotSupported
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) UpdateFileSystem(request *dto.UpdateFileSystemRequest, ctx context.Context) error {
|
|
// Implement file system update for Kubernetes pods
|
|
return nil
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) GetFileContent(path string, writer http.ResponseWriter, humanReadable bool, ctx context.Context) error {
|
|
// Implement file content retrieval for Kubernetes pods
|
|
return dto.ErrNotSupported
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) Destroy(_ DestroyReason) error {
|
|
w.cancel()
|
|
err := w.clientset.CoreV1().Pods(w.namespace).Delete(context.Background(), w.podName, metav1.DeleteOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("error while destroying kubernetes pod: %w", err)
|
|
}
|
|
if err := w.onDestroy(w); err != nil {
|
|
return fmt.Errorf("error while destroying kubernetes runner: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) executeCommand(ctx context.Context, command string,
|
|
stdout, stderr io.Writer, exit chan<- ExitInfo,
|
|
) {
|
|
defer close(exit)
|
|
|
|
pod := &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: w.podName,
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
RestartPolicy: corev1.RestartPolicyNever,
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: "workload",
|
|
Image: w.environment.Image(),
|
|
Command: []string{"/bin/sh", "-c", command},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
_, err := w.clientset.CoreV1().Pods(w.namespace).Create(ctx, pod, metav1.CreateOptions{})
|
|
if err != nil {
|
|
exit <- ExitInfo{1, fmt.Errorf("%w: %v", ErrPodCreationFailed, err)}
|
|
return
|
|
}
|
|
|
|
req := w.clientset.CoreV1().Pods(w.namespace).GetLogs(w.podName, &corev1.PodLogOptions{
|
|
Follow: true,
|
|
})
|
|
podLogs, err := req.Stream(ctx)
|
|
if err != nil {
|
|
exit <- ExitInfo{1, fmt.Errorf("error in opening stream: %v", err)}
|
|
return
|
|
}
|
|
defer func(podLogs io.ReadCloser) {
|
|
err := podLogs.Close()
|
|
if err != nil {
|
|
exit <- ExitInfo{1, fmt.Errorf("error in closing stream: %v", err)}
|
|
}
|
|
}(podLogs)
|
|
|
|
_, err = io.Copy(stdout, podLogs)
|
|
if err != nil {
|
|
exit <- ExitInfo{1, fmt.Errorf("error in copying logs: %v", err)}
|
|
return
|
|
}
|
|
|
|
// Wait for the pod to complete
|
|
watch, err := w.clientset.CoreV1().Pods(w.namespace).Watch(ctx, metav1.ListOptions{
|
|
FieldSelector: fmt.Sprintf("metadata.name=%s", w.podName),
|
|
})
|
|
if err != nil {
|
|
exit <- ExitInfo{1, fmt.Errorf("error watching pod: %v", err)}
|
|
return
|
|
}
|
|
defer watch.Stop()
|
|
|
|
for event := range watch.ResultChan() {
|
|
pod, ok := event.Object.(*corev1.Pod)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
|
|
exitCode := uint8(0)
|
|
if pod.Status.Phase == corev1.PodFailed {
|
|
exitCode = 1
|
|
}
|
|
exit <- ExitInfo{exitCode, nil}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *KubernetesPodWorkload) handleRunnerTimeout(ctx context.Context,
|
|
exitInternal <-chan ExitInfo, exit chan<- ExitInfo, executionID string) {
|
|
executionCtx, cancelExecution := context.WithCancel(ctx)
|
|
w.runningExecutions[executionID] = cancelExecution
|
|
defer delete(w.runningExecutions, executionID)
|
|
defer close(exit)
|
|
|
|
select {
|
|
case exitInfo := <-exitInternal:
|
|
exit <- exitInfo
|
|
case <-executionCtx.Done():
|
|
exit <- ExitInfo{255, ErrorRunnerInactivityTimeout}
|
|
}
|
|
}
|
|
|
|
// hideEnvironmentVariables sets the CODEOCEAN variable and unsets all variables starting with the passed prefix.
|
|
func hideEnvironmentVariablesK8s(request *dto.ExecutionRequest, unsetPrefix string) {
|
|
if request.Environment == nil {
|
|
request.Environment = make(map[string]string)
|
|
}
|
|
request.Command = "unset \"${!" + unsetPrefix + "@}\" && " + request.Command
|
|
}
|
|
|
|
func prepareExecution(request *dto.ExecutionRequest, environmentCtx context.Context) (
|
|
command string, ctx context.Context, cancel context.CancelFunc,
|
|
) {
|
|
command = request.FullCommand()
|
|
if request.TimeLimit == 0 {
|
|
ctx, cancel = context.WithCancel(environmentCtx)
|
|
} else {
|
|
ctx, cancel = context.WithTimeout(environmentCtx, time.Duration(request.TimeLimit)*time.Second)
|
|
}
|
|
return command, ctx, cancel
|
|
}
|