Add updating cached allocations
This commit is contained in:

committed by
Maximilian Pass

parent
66821dbfc8
commit
3f572261c2
@@ -1,9 +1,12 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
nomadApi "github.com/hashicorp/nomad/api"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/logging"
|
||||
"gitlab.hpi.de/codeocean/codemoon/poseidon/nomad"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -49,12 +52,14 @@ type NomadRunnerManager struct {
|
||||
usedRunners Storage
|
||||
}
|
||||
|
||||
func NewNomadRunnerManager(apiClient nomad.ExecutorApi) *NomadRunnerManager {
|
||||
return &NomadRunnerManager{
|
||||
func NewNomadRunnerManager(apiClient nomad.ExecutorApi, ctx context.Context) *NomadRunnerManager {
|
||||
m := &NomadRunnerManager{
|
||||
apiClient,
|
||||
NewLocalNomadJobStorage(),
|
||||
NewLocalRunnerStorage(),
|
||||
}
|
||||
go m.updateRunners(ctx)
|
||||
return m
|
||||
}
|
||||
|
||||
type NomadJob struct {
|
||||
@@ -113,6 +118,41 @@ func (m *NomadRunnerManager) Return(r Runner) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m *NomadRunnerManager) updateRunners(ctx context.Context) {
|
||||
onCreate := func(alloc *nomadApi.Allocation) {
|
||||
log.WithField("id", alloc.ID).Debug("Allocation started")
|
||||
|
||||
intJobID, err := strconv.Atoi(alloc.JobID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
job, ok := m.jobs.Get(EnvironmentId(intJobID))
|
||||
if ok {
|
||||
job.idleRunners.Add(NewRunner(alloc.ID))
|
||||
}
|
||||
}
|
||||
onStop := func(alloc *nomadApi.Allocation) {
|
||||
log.WithField("id", alloc.ID).Debug("Allocation stopped")
|
||||
|
||||
intJobID, err := strconv.Atoi(alloc.JobID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
job, ok := m.jobs.Get(EnvironmentId(intJobID))
|
||||
if ok {
|
||||
job.idleRunners.Delete(alloc.ID)
|
||||
m.usedRunners.Delete(alloc.ID)
|
||||
}
|
||||
}
|
||||
|
||||
err := m.apiClient.WatchAllocations(ctx, onCreate, onStop)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed updating runners")
|
||||
}
|
||||
}
|
||||
|
||||
// Refresh Big ToDo: Improve this function!! State out that it also rescales the job; Provide context to be terminable...
|
||||
func (m *NomadRunnerManager) refreshEnvironment(id EnvironmentId) {
|
||||
job, ok := m.jobs.Get(id)
|
||||
|
Reference in New Issue
Block a user