diff --git a/api/dto/dto.go b/api/dto/dto.go index 126f1f2..ed1a7ea 100644 --- a/api/dto/dto.go +++ b/api/dto/dto.go @@ -42,9 +42,17 @@ type ExecutionEnvironmentRequest struct { ExposedPorts []uint16 `json:"exposedPorts"` } +// MappedPort contains the mapping from exposed port inside the container to the host address +// outside the container. +type MappedPort struct { + ExposedPort uint `json:"exposedPort"` + HostAddress string `json:"hostAddress"` +} + // RunnerResponse is the expected response when providing a runner. type RunnerResponse struct { - ID string `json:"runnerId"` + ID string `json:"runnerId"` + MappedPorts []*MappedPort `json:"mappedPorts"` } // ExecutionResponse is the expected response when creating an execution for a runner. diff --git a/api/runners.go b/api/runners.go index 9259171..8e65655 100644 --- a/api/runners.go +++ b/api/runners.go @@ -62,7 +62,7 @@ func (r *RunnerController) provide(writer http.ResponseWriter, request *http.Req } return } - sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID()}, http.StatusOK) + sendJSON(writer, &dto.RunnerResponse{ID: nextRunner.ID(), MappedPorts: nextRunner.MappedPorts()}, http.StatusOK) } // updateFileSystem handles the files API route. diff --git a/api/runners_test.go b/api/runners_test.go index d4f9818..5cd0496 100644 --- a/api/runners_test.go +++ b/api/runners_test.go @@ -28,7 +28,7 @@ type MiddlewareTestSuite struct { func (s *MiddlewareTestSuite) SetupTest() { s.manager = &runner.ManagerMock{} - s.runner = runner.NewNomadJob(tests.DefaultRunnerID, nil, nil) + s.runner = runner.NewNomadJob(tests.DefaultRunnerID, nil, nil, nil) s.capturedRunner = nil s.runnerRequest = func(runnerId string) *http.Request { path, err := s.router.Get("test-runner-id").URL(RunnerIDKey, runnerId) @@ -91,7 +91,7 @@ type RunnerRouteTestSuite struct { func (s *RunnerRouteTestSuite) SetupTest() { s.runnerManager = &runner.ManagerMock{} s.router = NewRouter(s.runnerManager, nil) - s.runner = runner.NewNomadJob("some-id", nil, nil) + s.runner = runner.NewNomadJob("some-id", nil, nil, nil) s.executionID = "execution-id" s.runner.Add(s.executionID, &dto.ExecutionRequest{}) s.runnerManager.On("Get", s.runner.ID()).Return(s.runner, nil) diff --git a/api/websocket_test.go b/api/websocket_test.go index 52a6cb2..b0a1f38 100644 --- a/api/websocket_test.go +++ b/api/websocket_test.go @@ -370,9 +370,9 @@ func TestCodeOceanToRawReaderReturnsOnlyAfterOneByteWasReadFromConnection(t *tes // --- Test suite specific test helpers --- -func newNomadAllocationWithMockedAPIClient(runnerID string) (r runner.Runner, executorAPIMock *nomad.ExecutorAPIMock) { - executorAPIMock = &nomad.ExecutorAPIMock{} - r = runner.NewNomadJob(runnerID, executorAPIMock, nil) +func newNomadAllocationWithMockedAPIClient(runnerID string) (r runner.Runner, mock *nomad.ExecutorAPIMock) { + mock = &nomad.ExecutorAPIMock{} + r = runner.NewNomadJob(runnerID, nil, mock, nil) return } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index e0333f6..dee10b5 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -144,6 +144,21 @@ paths: description: The UUID of the provided runner type: string example: 123e4567-e89b-12d3-a456-426614174000 + mappedPorts: + description: Array containing the addresses of the mapped ports specified in the execution environment. + type: array + items: + description: The exposedPort inside the container is reachable on the returned hostAddress. + type: object + properties: + exposedPort: + description: The port inside the container. + type: uint + example: 80 + hostAddress: + description: The address which can be contacted to reach the mapped port. + type: string + example: 10.224.6.18:23832 "400": $ref: "#/components/responses/BadRequest" "401": diff --git a/nomad/api_querier.go b/nomad/api_querier.go index 93108b1..c6ae626 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -40,6 +40,9 @@ type apiQuerier interface { // job returns the job of the given jobID. job(jobID string) (job *nomadApi.Job, err error) + // allocation returns the first allocation of the given job. + allocation(jobID string) (*nomadApi.Allocation, error) + // RegisterNomadJob registers a job with Nomad. // It returns the evaluation ID that can be used when listening to the Nomad event stream. RegisterNomadJob(job *nomadApi.Job) (string, error) @@ -193,3 +196,18 @@ func (nc *nomadAPIClient) job(jobID string) (job *nomadApi.Job, err error) { job, _, err = nc.client.Jobs().Info(jobID, nil) return } + +func (nc *nomadAPIClient) allocation(jobID string) (alloc *nomadApi.Allocation, err error) { + allocs, _, err := nc.client.Jobs().Allocations(jobID, false, nil) + if err != nil { + return nil, fmt.Errorf("error requesting Nomad job allocations: %w", err) + } + if len(allocs) == 0 { + return nil, ErrorNoAllocationFound + } + alloc, _, err = nc.client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + return nil, fmt.Errorf("error requesting Nomad allocation info: %w", err) + } + return alloc, nil +} diff --git a/nomad/api_querier_mock.go b/nomad/api_querier_mock.go index f927ee4..b4e00c1 100644 --- a/nomad/api_querier_mock.go +++ b/nomad/api_querier_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.8.0. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package nomad @@ -179,6 +179,29 @@ func (_m *apiQuerierMock) SetJobScale(jobId string, count uint, reason string) e return r0 } +// allocation provides a mock function with given fields: jobID +func (_m *apiQuerierMock) allocation(jobID string) (*api.Allocation, error) { + ret := _m.Called(jobID) + + var r0 *api.Allocation + if rf, ok := ret.Get(0).(func(string) *api.Allocation); ok { + r0 = rf(jobID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Allocation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // init provides a mock function with given fields: nomadURL, nomadNamespace func (_m *apiQuerierMock) init(nomadURL *url.URL, nomadNamespace string) error { ret := _m.Called(nomadURL, nomadNamespace) diff --git a/nomad/executor_api_mock.go b/nomad/executor_api_mock.go index abaae17..de53d2d 100644 --- a/nomad/executor_api_mock.go +++ b/nomad/executor_api_mock.go @@ -234,6 +234,29 @@ func (_m *ExecutorAPIMock) LoadRunnerJobs(environmentID string) ([]*api.Job, err return r0, r1 } +// LoadRunnerPorts provides a mock function with given fields: runnerID +func (_m *ExecutorAPIMock) LoadRunnerPortMappings(runnerID string) ([]api.PortMapping, error) { + ret := _m.Called(runnerID) + + var r0 []api.PortMapping + if rf, ok := ret.Get(0).(func(string) []api.PortMapping); ok { + r0 = rf(runnerID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]api.PortMapping) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(runnerID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // MarkRunnerAsUsed provides a mock function with given fields: runnerID, duration func (_m *ExecutorAPIMock) MarkRunnerAsUsed(runnerID string, duration int) error { ret := _m.Called(runnerID, duration) @@ -348,6 +371,29 @@ func (_m *ExecutorAPIMock) WatchAllocations(ctx context.Context, onNewAllocation return r0 } +// allocation provides a mock function with given fields: jobID +func (_m *ExecutorAPIMock) allocation(jobID string) (*api.Allocation, error) { + ret := _m.Called(jobID) + + var r0 *api.Allocation + if rf, ok := ret.Get(0).(func(string) *api.Allocation); ok { + r0 = rf(jobID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*api.Allocation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // init provides a mock function with given fields: nomadURL, nomadNamespace func (_m *ExecutorAPIMock) init(nomadURL *url.URL, nomadNamespace string) error { ret := _m.Called(nomadURL, nomadNamespace) diff --git a/nomad/nomad.go b/nomad/nomad.go index 60d4fae..c8eca7e 100644 --- a/nomad/nomad.go +++ b/nomad/nomad.go @@ -21,6 +21,7 @@ var ( ErrorEvaluation = errors.New("evaluation could not complete") ErrorPlacingAllocations = errors.New("failed to place all allocations") ErrorLoadingJob = errors.New("failed to load job") + ErrorNoAllocatedResourcesFound = errors.New("no allocated resources found") ) type AllocationProcessor func(*nomadApi.Allocation) @@ -39,6 +40,9 @@ type ExecutorAPI interface { // get stopped. LoadRunnerIDs(environmentID string) (runnerIds []string, err error) + // LoadRunnerPortMappings returns the mapped ports of the runner. + LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapping, error) + // RegisterTemplateJob creates a template job based on the default job configuration and the given parameters. // It registers the job and waits until the registration completes. RegisterTemplateJob(defaultJob *nomadApi.Job, id string, @@ -105,6 +109,17 @@ func (a *APIClient) LoadRunnerIDs(environmentID string) (runnerIDs []string, err return runnerIDs, nil } +func (a *APIClient) LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapping, error) { + alloc, err := a.apiQuerier.allocation(runnerID) + if err != nil { + return nil, fmt.Errorf("error querying allocation for runner %s: %w", runnerID, err) + } + if alloc.AllocatedResources == nil { + return nil, ErrorNoAllocatedResourcesFound + } + return alloc.AllocatedResources.Shared.Ports, nil +} + func (a *APIClient) LoadRunnerJobs(environmentID string) ([]*nomadApi.Job, error) { runnerIDs, err := a.LoadRunnerIDs(environmentID) if err != nil { diff --git a/runner/manager.go b/runner/manager.go index cd322ac..0902b88 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -238,7 +238,12 @@ func (m *NomadRunnerManager) loadSingleJob(job *nomadApi.Job, environmentLogger return } isUsed := configTaskGroup.Meta[nomad.ConfigMetaUsedKey] == nomad.ConfigMetaUsedValue - newJob := NewNomadJob(*job.ID, m.apiClient, m) + portMappings, err := m.apiClient.LoadRunnerPortMappings(*job.ID) + if err != nil { + environmentLogger.WithError(err).Warn("Error loading runner portMappings") + return + } + newJob := NewNomadJob(*job.ID, portMappings, m.apiClient, m) if isUsed { m.usedRunners.Add(newJob) timeout, err := strconv.Atoi(configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey]) @@ -277,7 +282,11 @@ func (m *NomadRunnerManager) onAllocationAdded(alloc *nomadApi.Allocation) { job, ok := m.environments.Get(environmentID) if ok { - job.idleRunners.Add(NewNomadJob(alloc.JobID, m.apiClient, m)) + var mappedPorts []nomadApi.PortMapping + if alloc.AllocatedResources != nil { + mappedPorts = alloc.AllocatedResources.Shared.Ports + } + job.idleRunners.Add(NewNomadJob(alloc.JobID, mappedPorts, m.apiClient, m)) } } diff --git a/runner/runner.go b/runner/runner.go index c9da54e..d4976b5 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + nomadApi "github.com/hashicorp/nomad/api" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" "io" @@ -131,6 +132,8 @@ func (t *InactivityTimerImplementation) TimeoutPassed() bool { type Runner interface { // ID returns the id of the runner. ID() string + // MappedPorts returns the mapped ports of the runner. + MappedPorts() []*dto.MappedPort ExecutionStorage InactivityTimer @@ -154,14 +157,18 @@ type Runner interface { type NomadJob struct { ExecutionStorage InactivityTimer - id string - api nomad.ExecutorAPI + id string + portMappings []nomadApi.PortMapping + api nomad.ExecutorAPI } // NewNomadJob creates a new NomadJob with the provided id. -func NewNomadJob(id string, apiClient nomad.ExecutorAPI, manager Manager) *NomadJob { +func NewNomadJob(id string, portMappings []nomadApi.PortMapping, + apiClient nomad.ExecutorAPI, manager Manager, +) *NomadJob { job := &NomadJob{ id: id, + portMappings: portMappings, api: apiClient, ExecutionStorage: NewLocalExecutionStorage(), } @@ -173,6 +180,17 @@ func (r *NomadJob) ID() string { return r.id } +func (r *NomadJob) MappedPorts() []*dto.MappedPort { + ports := make([]*dto.MappedPort, 0, len(r.portMappings)) + for _, portMapping := range r.portMappings { + ports = append(ports, &dto.MappedPort{ + ExposedPort: uint(portMapping.To), + HostAddress: fmt.Sprintf("%s:%d", portMapping.HostIP, portMapping.Value), + }) + } + return ports +} + type ExitInfo struct { Code uint8 Err error diff --git a/runner/runner_mock.go b/runner/runner_mock.go index a48537e..cda0c17 100644 --- a/runner/runner_mock.go +++ b/runner/runner_mock.go @@ -62,6 +62,22 @@ func (_m *RunnerMock) ID() string { return r0 } +// MappedPorts provides a mock function with given fields: +func (_m *RunnerMock) MappedPorts() []*dto.MappedPort { + ret := _m.Called() + + var r0 []*dto.MappedPort + if rf, ok := ret.Get(0).(func() []*dto.MappedPort); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dto.MappedPort) + } + } + + return r0 +} + // Pop provides a mock function with given fields: id func (_m *RunnerMock) Pop(id ExecutionID) (*dto.ExecutionRequest, bool) { ret := _m.Called(id) diff --git a/runner/runner_test.go b/runner/runner_test.go index 484e878..be8b6f3 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -21,19 +21,19 @@ import ( ) func TestIdIsStored(t *testing.T) { - runner := NewNomadJob(tests.DefaultJobID, nil, nil) + runner := NewNomadJob(tests.DefaultJobID, nil, nil, nil) assert.Equal(t, tests.DefaultJobID, runner.ID()) } func TestMarshalRunner(t *testing.T) { - runner := NewNomadJob(tests.DefaultJobID, nil, nil) + runner := NewNomadJob(tests.DefaultJobID, nil, nil, nil) marshal, err := json.Marshal(runner) assert.NoError(t, err) assert.Equal(t, "{\"runnerId\":\""+tests.DefaultJobID+"\"}", string(marshal)) } func TestExecutionRequestIsStored(t *testing.T) { - runner := NewNomadJob(tests.DefaultJobID, nil, nil) + runner := NewNomadJob(tests.DefaultJobID, nil, nil, nil) executionRequest := &dto.ExecutionRequest{ Command: "command", TimeLimit: 10, @@ -48,7 +48,7 @@ func TestExecutionRequestIsStored(t *testing.T) { } func TestNewContextReturnsNewContextWithRunner(t *testing.T) { - runner := NewNomadJob(tests.DefaultRunnerID, nil, nil) + runner := NewNomadJob(tests.DefaultRunnerID, nil, nil, nil) ctx := context.Background() newCtx := NewContext(ctx, runner) storedRunner, ok := newCtx.Value(runnerContextKey).(Runner) @@ -59,7 +59,7 @@ func TestNewContextReturnsNewContextWithRunner(t *testing.T) { } func TestFromContextReturnsRunner(t *testing.T) { - runner := NewNomadJob(tests.DefaultRunnerID, nil, nil) + runner := NewNomadJob(tests.DefaultRunnerID, nil, nil, nil) ctx := NewContext(context.Background(), runner) storedRunner, ok := FromContext(ctx) @@ -389,5 +389,5 @@ func (s *InactivityTimerTestSuite) TestTimerIsInactiveWhenDurationIsZero() { // NewRunner creates a new runner with the provided id and manager. func NewRunner(id string, manager Manager) Runner { - return NewNomadJob(id, nil, manager) + return NewNomadJob(id, nil, nil, manager) }