From 5e5e13806e410e702c3aee2f1b85b6bc2ff1a0dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Pa=C3=9F?= <22845248+mpass99@users.noreply.github.com> Date: Tue, 25 Oct 2022 15:49:33 +0100 Subject: [PATCH] Monitor file download. --- internal/runner/nomad_runner.go | 10 +++++++ pkg/monitoring/influxdb2_middleware.go | 3 ++ pkg/nullio/content_length.go | 41 +++++++++++++++++++++----- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index faf8309..d569395 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" nomadApi "github.com/hashicorp/nomad/api" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/monitoring" @@ -190,6 +191,15 @@ func (r *NomadJob) GetFileContent( r.ResetTimeout() contentLengthWriter := &nullio.ContentLengthWriter{Target: content} + p := influxdb2.NewPointWithMeasurement(monitoring.MeasurementFileDownload) + p.AddTag(monitoring.InfluxKeyRunnerID, r.ID()) + environmentID, err := nomad.EnvironmentIDFromRunnerID(r.ID()) + if err != nil { + log.WithField("runnerID", r.ID()).WithError(err).Warn("can not parse environment id") + } + p.AddTag(monitoring.InfluxKeyEnvironmentID, environmentID.ToString()) + defer contentLengthWriter.SendMonitoringData(p) + retrieveCommand := (&dto.ExecutionRequest{ Command: fmt.Sprintf("%s %q && cat %q", lsCommand, path, path), }).FullCommand() diff --git a/pkg/monitoring/influxdb2_middleware.go b/pkg/monitoring/influxdb2_middleware.go index cbf7a9c..248f7b3 100644 --- a/pkg/monitoring/influxdb2_middleware.go +++ b/pkg/monitoring/influxdb2_middleware.go @@ -28,11 +28,14 @@ const ( MeasurementExecutionsNomad = measurementPrefix + "nomad_executions" MeasurementEnvironments = measurementPrefix + "environments" MeasurementUsedRunner = measurementPrefix + "used_runners" + MeasurementFileDownload = measurementPrefix + "file_download" // The keys for the monitored tags and fields. InfluxKeyRunnerID = "runner_id" InfluxKeyEnvironmentID = "environment_id" + InfluxKeyActualContentLength = "actual_length" + InfluxKeyExpectedContentLength = "expected_length" InfluxKeyDuration = "duration" InfluxKeyStartupDuration = "startup_" + InfluxKeyDuration influxKeyEnvironmentPrewarmingPoolSize = "prewarming_pool_size" diff --git a/pkg/nullio/content_length.go b/pkg/nullio/content_length.go index 2877c60..93ec5ca 100644 --- a/pkg/nullio/content_length.go +++ b/pkg/nullio/content_length.go @@ -3,7 +3,10 @@ package nullio import ( "errors" "fmt" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/openHPI/poseidon/pkg/monitoring" "net/http" + "strconv" ) var ErrRegexMatching = errors.New("could not match content length") @@ -12,20 +15,31 @@ var ErrRegexMatching = errors.New("could not match content length") // It parses the size from the first line as Content Length Header and streams the following data to the Target. // The first line is expected to follow the format headerLineRegex. type ContentLengthWriter struct { - Target http.ResponseWriter - contentLengthSet bool - firstLine []byte + Target http.ResponseWriter + contentLengthSet bool + firstLine []byte + expectedContentLength int + actualContentLength int } func (w *ContentLengthWriter) Write(p []byte) (count int, err error) { if w.contentLengthSet { - count, err = w.Target.Write(p) - if err != nil { - err = fmt.Errorf("could not write to target: %w", err) - } - return count, err + return w.handleDataForwarding(p) + } else { + return w.handleContentLengthParsing(p) } +} +func (w *ContentLengthWriter) handleDataForwarding(p []byte) (int, error) { + count, err := w.Target.Write(p) + if err != nil { + err = fmt.Errorf("could not write to target: %w", err) + } + w.actualContentLength += count + return count, err +} + +func (w *ContentLengthWriter) handleContentLengthParsing(p []byte) (count int, err error) { for i, char := range p { if char != '\n' { continue @@ -38,6 +52,10 @@ func (w *ContentLengthWriter) Write(p []byte) (count int, err error) { return 0, ErrRegexMatching } size := string(matches[headerLineGroupSize]) + w.expectedContentLength, err = strconv.Atoi(size) + if err != nil { + log.WithField("size", size).Warn("could not parse content length") + } w.Target.Header().Set("Content-Length", size) w.contentLengthSet = true @@ -57,3 +75,10 @@ func (w *ContentLengthWriter) Write(p []byte) (count int, err error) { return len(p), nil } + +// SendMonitoringData will send a monitoring event of the content length read and written. +func (w *ContentLengthWriter) SendMonitoringData(p *write.Point) { + p.AddField(monitoring.InfluxKeyExpectedContentLength, w.expectedContentLength) + p.AddField(monitoring.InfluxKeyActualContentLength, w.actualContentLength) + monitoring.WriteInfluxPoint(p) +}