Recover Runner Allocations on startup.

This commit is contained in:
Maximilian Paß
2023-03-31 17:26:44 +01:00
committed by Sebastian Serth
parent 038d71ff51
commit 8950ce29d8
5 changed files with 89 additions and 909 deletions

View File

@ -93,13 +93,19 @@ type ExecutorAPI interface {
type APIClient struct {
apiQuerier
evaluations map[string]chan error
// allocations contain management data for all pending and running allocations.
allocations storage.Storage[*allocationData]
isListening bool
}
// NewExecutorAPI creates a new api client.
// One client is usually sufficient for the complete runtime of the API.
func NewExecutorAPI(nomadConfig *config.Nomad) (ExecutorAPI, error) {
client := &APIClient{apiQuerier: &nomadAPIClient{}, evaluations: map[string]chan error{}}
client := &APIClient{
apiQuerier: &nomadAPIClient{},
evaluations: map[string]chan error{},
allocations: storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations, nil, 0, nil),
}
err := client.init(nomadConfig)
return client, err
}
@ -138,6 +144,8 @@ func (a *APIClient) LoadRunnerPortMappings(runnerID string) ([]nomadApi.PortMapp
}
func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi.Job, error) {
go a.initializeAllocations()
runnerIDs, err := a.LoadRunnerIDs(RunnerJobID(environmentID, ""))
if err != nil {
return []*nomadApi.Job{}, fmt.Errorf("couldn't load jobs: %w", err)
@ -192,15 +200,13 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP
if err != nil {
return fmt.Errorf("failed retrieving allocation stream: %w", err)
}
// allocations contain management data for all pending and running allocations.
allocations := storage.NewMonitoredLocalStorage[*allocationData](monitoring.MeasurementNomadAllocations, nil, 0, nil)
handler := func(event *nomadApi.Event) (bool, error) {
switch event.Topic {
case nomadApi.TopicEvaluation:
return false, handleEvaluationEvent(a.evaluations, event)
case nomadApi.TopicAllocation:
return false, handleAllocationEvent(startTime, allocations, event, callbacks)
return false, handleAllocationEvent(startTime, a.allocations, event, callbacks)
default:
return false, nil
}
@ -212,6 +218,24 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP
return err
}
func (a *APIClient) initializeAllocations() {
allocationStubs, err := a.listAllocations()
if err != nil {
log.WithError(err).Warn("Could not initialize allocations")
} else {
for _, stub := range allocationStubs {
if IsEnvironmentTemplateID(stub.JobID) {
continue
} else if stub.ClientStatus == structs.AllocClientStatusPending ||
stub.ClientStatus == structs.AllocClientStatusRunning {
log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Runner")
a.allocations.Add(stub.ID,
&allocationData{allocClientStatus: stub.ClientStatus, start: time.Unix(0, stub.CreateTime)})
}
}
}
}
// nomadAPIEventHandler is a function that receives a nomadApi.Event and processes it.
// It is called by an event listening loop. For each received event, the function is called.
// If done is true, the calling function knows that it should break out of the event listening