Monitor file download.
This commit is contained in:

committed by
Sebastian Serth

parent
939f36dac6
commit
5e5e13806e
@ -9,6 +9,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
nomadApi "github.com/hashicorp/nomad/api"
|
nomadApi "github.com/hashicorp/nomad/api"
|
||||||
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
"github.com/openHPI/poseidon/internal/nomad"
|
"github.com/openHPI/poseidon/internal/nomad"
|
||||||
"github.com/openHPI/poseidon/pkg/dto"
|
"github.com/openHPI/poseidon/pkg/dto"
|
||||||
"github.com/openHPI/poseidon/pkg/monitoring"
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
||||||
@ -190,6 +191,15 @@ func (r *NomadJob) GetFileContent(
|
|||||||
r.ResetTimeout()
|
r.ResetTimeout()
|
||||||
|
|
||||||
contentLengthWriter := &nullio.ContentLengthWriter{Target: content}
|
contentLengthWriter := &nullio.ContentLengthWriter{Target: content}
|
||||||
|
p := influxdb2.NewPointWithMeasurement(monitoring.MeasurementFileDownload)
|
||||||
|
p.AddTag(monitoring.InfluxKeyRunnerID, r.ID())
|
||||||
|
environmentID, err := nomad.EnvironmentIDFromRunnerID(r.ID())
|
||||||
|
if err != nil {
|
||||||
|
log.WithField("runnerID", r.ID()).WithError(err).Warn("can not parse environment id")
|
||||||
|
}
|
||||||
|
p.AddTag(monitoring.InfluxKeyEnvironmentID, environmentID.ToString())
|
||||||
|
defer contentLengthWriter.SendMonitoringData(p)
|
||||||
|
|
||||||
retrieveCommand := (&dto.ExecutionRequest{
|
retrieveCommand := (&dto.ExecutionRequest{
|
||||||
Command: fmt.Sprintf("%s %q && cat %q", lsCommand, path, path),
|
Command: fmt.Sprintf("%s %q && cat %q", lsCommand, path, path),
|
||||||
}).FullCommand()
|
}).FullCommand()
|
||||||
|
@ -28,11 +28,14 @@ const (
|
|||||||
MeasurementExecutionsNomad = measurementPrefix + "nomad_executions"
|
MeasurementExecutionsNomad = measurementPrefix + "nomad_executions"
|
||||||
MeasurementEnvironments = measurementPrefix + "environments"
|
MeasurementEnvironments = measurementPrefix + "environments"
|
||||||
MeasurementUsedRunner = measurementPrefix + "used_runners"
|
MeasurementUsedRunner = measurementPrefix + "used_runners"
|
||||||
|
MeasurementFileDownload = measurementPrefix + "file_download"
|
||||||
|
|
||||||
// The keys for the monitored tags and fields.
|
// The keys for the monitored tags and fields.
|
||||||
|
|
||||||
InfluxKeyRunnerID = "runner_id"
|
InfluxKeyRunnerID = "runner_id"
|
||||||
InfluxKeyEnvironmentID = "environment_id"
|
InfluxKeyEnvironmentID = "environment_id"
|
||||||
|
InfluxKeyActualContentLength = "actual_length"
|
||||||
|
InfluxKeyExpectedContentLength = "expected_length"
|
||||||
InfluxKeyDuration = "duration"
|
InfluxKeyDuration = "duration"
|
||||||
InfluxKeyStartupDuration = "startup_" + InfluxKeyDuration
|
InfluxKeyStartupDuration = "startup_" + InfluxKeyDuration
|
||||||
influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size"
|
influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size"
|
||||||
|
@ -3,7 +3,10 @@ package nullio
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||||
|
"github.com/openHPI/poseidon/pkg/monitoring"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrRegexMatching = errors.New("could not match content length")
|
var ErrRegexMatching = errors.New("could not match content length")
|
||||||
@ -12,20 +15,31 @@ var ErrRegexMatching = errors.New("could not match content length")
|
|||||||
// It parses the size from the first line as Content Length Header and streams the following data to the Target.
|
// It parses the size from the first line as Content Length Header and streams the following data to the Target.
|
||||||
// The first line is expected to follow the format headerLineRegex.
|
// The first line is expected to follow the format headerLineRegex.
|
||||||
type ContentLengthWriter struct {
|
type ContentLengthWriter struct {
|
||||||
Target http.ResponseWriter
|
Target http.ResponseWriter
|
||||||
contentLengthSet bool
|
contentLengthSet bool
|
||||||
firstLine []byte
|
firstLine []byte
|
||||||
|
expectedContentLength int
|
||||||
|
actualContentLength int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *ContentLengthWriter) Write(p []byte) (count int, err error) {
|
func (w *ContentLengthWriter) Write(p []byte) (count int, err error) {
|
||||||
if w.contentLengthSet {
|
if w.contentLengthSet {
|
||||||
count, err = w.Target.Write(p)
|
return w.handleDataForwarding(p)
|
||||||
if err != nil {
|
} else {
|
||||||
err = fmt.Errorf("could not write to target: %w", err)
|
return w.handleContentLengthParsing(p)
|
||||||
}
|
|
||||||
return count, err
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ContentLengthWriter) handleDataForwarding(p []byte) (int, error) {
|
||||||
|
count, err := w.Target.Write(p)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("could not write to target: %w", err)
|
||||||
|
}
|
||||||
|
w.actualContentLength += count
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ContentLengthWriter) handleContentLengthParsing(p []byte) (count int, err error) {
|
||||||
for i, char := range p {
|
for i, char := range p {
|
||||||
if char != '\n' {
|
if char != '\n' {
|
||||||
continue
|
continue
|
||||||
@ -38,6 +52,10 @@ func (w *ContentLengthWriter) Write(p []byte) (count int, err error) {
|
|||||||
return 0, ErrRegexMatching
|
return 0, ErrRegexMatching
|
||||||
}
|
}
|
||||||
size := string(matches[headerLineGroupSize])
|
size := string(matches[headerLineGroupSize])
|
||||||
|
w.expectedContentLength, err = strconv.Atoi(size)
|
||||||
|
if err != nil {
|
||||||
|
log.WithField("size", size).Warn("could not parse content length")
|
||||||
|
}
|
||||||
w.Target.Header().Set("Content-Length", size)
|
w.Target.Header().Set("Content-Length", size)
|
||||||
w.contentLengthSet = true
|
w.contentLengthSet = true
|
||||||
|
|
||||||
@ -57,3 +75,10 @@ func (w *ContentLengthWriter) Write(p []byte) (count int, err error) {
|
|||||||
|
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SendMonitoringData will send a monitoring event of the content length read and written.
|
||||||
|
func (w *ContentLengthWriter) SendMonitoringData(p *write.Point) {
|
||||||
|
p.AddField(monitoring.InfluxKeyExpectedContentLength, w.expectedContentLength)
|
||||||
|
p.AddField(monitoring.InfluxKeyActualContentLength, w.actualContentLength)
|
||||||
|
monitoring.WriteInfluxPoint(p)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user