Add context to log statements.
This commit is contained in:
@ -42,7 +42,7 @@ func NewRouter(runnerManager runner.Manager, environmentManager environment.Mana
|
||||
func configureV1Router(router *mux.Router,
|
||||
runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
|
||||
router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
log.WithField("request", r).Debug("Not Found Handler")
|
||||
log.WithContext(r.Context()).WithField("request", r).Debug("Not Found Handler")
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
})
|
||||
v1 := router.PathPrefix(BasePath).Subrouter()
|
||||
@ -75,10 +75,10 @@ func configureV1Router(router *mux.Router,
|
||||
|
||||
// Version handles the version route.
|
||||
// It responds the release information stored in the configuration.
|
||||
func Version(writer http.ResponseWriter, _ *http.Request) {
|
||||
func Version(writer http.ResponseWriter, request *http.Request) {
|
||||
release := config.Config.Sentry.Release
|
||||
if len(release) > 0 {
|
||||
sendJSON(writer, release, http.StatusOK)
|
||||
sendJSON(writer, release, http.StatusOK, request.Context())
|
||||
} else {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
@ -87,12 +87,12 @@ func Version(writer http.ResponseWriter, _ *http.Request) {
|
||||
// StatisticsExecutionEnvironments handles the route for statistics about execution environments.
|
||||
// It responds the prewarming pool size and the number of idle runners and used runners.
|
||||
func StatisticsExecutionEnvironments(manager environment.Manager) http.HandlerFunc {
|
||||
return func(writer http.ResponseWriter, _ *http.Request) {
|
||||
return func(writer http.ResponseWriter, request *http.Request) {
|
||||
result := make(map[string]*dto.StatisticalExecutionEnvironmentData)
|
||||
environmentsData := manager.Statistics()
|
||||
for id, data := range environmentsData {
|
||||
result[id.ToString()] = data
|
||||
}
|
||||
sendJSON(writer, result, http.StatusOK)
|
||||
sendJSON(writer, result, http.StatusOK, request.Context())
|
||||
}
|
||||
}
|
||||
|
@ -27,7 +27,8 @@ func HTTPAuthenticationMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
token := r.Header.Get(TokenHeader)
|
||||
if subtle.ConstantTimeCompare([]byte(token), correctAuthenticationToken) == 0 {
|
||||
log.WithField("token", logging.RemoveNewlineSymbol(token)).
|
||||
log.WithContext(r.Context()).
|
||||
WithField("token", logging.RemoveNewlineSymbol(token)).
|
||||
Warn("Incorrect token")
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
return
|
||||
|
@ -47,17 +47,17 @@ func (e *EnvironmentController) ConfigureRoutes(router *mux.Router) {
|
||||
func (e *EnvironmentController) list(writer http.ResponseWriter, request *http.Request) {
|
||||
fetch, err := parseFetchParameter(request)
|
||||
if err != nil {
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
environments, err := e.manager.List(fetch)
|
||||
if err != nil {
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
sendJSON(writer, ExecutionEnvironmentsResponse{environments}, http.StatusOK)
|
||||
sendJSON(writer, ExecutionEnvironmentsResponse{environments}, http.StatusOK, request.Context())
|
||||
}
|
||||
|
||||
// get returns all information about the requested execution environment.
|
||||
@ -65,12 +65,12 @@ func (e *EnvironmentController) get(writer http.ResponseWriter, request *http.Re
|
||||
environmentID, err := parseEnvironmentID(request)
|
||||
if err != nil {
|
||||
// This case is never used as the router validates the id format
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
fetch, err := parseFetchParameter(request)
|
||||
if err != nil {
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
@ -79,11 +79,11 @@ func (e *EnvironmentController) get(writer http.ResponseWriter, request *http.Re
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else if err != nil {
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
sendJSON(writer, executionEnvironment, http.StatusOK)
|
||||
sendJSON(writer, executionEnvironment, http.StatusOK, request.Context())
|
||||
}
|
||||
|
||||
// delete removes the specified execution environment.
|
||||
@ -91,13 +91,13 @@ func (e *EnvironmentController) delete(writer http.ResponseWriter, request *http
|
||||
environmentID, err := parseEnvironmentID(request)
|
||||
if err != nil {
|
||||
// This case is never used as the router validates the id format
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
found, err := e.manager.Delete(environmentID)
|
||||
if err != nil {
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
} else if !found {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
@ -111,12 +111,12 @@ func (e *EnvironmentController) delete(writer http.ResponseWriter, request *http
|
||||
func (e *EnvironmentController) createOrUpdate(writer http.ResponseWriter, request *http.Request) {
|
||||
req := new(dto.ExecutionEnvironmentRequest)
|
||||
if err := json.NewDecoder(request.Body).Decode(req); err != nil {
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
environmentID, err := parseEnvironmentID(request)
|
||||
if err != nil {
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
@ -125,7 +125,7 @@ func (e *EnvironmentController) createOrUpdate(writer http.ResponseWriter, reque
|
||||
created, err = e.manager.CreateOrUpdate(environmentID, *req, ctx)
|
||||
})
|
||||
if err != nil {
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
}
|
||||
|
||||
if created {
|
||||
|
@ -1,38 +1,40 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/openHPI/poseidon/pkg/dto"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func writeInternalServerError(writer http.ResponseWriter, err error, errorCode dto.ErrorCode) {
|
||||
sendJSON(writer, &dto.InternalServerError{Message: err.Error(), ErrorCode: errorCode}, http.StatusInternalServerError)
|
||||
func writeInternalServerError(writer http.ResponseWriter, err error, errorCode dto.ErrorCode, ctx context.Context) {
|
||||
sendJSON(writer, &dto.InternalServerError{Message: err.Error(), ErrorCode: errorCode},
|
||||
http.StatusInternalServerError, ctx)
|
||||
}
|
||||
|
||||
func writeClientError(writer http.ResponseWriter, err error, status uint16) {
|
||||
sendJSON(writer, &dto.ClientError{Message: err.Error()}, int(status))
|
||||
func writeClientError(writer http.ResponseWriter, err error, status uint16, ctx context.Context) {
|
||||
sendJSON(writer, &dto.ClientError{Message: err.Error()}, int(status), ctx)
|
||||
}
|
||||
|
||||
func sendJSON(writer http.ResponseWriter, content interface{}, httpStatusCode int) {
|
||||
func sendJSON(writer http.ResponseWriter, content interface{}, httpStatusCode int, ctx context.Context) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
writer.WriteHeader(httpStatusCode)
|
||||
response, err := json.Marshal(content)
|
||||
if err != nil {
|
||||
// cannot produce infinite recursive loop, since json.Marshal of dto.InternalServerError won't return an error
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, ctx)
|
||||
return
|
||||
}
|
||||
if _, err = writer.Write(response); err != nil {
|
||||
log.WithError(err).Error("Could not write JSON response")
|
||||
log.WithError(err).WithContext(ctx).Error("Could not write JSON response")
|
||||
http.Error(writer, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
func parseJSONRequestBody(writer http.ResponseWriter, request *http.Request, structure interface{}) error {
|
||||
if err := json.NewDecoder(request.Body).Decode(structure); err != nil {
|
||||
writeClientError(writer, err, http.StatusBadRequest)
|
||||
writeClientError(writer, err, http.StatusBadRequest, request.Context())
|
||||
return fmt.Errorf("error parsing JSON request body: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -74,18 +74,20 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, runner.ErrUnknownExecutionEnvironment):
|
||||
writeClientError(writer, err, http.StatusNotFound)
|
||||
writeClientError(writer, err, http.StatusNotFound, request.Context())
|
||||
case errors.Is(err, runner.ErrNoRunnersAvailable):
|
||||
log.WithField("environment", logging.RemoveNewlineSymbol(strconv.Itoa(int(environmentID)))).
|
||||
log.WithContext(request.Context()).
|
||||
WithField("environment", logging.RemoveNewlineSymbol(strconv.Itoa(int(environmentID)))).
|
||||
Warn("No runners available")
|
||||
writeInternalServerError(writer, err, dto.ErrorNomadOverload)
|
||||
writeInternalServerError(writer, err, dto.ErrorNomadOverload, request.Context())
|
||||
default:
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
}
|
||||
return
|
||||
}
|
||||
monitoring.AddRunnerMonitoringData(request, nextRunner.ID(), nextRunner.Environment())
|
||||
sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK)
|
||||
sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()},
|
||||
http.StatusOK, request.Context())
|
||||
}
|
||||
|
||||
// listFileSystem handles the files API route with the method GET.
|
||||
@ -112,11 +114,11 @@ func (r *RunnerController) listFileSystem(writer http.ResponseWriter, request *h
|
||||
err = targetRunner.ListFileSystem(path, recursive, writer, privilegedExecution, ctx)
|
||||
})
|
||||
if errors.Is(err, runner.ErrFileNotFound) {
|
||||
writeClientError(writer, err, http.StatusFailedDependency)
|
||||
writeClientError(writer, err, http.StatusFailedDependency, request.Context())
|
||||
return
|
||||
} else if err != nil {
|
||||
log.WithError(err).Error("Could not perform the requested listFileSystem.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
log.WithContext(request.Context()).WithError(err).Error("Could not perform the requested listFileSystem.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -138,8 +140,8 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request
|
||||
err = targetRunner.UpdateFileSystem(fileCopyRequest, ctx)
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not perform the requested updateFileSystem.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
log.WithContext(request.Context()).WithError(err).Error("Could not perform the requested updateFileSystem.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
@ -160,11 +162,11 @@ func (r *RunnerController) fileContent(writer http.ResponseWriter, request *http
|
||||
err = targetRunner.GetFileContent(path, writer, privilegedExecution, ctx)
|
||||
})
|
||||
if errors.Is(err, runner.ErrFileNotFound) {
|
||||
writeClientError(writer, err, http.StatusFailedDependency)
|
||||
writeClientError(writer, err, http.StatusFailedDependency, request.Context())
|
||||
return
|
||||
} else if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve the requested file.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
log.WithContext(request.Context()).WithError(err).Error("Could not retrieve the requested file.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -179,7 +181,7 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req
|
||||
}
|
||||
forbiddenCharacters := "'"
|
||||
if strings.ContainsAny(executionRequest.Command, forbiddenCharacters) {
|
||||
writeClientError(writer, ErrForbiddenCharacter, http.StatusBadRequest)
|
||||
writeClientError(writer, ErrForbiddenCharacter, http.StatusBadRequest, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
@ -194,14 +196,14 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req
|
||||
|
||||
path, err := r.runnerRouter.Get(WebsocketPath).URL(RunnerIDKey, targetRunner.ID())
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not create runner websocket URL.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
log.WithContext(request.Context()).WithError(err).Error("Could not create runner websocket URL.")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
newUUID, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not create execution id")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
log.WithContext(request.Context()).WithError(err).Error("Could not create execution id")
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
id := newUUID.String()
|
||||
@ -216,7 +218,7 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req
|
||||
RawQuery: fmt.Sprintf("%s=%s", ExecutionIDKey, id),
|
||||
}
|
||||
|
||||
sendJSON(writer, &dto.ExecutionResponse{WebSocketURL: webSocketURL.String()}, http.StatusOK)
|
||||
sendJSON(writer, &dto.ExecutionResponse{WebSocketURL: webSocketURL.String()}, http.StatusOK, request.Context())
|
||||
}
|
||||
|
||||
// The findRunnerMiddleware looks up the runnerId for routes containing it
|
||||
@ -230,9 +232,9 @@ func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler
|
||||
// See https://github.com/openHPI/poseidon/issues/54
|
||||
_, readErr := io.ReadAll(request.Body)
|
||||
if readErr != nil {
|
||||
log.WithError(readErr).Warn("Failed to discard the request body")
|
||||
log.WithContext(request.Context()).WithError(readErr).Warn("Failed to discard the request body")
|
||||
}
|
||||
writeClientError(writer, err, http.StatusGone)
|
||||
writeClientError(writer, err, http.StatusGone, request.Context())
|
||||
return
|
||||
}
|
||||
ctx := runner.NewContext(request.Context(), targetRunner)
|
||||
@ -252,7 +254,7 @@ func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Requ
|
||||
err = r.manager.Return(targetRunner)
|
||||
})
|
||||
if err != nil {
|
||||
writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError)
|
||||
writeInternalServerError(writer, err, dto.ErrorNomadInternalServerError, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/openHPI/poseidon/internal/api/ws"
|
||||
"github.com/openHPI/poseidon/internal/runner"
|
||||
@ -28,7 +29,7 @@ func upgradeConnection(writer http.ResponseWriter, request *http.Request) (ws.Co
|
||||
connUpgrader := websocket.Upgrader{}
|
||||
connection, err := connUpgrader.Upgrade(writer, request, nil)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Connection upgrade failed")
|
||||
log.WithContext(request.Context()).WithError(err).Warn("Connection upgrade failed")
|
||||
return nil, fmt.Errorf("error upgrading the connection: %w", err)
|
||||
}
|
||||
return connection, nil
|
||||
@ -61,13 +62,13 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
||||
var exitInfo runner.ExitInfo
|
||||
select {
|
||||
case <-wp.ctx.Done():
|
||||
log.Info("Client closed the connection")
|
||||
log.WithContext(wp.ctx).Info("Client closed the connection")
|
||||
wp.Input.Stop()
|
||||
cancelExecution()
|
||||
<-exit // /internal/runner/runner.go handleExitOrContextDone does not require client connection anymore.
|
||||
<-exit // The goroutine closes this channel indicating that it does not use the connection to the executor anymore.
|
||||
case exitInfo = <-exit:
|
||||
log.Info("Execution returned")
|
||||
log.WithContext(wp.ctx).Info("Execution returned")
|
||||
wp.Input.Stop()
|
||||
wp.Output.SendExitInfo(&exitInfo)
|
||||
}
|
||||
@ -80,27 +81,30 @@ func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *
|
||||
|
||||
executionID := request.URL.Query().Get(ExecutionIDKey)
|
||||
if !targetRunner.ExecutionExists(executionID) {
|
||||
writeClientError(writer, ErrUnknownExecutionID, http.StatusNotFound)
|
||||
writeClientError(writer, ErrUnknownExecutionID, http.StatusNotFound, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
connection, err := upgradeConnection(writer, request)
|
||||
if err != nil {
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||
writeInternalServerError(writer, err, dto.ErrorUnknown, request.Context())
|
||||
return
|
||||
}
|
||||
|
||||
// ToDo: Why can we not inherit from request.Context() here?
|
||||
proxyCtx, cancelProxy := context.WithCancel(context.Background())
|
||||
proxyCtx = sentry.SetHubOnContext(proxyCtx, sentry.GetHubFromContext(request.Context()))
|
||||
defer cancelProxy()
|
||||
proxy := newWebSocketProxy(connection, proxyCtx)
|
||||
|
||||
log.WithField("runnerId", targetRunner.ID()).
|
||||
log.WithContext(proxyCtx).WithField("runnerId", targetRunner.ID()).
|
||||
WithField("executionID", logging.RemoveNewlineSymbol(executionID)).
|
||||
Info("Running execution")
|
||||
logging.StartSpan("api.runner.connect", "Execute Interactively", request.Context(), func(ctx context.Context) {
|
||||
exit, cancel, err := targetRunner.ExecuteInteractively(executionID,
|
||||
proxy.Input, proxy.Output.StdOut(), proxy.Output.StdErr(), ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Cannot execute request.")
|
||||
log.WithContext(ctx).WithError(err).Warn("Cannot execute request.")
|
||||
return // The proxy is stopped by the deferred cancel.
|
||||
}
|
||||
|
||||
|
@ -80,7 +80,7 @@ func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) {
|
||||
case <-readMessage:
|
||||
}
|
||||
|
||||
if inputContainsError(messageType, err) {
|
||||
if inputContainsError(messageType, err, loopContext) {
|
||||
return
|
||||
}
|
||||
if handleInput(reader, cr.buffer, loopContext) {
|
||||
@ -93,11 +93,11 @@ func (cr *codeOceanToRawReader) readInputLoop(ctx context.Context) {
|
||||
func handleInput(reader io.Reader, buffer chan byte, ctx context.Context) (done bool) {
|
||||
message, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("error while reading WebSocket message")
|
||||
log.WithContext(ctx).WithError(err).Warn("error while reading WebSocket message")
|
||||
return true
|
||||
}
|
||||
|
||||
log.WithField("message", string(message)).Trace("Received message from client")
|
||||
log.WithContext(ctx).WithField("message", string(message)).Trace("Received message from client")
|
||||
for _, character := range message {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -108,17 +108,17 @@ func handleInput(reader io.Reader, buffer chan byte, ctx context.Context) (done
|
||||
return false
|
||||
}
|
||||
|
||||
func inputContainsError(messageType int, err error) (done bool) {
|
||||
func inputContainsError(messageType int, err error, ctx context.Context) (done bool) {
|
||||
if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
log.Debug("ReadInputLoop: The client closed the connection!")
|
||||
log.WithContext(ctx).Debug("ReadInputLoop: The client closed the connection!")
|
||||
// The close handler will do something soon.
|
||||
return true
|
||||
} else if err != nil {
|
||||
log.WithError(err).Warn("Error reading client message")
|
||||
log.WithContext(ctx).WithError(err).Warn("Error reading client message")
|
||||
return true
|
||||
}
|
||||
if messageType != websocket.TextMessage {
|
||||
log.WithField("messageType", messageType).Warn("Received message of wrong type")
|
||||
log.WithContext(ctx).WithField("messageType", messageType).Warn("Received message of wrong type")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -94,7 +94,7 @@ func (nc *nomadAPIClient) Execute(runnerID string,
|
||||
ctx context.Context, cmd string, tty bool,
|
||||
stdin io.Reader, stdout, stderr io.Writer,
|
||||
) (int, error) {
|
||||
log.WithField("command", strings.ReplaceAll(cmd, "\n", "")).Trace("Requesting Nomad Exec")
|
||||
log.WithContext(ctx).WithField("command", strings.ReplaceAll(cmd, "\n", "")).Trace("Requesting Nomad Exec")
|
||||
var allocations []*nomadApi.AllocationListStub
|
||||
var err error
|
||||
logging.StartSpan("nomad.execute.list", "List Allocations for id", ctx, func(_ context.Context) {
|
||||
@ -132,10 +132,10 @@ func (nc *nomadAPIClient) Execute(runnerID string,
|
||||
case err == nil:
|
||||
return exitCode, nil
|
||||
case websocket.IsCloseError(errors.Unwrap(err), websocket.CloseNormalClosure):
|
||||
log.WithField("runnerID", runnerID).WithError(err).Info("The exit code could not be received.")
|
||||
log.WithContext(ctx).WithField("runnerID", runnerID).WithError(err).Info("The exit code could not be received.")
|
||||
return 0, nil
|
||||
case errors.Is(err, context.Canceled):
|
||||
log.WithField("runnerID", runnerID).Debug("Execution canceled by context")
|
||||
log.WithContext(ctx).WithField("runnerID", runnerID).Debug("Execution canceled by context")
|
||||
return 0, nil
|
||||
default:
|
||||
return 1, fmt.Errorf("error executing command in allocation: %w", err)
|
||||
|
@ -482,7 +482,7 @@ func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, c
|
||||
exit, err := a.Execute(allocationID, ctx, prepareCommandTTYStdErr(currentNanoTime, privilegedExecution), true,
|
||||
nullio.Reader{Ctx: readingContext}, stderr, io.Discard)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
|
||||
log.WithContext(ctx).WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
|
||||
}
|
||||
stderrExitChan <- exit
|
||||
})
|
||||
|
@ -56,7 +56,7 @@ func (s *SentryDebugWriter) Write(p []byte) (n int, err error) {
|
||||
|
||||
match := matchAndMapTimeDebugMessage(p)
|
||||
if match == nil {
|
||||
log.WithField("data", p).Warn("Exec debug message could not be read completely")
|
||||
log.WithContext(s.Ctx).WithField("data", p).Warn("Exec debug message could not be read completely")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
@ -93,7 +93,7 @@ func (s *SentryDebugWriter) Close(exitCode int) {
|
||||
func (s *SentryDebugWriter) handleTimeDebugMessage(match map[string][]byte) {
|
||||
timestamp, err := strconv.ParseInt(string(match["time"]), 10, 64)
|
||||
if err != nil {
|
||||
log.WithField("match", match).Warn("Could not parse Unix timestamp")
|
||||
log.WithContext(s.Ctx).WithField("match", match).Warn("Could not parse Unix timestamp")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -153,7 +153,7 @@ func (w *AWSFunctionWorkload) executeCommand(ctx context.Context, command []stri
|
||||
Cmd: command,
|
||||
Files: w.fs,
|
||||
}
|
||||
log.WithField("request", data).Trace("Sending request to AWS")
|
||||
log.WithContext(ctx).WithField("request", data).Trace("Sending request to AWS")
|
||||
rawData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
exit <- ExitInfo{uint8(1), fmt.Errorf("cannot stingify aws function request: %w", err)}
|
||||
@ -202,7 +202,7 @@ func (w *AWSFunctionWorkload) receiveOutput(
|
||||
|
||||
switch wsMessage.Type {
|
||||
default:
|
||||
log.WithField("data", wsMessage).Warn("unexpected message from aws function")
|
||||
log.WithContext(ctx).WithField("data", wsMessage).Warn("unexpected message from aws function")
|
||||
case dto.WebSocketExit:
|
||||
return wsMessage.ExitCode, nil
|
||||
case dto.WebSocketOutputStdout:
|
||||
|
@ -134,7 +134,7 @@ func (m *NomadRunnerManager) keepRunnersSynced(ctx context.Context) {
|
||||
err := m.apiClient.WatchEventStream(ctx,
|
||||
&nomad.AllocationProcessoring{OnNew: m.onAllocationAdded, OnDeleted: m.onAllocationStopped})
|
||||
retries += 1
|
||||
log.WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
|
||||
log.WithContext(ctx).WithError(err).Errorf("Stopped updating the runners! Retry %v", retries)
|
||||
<-time.After(time.Second)
|
||||
}
|
||||
}
|
||||
|
@ -164,7 +164,7 @@ func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest, ct
|
||||
r.ResetTimeout()
|
||||
|
||||
var tarBuffer bytes.Buffer
|
||||
if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer); err != nil {
|
||||
if err := createTarArchiveForFiles(copyRequest.Copy, &tarBuffer, ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -201,7 +201,7 @@ func (r *NomadJob) GetFileContent(
|
||||
p.AddTag(monitoring.InfluxKeyRunnerID, r.ID())
|
||||
environmentID, err := nomad.EnvironmentIDFromRunnerID(r.ID())
|
||||
if err != nil {
|
||||
log.WithField("runnerID", r.ID()).WithError(err).Warn("can not parse environment id")
|
||||
log.WithContext(ctx).WithField("runnerID", r.ID()).WithError(err).Warn("can not parse environment id")
|
||||
}
|
||||
p.AddTag(monitoring.InfluxKeyEnvironmentID, environmentID.ToString())
|
||||
defer contentLengthWriter.SendMonitoringData(p)
|
||||
@ -283,26 +283,26 @@ func (r *NomadJob) handleExitOrContextDone(ctx context.Context, cancelExecute co
|
||||
// log.WithField("runner", r.id).Warn("Could not send SIGQUIT because nothing was written")
|
||||
// }
|
||||
if err != nil {
|
||||
log.WithField("runner", r.id).WithError(err).Warn("Could not send SIGQUIT due to error")
|
||||
log.WithContext(ctx).WithField("runner", r.id).WithError(err).Warn("Could not send SIGQUIT due to error")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-exitInternal:
|
||||
log.WithField("runner", r.id).Debug("Execution terminated after SIGQUIT")
|
||||
log.WithContext(ctx).WithField("runner", r.id).Debug("Execution terminated after SIGQUIT")
|
||||
case <-time.After(executionTimeoutGracePeriod):
|
||||
log.WithField("runner", r.id).Info("Execution did not quit after SIGQUIT")
|
||||
log.WithContext(ctx).WithField("runner", r.id).Info("Execution did not quit after SIGQUIT")
|
||||
if err := r.Destroy(); err != nil {
|
||||
log.WithField("runner", r.id).Error("Error when destroying runner")
|
||||
log.WithContext(ctx).WithField("runner", r.id).Error("Error when destroying runner")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createTarArchiveForFiles(filesToCopy []dto.File, w io.Writer) error {
|
||||
func createTarArchiveForFiles(filesToCopy []dto.File, w io.Writer, ctx context.Context) error {
|
||||
tarWriter := tar.NewWriter(w)
|
||||
for _, file := range filesToCopy {
|
||||
if err := tarWriter.WriteHeader(tarHeader(file)); err != nil {
|
||||
err := fmt.Errorf("error writing tar file header: %w", err)
|
||||
log.
|
||||
log.WithContext(ctx).
|
||||
WithField("path", base64.StdEncoding.EncodeToString([]byte(file.Path))).
|
||||
WithField("content", base64.StdEncoding.EncodeToString(file.Content)).
|
||||
Error(err)
|
||||
@ -310,7 +310,7 @@ func createTarArchiveForFiles(filesToCopy []dto.File, w io.Writer) error {
|
||||
}
|
||||
if _, err := tarWriter.Write(file.ByteContent()); err != nil {
|
||||
err := fmt.Errorf("error writing tar file content: %w", err)
|
||||
log.
|
||||
log.WithContext(ctx).
|
||||
WithField("path", base64.StdEncoding.EncodeToString([]byte(file.Path))).
|
||||
WithField("content", base64.StdEncoding.EncodeToString(file.Content)).
|
||||
Error(err)
|
||||
|
@ -77,7 +77,7 @@ func HTTPLoggingMiddleware(next http.Handler) http.Handler {
|
||||
next.ServeHTTP(lrw, r)
|
||||
|
||||
latency := time.Now().UTC().Sub(start)
|
||||
logEntry := log.WithFields(logrus.Fields{
|
||||
logEntry := log.WithContext(r.Context()).WithFields(logrus.Fields{
|
||||
"code": lrw.StatusCode,
|
||||
"method": r.Method,
|
||||
"path": path,
|
||||
|
@ -130,12 +130,12 @@ func addEnvironmentID(r *http.Request, id dto.EnvironmentID) {
|
||||
func AddRequestSize(r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to read request body")
|
||||
log.WithContext(r.Context()).WithError(err).Warn("Failed to read request body")
|
||||
}
|
||||
|
||||
err = r.Body.Close()
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to close request body")
|
||||
log.WithContext(r.Context()).WithError(err).Warn("Failed to close request body")
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(body))
|
||||
|
||||
@ -185,7 +185,7 @@ func addInfluxDBField(r *http.Request, key string, value interface{}) {
|
||||
func dataPointFromRequest(r *http.Request) *write.Point {
|
||||
p, ok := r.Context().Value(influxdbContextKey).(*write.Point)
|
||||
if !ok {
|
||||
log.Error("All http request must contain an influxdb data point!")
|
||||
log.WithContext(r.Context()).Error("All http request must contain an influxdb data point!")
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func (w *Ls2JsonWriter) Write(p []byte) (int, error) {
|
||||
if len(line) != 0 {
|
||||
count, err := w.writeLine(line)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Could not write line to Target")
|
||||
log.WithContext(w.Ctx).WithError(err).Warn("Could not write line to Target")
|
||||
return count, err
|
||||
}
|
||||
}
|
||||
@ -87,7 +87,7 @@ func (w *Ls2JsonWriter) initializeJSONObject() (count int, err error) {
|
||||
if !w.jsonStartSent {
|
||||
count, err = w.Target.Write([]byte("{\"files\": ["))
|
||||
if count == 0 || err != nil {
|
||||
log.WithError(err).Warn("Could not write to target")
|
||||
log.WithContext(w.Ctx).WithError(err).Warn("Could not write to target")
|
||||
err = fmt.Errorf("could not write to target: %w", err)
|
||||
} else {
|
||||
w.jsonStartSent = true
|
||||
@ -102,7 +102,7 @@ func (w *Ls2JsonWriter) Close() {
|
||||
if w.jsonStartSent {
|
||||
count, err := w.Target.Write([]byte("]}"))
|
||||
if count == 0 || err != nil {
|
||||
log.WithError(err).Warn("Could not Close ls2json writer")
|
||||
log.WithContext(w.Ctx).WithError(err).Warn("Could not Close ls2json writer")
|
||||
}
|
||||
w.sentrySpan.Finish()
|
||||
}
|
||||
@ -163,7 +163,7 @@ func (w *Ls2JsonWriter) parseFileHeader(matches [][]byte) ([]byte, error) {
|
||||
name = dto.FilePath(parts[0])
|
||||
linkTarget = dto.FilePath(parts[1])
|
||||
} else {
|
||||
log.Error("could not split link into name and target")
|
||||
log.WithContext(w.Ctx).Error("could not split link into name and target")
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user