Implement merge request review comments
This commit is contained in:
@ -1,6 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||||
@ -27,12 +28,12 @@ func provideRunner(writer http.ResponseWriter, request *http.Request) {
|
|||||||
if err := parseJSONRequestBody(writer, request, runnerRequest); err != nil {
|
if err := parseJSONRequestBody(writer, request, runnerRequest); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
env, err := environment.GetExecutionEnvironment(runnerRequest.ExecutionEnvironmentId)
|
executionEnvironment, err := environment.GetExecutionEnvironment(runnerRequest.ExecutionEnvironmentId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeNotFound(writer, err)
|
writeNotFound(writer, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nextRunner, err := env.NextRunner()
|
nextRunner, err := executionEnvironment.NextRunner()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeInternalServerError(writer, err, dto.ErrorNomadOverload)
|
writeInternalServerError(writer, err, dto.ErrorNomadOverload)
|
||||||
return
|
return
|
||||||
|
@ -8,43 +8,27 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
var connUpgrade = websocket.Upgrader{
|
// connectToRunner is the endpoint for websocket connections.
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
}
|
|
||||||
|
|
||||||
// connectToRunner is a placeholder for now and will become the endpoint for websocket connections.
|
|
||||||
func connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
func connectToRunner(writer http.ResponseWriter, request *http.Request) {
|
||||||
r, _ := runner.FromContext(request.Context())
|
r, _ := runner.FromContext(request.Context())
|
||||||
executionId := request.URL.Query().Get(ExecutionIdKey)
|
executionId := runner.ExecutionId(request.URL.Query().Get(ExecutionIdKey))
|
||||||
executionRequest, ok := r.Execution(runner.ExecutionId(executionId))
|
_, ok := r.Execution(executionId)
|
||||||
if !ok {
|
if !ok {
|
||||||
writeNotFound(writer, errors.New("executionId does not exist"))
|
writeNotFound(writer, errors.New("executionId does not exist"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.
|
log.
|
||||||
|
WithField("runnerId", r.Id()).
|
||||||
WithField("executionId", executionId).
|
WithField("executionId", executionId).
|
||||||
WithField("command", executionRequest.Command).
|
|
||||||
Info("Running execution")
|
Info("Running execution")
|
||||||
connClient, err := connUpgrade.Upgrade(writer, request, nil)
|
connUpgrader := websocket.Upgrader{}
|
||||||
|
connClient, err := connUpgrader.Upgrade(writer, request, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Warn("Connection upgrade failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = connClient.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
||||||
return
|
|
||||||
}
|
|
||||||
defer func(connClient *websocket.Conn) {
|
|
||||||
err := connClient.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
||||||
if err != nil {
|
|
||||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
|
||||||
}
|
|
||||||
}(connClient)
|
|
||||||
|
|
||||||
// ToDo: Implement communication forwarding
|
|
||||||
err, ok = r.Execute(runner.ExecutionId(executionId))
|
|
||||||
if !ok {
|
|
||||||
writeBadRequest(writer, errors.New("invalid Execution Id"))
|
|
||||||
return
|
|
||||||
} else if err != nil {
|
|
||||||
writeInternalServerError(writer, err, dto.ErrorUnknown)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,12 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment/pool"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/environment"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/runner"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@ -23,19 +22,21 @@ type WebsocketTestSuite struct {
|
|||||||
executionId runner.ExecutionId
|
executionId runner.ExecutionId
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWebsocketTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(WebsocketTestSuite))
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *WebsocketTestSuite) SetupSuite() {
|
func (suite *WebsocketTestSuite) SetupSuite() {
|
||||||
runnerPool := pool.NewLocalRunnerPool()
|
runnerPool := environment.NewLocalRunnerPool()
|
||||||
suite.runner = runner.NewExerciseRunner("testRunner")
|
suite.runner = runner.NewExerciseRunner("testRunner")
|
||||||
runnerPool.AddRunner(suite.runner)
|
runnerPool.Add(suite.runner)
|
||||||
var err error
|
var err error
|
||||||
suite.executionId, err = suite.runner.AddExecution(dto.ExecutionRequest{
|
suite.executionId, err = suite.runner.AddExecution(dto.ExecutionRequest{
|
||||||
Command: "command",
|
Command: "command",
|
||||||
TimeLimit: 10,
|
TimeLimit: 10,
|
||||||
Environment: nil,
|
Environment: nil,
|
||||||
})
|
})
|
||||||
if !suite.NoError(err) {
|
suite.Require().NoError(err)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
router := mux.NewRouter()
|
router := mux.NewRouter()
|
||||||
router.Use(findRunnerMiddleware(runnerPool))
|
router.Use(findRunnerMiddleware(runnerPool))
|
||||||
@ -44,15 +45,11 @@ func (suite *WebsocketTestSuite) SetupSuite() {
|
|||||||
suite.router = router
|
suite.router = router
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *WebsocketTestSuite) url(scheme, runnerId string, executionId runner.ExecutionId) (*url.URL, error) {
|
func (suite *WebsocketTestSuite) websocketUrl(scheme, runnerId string, executionId runner.ExecutionId) (*url.URL, error) {
|
||||||
websocketUrl, err := url.Parse(suite.server.URL)
|
websocketUrl, err := url.Parse(suite.server.URL)
|
||||||
if !suite.NoError(err, "Error: parsing test server url") {
|
suite.Require().NoError(err, "Error: parsing test server url")
|
||||||
return nil, errors.New("could not parse server url")
|
|
||||||
}
|
|
||||||
path, err := suite.router.Get(WebsocketPath).URL(RunnerIdKey, runnerId)
|
path, err := suite.router.Get(WebsocketPath).URL(RunnerIdKey, runnerId)
|
||||||
if !suite.NoError(err) {
|
suite.Require().NoError(err, "could not set runnerId")
|
||||||
return nil, errors.New("could not set runnerId")
|
|
||||||
}
|
|
||||||
websocketUrl.Scheme = scheme
|
websocketUrl.Scheme = scheme
|
||||||
websocketUrl.Path = path.Path
|
websocketUrl.Path = path.Path
|
||||||
websocketUrl.RawQuery = fmt.Sprintf("executionId=%s", executionId)
|
websocketUrl.RawQuery = fmt.Sprintf("executionId=%s", executionId)
|
||||||
@ -63,29 +60,24 @@ func (suite *WebsocketTestSuite) TearDownSuite() {
|
|||||||
suite.server.Close()
|
suite.server.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWebsocketTestSuite(t *testing.T) {
|
func (suite *WebsocketTestSuite) TestWebsocketConnectionCanBeEstablished() {
|
||||||
suite.Run(t, new(WebsocketTestSuite))
|
path, err := suite.websocketUrl("ws", suite.runner.Id(), suite.executionId)
|
||||||
}
|
suite.Require().NoError(err)
|
||||||
|
|
||||||
func (suite *WebsocketTestSuite) TestEstablishWebsocketConnection() {
|
|
||||||
path, err := suite.url("ws", suite.runner.Id(), suite.executionId)
|
|
||||||
if !suite.NoError(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_, _, err = websocket.DefaultDialer.Dial(path.String(), nil)
|
_, _, err = websocket.DefaultDialer.Dial(path.String(), nil)
|
||||||
if !suite.NoError(err) {
|
suite.Require().NoError(err)
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *WebsocketTestSuite) TestWebsocketReturns404IfExecutionDoesNotExist() {
|
func (suite *WebsocketTestSuite) TestWebsocketReturns404IfExecutionDoesNotExist() {
|
||||||
wsUrl, err := suite.url("http", suite.runner.Id(), "invalid-execution-id")
|
wsUrl, err := suite.websocketUrl("ws", suite.runner.Id(), "invalid-execution-id")
|
||||||
if !suite.NoError(err) {
|
suite.Require().NoError(err)
|
||||||
return
|
_, response, _ := websocket.DefaultDialer.Dial(wsUrl.String(), nil)
|
||||||
}
|
|
||||||
response, err := http.Get(wsUrl.String())
|
|
||||||
if !suite.NoError(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
suite.Equal(http.StatusNotFound, response.StatusCode)
|
suite.Equal(http.StatusNotFound, response.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *WebsocketTestSuite) TestWebsocketReturns400IfRequestedViaHttp() {
|
||||||
|
wsUrl, err := suite.websocketUrl("http", suite.runner.Id(), suite.executionId)
|
||||||
|
suite.Require().NoError(err)
|
||||||
|
response, err := http.Get(wsUrl.String())
|
||||||
|
suite.Require().NoError(err)
|
||||||
|
suite.Equal(http.StatusBadRequest, response.StatusCode)
|
||||||
|
}
|
||||||
|
@ -5,14 +5,10 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto"
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
|
||||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/store"
|
"gitlab.hpi.de/codeocean/codemoon/poseidon/store"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.GetLogger("runner")
|
|
||||||
|
|
||||||
// Status is the type for the status of a Runner.
|
// Status is the type for the status of a Runner.
|
||||||
type Status string
|
type Status string
|
||||||
|
|
||||||
@ -50,10 +46,6 @@ type Runner interface {
|
|||||||
|
|
||||||
// DeleteExecution deletes the execution of the runner with the specified id.
|
// DeleteExecution deletes the execution of the runner with the specified id.
|
||||||
DeleteExecution(ExecutionId)
|
DeleteExecution(ExecutionId)
|
||||||
|
|
||||||
// Execute runs one of the runners Executions by it's id
|
|
||||||
// ok will be false if the runner has no execution with the provided id
|
|
||||||
Execute(ExecutionId) (err error, ok bool)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExerciseRunner is an abstraction to communicate with Nomad allocations.
|
// ExerciseRunner is an abstraction to communicate with Nomad allocations.
|
||||||
@ -128,19 +120,6 @@ func (r *ExerciseRunner) DeleteExecution(id ExecutionId) {
|
|||||||
delete(r.executions, id)
|
delete(r.executions, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ExerciseRunner) Execute(id ExecutionId) (err error, ok bool) {
|
|
||||||
execution, ok := r.Execution(id)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(execution.TimeLimit)*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
log.WithField("Context", ctx).Printf("ToDo: Running execution")
|
|
||||||
// ToDo: Implement command execution
|
|
||||||
// r.nomadApiClient.ExecuteCommand(r.id, ctx, )
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewContext creates a context containing a runner.
|
// NewContext creates a context containing a runner.
|
||||||
func NewContext(ctx context.Context, runner Runner) context.Context {
|
func NewContext(ctx context.Context, runner Runner) context.Context {
|
||||||
return context.WithValue(ctx, runnerContextKey, runner)
|
return context.WithValue(ctx, runnerContextKey, runner)
|
||||||
|
Reference in New Issue
Block a user