Files
poseidon/internal/nomad/nomad.go
Maximilian Paß 19e0ae1583 Fix concurrent map write
in the Nomad `evaluations` map by replacing the simple map with our concurrency-ready storage object.
2024-04-17 13:19:49 +02:00

772 lines
32 KiB
Go

package nomad
import (
"context"
"errors"
"fmt"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"github.com/openHPI/poseidon/pkg/nullio"
"github.com/openHPI/poseidon/pkg/storage"
"io"
"strconv"
"strings"
"time"
)
var (
log = logging.GetLogger("nomad")
ErrorExecutorCommunicationFailed = errors.New("communication with executor failed")
ErrorEvaluation = errors.New("evaluation could not complete")
ErrorPlacingAllocations = errors.New("failed to place all allocations")
ErrorLoadingJob = errors.New("failed to load job")
ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found")
ErrorLocalDestruction RunnerDeletedReason = errors.New("the destruction should not cause external changes")
ErrorOOMKilled RunnerDeletedReason = fmt.Errorf("%s: %w", dto.ErrOOMKilled.Error(), ErrorLocalDestruction)
ErrorAllocationRescheduled RunnerDeletedReason = fmt.Errorf("the allocation was rescheduled: %w", ErrorLocalDestruction)
ErrorAllocationStopped RunnerDeletedReason = errors.New("the allocation was stopped")
ErrorAllocationStoppedUnexpectedly RunnerDeletedReason = fmt.Errorf("%w unexpectedly", ErrorAllocationStopped)
ErrorAllocationRescheduledUnexpectedly RunnerDeletedReason = fmt.Errorf(
"%w correctly but rescheduled", ErrorAllocationStopped)
// ErrorAllocationCompleted is for reporting the reason for the stopped allocation.
// We do not consider it as an error but add it anyway for a complete reporting.
ErrorAllocationCompleted RunnerDeletedReason = errors.New("the allocation completed")
)
// resultChannelWriteTimeout is to detect the error when more element are written into a channel than expected.
const resultChannelWriteTimeout = 10 * time.Millisecond
// AllocationProcessing includes the callbacks to interact with allocation events.
type AllocationProcessing struct {
OnNew NewAllocationProcessor
OnDeleted DeletedAllocationProcessor
}
type RunnerDeletedReason error
// DeletedAllocationProcessor is a handler that will be called for each deleted allocation.
// removedByPoseidon should be true iff the Nomad Manager has removed the runner before.
type DeletedAllocationProcessor func(jobID string, RunnerDeletedReason error) (removedByPoseidon bool)
type NewAllocationProcessor func(*nomadApi.Allocation, time.Duration)
type allocationData struct {
// allocClientStatus defines the state defined by Nomad.
allocClientStatus string
// allocDesiredStatus defines if the allocation wants to be running or being stopped.
allocDesiredStatus string
jobID string
start time.Time
// stopExpected is used to suppress warnings that could be triggered by a race condition
// between the Inactivity timer and an external event leadng to allocation rescheduling.
stopExpected bool
// Just debugging information
allocNomadNode string
}
// ExecutorAPI provides access to a container orchestration solution.
type ExecutorAPI interface {
apiQuerier
// LoadEnvironmentJobs loads all environment jobs.
LoadEnvironmentJobs() ([]*nomadApi.Job, error)
// LoadRunnerJobs loads all runner jobs specific for the environment.
LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi.Job, error)
// LoadRunnerIDs returns the IDs of all runners with the specified id prefix which are not about to
// get stopped.
LoadRunnerIDs(prefix string) (runnerIds []string, err error)
// LoadRunnerPortMappings returns the mapped ports of the runner.
LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapping, error)
// RegisterRunnerJob creates a runner job based on the template job.
// It registers the job and waits until the registration completes.
RegisterRunnerJob(template *nomadApi.Job) error
// MonitorEvaluation monitors the given evaluation ID.
// It waits until the evaluation reaches one of the states complete, canceled or failed.
// If the evaluation was not successful, an error containing the failures is returned.
// See also https://github.com/hashicorp/nomad/blob/7d5a9ecde95c18da94c9b6ace2565afbfdd6a40d/command/monitor.go#L175
MonitorEvaluation(evaluationID string, ctx context.Context) error
// WatchEventStream listens on the Nomad event stream for allocation and evaluation events.
// Depending on the incoming event, any of the given function is executed.
// Do not run multiple times simultaneously.
WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error
// ExecuteCommand executes the given command in the job/runner with the given id.
// It writes the output of the command to stdout/stderr and reads input from stdin.
// If tty is true, the command will run with a tty.
// Iff privilegedExecution is true, the command will be executed privileged.
// The command is passed in the shell form (not the exec array form) and will be executed in a shell.
ExecuteCommand(jobID string, ctx context.Context, command string, tty bool, privilegedExecution bool,
stdin io.Reader, stdout, stderr io.Writer) (int, error)
// MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata.
MarkRunnerAsUsed(runnerID string, duration int) error
}
// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
// Executor API and its return values.
type APIClient struct {
apiQuerier
evaluations storage.Storage[chan error]
// allocations contain management data for all pending and running allocations.
allocations storage.Storage[*allocationData]
isListening bool
}
// NewExecutorAPI creates a new api client.
// One client is usually sufficient for the complete runtime of the API.
func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) {
client := &APIClient{
apiQuerier: &nomadAPIClient{},
evaluations: storage.NewLocalStorage[chan error](),
allocations: storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations,
func(p *write.Point, object *allocationData, _ storage.EventType) {
p.AddTag(monitoring.InfluxKeyJobID, object.jobID)
p.AddTag(monitoring.InfluxKeyClientStatus, object.allocClientStatus)
p.AddTag(monitoring.InfluxKeyNomadNode, object.allocNomadNode)
}, 0, nil),
}
err := client.init(nomadConfig)
return client, err
}
// init prepares an apiClient to be able to communicate to a provided Nomad API.
func (a *APIClient) init(nomadConfig *config.Nomad) error {
if err := a.apiQuerier.init(nomadConfig); err != nil {
return fmt.Errorf("error initializing API querier: %w", err)
}
return nil
}
func (a *APIClient) LoadRunnerIDs(prefix string) (runnerIDs []string, err error) {
list, err := a.listJobs(prefix)
if err != nil {
return nil, err
}
for _, jobListStub := range list {
// Filter out dead ("complete", "failed" or "lost") jobs
if jobListStub.Status != structs.JobStatusDead {
runnerIDs = append(runnerIDs, jobListStub.ID)
}
}
return runnerIDs, nil
}
func (a *APIClient) LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapping, error) {
alloc, err := a.apiQuerier.allocation(runnerID)
if err != nil {
return nil, fmt.Errorf("error querying allocation for runner %s: %w", runnerID, err)
}
if alloc.AllocatedResources == nil {
return nil, ErrorNoAllocatedResourcesFound
}
return alloc.AllocatedResources.Shared.Ports, nil
}
func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi.Job, error) {
go a.initializeAllocations(environmentID)
runnerIDs, err := a.LoadRunnerIDs(RunnerJobID(environmentID, ""))
if err != nil {
return []*nomadApi.Job{}, fmt.Errorf("couldn't load jobs: %w", err)
}
var occurredError error
jobs := make([]*nomadApi.Job, 0, len(runnerIDs))
for _, id := range runnerIDs {
job, err := a.apiQuerier.job(id)
if err != nil {
if occurredError == nil {
occurredError = ErrorLoadingJob
}
occurredError = fmt.Errorf("%w: couldn't load job info for runner %s - %v", occurredError, id, err)
continue
}
jobs = append(jobs, job)
}
return jobs, occurredError
}
func (a *APIClient) MonitorEvaluation(evaluationID string, ctx context.Context) (err error) {
evaluationErrorChannel := make(chan error, 1)
a.evaluations.Add(evaluationID, evaluationErrorChannel)
defer a.evaluations.Delete(evaluationID)
if !a.isListening {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel() // cancel the WatchEventStream when the evaluation result was read.
go func() {
err = a.WatchEventStream(ctx, &AllocationProcessing{
OnNew: func(_ *nomadApi.Allocation, _ time.Duration) {},
OnDeleted: func(_ string, _ error) bool { return false },
})
cancel() // cancel the waiting for an evaluation result if watching the event stream ends.
}()
}
select {
case <-ctx.Done():
return err
case err := <-evaluationErrorChannel:
// At the moment we expect only one error to be sent via this channel.
return err
}
}
func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationProcessing) error {
startTime := time.Now().UnixNano()
stream, err := a.EventStream(ctx)
if err != nil {
return fmt.Errorf("failed retrieving allocation stream: %w", err)
}
handler := func(event *nomadApi.Event) (bool, error) {
dumpNomadEventToInflux(event)
switch event.Topic {
case nomadApi.TopicEvaluation:
return false, handleEvaluationEvent(a.evaluations, event)
case nomadApi.TopicAllocation:
return false, handleAllocationEvent(startTime, a.allocations, event, callbacks)
default:
return false, nil
}
}
a.isListening = true
err = receiveAndHandleNomadAPIEvents(stream, handler)
a.isListening = false
return err
}
func dumpNomadEventToInflux(event *nomadApi.Event) {
p := influxdb2.NewPointWithMeasurement(monitoring.MeasurementNomadEvents)
p.AddTag("topic", event.Topic.String())
p.AddTag("type", event.Type)
p.AddTag("key", event.Key)
p.AddField("payload", event.Payload)
p.AddTag("timestamp", time.Now().Format("03:04:05.000000000"))
monitoring.WriteInfluxPoint(p)
}
func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) {
allocationStubs, err := a.listAllocations()
if err != nil {
log.WithError(err).Warn("Could not initialize allocations")
} else {
for _, stub := range allocationStubs {
switch {
case IsEnvironmentTemplateID(stub.JobID):
continue
case !strings.HasPrefix(stub.JobID, RunnerJobID(environmentID, "")):
continue
case stub.ClientStatus == structs.AllocClientStatusPending || stub.ClientStatus == structs.AllocClientStatusRunning:
log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation")
a.allocations.Add(stub.ID, &allocationData{
allocClientStatus: stub.ClientStatus,
allocDesiredStatus: stub.DesiredStatus,
jobID: stub.JobID,
start: time.Unix(0, stub.CreateTime),
allocNomadNode: stub.NodeName,
})
}
}
}
}
// nomadAPIEventHandler is a function that receives a nomadApi.Event and processes it.
// It is called by an event listening loop. For each received event, the function is called.
// If done is true, the calling function knows that it should break out of the event listening
// loop.
type nomadAPIEventHandler func(event *nomadApi.Event) (done bool, err error)
// receiveAndHandleNomadAPIEvents receives events from the Nomad event stream and calls the handler function for
// each received event. It skips heartbeat events and returns an error if the received events contain an error.
func receiveAndHandleNomadAPIEvents(stream <-chan *nomadApi.Events, handler nomadAPIEventHandler) error {
// If original context is canceled, the stream will be closed by Nomad and we exit the for loop.
for events := range stream {
if err := events.Err; err != nil {
return fmt.Errorf("error receiving events: %w", err)
} else if events.IsHeartbeat() {
continue
}
for _, event := range events.Events {
// Don't take the address of the loop variable as the underlying value might change
eventCopy := event
done, err := handler(&eventCopy)
if err != nil || done {
return err
}
}
}
return nil
}
// handleEvaluationEvent is an event handler that returns whether the evaluation described by the event
// was successful.
func handleEvaluationEvent(evaluations storage.Storage[chan error], event *nomadApi.Event) error {
eval, err := event.Evaluation()
if err != nil {
return fmt.Errorf("failed to monitor evaluation: %w", err)
}
switch eval.Status {
case structs.EvalStatusComplete, structs.EvalStatusCancelled, structs.EvalStatusFailed:
resultChannel, ok := evaluations.Get(eval.ID)
if ok {
evalErr := checkEvaluation(eval)
select {
case resultChannel <- evalErr:
close(resultChannel)
case <-time.After(resultChannelWriteTimeout):
log.WithField("length", len(resultChannel)).
WithField("eval", eval).
WithError(evalErr).
Error("Sending to the evaluation channel timed out")
}
}
}
return nil
}
// handleAllocationEvent is an event handler that processes allocation events.
// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation
// is called. The allocations storage is used to track pending and running allocations. Using the
// storage the state is persisted between multiple calls of this function.
func handleAllocationEvent(startTime int64, allocations storage.Storage[*allocationData],
event *nomadApi.Event, callbacks *AllocationProcessing) error {
alloc, err := event.Allocation()
if err != nil {
return fmt.Errorf("failed to retrieve allocation from event: %w", err)
} else if alloc == nil {
return nil
}
// When starting the API and listening on the Nomad event stream we might get events that already
// happened from Nomad as it seems to buffer them for a certain duration.
// Ignore old events here.
if alloc.ModifyTime < startTime {
return nil
}
if valid := filterDuplicateEvents(alloc, allocations); !valid {
return nil
}
log.WithField("alloc_id", alloc.ID).
WithField("ClientStatus", alloc.ClientStatus).
WithField("DesiredStatus", alloc.DesiredStatus).
WithField("PrevAllocation", alloc.PreviousAllocation).
WithField("NextAllocation", alloc.NextAllocation).
Debug("Handle Allocation Event")
allocData := updateAllocationData(alloc, allocations)
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
handlePendingAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusRunning:
handleRunningAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusComplete:
handleCompleteAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusFailed:
handleFailedAllocationEvent(alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusLost:
handleLostAllocationEvent(alloc, allocData, allocations, callbacks)
default:
log.WithField("alloc", alloc).Warn("Other Client Status")
}
return nil
}
// filterDuplicateEvents identifies duplicate events or events of unknown allocations.
func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (valid bool) {
newAllocationExpected := alloc.ClientStatus == structs.AllocClientStatusPending &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun
allocData, ok := allocations.Get(alloc.ID)
switch {
case !ok && newAllocationExpected:
return true
case !ok:
// This case happens in case of an error or when an event that led to the deletion of the alloc data is duplicated.
log.WithField("allocID", alloc.ID).Debug("Ignoring unknown allocation")
return false
case alloc.ClientStatus == allocData.allocClientStatus && alloc.DesiredStatus == allocData.allocDesiredStatus:
log.WithField("allocID", alloc.ID).Debug("Ignoring duplicate event")
return false
default:
return true
}
}
// updateAllocationData updates the allocation tracking data according to the passed alloc.
// The allocation data before this allocation update is returned.
func updateAllocationData(
alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (previous *allocationData) {
allocData, ok := allocations.Get(alloc.ID)
if ok {
data := *allocData
previous = &data
allocData.allocClientStatus = alloc.ClientStatus
allocData.allocDesiredStatus = alloc.DesiredStatus
allocations.Add(alloc.ID, allocData)
}
return previous
}
// handlePendingAllocationEvent manages allocation that are currently pending.
// This allows the handling of startups and re-placements of allocations.
func handlePendingAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
var stopExpected bool
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
if allocData != nil {
// Handle Allocation restart.
var reason error
if isOOMKilled(alloc) {
reason = ErrorOOMKilled
}
callbacks.OnDeleted(alloc.JobID, reason)
} else if alloc.PreviousAllocation != "" {
// Handle Runner (/Container) re-allocations.
if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok {
stopExpected = callbacks.OnDeleted(prevData.jobID, ErrorAllocationRescheduled)
allocations.Delete(alloc.PreviousAllocation)
} else {
log.WithField("alloc", alloc).Warn("Previous Allocation not found")
}
}
// Store Pending Allocation - Allocation gets started, wait until it runs.
allocations.Add(alloc.ID, &allocationData{
allocClientStatus: alloc.ClientStatus,
allocDesiredStatus: alloc.DesiredStatus,
jobID: alloc.JobID,
start: time.Now(),
allocNomadNode: alloc.NodeName,
stopExpected: stopExpected,
})
case structs.AllocDesiredStatusStop:
// As this allocation was still pending, we don't have to propagate its deletion.
allocations.Delete(alloc.ID)
// Anyway, we want to monitor the occurrences.
if !allocData.stopExpected {
log.WithField("alloc", alloc).Warn("Pending allocation was stopped unexpectedly")
} else {
// ToDo: This log statement is just for measuring how common the mentioned race condition is. Remove it soon.
log.WithField("alloc", alloc).Warn("Pending allocation was stopped expectedly")
}
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}
// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
func handleRunningAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
_ storage.Storage[*allocationData], callbacks *AllocationProcessing) {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
startupDuration := time.Since(allocData.start)
callbacks.OnNew(alloc, startupDuration)
case structs.AllocDesiredStatusStop:
// It is normal that running allocations will stop. We will handle it when it is stopped.
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}
// handleCompleteAllocationEvent handles allocations that stopped.
func handleCompleteAllocationEvent(alloc *nomadApi.Allocation, _ *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
log.WithField("alloc", alloc).Warn("Complete allocation desires to run")
case structs.AllocDesiredStatusStop:
callbacks.OnDeleted(alloc.JobID, ErrorAllocationCompleted)
allocations.Delete(alloc.ID)
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}
// handleFailedAllocationEvent logs only the last of the multiple failure events.
func handleFailedAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
// The stop is expected when the allocation desired to stop even before it failed.
reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop
handleStoppingAllocationEvent(alloc, allocations, callbacks, reschedulingExpected)
}
// handleLostAllocationEvent logs only the last of the multiple lost events.
func handleLostAllocationEvent(alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing) {
// The stop is expected when the allocation desired to stop even before it got lost.
reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop
handleStoppingAllocationEvent(alloc, allocations, callbacks, reschedulingExpected)
}
func handleStoppingAllocationEvent(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData],
callbacks *AllocationProcessing, reschedulingExpected bool) {
replacementAllocationScheduled := alloc.NextAllocation != ""
correctRescheduling := reschedulingExpected == replacementAllocationScheduled
removedByPoseidon := false
if !replacementAllocationScheduled {
var reason error
if correctRescheduling {
reason = ErrorAllocationStoppedUnexpectedly
} else {
reason = ErrorAllocationRescheduledUnexpectedly
}
removedByPoseidon = callbacks.OnDeleted(alloc.JobID, reason)
allocations.Delete(alloc.ID)
}
entry := log.WithField("job", alloc.JobID)
if !removedByPoseidon && !correctRescheduling {
entry.WithField("alloc", alloc).Warn("Unexpected Allocation Stopping / Restarting")
} else {
entry.Trace("Expected Allocation Stopping / Restarting")
}
}
// checkEvaluation checks whether the given evaluation failed.
// If the evaluation failed, it returns an error with a message containing the failure information.
func checkEvaluation(eval *nomadApi.Evaluation) (err error) {
if len(eval.FailedTGAllocs) == 0 {
if eval.Status != structs.EvalStatusComplete {
err = fmt.Errorf("%w: %q", ErrorEvaluation, eval.Status)
}
} else {
err = fmt.Errorf("evaluation %q finished with status %q but %w", eval.ID, eval.Status, ErrorPlacingAllocations)
for taskGroup, metrics := range eval.FailedTGAllocs {
err = fmt.Errorf("%w\n%s: %#v", err, taskGroup, metrics)
}
if eval.BlockedEval != "" {
err = fmt.Errorf("%w\nEvaluation %q waiting for additional capacity to place remainder", err, eval.BlockedEval)
}
}
return err
}
func (a *APIClient) MarkRunnerAsUsed(runnerID string, duration int) error {
job, err := a.job(runnerID)
if err != nil {
return fmt.Errorf("couldn't retrieve job info: %w", err)
}
configTaskGroup := FindAndValidateConfigTaskGroup(job)
configTaskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUsedValue
configTaskGroup.Meta[ConfigMetaTimeoutKey] = strconv.Itoa(duration)
_, err = a.RegisterNomadJob(job)
if err != nil {
return fmt.Errorf("couldn't update runner config: %w", err)
}
return nil
}
func (a *APIClient) LoadEnvironmentJobs() ([]*nomadApi.Job, error) {
jobStubs, err := a.listJobs(TemplateJobPrefix)
if err != nil {
return []*nomadApi.Job{}, fmt.Errorf("couldn't load jobs: %w", err)
}
jobs := make([]*nomadApi.Job, 0, len(jobStubs))
for _, jobStub := range jobStubs {
job, err := a.apiQuerier.job(jobStub.ID)
if err != nil {
return []*nomadApi.Job{}, fmt.Errorf("couldn't load job info for job %v: %w", jobStub.ID, err)
}
jobs = append(jobs, job)
}
return jobs, nil
}
// ExecuteCommand executes the given command in the given job.
// If tty is true, Nomad would normally write stdout and stderr of the command
// both on the stdout stream. However, if the InteractiveStderr server config option is true,
// we make sure that stdout and stderr are split correctly.
func (a *APIClient) ExecuteCommand(jobID string,
ctx context.Context, command string, tty bool, privilegedExecution bool,
stdin io.Reader, stdout, stderr io.Writer) (int, error) {
if tty && config.Config.Server.InteractiveStderr {
return a.executeCommandInteractivelyWithStderr(jobID, ctx, command, privilegedExecution, stdin, stdout, stderr)
}
command = prepareCommandWithoutTTY(command, privilegedExecution)
exitCode, err := a.apiQuerier.Execute(jobID, ctx, command, tty, stdin, stdout, stderr)
if err != nil {
return 1, fmt.Errorf("error executing command in job %s: %w", jobID, err)
}
return exitCode, nil
}
// executeCommandInteractivelyWithStderr executes the given command interactively and splits stdout
// and stderr correctly. Normally, using Nomad to execute a command with tty=true (in order to have
// an interactive connection and possibly a fully working shell), would result in stdout and stderr
// to be served both over stdout. This function circumvents this by creating a fifo for the stderr
// of the command and starting a second execution that reads the stderr from that fifo.
func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, ctx context.Context,
command string, privilegedExecution bool, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
// Use current nano time to make the stderr fifo kind of unique.
currentNanoTime := time.Now().UnixNano()
stderrExitChan := make(chan int)
go func() {
readingContext, cancel := context.WithCancel(ctx)
defer cancel()
// Catch stderr in separate execution.
logging.StartSpan("nomad.execute.stderr", "Execution for separate StdErr", ctx, func(ctx context.Context) {
exit, err := a.Execute(allocationID, ctx, prepareCommandTTYStdErr(currentNanoTime, privilegedExecution), true,
nullio.Reader{Ctx: readingContext}, stderr, io.Discard)
if err != nil {
log.WithContext(ctx).WithError(err).Warn("Stderr task finished with error")
}
stderrExitChan <- exit
})
}()
command = prepareCommandTTY(command, currentNanoTime, privilegedExecution)
var exit int
var err error
logging.StartSpan("nomad.execute.tty", "Interactive Execution", ctx, func(ctx context.Context) {
exit, err = a.Execute(allocationID, ctx, command, true, stdin, stdout, io.Discard)
})
// Wait until the stderr catch command finished to make sure we receive all output.
<-stderrExitChan
return exit, err
}
const (
// unsetEnvironmentVariablesFormat prepends the call to unset the passed variables before the actual command.
unsetEnvironmentVariablesFormat = "unset %s && %s"
// unsetEnvironmentVariablesPrefix is the prefix of all environment variables that will be filtered.
unsetEnvironmentVariablesPrefix = "NOMAD_"
// unsetEnvironmentVariablesShell is the shell functionality to get all environment variables starting with the prefix.
unsetEnvironmentVariablesShell = "${!" + unsetEnvironmentVariablesPrefix + "@}"
// stderrFifoFormat represents the format we use for our stderr fifos. The %d should be unique for the execution
// as otherwise multiple executions are not possible.
// Example: "/tmp/stderr_1623330777825234133.fifo".
stderrFifoFormat = "/tmp/stderr_%d.fifo"
// stderrFifoCommandFormat, if executed, is supposed to create a fifo, read from it and remove it in the end.
// Example: "mkfifo my.fifo && (cat my.fifo; rm my.fifo)".
stderrFifoCommandFormat = "mkfifo %s && (cat %s; rm %s)"
// stderrWrapperCommandFormat, if executed, is supposed to wait until a fifo exists (it sleeps 10ms to reduce load
// cause by busy waiting on the system). Once the fifo exists, the given command is executed and its stderr
// redirected to the fifo.
// Example: "until [ -e my.fifo ]; do sleep 0.01; done; (echo \"my.fifo exists\") 2> my.fifo".
stderrWrapperCommandFormat = "until [ -e %s ]; do sleep 0.01; done; (%s) 2> %s"
// setUserBinaryPath is due to Poseidon requires the setuser script for Nomad environments.
setUserBinaryPath = "/sbin/setuser"
// setUserBinaryUser is the user that is used and required by Poseidon for Nomad environments.
setUserBinaryUser = "user"
// PrivilegedExecution is to indicate the privileged execution of the passed command.
PrivilegedExecution = true
// UnprivilegedExecution is to indicate the unprivileged execution of the passed command.
UnprivilegedExecution = false
)
func prepareCommandWithoutTTY(command string, privilegedExecution bool) string {
const commandFieldAfterEnv = 4 // instead of "env CODEOCEAN=true /bin/bash -c sleep infinity" just "sleep infinity".
command = setInnerDebugMessages(command, commandFieldAfterEnv, -1)
command = setUserCommand(command, privilegedExecution)
command = unsetEnvironmentVariables(command)
return command
}
func prepareCommandTTY(command string, currentNanoTime int64, privilegedExecution bool) string {
const commandFieldAfterSettingEnvVariables = 4
command = setInnerDebugMessages(command, commandFieldAfterSettingEnvVariables, -1)
// Take the command to be executed and wrap it to redirect stderr.
stderrFifoPath := stderrFifo(currentNanoTime)
command = fmt.Sprintf(stderrWrapperCommandFormat, stderrFifoPath, command, stderrFifoPath)
command = injectStartDebugMessage(command, 0, 1)
command = setUserCommand(command, privilegedExecution)
command = unsetEnvironmentVariables(command)
return command
}
func prepareCommandTTYStdErr(currentNanoTime int64, privilegedExecution bool) string {
stderrFifoPath := stderrFifo(currentNanoTime)
command := fmt.Sprintf(stderrFifoCommandFormat, stderrFifoPath, stderrFifoPath, stderrFifoPath)
command = setInnerDebugMessages(command, 0, 1)
command = setUserCommand(command, privilegedExecution)
return command
}
func stderrFifo(id int64) string {
return fmt.Sprintf(stderrFifoFormat, id)
}
func unsetEnvironmentVariables(command string) string {
command = dto.WrapBashCommand(command)
command = fmt.Sprintf(unsetEnvironmentVariablesFormat, unsetEnvironmentVariablesShell, command)
// Debug Message
const commandFieldBeforeBash = 2 // e.g. instead of "unset ${!NOMAD_@} && /bin/bash -c [...]" just "unset ${!NOMAD_@}".
command = injectStartDebugMessage(command, 0, commandFieldBeforeBash)
return command
}
// setUserCommand prefixes the passed command with the setUser command.
func setUserCommand(command string, privilegedExecution bool) string {
// Wrap the inner command first so that the setUserBinary applies to the whole inner command.
command = dto.WrapBashCommand(command)
if !privilegedExecution {
command = fmt.Sprintf("%s %s %s", setUserBinaryPath, setUserBinaryUser, command)
}
// Debug Message
const commandFieldBeforeBash = 2 // e.g. instead of "/sbin/setuser user /bin/bash -c [...]" just "/sbin/setuser user".
command = injectStartDebugMessage(command, 0, commandFieldBeforeBash)
return command
}
func injectStartDebugMessage(command string, start uint, end int) string {
commandFields := strings.Fields(command)
if start < uint(len(commandFields)) {
commandFields = commandFields[start:]
end -= int(start)
}
if end >= 0 && end < len(commandFields) {
commandFields = commandFields[:end]
}
description := strings.Join(commandFields, " ")
if strings.HasPrefix(description, "\"") && strings.HasSuffix(description, "\"") {
description = description[1 : len(description)-1]
}
description = dto.BashEscapeCommand(description)
description = description[1 : len(description)-1] // The most outer quotes are not escaped!
return fmt.Sprintf(timeDebugMessageFormatStart, description, command)
}
// setInnerDebugMessages injects debug commands into the bash command.
// The debug messages are parsed by the SentryDebugWriter.
func setInnerDebugMessages(command string, descriptionStart uint, descriptionEnd int) (result string) {
result = injectStartDebugMessage(command, descriptionStart, descriptionEnd)
result = strings.TrimSuffix(result, ";")
return fmt.Sprintf(timeDebugMessageFormatEnd, result, "exit $ec")
}