From 3afcdeaba8ff35914154d57f2301eb0edc9ae0c9 Mon Sep 17 00:00:00 2001 From: Konrad Hanff Date: Thu, 20 May 2021 08:47:51 +0200 Subject: [PATCH] Execute commands in runner via WebSocket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This enables executing commands in runners and forwarding input and output between the runner and the websocket to the client. Co-authored-by: Maximilian Paß --- api/dto/dto.go | 115 +++++++++++++++- api/runners.go | 11 +- api/websocket.go | 254 ++++++++++++++++++++++++++++++++++-- ci/demo-job.tpl.nomad | 2 +- environment/job.go | 1 + nomad/api_querier.go | 15 +++ nomad/job.go | 1 + runner/execution_storage.go | 46 +++++++ runner/manager.go | 6 +- runner/runner.go | 106 +++++++-------- 10 files changed, 479 insertions(+), 78 deletions(-) create mode 100644 runner/execution_storage.go diff --git a/api/dto/dto.go b/api/dto/dto.go index 3049606..dbd8e90 100644 --- a/api/dto/dto.go +++ b/api/dto/dto.go @@ -1,5 +1,11 @@ package dto +import ( + "encoding/json" + "errors" + "fmt" +) + // RunnerRequest is the expected json structure of the request body for the ProvideRunner function. type RunnerRequest struct { ExecutionEnvironmentId int `json:"executionEnvironmentId"` @@ -13,6 +19,16 @@ type ExecutionRequest struct { Environment map[string]string } +func (er *ExecutionRequest) FullCommand() []string { + var command []string + command = append(command, "env", "-") + for variable, value := range er.Environment { + command = append(command, fmt.Sprintf("%s=%s", variable, value)) + } + command = append(command, "sh", "-c", er.Command) + return command +} + // ExecutionEnvironmentRequest is the expected json structure of the request body for the create execution environment function. // nolint:unused,structcheck type ExecutionEnvironmentRequest struct { @@ -33,9 +49,102 @@ type RunnerResponse struct { // TODO: specify content of the struct type FileCreation struct{} -// WebsocketResponse is the expected response when creating an execution for a runner. -type WebsocketResponse struct { - WebsocketUrl string `json:"websocketUrl"` +// ExecutionResponse is the expected response when creating an execution for a runner. +type ExecutionResponse struct { + WebSocketUrl string `json:"websocketUrl"` +} + +// WebSocketMessageType is the type for the messages from Poseidon to the client. +type WebSocketMessageType string + +const ( + WebSocketOutputStdout WebSocketMessageType = "stdout" + WebSocketOutputStderr WebSocketMessageType = "stderr" + WebSocketOutputError WebSocketMessageType = "error" + WebSocketMetaStart WebSocketMessageType = "start" + WebSocketMetaTimeout WebSocketMessageType = "timeout" + WebSocketExit WebSocketMessageType = "exit" +) + +// WebSocketMessage is the type for all messages send in the WebSocket to the client. +// Depending on the MessageType the Data or ExitCode might not be included in the marshaled json message. +type WebSocketMessage struct { + Type WebSocketMessageType + Data string + ExitCode uint8 +} + +// MarshalJSON implements the json.Marshaler interface. +// This converts the WebSocketMessage into the expected schema (see docs/websocket.schema.json). +func (m WebSocketMessage) MarshalJSON() ([]byte, error) { + switch m.Type { + case WebSocketOutputStdout, WebSocketOutputStderr, WebSocketOutputError: + return json.Marshal(struct { + MessageType WebSocketMessageType `json:"type"` + Data string `json:"data"` + }{m.Type, m.Data}) + case WebSocketMetaStart, WebSocketMetaTimeout: + return json.Marshal(struct { + MessageType WebSocketMessageType `json:"type"` + }{m.Type}) + case WebSocketExit: + return json.Marshal(struct { + MessageType WebSocketMessageType `json:"type"` + ExitCode uint8 `json:"data"` + }{m.Type, m.ExitCode}) + } + return nil, errors.New("unhandled WebSocket message type") +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// It is used by tests in order to ReceiveNextWebSocketMessage. +func (m *WebSocketMessage) UnmarshalJSON(rawMessage []byte) error { + messageMap := make(map[string]interface{}) + err := json.Unmarshal(rawMessage, &messageMap) + if err != nil { + return err + } + messageType, ok := messageMap["type"] + if !ok { + return errors.New("missing key type") + } + messageTypeString, ok := messageType.(string) + if !ok { + return errors.New("value of key type must be a string") + } + switch messageType := WebSocketMessageType(messageTypeString); messageType { + case WebSocketExit: + data, ok := messageMap["data"] + if !ok { + return errors.New("missing key data") + } + // json.Unmarshal converts any number to a float64 in the massageMap, so we must first cast it to the float. + exit, ok := data.(float64) + if !ok { + return errors.New("value of key data must be a number") + } + if exit != float64(uint8(exit)) { + return errors.New("value of key data must be uint8") + } + m.Type = messageType + m.ExitCode = uint8(exit) + case WebSocketOutputStdout, WebSocketOutputStderr, WebSocketOutputError: + data, ok := messageMap["data"] + if !ok { + return errors.New("missing key data") + } + text, ok := data.(string) + if !ok { + return errors.New("value of key data must be a string") + } + m.Type = messageType + m.Data = text + case WebSocketMetaStart, WebSocketMetaTimeout: + m.Type = messageType + default: + return errors.New("unknown WebSocket message type") + } + return nil } // ClientError is the response interface if the request is not valid. diff --git a/api/runners.go b/api/runners.go index a358c7e..68d94ac 100644 --- a/api/runners.go +++ b/api/runners.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "github.com/google/uuid" "github.com/gorilla/mux" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/config" @@ -81,20 +82,22 @@ func (r *RunnerController) execute(writer http.ResponseWriter, request *http.Req writeInternalServerError(writer, err, dto.ErrorUnknown) return } - id, err := targetRunner.AddExecution(*executionRequest) + newUuid, err := uuid.NewRandom() if err != nil { - log.WithError(err).Error("Could not store execution.") + log.WithError(err).Error("Could not create execution id") writeInternalServerError(writer, err, dto.ErrorUnknown) return } - websocketUrl := url.URL{ + id := runner.ExecutionId(newUuid.String()) + targetRunner.Add(id, executionRequest) + webSocketUrl := url.URL{ Scheme: scheme, Host: request.Host, Path: path.String(), RawQuery: fmt.Sprintf("%s=%s", ExecutionIdKey, id), } - sendJson(writer, &dto.WebsocketResponse{WebsocketUrl: websocketUrl.String()}, http.StatusOK) + sendJson(writer, &dto.ExecutionResponse{WebSocketUrl: webSocketUrl.String()}, http.StatusOK) } // The findRunnerMiddleware looks up the runnerId for routes containing it diff --git a/api/websocket.go b/api/websocket.go index b3085a2..7aca176 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -1,34 +1,264 @@ package api import ( + "context" + "encoding/json" "errors" "github.com/gorilla/websocket" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" "gitlab.hpi.de/codeocean/codemoon/poseidon/runner" + "io" "net/http" ) +type webSocketConnection interface { + WriteMessage(messageType int, data []byte) error + Close() error + NextReader() (messageType int, r io.Reader, err error) + CloseHandler() func(code int, text string) error + SetCloseHandler(handler func(code int, text string) error) +} + +type WebSocketReader interface { + io.Reader + readInputLoop() context.CancelFunc +} + +// codeOceanToRawReader is an io.Reader implementation that provides the content of the WebSocket connection to CodeOcean. +// You have to start the Reader by calling readInputLoop. After that you can use the Read function. +type codeOceanToRawReader struct { + connection webSocketConnection + + // A buffered channel of bytes is used to store data coming from CodeOcean via WebSocket + // and retrieve it when Read(..) is called. Since channels are thread-safe, we use one here + // instead of bytes.Buffer. + buffer chan byte +} + +func newCodeOceanToRawReader(connection webSocketConnection) *codeOceanToRawReader { + return &codeOceanToRawReader{ + connection: connection, + buffer: make(chan byte, 1024), + } +} + +// readInputLoop asynchronously reads from the WebSocket connection and buffers the user's input. +// This is necessary because input must be read for the connection to handle special messages like close and call the +// CloseHandler. +func (cr *codeOceanToRawReader) readInputLoop() context.CancelFunc { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + readMessage := make(chan bool) + for { + var messageType int + var reader io.Reader + var err error + + go func() { + messageType, reader, err = cr.connection.NextReader() + readMessage <- true + }() + select { + case <-readMessage: + case <-ctx.Done(): + return + } + + if err != nil { + log.WithError(err).Warn("Error reading client message") + return + } + if messageType != websocket.TextMessage { + log.WithField("messageType", messageType).Warn("Received message of wrong type") + return + } + + message, err := io.ReadAll(reader) + if err != nil { + log.WithError(err).Warn("error while reading WebSocket message") + return + } + for _, character := range message { + select { + case cr.buffer <- character: + case <-ctx.Done(): + return + } + } + } + }() + return cancel +} + +// Read implements the io.Reader interface. +// It returns bytes from the buffer. +func (cr *codeOceanToRawReader) Read(p []byte) (n int, err error) { + for n = 0; n < len(p); n++ { + select { + case p[n] = <-cr.buffer: + default: + return + } + } + return +} + +// rawToCodeOceanWriter is an io.Writer implementation that, when written to, wraps the written data in the appropriate +// json structure and sends it to the CodeOcean via WebSocket. +type rawToCodeOceanWriter struct { + proxy *webSocketProxy + outputType dto.WebSocketMessageType +} + +// Write implements the io.Writer interface. +// The passed data is forwarded to the WebSocket to CodeOcean. +func (rc *rawToCodeOceanWriter) Write(p []byte) (int, error) { + err := rc.proxy.sendToClient(dto.WebSocketMessage{Type: rc.outputType, Data: string(p)}) + return len(p), err +} + +// webSocketProxy is an encapsulation of logic for forwarding between Runners and CodeOcean. +type webSocketProxy struct { + userExit chan bool + connection webSocketConnection + Stdin WebSocketReader + Stdout io.Writer + Stderr io.Writer +} + +// upgradeConnection upgrades a connection to a websocket and returns a webSocketProxy for this connection. +func upgradeConnection(writer http.ResponseWriter, request *http.Request) (webSocketConnection, error) { + connUpgrader := websocket.Upgrader{} + connection, err := connUpgrader.Upgrade(writer, request, nil) + if err != nil { + log.WithError(err).Warn("Connection upgrade failed") + return nil, err + } + return connection, nil +} + +// newWebSocketProxy returns a initiated and started webSocketProxy. +// As this proxy is already started, a start message is send to the client. +func newWebSocketProxy(connection webSocketConnection) (*webSocketProxy, error) { + stdin := newCodeOceanToRawReader(connection) + proxy := &webSocketProxy{ + connection: connection, + Stdin: stdin, + userExit: make(chan bool), + } + proxy.Stdout = &rawToCodeOceanWriter{proxy: proxy, outputType: dto.WebSocketOutputStdout} + proxy.Stderr = &rawToCodeOceanWriter{proxy: proxy, outputType: dto.WebSocketOutputStderr} + + err := proxy.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaStart}) + if err != nil { + return nil, err + } + + closeHandler := connection.CloseHandler() + connection.SetCloseHandler(func(code int, text string) error { + // The default close handler always returns nil, so the error can be safely ignored. + _ = closeHandler(code, text) + close(proxy.userExit) + return nil + }) + return proxy, nil +} + +// waitForExit waits for an exit of either the runner (when the command terminates) or the client closing the WebSocket +// and handles WebSocket exit messages. +func (wp *webSocketProxy) waitForExit(exit <-chan runner.ExitInfo, cancelExecution context.CancelFunc) { + defer wp.close() + + cancelInputLoop := wp.Stdin.readInputLoop() + var exitInfo runner.ExitInfo + select { + case exitInfo = <-exit: + cancelInputLoop() + log.Info("Execution returned") + case <-wp.userExit: + cancelInputLoop() + cancelExecution() + log.Info("Client closed the connection") + return + } + + if exitInfo.Err == context.DeadlineExceeded { + err := wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketMetaTimeout}) + if err != nil { + return + } + return + } else if exitInfo.Err != nil { + errorMessage := "Error executing the request" + log.WithError(exitInfo.Err).Warn(errorMessage) + _ = wp.sendToClient(dto.WebSocketMessage{Type: dto.WebSocketOutputError, Data: errorMessage}) + return + } + log.WithField("exit_code", exitInfo.Code).Debug() + + err := wp.sendToClient(dto.WebSocketMessage{ + Type: dto.WebSocketExit, + ExitCode: exitInfo.Code, + }) + if err != nil { + return + } +} + +func (wp *webSocketProxy) sendToClient(message dto.WebSocketMessage) error { + encodedMessage, err := json.Marshal(message) + if err != nil { + log.WithField("message", message).WithError(err).Warn("Marshal error") + wp.closeWithError("Error creating message") + return err + } + err = wp.connection.WriteMessage(websocket.TextMessage, encodedMessage) + if err != nil { + errorMessage := "Error writing the exit message" + log.WithError(err).Warn(errorMessage) + wp.closeWithError(errorMessage) + return err + } + return nil +} + +func (wp *webSocketProxy) closeWithError(message string) { + err := wp.connection.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, message)) + if err != nil { + log.WithError(err).Warn("Error during websocket close") + } +} + +func (wp *webSocketProxy) close() { + err := wp.connection.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + _ = wp.connection.Close() + if err != nil { + log.WithError(err).Warn("Error during websocket close") + } +} + // connectToRunner is the endpoint for websocket connections. func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *http.Request) { targetRunner, _ := runner.FromContext(request.Context()) executionId := runner.ExecutionId(request.URL.Query().Get(ExecutionIdKey)) - _, ok := targetRunner.Execution(executionId) + executionRequest, ok := targetRunner.Pop(executionId) if !ok { writeNotFound(writer, errors.New("executionId does not exist")) return } - log. - WithField("runnerId", targetRunner.Id()). - WithField("executionId", executionId). - Info("Running execution") - connUpgrader := websocket.Upgrader{} - connClient, err := connUpgrader.Upgrade(writer, request, nil) - if err != nil { - log.WithError(err).Warn("Connection upgrade failed") - return - } - err = connClient.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + + connection, err := upgradeConnection(writer, request) if err != nil { writeInternalServerError(writer, err, dto.ErrorUnknown) + return } + proxy, err := newWebSocketProxy(connection) + if err != nil { + return + } + + log.WithField("runnerId", targetRunner.Id()).WithField("executionId", executionId).Info("Running execution") + exit, cancel := targetRunner.Execute(executionRequest, proxy.Stdin, proxy.Stdout, proxy.Stderr) + + proxy.waitForExit(exit, cancel) } diff --git a/ci/demo-job.tpl.nomad b/ci/demo-job.tpl.nomad index a321378..7dcf20e 100644 --- a/ci/demo-job.tpl.nomad +++ b/ci/demo-job.tpl.nomad @@ -23,7 +23,7 @@ job "python" { weight = 100 } - task "python-task" { + task "default-task" { driver = "docker" kill_timeout = "0s" kill_signal = "SIGKILL" diff --git a/environment/job.go b/environment/job.go index 935eb18..d274669 100644 --- a/environment/job.go +++ b/environment/job.go @@ -13,6 +13,7 @@ import ( const ( DefaultTaskDriver = "docker" TaskNameFormat = "%s-task" + TaskName = "python-job-task" ) // defaultJobHCL holds our default job in HCL format. diff --git a/nomad/api_querier.go b/nomad/api_querier.go index a07d78f..4505068 100644 --- a/nomad/api_querier.go +++ b/nomad/api_querier.go @@ -3,6 +3,7 @@ package nomad import ( "context" nomadApi "github.com/hashicorp/nomad/api" + "io" "net/url" ) @@ -23,6 +24,10 @@ type apiQuerier interface { // DeleteRunner deletes the runner with the given Id. DeleteRunner(runnerId string) (err error) + // ExecuteCommand runs a command in the passed allocation. + ExecuteCommand(allocationID string, ctx context.Context, command []string, + stdin io.Reader, stdout, stderr io.Writer) (int, error) + // loadRunners loads all allocations of the specified job. loadRunners(jobId string) (allocationListStub []*nomadApi.AllocationListStub, err error) @@ -60,6 +65,16 @@ func (nc *nomadApiClient) DeleteRunner(runnerId string) (err error) { return err } +func (nc *nomadApiClient) ExecuteCommand(allocationID string, + ctx context.Context, command []string, + stdin io.Reader, stdout, stderr io.Writer) (int, error) { + allocation, _, err := nc.client.Allocations().Info(allocationID, nil) + if err != nil { + return 1, err + } + return nc.client.Allocations().Exec(ctx, allocation, TaskName, true, command, stdin, stdout, stderr, nil, nil) +} + func (nc *nomadApiClient) loadRunners(jobId string) (allocationListStub []*nomadApi.AllocationListStub, err error) { allocationListStub, _, err = nc.client.Jobs().Allocations(jobId, true, nil) return diff --git a/nomad/job.go b/nomad/job.go index 6890565..4fe11f9 100644 --- a/nomad/job.go +++ b/nomad/job.go @@ -7,6 +7,7 @@ import ( const ( TaskGroupNameFormat = "%s-group" + TaskName = "default-task" ) // LoadJobList loads the list of jobs from the Nomad api. diff --git a/runner/execution_storage.go b/runner/execution_storage.go new file mode 100644 index 0000000..74a881c --- /dev/null +++ b/runner/execution_storage.go @@ -0,0 +1,46 @@ +package runner + +import ( + "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" + "sync" +) + +// ExecutionStorage stores executions. +type ExecutionStorage interface { + // Add adds a runner to the storage. + // It overwrites the existing execution if an execution with the same id already exists. + Add(id ExecutionId, executionRequest *dto.ExecutionRequest) + + // Pop deletes the execution with the given id from the storage and returns it. + // If no such execution exists, ok is false and true otherwise. + Pop(id ExecutionId) (request *dto.ExecutionRequest, ok bool) +} + +// localExecutionStorage stores execution objects in the local application memory. +// ToDo: Create implementation that use some persistent storage like a database +type localExecutionStorage struct { + sync.RWMutex + executions map[ExecutionId]*dto.ExecutionRequest +} + +// NewLocalExecutionStorage responds with an ExecutionStorage implementation. +// This implementation stores the data thread-safe in the local application memory. +func NewLocalExecutionStorage() *localExecutionStorage { + return &localExecutionStorage{ + executions: make(map[ExecutionId]*dto.ExecutionRequest), + } +} + +func (s *localExecutionStorage) Add(id ExecutionId, executionRequest *dto.ExecutionRequest) { + s.Lock() + defer s.Unlock() + s.executions[id] = executionRequest +} + +func (s *localExecutionStorage) Pop(id ExecutionId) (request *dto.ExecutionRequest, ok bool) { + s.Lock() + defer s.Unlock() + request, ok = s.executions[id] + delete(s.executions, id) + return +} diff --git a/runner/manager.go b/runner/manager.go index ffd1eb1..c946173 100644 --- a/runner/manager.go +++ b/runner/manager.go @@ -16,6 +16,10 @@ var ( type EnvironmentId int +func (e EnvironmentId) toString() string { + return string(rune(e)) +} + type NomadJobId string // Manager keeps track of the used and unused runners of all execution environments in order to provide unused runners to new clients and ensure no runner is used twice. @@ -153,7 +157,7 @@ func (m *NomadRunnerManager) unusedRunners(environmentId EnvironmentId, fetchedR if !ok { _, ok = job.idleRunners.Get(runnerId) if !ok { - newRunners = append(newRunners, NewRunner(runnerId)) + newRunners = append(newRunners, NewNomadAllocation(runnerId, m.apiClient)) } } } diff --git a/runner/runner.go b/runner/runner.go index 34de519..c95e5dc 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -3,9 +3,10 @@ package runner import ( "context" "encoding/json" - "github.com/google/uuid" "gitlab.hpi.de/codeocean/codemoon/poseidon/api/dto" - "sync" + "gitlab.hpi.de/codeocean/codemoon/poseidon/nomad" + "io" + "time" ) // ContextKey is the type for keys in a request context. @@ -23,18 +24,11 @@ type Runner interface { // Id returns the id of the runner. Id() string - // AddExecution saves the supplied ExecutionRequest for the runner and returns an ExecutionId to retrieve it again. - AddExecution(dto.ExecutionRequest) (ExecutionId, error) + ExecutionStorage - // Execution looks up an ExecutionId for the runner and returns the associated RunnerRequest. - // If this request does not exit, ok is false, else true. - Execution(ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) - - // DeleteExecution deletes the execution of the runner with the specified id. - DeleteExecution(ExecutionId) - - // Execute executes the execution with the given ID. - Execute(ExecutionId) + // Execute runs the given execution request and forwards from and to the given reader and writers. + // An ExitInfo is sent to the exit channel on command completion. + Execute(request *dto.ExecutionRequest, stdin io.Reader, stdout, stderr io.Writer) (exit <-chan ExitInfo, cancel context.CancelFunc) // Copy copies the specified files into the runner. Copy(dto.FileCreation) @@ -42,21 +36,56 @@ type Runner interface { // NomadAllocation is an abstraction to communicate with Nomad allocations. type NomadAllocation struct { - sync.RWMutex - id string - ch chan bool - executions map[ExecutionId]dto.ExecutionRequest + ExecutionStorage + id string + api nomad.ExecutorApi } // NewRunner creates a new runner with the provided id. func NewRunner(id string) Runner { + return NewNomadAllocation(id, nil) +} + +// NewNomadAllocation creates a new Nomad allocation with the provided id. +func NewNomadAllocation(id string, apiClient nomad.ExecutorApi) *NomadAllocation { return &NomadAllocation{ - id: id, - ch: make(chan bool), - executions: make(map[ExecutionId]dto.ExecutionRequest), + id: id, + api: apiClient, + ExecutionStorage: NewLocalExecutionStorage(), } } +func (r *NomadAllocation) Id() string { + return r.id +} + +type ExitInfo struct { + Code uint8 + Err error +} + +func (r *NomadAllocation) Execute(request *dto.ExecutionRequest, stdin io.Reader, stdout, stderr io.Writer) (<-chan ExitInfo, context.CancelFunc) { + command := request.FullCommand() + var ctx context.Context + var cancel context.CancelFunc + if request.TimeLimit == 0 { + ctx, cancel = context.WithCancel(context.Background()) + } else { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(request.TimeLimit)*time.Second) + } + exit := make(chan ExitInfo) + go func() { + exitCode, err := r.api.ExecuteCommand(r.Id(), ctx, command, stdin, stdout, stderr) + exit <- ExitInfo{uint8(exitCode), err} + close(exit) + }() + return exit, cancel +} + +func (r *NomadAllocation) Copy(files dto.FileCreation) { + +} + // MarshalJSON implements json.Marshaler interface. // This exports private attributes like the id too. func (r *NomadAllocation) MarshalJSON() ([]byte, error) { @@ -67,43 +96,6 @@ func (r *NomadAllocation) MarshalJSON() ([]byte, error) { }) } -func (r *NomadAllocation) Id() string { - return r.id -} - -func (r *NomadAllocation) Execution(id ExecutionId) (executionRequest dto.ExecutionRequest, ok bool) { - r.RLock() - defer r.RUnlock() - executionRequest, ok = r.executions[id] - return -} - -func (r *NomadAllocation) AddExecution(request dto.ExecutionRequest) (ExecutionId, error) { - r.Lock() - defer r.Unlock() - idUuid, err := uuid.NewRandom() - if err != nil { - return ExecutionId(""), err - } - id := ExecutionId(idUuid.String()) - r.executions[id] = request - return id, err -} - -func (r *NomadAllocation) Execute(id ExecutionId) { - -} - -func (r *NomadAllocation) Copy(files dto.FileCreation) { - -} - -func (r *NomadAllocation) DeleteExecution(id ExecutionId) { - r.Lock() - defer r.Unlock() - delete(r.executions, id) -} - // NewContext creates a context containing a runner. func NewContext(ctx context.Context, runner Runner) context.Context { return context.WithValue(ctx, runnerContextKey, runner)