Files
poseidon/internal/kubernetes/kubernetes.go
2024-09-18 10:43:38 +02:00

222 lines
9.1 KiB
Go

package kubernetes
import (
"context"
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/storage"
"io"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"strings"
"time"
)
var (
log = logging.GetLogger("kubernetes")
ErrorExecutorCommunicationFailed = errors.New("communication with executor failed")
ErrorEvaluation = errors.New("evaluation could not complete")
ErrorPlacingAllocations = errors.New("failed to place all allocations")
ErrorLoadingJob = errors.New("failed to load job")
ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found")
ErrorLocalDestruction RunnerDeletedReason = errors.New("the destruction should not cause external changes")
ErrorOOMKilled RunnerDeletedReason = fmt.Errorf("%s: %w", dto.ErrOOMKilled.Error(), ErrorLocalDestruction)
ErrorAllocationRescheduled RunnerDeletedReason = fmt.Errorf("the allocation was rescheduled: %w", ErrorLocalDestruction)
ErrorAllocationStopped RunnerDeletedReason = errors.New("the allocation was stopped")
ErrorAllocationStoppedUnexpectedly RunnerDeletedReason = fmt.Errorf("%w unexpectedly", ErrorAllocationStopped)
ErrorAllocationRescheduledUnexpectedly RunnerDeletedReason = fmt.Errorf(
"%w correctly but rescheduled", ErrorAllocationStopped)
// ErrorAllocationCompleted is for reporting the reason for the stopped allocation.
// We do not consider it as an error but add it anyway for a complete reporting.
ErrorAllocationCompleted RunnerDeletedReason = errors.New("the allocation completed")
)
type allocationData struct {
// allocClientStatus defines the state defined by Nomad.
allocClientStatus string
// allocDesiredStatus defines if the allocation wants to be running or being stopped.
allocDesiredStatus string
jobID string
start time.Time
// stopExpected is used to suppress warnings that could be triggered by a race condition
// between the Inactivity timer and an external event leadng to allocation rescheduling.
stopExpected bool
// Just debugging information
allocNomadNode string
}
// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected.
const resultChannelWriteTimeout = 10 * time.Millisecond
type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool)
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
// AllocationProcessing includes the callbacks to interact with allocation events.
type AllocationProcessing struct {
OnNew NewAllocationProcessor
OnDeleted DeletedAllocationProcessor
}
type RunnerDeletedReason error
// ExecutorAPI provides access to a container orchestration solution.
type ExecutorAPI interface {
apiQuerier
// LoadEnvironmentJobs loads all environment jobs.
LoadEnvironmentJobs() ([]*appsv1.Deployment, error)
// LoadRunnerJobs loads all runner jobs specific for the environment.
LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*appsv1.Deployment, error)
// LoadRunnerIDs returns the IDs of all runners with the specified id prefix which are not about to
// get stopped.
LoadRunnerIDs(prefix string) (runnerIds []string, err error)
// LoadRunnerPortMappings returns the mapped ports of the runner.
LoadRunnerPortMappings(runnerID string) ([]v1.ContainerPort, error)
// RegisterRunnerJob creates a runner job based on the template job.
// It registers the job and waits until the registration completes.
RegisterRunnerJob(template *appsv1.Deployment) error
// MonitorEvaluation monitors the given evaluation ID.
// It waits until the evaluation reaches one of the states complete, canceled or failed.
// If the evaluation was not successful, an error containing the failures is returned.
// See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175
MonitorEvaluation(evaluationID string, ctx context.Context) error
// WatchEventStream listens on the Nomad event stream for allocation and evaluation events.
// Depending on the incoming event, any of the given function is executed.
// Do not run multiple times simultaneously.
WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error
// ExecuteCommand executes the given command in the job/runner with the given id.
// It writes the output of the command to stdout/stderr and reads input from stdin.
// If tty is true, the command will run with a tty.
// Iff privilegedExecution is true, the command will be executed privileged.
// The command is passed in the shell form (not the exec array form) and will be executed in a shell.
ExecuteCommand(jobID string, ctx context.Context, command string, tty bool, privilegedExecution bool,
stdin io.Reader, stdout, stderr io.Writer) (int, error)
// MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata.
MarkRunnerAsUsed(runnerID string, duration int) error
}
// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
// Executor API and its return values.
type APIClient struct {
apiQuerier
evaluations storage.Storage[chan error]
// allocations contain management data for all pending and running allocations.
allocations storage.Storage[*allocationData]
isListening bool
}
func (A APIClient) LoadEnvironmentJobs() ([]*appsv1.Deployment, error) {
//TODO implement me
panic("implement me")
}
func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*appsv1.Deployment, error) {
go a.initializeAllocations(environmentID)
runnerIDs, err := a.LoadRunnerIDs(RunnerJobID(environmentID, ""))
if err != nil {
return []*appsv1.Deployment{}, fmt.Errorf("couldn't load jobs: %w", err)
}
var occurredError error
jobs := make([]*appsv1.Deployment, 0, len(runnerIDs))
for _, id := range runnerIDs {
job, err := a.apiQuerier.deployment(id)
if err != nil {
if occurredError == nil {
occurredError = ErrorLoadingJob
}
occurredError = fmt.Errorf("%w: couldn't load job info for runner %s - %v", occurredError, id, err)
continue
}
jobs = append(jobs, &job)
}
return jobs, occurredError
}
func (A APIClient) LoadRunnerIDs(prefix string) (runnerIds []string, err error) {
//TODO implement me
panic("implement me")
}
func (A APIClient) LoadRunnerPortMappings(runnerID string) ([]v1.ContainerPort, error) {
//TODO implement me
panic("implement me")
}
func (A APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) error {
//TODO implement me
panic("implement me")
}
func (A APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error {
//TODO implement me
panic("implement me")
}
func (A APIClient) ExecuteCommand(jobID string, ctx context.Context, command string, tty bool, privilegedExecution bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
//TODO implement me
panic("implement me")
}
func (A APIClient) MarkRunnerAsUsed(runnerID string, duration int) error {
//TODO implement me
panic("implement me")
}
// NewExecutorAPI creates a new api client.
// One client is usually sufficient for the complete runtime of the API.
func NewExecutorAPI(kubernetesConfig *rest.Config) (ExecutorAPI, error) {
client := &APIClient{
apiQuerier: &kubernetesAPIClient{},
evaluations: storage.NewLocalStorage[chan error](),
allocations: storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations,
func(p *write.Point, object *allocationData, _ storage.EventType) {
p.AddTag(monitoring.InfluxKeyJobID, object.jobID)
p.AddTag(monitoring.InfluxKeyClientStatus, object.allocClientStatus)
p.AddTag(monitoring.InfluxKeyNomadNode, object.allocNomadNode)
}, 0, nil),
}
err := client.init(kubernetesConfig)
return client, err
}
func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) {
allocationStubs, err := a.listAllocations()
if err != nil {
log.WithError(err).Warn("Could not initialize allocations")
} else {
for _, stub := range allocationStubs {
switch {
case IsEnvironmentTemplateID(stub.JobID):
continue
case !strings.HasPrefix(stub.JobID, RunnerJobID(environmentID, "")):
continue
case stub.ClientStatus == structs.AllocClientStatusPending || stub.ClientStatus == structs.AllocClientStatusRunning:
log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation")
a.allocations.Add(stub.ID, &allocationData{
allocClientStatus: stub.ClientStatus,
allocDesiredStatus: stub.DesiredStatus,
jobID: stub.JobID,
start: time.Unix(0, stub.CreateTime),
allocNomadNode: stub.NodeName,
})
}
}
}
}