Add the Environment ID to the influxdb data.
Also move the interface of an execution environment into its own file, execution_environment.go.
This commit is contained in:
@ -47,8 +47,8 @@ func configureV1Router(router *mux.Router,
|
|||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
})
|
})
|
||||||
v1 := router.PathPrefix(BasePath).Subrouter()
|
v1 := router.PathPrefix(BasePath).Subrouter()
|
||||||
v1.HandleFunc(HealthPath, Health).Methods(http.MethodGet)
|
v1.HandleFunc(HealthPath, Health).Methods(http.MethodGet).Name(HealthPath)
|
||||||
v1.HandleFunc(VersionPath, Version).Methods(http.MethodGet)
|
v1.HandleFunc(VersionPath, Version).Methods(http.MethodGet).Name(VersionPath)
|
||||||
|
|
||||||
runnerController := &RunnerController{manager: runnerManager}
|
runnerController := &RunnerController{manager: runnerManager}
|
||||||
environmentController := &EnvironmentController{manager: environmentManager}
|
environmentController := &EnvironmentController{manager: environmentManager}
|
||||||
@ -59,7 +59,8 @@ func configureV1Router(router *mux.Router,
|
|||||||
// May add a statistics controller if another route joins
|
// May add a statistics controller if another route joins
|
||||||
statisticsRouter := router.PathPrefix(StatisticsPath).Subrouter()
|
statisticsRouter := router.PathPrefix(StatisticsPath).Subrouter()
|
||||||
statisticsRouter.
|
statisticsRouter.
|
||||||
HandleFunc(EnvironmentsPath, StatisticsExecutionEnvironments(environmentManager)).Methods(http.MethodGet)
|
HandleFunc(EnvironmentsPath, StatisticsExecutionEnvironments(environmentManager)).
|
||||||
|
Methods(http.MethodGet).Name(EnvironmentsPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
if auth.InitializeAuthentication() {
|
if auth.InitializeAuthentication() {
|
||||||
|
@ -51,7 +51,6 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
environmentID := dto.EnvironmentID(runnerRequest.ExecutionEnvironmentID)
|
environmentID := dto.EnvironmentID(runnerRequest.ExecutionEnvironmentID)
|
||||||
logging.AddEnvironmentID(request, environmentID)
|
|
||||||
|
|
||||||
nextRunner, err := r.manager.Claim(environmentID, runnerRequest.InactivityTimeout)
|
nextRunner, err := r.manager.Claim(environmentID, runnerRequest.InactivityTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -66,7 +65,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logging.AddRunnerID(request, nextRunner.ID())
|
addMonitoringData(request, nextRunner)
|
||||||
sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK)
|
sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,7 +79,7 @@ func (r *RunnerController) updateFileSystem(writer http.ResponseWriter, request
|
|||||||
}
|
}
|
||||||
|
|
||||||
targetRunner, _ := runner.FromContext(request.Context())
|
targetRunner, _ := runner.FromContext(request.Context())
|
||||||
logging.AddRunnerID(request, targetRunner.ID())
|
addMonitoringData(request, targetRunner)
|
||||||
if err := targetRunner.UpdateFileSystem(fileCopyRequest); err != nil {
|
if err := targetRunner.UpdateFileSystem(fileCopyRequest); err != nil {
|
||||||
log.WithError(err).Error("Could not perform the requested updateFileSystem.")
|
log.WithError(err).Error("Could not perform the requested updateFileSystem.")
|
||||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||||
@ -106,7 +105,7 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req
|
|||||||
scheme = "ws"
|
scheme = "ws"
|
||||||
}
|
}
|
||||||
targetRunner, _ := runner.FromContext(request.Context())
|
targetRunner, _ := runner.FromContext(request.Context())
|
||||||
logging.AddRunnerID(request, targetRunner.ID())
|
addMonitoringData(request, targetRunner)
|
||||||
|
|
||||||
path, err := r.runnerRouter.Get(WebsocketPath).URL(RunnerIDKey, targetRunner.ID())
|
path, err := r.runnerRouter.Get(WebsocketPath).URL(RunnerIDKey, targetRunner.ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -158,7 +157,7 @@ func (r *RunnerController) findRunnerMiddleware(next http.Handler) http.Handler
|
|||||||
// It destroys the given runner on the executor and removes it from the used runners list.
|
// It destroys the given runner on the executor and removes it from the used runners list.
|
||||||
func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Request) {
|
func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Request) {
|
||||||
targetRunner, _ := runner.FromContext(request.Context())
|
targetRunner, _ := runner.FromContext(request.Context())
|
||||||
logging.AddRunnerID(request, targetRunner.ID())
|
addMonitoringData(request, targetRunner)
|
||||||
|
|
||||||
err := r.manager.Return(targetRunner)
|
err := r.manager.Return(targetRunner)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -172,3 +171,9 @@ func (r *RunnerController) delete(writer http.ResponseWriter, request *http.Requ
|
|||||||
|
|
||||||
writer.WriteHeader(http.StatusNoContent)
|
writer.WriteHeader(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addMonitoringData adds the data of the runner and environment we want to monitor.
|
||||||
|
func addMonitoringData(request *http.Request, r runner.Runner) {
|
||||||
|
logging.AddRunnerID(request, r.ID())
|
||||||
|
logging.AddEnvironmentID(request, r.Environment())
|
||||||
|
}
|
||||||
|
@ -254,6 +254,7 @@ func (s *UpdateFileSystemRouteTestSuite) SetupTest() {
|
|||||||
s.path = routeURL.String()
|
s.path = routeURL.String()
|
||||||
s.runnerMock = &runner.RunnerMock{}
|
s.runnerMock = &runner.RunnerMock{}
|
||||||
s.runnerMock.On("ID").Return(tests.DefaultMockID)
|
s.runnerMock.On("ID").Return(tests.DefaultMockID)
|
||||||
|
s.runnerMock.On("Environment").Return(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger))
|
||||||
s.runnerManager.On("Get", tests.DefaultMockID).Return(s.runnerMock, nil)
|
s.runnerManager.On("Get", tests.DefaultMockID).Return(s.runnerMock, nil)
|
||||||
s.recorder = httptest.NewRecorder()
|
s.recorder = httptest.NewRecorder()
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/openHPI/poseidon/internal/runner"
|
"github.com/openHPI/poseidon/internal/runner"
|
||||||
"github.com/openHPI/poseidon/pkg/dto"
|
"github.com/openHPI/poseidon/pkg/dto"
|
||||||
"github.com/openHPI/poseidon/pkg/logging"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
@ -337,7 +336,8 @@ func (wp *webSocketProxy) writeMessage(messageType int, data []byte) error {
|
|||||||
// connectToRunner is the endpoint for websocket connections.
|
// connectToRunner is the endpoint for websocket connections.
|
||||||
func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
||||||
targetRunner, _ := runner.FromContext(request.Context())
|
targetRunner, _ := runner.FromContext(request.Context())
|
||||||
logging.AddRunnerID(request, targetRunner.ID())
|
addMonitoringData(request, targetRunner)
|
||||||
|
|
||||||
executionID := request.URL.Query().Get(ExecutionIDKey)
|
executionID := request.URL.Query().Get(ExecutionIDKey)
|
||||||
if !targetRunner.ExecutionExists(executionID) {
|
if !targetRunner.ExecutionExists(executionID) {
|
||||||
writeNotFound(writer, ErrUnknownExecutionID)
|
writeNotFound(writer, ErrUnknownExecutionID)
|
||||||
|
49
internal/runner/execution_environment.go
Normal file
49
internal/runner/execution_environment.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package runner
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/openHPI/poseidon/pkg/dto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExecutionEnvironment are groups of runner that share the configuration stored in the environment.
|
||||||
|
type ExecutionEnvironment interface {
|
||||||
|
json.Marshaler
|
||||||
|
|
||||||
|
// ID returns the id of the environment.
|
||||||
|
ID() dto.EnvironmentID
|
||||||
|
SetID(id dto.EnvironmentID)
|
||||||
|
// PrewarmingPoolSize sets the number of idle runner of this environment that should be prewarmed.
|
||||||
|
PrewarmingPoolSize() uint
|
||||||
|
SetPrewarmingPoolSize(count uint)
|
||||||
|
// ApplyPrewarmingPoolSize creates idle runners according to the PrewarmingPoolSize.
|
||||||
|
ApplyPrewarmingPoolSize() error
|
||||||
|
// CPULimit sets the share of cpu that a runner should receive at minimum.
|
||||||
|
CPULimit() uint
|
||||||
|
SetCPULimit(limit uint)
|
||||||
|
// MemoryLimit sets the amount of memory that should be available for each runner.
|
||||||
|
MemoryLimit() uint
|
||||||
|
SetMemoryLimit(limit uint)
|
||||||
|
// Image sets the image of the runner, e.g. Docker image.
|
||||||
|
Image() string
|
||||||
|
SetImage(image string)
|
||||||
|
// NetworkAccess sets if a runner should have network access and if ports should be mapped.
|
||||||
|
NetworkAccess() (bool, []uint16)
|
||||||
|
SetNetworkAccess(allow bool, ports []uint16)
|
||||||
|
// SetConfigFrom copies all above attributes from the passed environment to the object itself.
|
||||||
|
SetConfigFrom(environment ExecutionEnvironment)
|
||||||
|
|
||||||
|
// Register saves this environment at the executor.
|
||||||
|
Register() error
|
||||||
|
// Delete removes this environment and all it's runner from the executor and Poseidon itself.
|
||||||
|
Delete() error
|
||||||
|
|
||||||
|
// Sample returns and removes an arbitrary available runner.
|
||||||
|
// ok is true iff a runner was returned.
|
||||||
|
Sample() (r Runner, ok bool)
|
||||||
|
// AddRunner adds an existing runner to the idle runners of the environment.
|
||||||
|
AddRunner(r Runner)
|
||||||
|
// DeleteRunner removes an idle runner from the environment.
|
||||||
|
DeleteRunner(id string)
|
||||||
|
// IdleRunnerCount returns the number of idle runners of the environment.
|
||||||
|
IdleRunnerCount() int
|
||||||
|
}
|
@ -1,52 +1,6 @@
|
|||||||
package runner
|
package runner
|
||||||
|
|
||||||
import (
|
import "github.com/openHPI/poseidon/pkg/dto"
|
||||||
"encoding/json"
|
|
||||||
"github.com/openHPI/poseidon/pkg/dto"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ExecutionEnvironment are groups of runner that share the configuration stored in the environment.
|
|
||||||
type ExecutionEnvironment interface {
|
|
||||||
json.Marshaler
|
|
||||||
|
|
||||||
// ID returns the id of the environment.
|
|
||||||
ID() dto.EnvironmentID
|
|
||||||
SetID(id dto.EnvironmentID)
|
|
||||||
// PrewarmingPoolSize sets the number of idle runner of this environment that should be prewarmed.
|
|
||||||
PrewarmingPoolSize() uint
|
|
||||||
SetPrewarmingPoolSize(count uint)
|
|
||||||
// ApplyPrewarmingPoolSize creates idle runners according to the PrewarmingPoolSize.
|
|
||||||
ApplyPrewarmingPoolSize() error
|
|
||||||
// CPULimit sets the share of cpu that a runner should receive at minimum.
|
|
||||||
CPULimit() uint
|
|
||||||
SetCPULimit(limit uint)
|
|
||||||
// MemoryLimit sets the amount of memory that should be available for each runner.
|
|
||||||
MemoryLimit() uint
|
|
||||||
SetMemoryLimit(limit uint)
|
|
||||||
// Image sets the image of the runner, e.g. Docker image.
|
|
||||||
Image() string
|
|
||||||
SetImage(image string)
|
|
||||||
// NetworkAccess sets if a runner should have network access and if ports should be mapped.
|
|
||||||
NetworkAccess() (bool, []uint16)
|
|
||||||
SetNetworkAccess(allow bool, ports []uint16)
|
|
||||||
// SetConfigFrom copies all above attributes from the passed environment to the object itself.
|
|
||||||
SetConfigFrom(environment ExecutionEnvironment)
|
|
||||||
|
|
||||||
// Register saves this environment at the executor.
|
|
||||||
Register() error
|
|
||||||
// Delete removes this environment and all it's runner from the executor and Poseidon itself.
|
|
||||||
Delete() error
|
|
||||||
|
|
||||||
// Sample returns and removes an arbitrary available runner.
|
|
||||||
// ok is true iff a runner was returned.
|
|
||||||
Sample() (r Runner, ok bool)
|
|
||||||
// AddRunner adds an existing runner to the idle runners of the environment.
|
|
||||||
AddRunner(r Runner)
|
|
||||||
// DeleteRunner removes an idle runner from the environment.
|
|
||||||
DeleteRunner(id string)
|
|
||||||
// IdleRunnerCount returns the number of idle runners of the environment.
|
|
||||||
IdleRunnerCount() int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Manager keeps track of the used and unused runners of all execution environments in order to provide unused
|
// Manager keeps track of the used and unused runners of all execution environments in order to provide unused
|
||||||
// runners to new clients and ensure no runner is used twice.
|
// runners to new clients and ensure no runner is used twice.
|
||||||
|
@ -50,12 +50,30 @@ func AddEnvironmentID(r *http.Request, id dto.EnvironmentID) {
|
|||||||
p.AddTag("environment_id", strconv.Itoa(int(id)))
|
p.AddTag("environment_id", strconv.Itoa(int(id)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddEnvironmentType adds the type of the used environment to the influxdb data point.
|
||||||
|
func AddEnvironmentType(r *http.Request, t string) {
|
||||||
|
p := pointFromContext(r.Context())
|
||||||
|
p.AddTag("environment_type", t)
|
||||||
|
}
|
||||||
|
|
||||||
// AddRunnerID adds the runner id to the influx data point for the current request.
|
// AddRunnerID adds the runner id to the influx data point for the current request.
|
||||||
func AddRunnerID(r *http.Request, id string) {
|
func AddRunnerID(r *http.Request, id string) {
|
||||||
p := pointFromContext(r.Context())
|
p := pointFromContext(r.Context())
|
||||||
p.AddTag("runner_id", id)
|
p.AddTag("runner_id", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddIdleRunner adds the count of idle runners of the used environment to the influxdb data point.
|
||||||
|
func AddIdleRunner(r *http.Request, count int) {
|
||||||
|
p := pointFromContext(r.Context())
|
||||||
|
p.AddField("idle_runner", strconv.Itoa(count))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddPrewarmingPoolSize adds the prewarming pool size of the used environment to the influxdb data point.
|
||||||
|
func AddPrewarmingPoolSize(r *http.Request, count uint) {
|
||||||
|
p := pointFromContext(r.Context())
|
||||||
|
p.AddField("prewarming_pool_size", strconv.Itoa(int(count)))
|
||||||
|
}
|
||||||
|
|
||||||
// AddRequestSize adds the size of the request body to the influx data point for the current request.
|
// AddRequestSize adds the size of the request body to the influx data point for the current request.
|
||||||
func AddRequestSize(r *http.Request) {
|
func AddRequestSize(r *http.Request) {
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
|
Reference in New Issue
Block a user