From 64764a980924ee3dc11e429d364ff6ae81836500 Mon Sep 17 00:00:00 2001 From: sirkrypt0 <22522058+sirkrypt0@users.noreply.github.com> Date: Wed, 7 Jul 2021 12:44:30 +0200 Subject: [PATCH] Return mapped ports when requesting runners We now store the mapped ports returned by Nomad locally in our runner struct and return them when requesting the runner. The returned ip address is in most Nomad setups not reachable from external users. --- api/dto/dto.go | 10 ++++++++- api/runners.go | 2 +- api/runners_test.go | 4 ++-- api/websocket_test.go | 6 ++--- docs/swagger.yaml | 15 +++++++++++++ nomad/api_querier.go | 18 +++++++++++++++ nomad/api_querier_mock.go | 25 ++++++++++++++++++++- nomad/executor_api_mock.go | 46 ++++++++++++++++++++++++++++++++++++++ nomad/nomad.go | 15 +++++++++++++ runner/manager.go | 13 +++++++++-- runner/runner.go | 24 +++++++++++++++++--- runner/runner_mock.go | 16 +++++++++++++ runner/runner_test.go | 12 +++++----- 13 files changed, 187 insertions(+), 19 deletions(-) diff --git a/api/dto/dto.go b/api/dto/dto.go index 126f1f2..ed1a7ea 100644 --- a/api/dto/dto.go +++ b/api/dto/dto.go @@ -42,9 +42,17 @@ type ExecutionEnvironmentRequest struct { ExposedPorts []uint16 `json:"exposedPorts"` } +// MappedPort contains the mapping from exposed port inside the container to the host address +// outside the container. +type MappedPort struct { + ExposedPort uint `json:"exposedPort"` + HostAddress string `json:"hostAddress"` +} + // RunnerResponse is the expected response when providing a runner. type RunnerResponse struct { - ID string `json:"runnerId"` + ID string `json:"runnerId"` + MappedPorts []*MappedPort `json:"mappedPorts"` } // ExecutionResponse is the expected response when creating an execution for a runner. diff --git a/api/runners.go b/api/runners.go index 9259171..8e65655 100644 --- a/api/runners.go +++ b/api/runners.go @@ -62,7 +62,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req } return } - sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID()}, http.StatusOK) + sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK) } // updateFileSystem handles the files API route. diff --git a/api/runners_test.go b/api/runners_test.go index d4f9818..5cd0496 100644 --- a/api/runners_test.go +++ b/api/runners_test.go @@ -28,7 +28,7 @@ type MiddlewareTestSuite struct { func (s *MiddlewareTestSuite) SetupTest() { s.manager = &runner.ManagerMock{} - s.runner = runner.NewNomadJob(tests.DefaultRunnerID, nil, nil) + s.runner = runner.NewNomadJob(tests.DefaultRunnerID, nil, nil, nil) s.capturedRunner = nil s.runnerRequest = func(runnerId string) *http.Request { path, err := s.router.Get("test-runner-id").URL(RunnerIDKey, runnerId) @@ -91,7 +91,7 @@ type RunnerRouteTestSuite struct { func (s *RunnerRouteTestSuite) SetupTest() { s.runnerManager = &runner.ManagerMock{} s.router = NewRouter(s.runnerManager, nil) - s.runner = runner.NewNomadJob("some-id", nil, nil) + s.runner = runner.NewNomadJob("some-id", nil, nil, nil) s.executionID = "execution-id" s.runner.Add(s.executionID, &dto.ExecutionRequest{}) s.runnerManager.On("Get", s.runner.ID()).Return(s.runner, nil) diff --git a/api/websocket_test.go b/api/websocket_test.go index 52a6cb2..b0a1f38 100644 --- a/api/websocket_test.go +++ b/api/websocket_test.go @@ -370,9 +370,9 @@ func TestCodeOceanToRawReaderReturnsOnlyAfterOneByteWasReadFromConnection(t *tes // --- Test suite specific test helpers --- -func newNomadAllocationWithMockedAPIClient(runnerID string) (r runner.Runner, executorAPIMock *nomad.ExecutorAPIMock) { - executorAPIMock = &nomad.ExecutorAPIMock{} - r = runner.NewNomadJob(runnerID, executorAPIMock, nil) +func newNomadAllocationWithMockedAPIClient(runnerID string) (r runner.Runner, mock *nomad.ExecutorAPIMock) { + mock = &nomad.ExecutorAPIMock{} + r = runner.NewNomadJob(runnerID, nil, mock, nil) return } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index e0333f6..dee10b5 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -144,6 +144,21 @@ paths: description: The UUID of the provided runner type: string example: 123e4567-e89b-12d3-a456-426614174000 + mappedPorts: + description: Array containing the addresses of the mapped ports specified in the execution environment. + type: array + items: + description: The exposedPort inside the container is reachable on the returned hostAddress. + type: object + properties: + exposedPort: + description: The port inside the container. + type: uint + example: 80 + hostAddress: + description: The address which can be contacted to reach the mapped port. + type: string + example: 10.224.6.18:23832 "400": $ref: "#/components/responses/BadRequest" "401": diff --git a/nomad/api_querier.go b/nomad/api_querier.go index 93108b1..c6ae626 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -40,6 +40,9 @@ type apiQuerier interface { // job returns the job of the given jobID. job(jobID string) (job *nomadApi.Job, err error) + // allocation returns the first allocation of the given job. + allocation(jobID string) (*nomadApi.Allocation, error) + // RegisterNomadJob registers a job with Nomad. // It returns the evaluation ID that can be used when listening to the Nomad event stream. RegisterNomadJob(job *nomadApi.Job) (string, error) @@ -193,3 +196,18 @@ func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) { job, _, err = nc.client.Jobs().Info(jobID, nil) return } + +func (nc *nomadAPIClient) allocation(jobID string) (alloc *nomadApi.Allocation, err error) { + allocs, _, err := nc.client.Jobs().Allocations(jobID, false, nil) + if err != nil { + return nil, fmt.Errorf("error requesting Nomad job allocations: %w", err) + } + if len(allocs) == 0 { + return nil, ErrorNoAllocationFound + } + alloc, _, err = nc.client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + return nil, fmt.Errorf("error requesting Nomad allocation info: %w", err) + } + return alloc, nil +} diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index f927ee4..b4e00c1 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.8.0. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package nomad @@ -179,6 +179,29 @@ func (_m *apiQuerierMock) SetJobScale(jobId string, count uint, reason string) e return r0 } +// allocation provides a mock function with given fields: jobID +func (_m *apiQuerierMock) allocation(jobID string) (*api.Allocation, error) { + ret := _m.Called(jobID) + + var r0 *api.Allocation + if rf, ok := ret.Get(0).(func(string) *api.Allocation); ok { + r0 = rf(jobID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Allocation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // init provides a mock function with given fields: nomadURL, nomadNamespace func (_m *apiQuerierMock) init(nomadURL *url.URL, nomadNamespace string) error { ret := _m.Called(nomadURL, nomadNamespace) diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index abaae17..de53d2d 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -234,6 +234,29 @@ func (_m *ExecutorAPIMock) LoadRunnerJobs(environmentID string) ([]*api.Job, err return r0, r1 } +// LoadRunnerPorts provides a mock function with given fields: runnerID +func (_m *ExecutorAPIMock) LoadRunnerPortMappings(runnerID string) ([]api.PortMapping, error) { + ret := _m.Called(runnerID) + + var r0 []api.PortMapping + if rf, ok := ret.Get(0).(func(string) []api.PortMapping); ok { + r0 = rf(runnerID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]api.PortMapping) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(runnerID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // MarkRunnerAsUsed provides a mock function with given fields: runnerID, duration func (_m *ExecutorAPIMock) MarkRunnerAsUsed(runnerID string, duration int) error { ret := _m.Called(runnerID, duration) @@ -348,6 +371,29 @@ func (_m *ExecutorAPIMock) WatchAllocations(ctx context.Context, onNewAllocation return r0 } +// allocation provides a mock function with given fields: jobID +func (_m *ExecutorAPIMock) allocation(jobID string) (*api.Allocation, error) { + ret := _m.Called(jobID) + + var r0 *api.Allocation + if rf, ok := ret.Get(0).(func(string) *api.Allocation); ok { + r0 = rf(jobID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Allocation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // init provides a mock function with given fields: nomadURL, nomadNamespace func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error { ret := _m.Called(nomadURL, nomadNamespace) diff --git a/nomad/nomad.go b/nomad/nomad.go index 60d4fae..c8eca7e 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -21,6 +21,7 @@ var ( ErrorEvaluation = errors.New("evaluation could not complete") ErrorPlacingAllocations = errors.New("failed to place all allocations") ErrorLoadingJob = errors.New("failed to load job") + ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") ) type AllocationProcessor func(*nomadApi.Allocation) @@ -39,6 +40,9 @@ type ExecutorAPI interface { // get stopped. LoadRunnerIDs(environmentID string) (runnerIds []string, err error) + // LoadRunnerPortMappings returns the mapped ports of the runner. + LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapping, error) + // RegisterTemplateJob creates a template job based on the default job configuration and the given parameters. // It registers the job and waits until the registration completes. RegisterTemplateJob(defaultJob *nomadApi.Job, id string, @@ -105,6 +109,17 @@ func (a *APIClient) LoadRunnerIDs(environmentID string) (runnerIDs []string, err return runnerIDs, nil } +func (a *APIClient) LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapping, error) { + alloc, err := a.apiQuerier.allocation(runnerID) + if err != nil { + return nil, fmt.Errorf("error querying allocation for runner %s: %w", runnerID, err) + } + if alloc.AllocatedResources == nil { + return nil, ErrorNoAllocatedResourcesFound + } + return alloc.AllocatedResources.Shared.Ports, nil +} + func (a *APIClient) LoadRunnerJobs(environmentID string) ([]*nomadApi.Job, error) { runnerIDs, err := a.LoadRunnerIDs(environmentID) if err != nil { diff --git a/runner/manager.go b/runner/manager.go index cd322ac..0902b88 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -238,7 +238,12 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger return } isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue - newJob := NewNomadJob(*job.ID, m.apiClient, m) + portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID) + if err != nil { + environmentLogger.WithError(err).Warn("Error loading runner portMappings") + return + } + newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m) if isUsed { m.usedRunners.Add(newJob) timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey]) @@ -277,7 +282,11 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { job, ok := m.environments.Get(environmentID) if ok { - job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient, m)) + var mappedPorts []nomadApi.PortMapping + if alloc.AllocatedResources != nil { + mappedPorts = alloc.AllocatedResources.Shared.Ports + } + job.idleRunners.Add(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m)) } } diff --git a/runner/runner.go b/runner/runner.go index c9da54e..d4976b5 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + nomadApi "github.com/hashicorp/nomad/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "io" @@ -131,6 +132,8 @@ func (t *InactivityTimerImplementation) TimeoutPassed() bool { type Runner interface { // ID returns the id of the runner. ID() string + // MappedPorts returns the mapped ports of the runner. + MappedPorts() []*dto.MappedPort ExecutionStorage InactivityTimer @@ -154,14 +157,18 @@ type Runner interface { type NomadJob struct { ExecutionStorage InactivityTimer - id string - api nomad.ExecutorAPI + id string + portMappings []nomadApi.PortMapping + api nomad.ExecutorAPI } // NewNomadJob creates a new NomadJob with the provided id. -func NewNomadJob(id string, apiClient nomad.ExecutorAPI, manager Manager) *NomadJob { +func NewNomadJob(id string, portMappings []nomadApi.PortMapping, + apiClient nomad.ExecutorAPI, manager Manager, +) *NomadJob { job := &NomadJob{ id: id, + portMappings: portMappings, api: apiClient, ExecutionStorage: NewLocalExecutionStorage(), } @@ -173,6 +180,17 @@ func (r *NomadJob) ID() string { return r.id } +func (r *NomadJob) MappedPorts() []*dto.MappedPort { + ports := make([]*dto.MappedPort, 0, len(r.portMappings)) + for _, portMapping := range r.portMappings { + ports = append(ports, &dto.MappedPort{ + ExposedPort: uint(portMapping.To), + HostAddress: fmt.Sprintf("%s:%d", portMapping.HostIP, portMapping.Value), + }) + } + return ports +} + type ExitInfo struct { Code uint8 Err error diff --git a/runner/runner_mock.go b/runner/runner_mock.go index a48537e..cda0c17 100644 --- a/runner/runner_mock.go +++ b/runner/runner_mock.go @@ -62,6 +62,22 @@ func (_m *RunnerMock) ID() string { return r0 } +// MappedPorts provides a mock function with given fields: +func (_m *RunnerMock) MappedPorts() []*dto.MappedPort { + ret := _m.Called() + + var r0 []*dto.MappedPort + if rf, ok := ret.Get(0).(func() []*dto.MappedPort); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dto.MappedPort) + } + } + + return r0 +} + // Pop provides a mock function with given fields: id func (_m *RunnerMock) Pop(id ExecutionID) (*dto.ExecutionRequest, bool) { ret := _m.Called(id) diff --git a/runner/runner_test.go b/runner/runner_test.go index 484e878..be8b6f3 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -21,19 +21,19 @@ import ( ) func TestIdIsStored(t *testing.T) { - runner := NewNomadJob(tests.DefaultJobID, nil, nil) + runner := NewNomadJob(tests.DefaultJobID, nil, nil, nil) assert.Equal(t, tests.DefaultJobID, runner.ID()) } func TestMarshalRunner(t *testing.T) { - runner := NewNomadJob(tests.DefaultJobID, nil, nil) + runner := NewNomadJob(tests.DefaultJobID, nil, nil, nil) marshal, err := json.Marshal(runner) assert.NoError(t, err) assert.Equal(t, "{\"runnerId\":\""+tests.DefaultJobID+"\"}", string(marshal)) } func TestExecutionRequestIsStored(t *testing.T) { - runner := NewNomadJob(tests.DefaultJobID, nil, nil) + runner := NewNomadJob(tests.DefaultJobID, nil, nil, nil) executionRequest := &dto.ExecutionRequest{ Command: "command", TimeLimit: 10, @@ -48,7 +48,7 @@ func TestExecutionRequestIsStored(t *testing.T) { } func TestNewContextReturnsNewContextWithRunner(t *testing.T) { - runner := NewNomadJob(tests.DefaultRunnerID, nil, nil) + runner := NewNomadJob(tests.DefaultRunnerID, nil, nil, nil) ctx := context.Background() newCtx := NewContext(ctx, runner) storedRunner, ok := newCtx.Value(runnerContextKey).(Runner) @@ -59,7 +59,7 @@ func TestNewContextReturnsNewContextWithRunner(t *testing.T) { } func TestFromContextReturnsRunner(t *testing.T) { - runner := NewNomadJob(tests.DefaultRunnerID, nil, nil) + runner := NewNomadJob(tests.DefaultRunnerID, nil, nil, nil) ctx := NewContext(context.Background(), runner) storedRunner, ok := FromContext(ctx) @@ -389,5 +389,5 @@ func (s *InactivityTimerTestSuite) TestTimerIsInactiveWhenDurationIsZero() { // NewRunner creates a new runner with the provided id and manager. func NewRunner(id string, manager Manager) Runner { - return NewNomadJob(id, nil, manager) + return NewNomadJob(id, nil, nil, manager) }