@ -3,7 +3,6 @@ package environment
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
nomadApi "github.com/hashicorp/nomad/api"
|
||||
@ -12,16 +11,13 @@ import (
|
||||
"github.com/openHPI/poseidon/internal/runner"
|
||||
"github.com/openHPI/poseidon/pkg/dto"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
portNumberBase = 10
|
||||
)
|
||||
|
||||
var (
|
||||
ErrorUpdatingExecutionEnvironment = errors.New("errors occurred when updating environment")
|
||||
)
|
||||
|
||||
type NomadEnvironment struct {
|
||||
jobHCL string
|
||||
job *nomadApi.Job
|
||||
@ -173,6 +169,11 @@ func (n *NomadEnvironment) SetNetworkAccess(allow bool, exposedPorts []uint16) {
|
||||
// Register creates a Nomad job based on the default job configuration and the given parameters.
|
||||
// It registers the job with Nomad and waits until the registration completes.
|
||||
func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error {
|
||||
// To avoid docker image issues. See https://github.com/openHPI/poseidon/issues/69
|
||||
if err := n.Delete(apiClient); err != nil {
|
||||
return fmt.Errorf("failed to remove the environment: %w", err)
|
||||
}
|
||||
|
||||
nomad.SetForcePullFlag(n.job, true) // This must be the default as otherwise new runners could have different images.
|
||||
evalID, err := apiClient.RegisterNomadJob(n.job)
|
||||
if err != nil {
|
||||
@ -188,7 +189,7 @@ func (n *NomadEnvironment) Register(apiClient nomad.ExecutorAPI) error {
|
||||
}
|
||||
|
||||
func (n *NomadEnvironment) Delete(apiClient nomad.ExecutorAPI) error {
|
||||
err := n.removeRunners(apiClient, uint(n.idleRunners.Length()))
|
||||
err := n.removeRunners(apiClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -205,36 +206,10 @@ func (n *NomadEnvironment) Scale(apiClient nomad.ExecutorAPI) error {
|
||||
if required > 0 {
|
||||
return n.createRunners(apiClient, uint(required), true)
|
||||
} else {
|
||||
return n.removeRunners(apiClient, uint(-required))
|
||||
return n.removeIdleRunners(apiClient, uint(-required))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NomadEnvironment) UpdateRunnerSpecs(apiClient nomad.ExecutorAPI) error {
|
||||
runners, err := apiClient.LoadRunnerIDs(n.ID().ToString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("update environment couldn't load runners: %w", err)
|
||||
}
|
||||
|
||||
var occurredError error
|
||||
for _, id := range runners {
|
||||
// avoid taking the address of the loop variable
|
||||
runnerID := id
|
||||
updatedRunnerJob := n.DeepCopyJob()
|
||||
updatedRunnerJob.ID = &runnerID
|
||||
updatedRunnerJob.Name = &runnerID
|
||||
nomad.SetForcePullFlag(updatedRunnerJob, true)
|
||||
|
||||
err := apiClient.RegisterRunnerJob(updatedRunnerJob)
|
||||
if err != nil {
|
||||
if occurredError == nil {
|
||||
occurredError = ErrorUpdatingExecutionEnvironment
|
||||
}
|
||||
occurredError = fmt.Errorf("%w; new api error for runner %s - %v", occurredError, id, err)
|
||||
}
|
||||
}
|
||||
return occurredError
|
||||
}
|
||||
|
||||
func (n *NomadEnvironment) Sample(apiClient nomad.ExecutorAPI) (runner.Runner, bool) {
|
||||
r, ok := n.idleRunners.Sample()
|
||||
if ok {
|
||||
@ -347,7 +322,7 @@ func (n *NomadEnvironment) createRunner(apiClient nomad.ExecutorAPI, forcePull b
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI, count uint) error {
|
||||
func (n *NomadEnvironment) removeIdleRunners(apiClient nomad.ExecutorAPI, count uint) error {
|
||||
log.WithField("runnersToDelete", count).WithField("id", n.ID()).Debug("Removing idle runners")
|
||||
for i := 0; i < int(count); i++ {
|
||||
r, ok := n.idleRunners.Sample()
|
||||
@ -361,3 +336,29 @@ func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI, count uint
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NomadEnvironment) removeRunners(apiClient nomad.ExecutorAPI) error {
|
||||
// Only to avoid timing issues as an idle runner is also removed when Nomad has deleted the allocation.
|
||||
for _, r := range n.idleRunners.List() {
|
||||
n.idleRunners.Delete(r.ID())
|
||||
}
|
||||
|
||||
ids, err := apiClient.LoadRunnerIDs(nomad.RunnerJobID(n.ID(), ""))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load runner ids: %w", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, id := range ids {
|
||||
wg.Add(1)
|
||||
go func(jobID string) {
|
||||
defer wg.Done()
|
||||
deleteErr := apiClient.DeleteJob(jobID)
|
||||
if deleteErr != nil {
|
||||
err = deleteErr
|
||||
}
|
||||
}(id)
|
||||
}
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
@ -110,8 +110,10 @@ func TestRegisterFailsWhenNomadJobRegistrationFails(t *testing.T) {
|
||||
expectedErr := tests.ErrDefault
|
||||
|
||||
apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", expectedErr)
|
||||
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
|
||||
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||
|
||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, nil}
|
||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
|
||||
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
|
||||
err := environment.Register(apiClientMock)
|
||||
|
||||
@ -125,8 +127,10 @@ func TestRegisterTemplateJobSucceedsWhenMonitoringEvaluationSucceeds(t *testing.
|
||||
|
||||
apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil)
|
||||
apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil)
|
||||
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
|
||||
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||
|
||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, nil}
|
||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
|
||||
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
|
||||
err := environment.Register(apiClientMock)
|
||||
|
||||
@ -139,8 +143,10 @@ func TestRegisterTemplateJobReturnsErrorWhenMonitoringEvaluationFails(t *testing
|
||||
|
||||
apiClientMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return(evaluationID, nil)
|
||||
apiClientMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(tests.ErrDefault)
|
||||
apiClientMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
|
||||
apiClientMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||
|
||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, nil}
|
||||
environment := &NomadEnvironment{"", &nomadApi.Job{}, runner.NewLocalRunnerStorage()}
|
||||
environment.SetID(tests.DefaultEnvironmentIDAsInteger)
|
||||
err := environment.Register(apiClientMock)
|
||||
|
||||
|
@ -134,10 +134,6 @@ func (m *NomadEnvironmentManager) CreateOrUpdate(id dto.EnvironmentID, request d
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error registering template job in API: %w", err)
|
||||
}
|
||||
err = environment.UpdateRunnerSpecs(m.api)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error updating runner jobs in API: %w", err)
|
||||
}
|
||||
err = environment.Scale(m.api)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error scaling template job in API: %w", err)
|
||||
|
@ -54,6 +54,8 @@ func (s *CreateOrUpdateTestSuite) SetupTest() {
|
||||
|
||||
func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentReturnsError() {
|
||||
s.apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", tests.ErrDefault)
|
||||
s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
|
||||
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||
s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false)
|
||||
s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
|
||||
_, err := s.manager.CreateOrUpdate(dto.EnvironmentID(tests.DefaultEnvironmentIDAsInteger), s.request)
|
||||
@ -62,6 +64,8 @@ func (s *CreateOrUpdateTestSuite) TestReturnsErrorIfCreatesOrUpdateEnvironmentRe
|
||||
|
||||
func (s *CreateOrUpdateTestSuite) TestCreateOrUpdatesSetsForcePullFlag() {
|
||||
s.apiMock.On("RegisterNomadJob", mock.AnythingOfType("*api.Job")).Return("", nil)
|
||||
s.apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
|
||||
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||
s.runnerManagerMock.On("GetEnvironment", mock.AnythingOfType("dto.EnvironmentID")).Return(nil, false)
|
||||
s.runnerManagerMock.On("SetEnvironment", mock.AnythingOfType("*environment.NomadEnvironment")).Return(true)
|
||||
s.apiMock.On("MonitorEvaluation", mock.AnythingOfType("string"), mock.Anything).Return(nil)
|
||||
@ -131,6 +135,8 @@ func TestNewNomadEnvironmentManager(t *testing.T) {
|
||||
func TestNomadEnvironmentManager_Get(t *testing.T) {
|
||||
apiMock := &nomad.ExecutorAPIMock{}
|
||||
mockWatchAllocations(apiMock)
|
||||
apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
|
||||
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
|
||||
call := apiMock.On("LoadEnvironmentJobs")
|
||||
call.Run(func(args mock.Arguments) {
|
||||
call.ReturnArguments = mock.Arguments{[]*nomadApi.Job{}, nil}
|
||||
|
Reference in New Issue
Block a user