Fix goroutine leak in the nullio reader

This commit is contained in:
Maximilian Paß
2021-12-11 22:04:20 +01:00
parent c565ca217e
commit 9f0b04660f
3 changed files with 30 additions and 8 deletions

View File

@@ -374,9 +374,12 @@ func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, c
stderrExitChan := make(chan int) stderrExitChan := make(chan int)
go func() { go func() {
readingContext, cancel := context.WithCancel(ctx)
defer cancel()
// Catch stderr in separate execution. // Catch stderr in separate execution.
exit, err := a.Execute(allocationID, ctx, stderrFifoCommand(currentNanoTime), true, exit, err := a.Execute(allocationID, ctx, stderrFifoCommand(currentNanoTime), true,
nullio.Reader{}, stderr, io.Discard) nullio.Reader{Ctx: readingContext}, stderr, io.Discard)
if err != nil { if err != nil {
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error") log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
} }

View File

@@ -1,16 +1,31 @@
package nullio package nullio
import ( import (
"context"
"fmt" "fmt"
"io" "io"
) )
// Reader is a struct that implements the io.Reader interface. Read does not return when called. // Reader is a struct that implements the io.Reader interface.
type Reader struct{} // Read does not return when called until the context is done. It is used to avoid reading anything and returning io.EOF
// before the context has finished.
// For example the reader is used by the execution that fetches the stderr stream from Nomad. We do not have a stdin
// that we want to send to Nomad. But we still have to pass Nomad a reader.
// Normally readers send an io.EOF as soon as they have nothing more to read. But we want to avoid this, because in that
// case Nomad will abort (not the execution but) the transmission.
// Why should the reader not just always return 0, nil? Because Nomad reads in an endless loop and thus a busy waiting
// is avoided.
type Reader struct {
Ctx context.Context
}
func (r Reader) Read(_ []byte) (int, error) { func (r Reader) Read(_ []byte) (int, error) {
// An empty select blocks forever. if r.Ctx.Err() != nil {
select {} return 0, io.EOF
}
<-r.Ctx.Done()
return 0, io.EOF
} }
// ReadWriter implements io.ReadWriter. It does not return from Read and discards everything on Write. // ReadWriter implements io.ReadWriter. It does not return from Read and discards everything on Write.

View File

@@ -1,8 +1,9 @@
package nullio package nullio
import ( import (
"context"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "io"
"testing" "testing"
"time" "time"
) )
@@ -10,12 +11,15 @@ import (
const shortTimeout = 100 * time.Millisecond const shortTimeout = 100 * time.Millisecond
func TestReaderDoesNotReturnImmediately(t *testing.T) { func TestReaderDoesNotReturnImmediately(t *testing.T) {
reader := &Reader{} readingContext, cancel := context.WithCancel(context.Background())
defer cancel()
reader := &Reader{readingContext}
readerReturned := make(chan bool) readerReturned := make(chan bool)
go func() { go func() {
p := make([]byte, 0, 5) p := make([]byte, 0, 5)
_, err := reader.Read(p) _, err := reader.Read(p)
require.NoError(t, err) assert.ErrorIs(t, io.EOF, err)
close(readerReturned) close(readerReturned)
}() }()