Fix incomplete debug message
that is created by sending SIGQUIT to the bash process by not processing output after the the client disconnected / we have sent the SIGQUIT.
This commit is contained in:

committed by
Sebastian Serth

parent
0fd6e42487
commit
90092c48c1
@ -65,6 +65,7 @@ func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecuti
|
|||||||
case <-wp.ctx.Done():
|
case <-wp.ctx.Done():
|
||||||
log.WithContext(wp.ctx).Info("Client closed the connection")
|
log.WithContext(wp.ctx).Info("Client closed the connection")
|
||||||
wp.Input.Stop()
|
wp.Input.Stop()
|
||||||
|
wp.Output.Close(nil)
|
||||||
cancelExecution()
|
cancelExecution()
|
||||||
<-exit // /internal/runner/runner.go handleExitOrContextDone does not require client connection anymore.
|
<-exit // /internal/runner/runner.go handleExitOrContextDone does not require client connection anymore.
|
||||||
<-exit // The goroutine closes this channel indicating that it does not use the connection to the executor anymore.
|
<-exit // The goroutine closes this channel indicating that it does not use the connection to the executor anymore.
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/openHPI/poseidon/internal/nomad"
|
"github.com/openHPI/poseidon/internal/nomad"
|
||||||
"github.com/openHPI/poseidon/internal/runner"
|
"github.com/openHPI/poseidon/internal/runner"
|
||||||
@ -16,14 +17,23 @@ const CodeOceanOutputWriterBufferSize = 64
|
|||||||
|
|
||||||
// rawToCodeOceanWriter is a simple io.Writer implementation that just forwards the call to sendMessage.
|
// rawToCodeOceanWriter is a simple io.Writer implementation that just forwards the call to sendMessage.
|
||||||
type rawToCodeOceanWriter struct {
|
type rawToCodeOceanWriter struct {
|
||||||
sendMessage func(string)
|
outputType dto.WebSocketMessageType
|
||||||
|
sendMessage func(*dto.WebSocketMessage)
|
||||||
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements the io.Writer interface.
|
// Write implements the io.Writer interface.
|
||||||
func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) {
|
func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) {
|
||||||
rc.sendMessage(string(p))
|
switch {
|
||||||
|
case rc.ctx.Err() != nil:
|
||||||
|
return 0, fmt.Errorf("CodeOceanWriter context done: %w", rc.ctx.Err())
|
||||||
|
case len(p) == 0:
|
||||||
|
return 0, nil
|
||||||
|
default:
|
||||||
|
rc.sendMessage(&dto.WebSocketMessage{Type: rc.outputType, Data: string(p)})
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WebSocketWriter is an interface that defines which data is required and which information can be passed.
|
// WebSocketWriter is an interface that defines which data is required and which information can be passed.
|
||||||
type WebSocketWriter interface {
|
type WebSocketWriter interface {
|
||||||
@ -36,8 +46,8 @@ type WebSocketWriter interface {
|
|||||||
// It forwards the data written to stdOut or stdErr (Nomad, AWS) to the WebSocket connection (CodeOcean).
|
// It forwards the data written to stdOut or stdErr (Nomad, AWS) to the WebSocket connection (CodeOcean).
|
||||||
type codeOceanOutputWriter struct {
|
type codeOceanOutputWriter struct {
|
||||||
connection Connection
|
connection Connection
|
||||||
stdOut io.Writer
|
stdOut *rawToCodeOceanWriter
|
||||||
stdErr io.Writer
|
stdErr *rawToCodeOceanWriter
|
||||||
queue chan *writingLoopMessage
|
queue chan *writingLoopMessage
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
@ -51,18 +61,22 @@ type writingLoopMessage struct {
|
|||||||
// NewCodeOceanOutputWriter provides an codeOceanOutputWriter for the time the context ctx is active.
|
// NewCodeOceanOutputWriter provides an codeOceanOutputWriter for the time the context ctx is active.
|
||||||
// The codeOceanOutputWriter handles all the messages defined in the websocket.schema.json (start, timeout, stdout, ..).
|
// The codeOceanOutputWriter handles all the messages defined in the websocket.schema.json (start, timeout, stdout, ..).
|
||||||
func NewCodeOceanOutputWriter(
|
func NewCodeOceanOutputWriter(
|
||||||
connection Connection, ctx context.Context, done context.CancelFunc) *codeOceanOutputWriter {
|
connection Connection, ctx context.Context, done context.CancelFunc) WebSocketWriter {
|
||||||
cw := &codeOceanOutputWriter{
|
cw := &codeOceanOutputWriter{
|
||||||
connection: connection,
|
connection: connection,
|
||||||
queue: make(chan *writingLoopMessage, CodeOceanOutputWriterBufferSize),
|
queue: make(chan *writingLoopMessage, CodeOceanOutputWriterBufferSize),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
cw.stdOut = &rawToCodeOceanWriter{sendMessage: func(s string) {
|
cw.stdOut = &rawToCodeOceanWriter{
|
||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputStdout, Data: s})
|
outputType: dto.WebSocketOutputStdout,
|
||||||
}}
|
sendMessage: cw.send,
|
||||||
cw.stdErr = &rawToCodeOceanWriter{sendMessage: func(s string) {
|
ctx: ctx,
|
||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputStderr, Data: s})
|
}
|
||||||
}}
|
cw.stdErr = &rawToCodeOceanWriter{
|
||||||
|
outputType: dto.WebSocketOutputStderr,
|
||||||
|
sendMessage: cw.send,
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
|
||||||
go cw.startWritingLoop(done)
|
go cw.startWritingLoop(done)
|
||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaStart})
|
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketMetaStart})
|
||||||
@ -81,9 +95,13 @@ func (cw *codeOceanOutputWriter) StdErr() io.Writer {
|
|||||||
|
|
||||||
// Close forwards the kind of exit (timeout, error, normal) to CodeOcean.
|
// Close forwards the kind of exit (timeout, error, normal) to CodeOcean.
|
||||||
// This results in the closing of the WebSocket connection.
|
// This results in the closing of the WebSocket connection.
|
||||||
|
// The call of Close is mandantory!
|
||||||
func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) {
|
func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) {
|
||||||
|
defer func() { cw.queue <- &writingLoopMessage{done: true} }()
|
||||||
// Mask the internal stop reason before disclosing/forwarding it externally/to CodeOcean.
|
// Mask the internal stop reason before disclosing/forwarding it externally/to CodeOcean.
|
||||||
switch {
|
switch {
|
||||||
|
case info == nil:
|
||||||
|
return
|
||||||
case info.Err == nil:
|
case info.Err == nil:
|
||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: info.Code})
|
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketExit, ExitCode: info.Code})
|
||||||
case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout):
|
case errors.Is(info.Err, context.DeadlineExceeded) || errors.Is(info.Err, runner.ErrorRunnerInactivityTimeout):
|
||||||
@ -92,7 +110,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) {
|
|||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: runner.ErrOOMKilled.Error()})
|
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: runner.ErrOOMKilled.Error()})
|
||||||
case errors.Is(info.Err, nomad.ErrorAllocationCompleted), errors.Is(info.Err, runner.ErrDestroyedByAPIRequest):
|
case errors.Is(info.Err, nomad.ErrorAllocationCompleted), errors.Is(info.Err, runner.ErrDestroyedByAPIRequest):
|
||||||
message := "the allocation stopped as expected"
|
message := "the allocation stopped as expected"
|
||||||
log.WithContext(cw.ctx).WithError(info.Err).Debug(message)
|
log.WithContext(cw.ctx).WithError(info.Err).Trace(message)
|
||||||
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: message})
|
cw.send(&dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: message})
|
||||||
default:
|
default:
|
||||||
errorMessage := "Error executing the request"
|
errorMessage := "Error executing the request"
|
||||||
@ -103,15 +121,7 @@ func (cw *codeOceanOutputWriter) Close(info *runner.ExitInfo) {
|
|||||||
|
|
||||||
// send forwards the passed dto.WebSocketMessage to the writing loop.
|
// send forwards the passed dto.WebSocketMessage to the writing loop.
|
||||||
func (cw *codeOceanOutputWriter) send(message *dto.WebSocketMessage) {
|
func (cw *codeOceanOutputWriter) send(message *dto.WebSocketMessage) {
|
||||||
select {
|
cw.queue <- &writingLoopMessage{done: false, data: message}
|
||||||
case <-cw.ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
done := message.Type == dto.WebSocketExit ||
|
|
||||||
message.Type == dto.WebSocketMetaTimeout ||
|
|
||||||
message.Type == dto.WebSocketOutputError
|
|
||||||
cw.queue <- &writingLoopMessage{done: done, data: message}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// startWritingLoop enables the writing loop.
|
// startWritingLoop enables the writing loop.
|
||||||
@ -128,18 +138,18 @@ func (cw *codeOceanOutputWriter) startWritingLoop(writingLoopDone context.Cancel
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
message := <-cw.queue
|
||||||
case <-cw.ctx.Done():
|
done := true
|
||||||
return
|
if message.data != nil {
|
||||||
case message := <-cw.queue:
|
done = sendMessage(cw.connection, message.data, cw.ctx)
|
||||||
done := sendMessage(cw.connection, message.data, cw.ctx)
|
}
|
||||||
if done || message.done {
|
if done || message.done {
|
||||||
|
log.Debug("Writing loop done")
|
||||||
writingLoopDone()
|
writingLoopDone()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// sendMessage is a helper function for the writing loop. It must not be called from somewhere else!
|
// sendMessage is a helper function for the writing loop. It must not be called from somewhere else!
|
||||||
func sendMessage(connection Connection, message *dto.WebSocketMessage, ctx context.Context) (done bool) {
|
func sendMessage(connection Connection, message *dto.WebSocketMessage, ctx context.Context) (done bool) {
|
||||||
|
@ -44,6 +44,15 @@ func NewSentryDebugWriter(target io.Writer, ctx context.Context) *SentryDebugWri
|
|||||||
|
|
||||||
// Improve: Handling of a split debug messages (usually, p is exactly one debug message, not less and not more).
|
// Improve: Handling of a split debug messages (usually, p is exactly one debug message, not less and not more).
|
||||||
func (s *SentryDebugWriter) Write(p []byte) (n int, err error) {
|
func (s *SentryDebugWriter) Write(p []byte) (n int, err error) {
|
||||||
|
if s.Ctx.Err() != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
// Peaking if the target is able to write.
|
||||||
|
// If not we should not process the data (see #325).
|
||||||
|
if _, err = s.Target.Write([]byte{}); err != nil {
|
||||||
|
return 0, fmt.Errorf("SentryDebugWriter cannot write to target: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if !timeDebugMessagePatternStart.Match(p) {
|
if !timeDebugMessagePatternStart.Match(p) {
|
||||||
count, err := s.Target.Write(p)
|
count, err := s.Target.Write(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Reference in New Issue
Block a user