Implement merge request comments.
This commit is contained in:

committed by
Sebastian Serth

parent
cc0c425197
commit
4fb6ab980b
@ -89,6 +89,7 @@ func (nc *nomadAPIClient) Execute(runnerID string,
|
|||||||
ctx context.Context, command []string, tty bool,
|
ctx context.Context, command []string, tty bool,
|
||||||
stdin io.Reader, stdout, stderr io.Writer,
|
stdin io.Reader, stdout, stderr io.Writer,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
|
log.WithField("command", command).Trace("Requesting Nomad Exec")
|
||||||
var allocations []*nomadApi.AllocationListStub
|
var allocations []*nomadApi.AllocationListStub
|
||||||
var err error
|
var err error
|
||||||
logging.StartSpan("nomad.execute.list", "List Allocations for id", ctx, func(_ context.Context) {
|
logging.StartSpan("nomad.execute.list", "List Allocations for id", ctx, func(_ context.Context) {
|
||||||
@ -111,7 +112,10 @@ func (nc *nomadAPIClient) Execute(runnerID string,
|
|||||||
|
|
||||||
var exitCode int
|
var exitCode int
|
||||||
logging.StartSpan("nomad.execute.exec", "Execute Command in Allocation", ctx, func(ctx context.Context) {
|
logging.StartSpan("nomad.execute.exec", "Execute Command in Allocation", ctx, func(ctx context.Context) {
|
||||||
exitCode, err = nc.client.Allocations().Exec(ctx, allocation, TaskName, tty, command, stdin, stdout, stderr, nil, nil)
|
debugWriter := NewSentryDebugWriter(stdout, ctx)
|
||||||
|
exitCode, err = nc.client.Allocations().
|
||||||
|
Exec(ctx, allocation, TaskName, tty, command, stdin, debugWriter, stderr, nil, nil)
|
||||||
|
debugWriter.Close(exitCode)
|
||||||
})
|
})
|
||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
|
@ -398,8 +398,7 @@ func (a *APIClient) ExecuteCommand(jobID string,
|
|||||||
return a.executeCommandInteractivelyWithStderr(jobID, ctx, command, privilegedExecution, stdin, stdout, stderr)
|
return a.executeCommandInteractivelyWithStderr(jobID, ctx, command, privilegedExecution, stdin, stdout, stderr)
|
||||||
}
|
}
|
||||||
command = prepareCommandWithoutTTY(command, privilegedExecution)
|
command = prepareCommandWithoutTTY(command, privilegedExecution)
|
||||||
debugWriter := &SentryDebugWriter{Target: stdout, Ctx: ctx}
|
exitCode, err := a.apiQuerier.Execute(jobID, ctx, command, tty, stdin, stdout, stderr)
|
||||||
exitCode, err := a.apiQuerier.Execute(jobID, ctx, command, tty, stdin, debugWriter, stderr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 1, fmt.Errorf("error executing command in job %s: %w", jobID, err)
|
return 1, fmt.Errorf("error executing command in job %s: %w", jobID, err)
|
||||||
}
|
}
|
||||||
@ -423,9 +422,8 @@ func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, c
|
|||||||
|
|
||||||
// Catch stderr in separate execution.
|
// Catch stderr in separate execution.
|
||||||
logging.StartSpan("nomad.execute.stderr", "Execution for separate StdErr", ctx, func(ctx context.Context) {
|
logging.StartSpan("nomad.execute.stderr", "Execution for separate StdErr", ctx, func(ctx context.Context) {
|
||||||
debugWriterStderr := &SentryDebugWriter{Target: stderr, Ctx: ctx}
|
|
||||||
exit, err := a.Execute(allocationID, ctx, prepareCommandTTYStdErr(currentNanoTime, privilegedExecution), true,
|
exit, err := a.Execute(allocationID, ctx, prepareCommandTTYStdErr(currentNanoTime, privilegedExecution), true,
|
||||||
nullio.Reader{Ctx: readingContext}, debugWriterStderr, io.Discard)
|
nullio.Reader{Ctx: readingContext}, stderr, io.Discard)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
|
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
|
||||||
}
|
}
|
||||||
@ -437,8 +435,7 @@ func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, c
|
|||||||
var exit int
|
var exit int
|
||||||
var err error
|
var err error
|
||||||
logging.StartSpan("nomad.execute.tty", "Interactive Execution", ctx, func(ctx context.Context) {
|
logging.StartSpan("nomad.execute.tty", "Interactive Execution", ctx, func(ctx context.Context) {
|
||||||
debugWriter := &SentryDebugWriter{Target: stdout, Ctx: ctx}
|
exit, err = a.Execute(allocationID, ctx, command, true, stdin, stdout, io.Discard)
|
||||||
exit, err = a.Execute(allocationID, ctx, command, true, stdin, debugWriter, io.Discard)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Wait until the stderr catch command finished to make sure we receive all output.
|
// Wait until the stderr catch command finished to make sure we receive all output.
|
||||||
@ -481,7 +478,7 @@ func prepareCommandWithoutTTY(srcCommands []string, privilegedExecution bool) []
|
|||||||
commands := make([]string, len(srcCommands)) // nozero The size is required for the copy.
|
commands := make([]string, len(srcCommands)) // nozero The size is required for the copy.
|
||||||
copy(commands, srcCommands)
|
copy(commands, srcCommands)
|
||||||
|
|
||||||
commands[len(commands)-1] = setInnerDebugMessages(commands[len(commands)-1])
|
commands[len(commands)-1] = setInnerDebugMessages(commands[len(commands)-1], false)
|
||||||
commands = setUserCommand(commands, privilegedExecution)
|
commands = setUserCommand(commands, privilegedExecution)
|
||||||
commands[len(commands)-1] = fmt.Sprintf("'%s'", commands[len(commands)-1])
|
commands[len(commands)-1] = fmt.Sprintf("'%s'", commands[len(commands)-1])
|
||||||
cmd := strings.Join(commands, " ")
|
cmd := strings.Join(commands, " ")
|
||||||
@ -495,7 +492,7 @@ func prepareCommandTTY(srcCommands []string, currentNanoTime int64, privilegedEx
|
|||||||
commands := make([]string, len(srcCommands)) // nozero The size is required for the copy.
|
commands := make([]string, len(srcCommands)) // nozero The size is required for the copy.
|
||||||
copy(commands, srcCommands)
|
copy(commands, srcCommands)
|
||||||
|
|
||||||
commands[len(commands)-1] = setInnerDebugMessages(commands[len(commands)-1])
|
commands[len(commands)-1] = setInnerDebugMessages(commands[len(commands)-1], true)
|
||||||
commands = setUserCommand(commands, privilegedExecution)
|
commands = setUserCommand(commands, privilegedExecution)
|
||||||
// Take the last command which is the one to be executed and wrap it to redirect stderr.
|
// Take the last command which is the one to be executed and wrap it to redirect stderr.
|
||||||
stderrFifoPath := stderrFifo(currentNanoTime)
|
stderrFifoPath := stderrFifo(currentNanoTime)
|
||||||
@ -511,7 +508,7 @@ func prepareCommandTTY(srcCommands []string, currentNanoTime int64, privilegedEx
|
|||||||
func prepareCommandTTYStdErr(currentNanoTime int64, privilegedExecution bool) []string {
|
func prepareCommandTTYStdErr(currentNanoTime int64, privilegedExecution bool) []string {
|
||||||
stderrFifoPath := stderrFifo(currentNanoTime)
|
stderrFifoPath := stderrFifo(currentNanoTime)
|
||||||
cmd := fmt.Sprintf(stderrFifoCommandFormat, stderrFifoPath, stderrFifoPath, stderrFifoPath)
|
cmd := fmt.Sprintf(stderrFifoCommandFormat, stderrFifoPath, stderrFifoPath, stderrFifoPath)
|
||||||
cmd = setInnerDebugMessages(cmd)
|
cmd = setInnerDebugMessages(cmd, false)
|
||||||
return setUserCommand([]string{"bash", "-c", cmd}, privilegedExecution)
|
return setUserCommand([]string{"bash", "-c", cmd}, privilegedExecution)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -529,8 +526,14 @@ func stderrFifo(id int64) string {
|
|||||||
return fmt.Sprintf(stderrFifoFormat, id)
|
return fmt.Sprintf(stderrFifoFormat, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setInnerDebugMessages(command string) (result string) {
|
// setInnerDebugMessages injects debug commands into the bash command.
|
||||||
result = fmt.Sprintf(timeDebugMessageFormatStart, "innerCommand", command)
|
// The debug messages are parsed by the SentryDebugWriter.
|
||||||
|
func setInnerDebugMessages(command string, includeCommandInDescription bool) (result string) {
|
||||||
|
description := "innerCommand"
|
||||||
|
if includeCommandInDescription {
|
||||||
|
description += fmt.Sprintf(" %s", command)
|
||||||
|
}
|
||||||
|
result = fmt.Sprintf(timeDebugMessageFormatStart, description, command)
|
||||||
result = strings.TrimSuffix(result, ";")
|
result = strings.TrimSuffix(result, ";")
|
||||||
return fmt.Sprintf(timeDebugMessageFormatEnd, result, "End")
|
return fmt.Sprintf(timeDebugMessageFormatEnd, result, "End")
|
||||||
}
|
}
|
||||||
|
@ -17,13 +17,11 @@ const (
|
|||||||
timeDebugMessageFormatStart = timeDebugMessageFormat + "; %s"
|
timeDebugMessageFormatStart = timeDebugMessageFormat + "; %s"
|
||||||
// Format Parameters: 1. command, 2. Debug Comment.
|
// Format Parameters: 1. command, 2. Debug Comment.
|
||||||
timeDebugMessageFormatEnd = "%s && " + timeDebugMessageFormat
|
timeDebugMessageFormatEnd = "%s && " + timeDebugMessageFormat
|
||||||
|
|
||||||
timeDebugMessagePatternGroupText = 1
|
|
||||||
timeDebugMessagePatternGroupTime = 2
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
timeDebugMessagePattern = regexp.MustCompile(`\x1EPoseidon (?P<text>\w+) (?P<time>\d+)\x1E`)
|
timeDebugMessagePattern = regexp.
|
||||||
|
MustCompile(`(?P<before>.*)\x1EPoseidon (?P<text>.+) (?P<time>\d{13})\x1E(?P<after>.*)`)
|
||||||
timeDebugMessagePatternStart = regexp.MustCompile(`\x1EPoseidon`)
|
timeDebugMessagePatternStart = regexp.MustCompile(`\x1EPoseidon`)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,54 +34,81 @@ type SentryDebugWriter struct {
|
|||||||
lastSpan *sentry.Span
|
lastSpan *sentry.Span
|
||||||
}
|
}
|
||||||
|
|
||||||
// Improve: Handling of a split debug messages (usually p is exactly one debug message, not less and not more).
|
func NewSentryDebugWriter(target io.Writer, ctx context.Context) *SentryDebugWriter {
|
||||||
|
span := sentry.StartSpan(ctx, "nomad.execute.connect")
|
||||||
|
return &SentryDebugWriter{
|
||||||
|
Target: target,
|
||||||
|
Ctx: ctx,
|
||||||
|
lastSpan: span,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Improve: Handling of a split debug messages (usually, p is exactly one debug message, not less and not more).
|
||||||
func (s *SentryDebugWriter) Write(p []byte) (n int, err error) {
|
func (s *SentryDebugWriter) Write(p []byte) (n int, err error) {
|
||||||
if !timeDebugMessagePatternStart.Match(p) {
|
if !timeDebugMessagePatternStart.Match(p) {
|
||||||
count, err := s.Target.Write(p)
|
count, err := s.Target.Write(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("SentryDebugWriter Forwarded: %w", err)
|
err = fmt.Errorf("SentryDebugWriter Forwarded Error: %w", err)
|
||||||
}
|
}
|
||||||
return count, err
|
return count, err
|
||||||
}
|
}
|
||||||
|
|
||||||
loc := timeDebugMessagePattern.FindIndex(p)
|
match := matchAndMapTimeDebugMessage(p)
|
||||||
if loc == nil {
|
if match == nil {
|
||||||
log.WithField("data", p).Warn("Exec debug message could not be read completely")
|
log.WithField("data", p).Warn("Exec debug message could not be read completely")
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.handleTimeDebugMessage(p[loc[0]:loc[1]])
|
go s.handleTimeDebugMessage(match)
|
||||||
|
|
||||||
debugMessageLength := loc[1] - loc[0]
|
lengthRemainingData := len(match["before"]) + len(match["after"])
|
||||||
if debugMessageLength < len(p) {
|
if lengthRemainingData > 0 {
|
||||||
count, err := s.Write(append(p[0:loc[0]], p[loc[1]:]...))
|
count, err := s.Write(append(match["before"], match["after"]...))
|
||||||
return debugMessageLength + count, err
|
return len(p) - (lengthRemainingData - count), err
|
||||||
} else {
|
} else {
|
||||||
return debugMessageLength, nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SentryDebugWriter) handleTimeDebugMessage(message []byte) {
|
func (s *SentryDebugWriter) Close(exitCode int) {
|
||||||
if s.lastSpan != nil {
|
if s.lastSpan != nil {
|
||||||
|
s.lastSpan.Op = "nomad.execute.disconnect"
|
||||||
|
s.lastSpan.SetTag("exit_code", strconv.Itoa(exitCode))
|
||||||
|
s.lastSpan.Finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleTimeDebugMessage transforms one time debug message into a Sentry span.
|
||||||
|
// It requires match to contain the keys `time` and `text`.
|
||||||
|
func (s *SentryDebugWriter) handleTimeDebugMessage(match map[string][]byte) {
|
||||||
|
timestamp, err := strconv.ParseInt(string(match["time"]), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.WithField("match", match).Warn("Could not parse Unix timestamp")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.lastSpan != nil {
|
||||||
|
s.lastSpan.EndTime = time.UnixMilli(timestamp)
|
||||||
|
s.lastSpan.SetData("latency", time.Since(time.UnixMilli(timestamp)).String())
|
||||||
s.lastSpan.Finish()
|
s.lastSpan.Finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
matches := timeDebugMessagePattern.FindSubmatch(message)
|
|
||||||
if matches == nil {
|
|
||||||
log.WithField("msg", message).Error("Cannot parse passed time debug message")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
timestamp, err := strconv.ParseInt(string(matches[timeDebugMessagePatternGroupTime]), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
log.WithField("matches", matches).Warn("Could not parse Unix timestamp")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.lastSpan = sentry.StartSpan(s.Ctx, "nomad.execute.pipe")
|
|
||||||
s.lastSpan.Description = string(matches[timeDebugMessagePatternGroupText])
|
|
||||||
s.lastSpan.StartTime = time.UnixMilli(timestamp)
|
|
||||||
s.lastSpan.Finish()
|
|
||||||
|
|
||||||
s.lastSpan = sentry.StartSpan(s.Ctx, "nomad.execute.bash")
|
s.lastSpan = sentry.StartSpan(s.Ctx, "nomad.execute.bash")
|
||||||
s.lastSpan.Description = string(matches[timeDebugMessagePatternGroupText])
|
s.lastSpan.Description = string(match["text"])
|
||||||
|
s.lastSpan.StartTime = time.UnixMilli(timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func matchAndMapTimeDebugMessage(p []byte) map[string][]byte {
|
||||||
|
match := timeDebugMessagePattern.FindSubmatch(p)
|
||||||
|
if match == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
labelMap := make(map[string][]byte)
|
||||||
|
for i, name := range timeDebugMessagePattern.SubexpNames() {
|
||||||
|
if i != 0 && name != "" {
|
||||||
|
labelMap[name] = match[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return labelMap
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package nomad
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"testing"
|
"testing"
|
||||||
@ -9,7 +10,7 @@ import (
|
|||||||
|
|
||||||
func TestSentryDebugWriter_Write(t *testing.T) {
|
func TestSentryDebugWriter_Write(t *testing.T) {
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
w := SentryDebugWriter{Target: buf}
|
w := SentryDebugWriter{Target: buf, Ctx: context.Background()}
|
||||||
|
|
||||||
description := "TestDebugMessageDescription"
|
description := "TestDebugMessageDescription"
|
||||||
data := "\x1EPoseidon " + description + " 1676646791482\x1E"
|
data := "\x1EPoseidon " + description + " 1676646791482\x1E"
|
||||||
|
Reference in New Issue
Block a user