Files
polyphem/cmd/poseidon/main.go
2024-08-12 10:02:36 +02:00

424 lines
13 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"github.com/coreos/go-systemd/v22/activation"
"github.com/coreos/go-systemd/v22/daemon"
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/gorilla/mux"
"github.com/openHPI/poseidon/internal/api"
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/internal/environment"
"github.com/openHPI/poseidon/internal/runner"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/logging"
"github.com/openHPI/poseidon/pkg/monitoring"
"k8s.io/client-go/kubernetes"
"net"
"net/http"
"os"
"os/signal"
"regexp"
"runtime"
"runtime/debug"
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
var (
gracefulShutdownWait = 15 * time.Second
log = logging.GetLogger("main")
// If pgoEnabled is true, the binary was built with PGO enabled.
// This is set during compilation with our Makefile as a STRING.
pgoEnabled = "false"
)
func getVcsRevision(short bool) string {
vcsRevision := "unknown"
vcsModified := false
if info, ok := debug.ReadBuildInfo(); ok {
for _, setting := range info.Settings {
if setting.Key == "vcs.revision" {
vcsRevision = setting.Value
} else if setting.Key == "vcs.modified" {
var err error
vcsModified, err = strconv.ParseBool(setting.Value)
if err != nil {
vcsModified = true // fallback to true, so we can see that something is wrong
log.WithError(err).Error("Could not parse the vcs.modified setting")
}
}
}
}
if short {
vcsRevision = vcsRevision[:7]
}
if vcsModified {
return vcsRevision + "-modified"
} else {
return vcsRevision
}
}
func initializeUserAgent() {
dto.UserAgentOut = strings.ReplaceAll(dto.UserAgentOut, dto.UserAgentVCSPlaceholder, getVcsRevision(true))
dto.UserAgentFiltered = strings.ReplaceAll(dto.UserAgentFiltered, dto.UserAgentVCSPlaceholder, getVcsRevision(true))
dto.UserAgentFiltered = strings.ReplaceAll(dto.UserAgentFiltered, dto.UserAgentFilterTokenPlaceholder, config.Config.Server.LoggingFilterToken)
}
func initSentry(options *sentry.ClientOptions, profilingEnabled bool) {
if options.Release == "" {
commit := getVcsRevision(false)
options.Release = commit
}
options.BeforeSendTransaction = func(event *sentry.Event, _ *sentry.EventHint) *sentry.Event {
if event.Tags == nil {
event.Tags = make(map[string]string)
}
event.Tags["go_pgo"] = pgoEnabled
event.Tags["go_profiling"] = strconv.FormatBool(profilingEnabled)
return event
}
if err := sentry.Init(*options); err != nil {
log.Errorf("sentry.Init: %s", err)
}
}
func shutdownSentry() {
if err := recover(); err != nil {
sentry.CurrentHub().Recover(err)
sentry.Flush(logging.GracefulSentryShutdown)
}
}
func initProfiling(options config.Profiling) (cancel func()) {
if options.CPUEnabled {
profile, err := os.Create(options.CPUFile)
if err != nil {
log.WithError(err).Error("Error while opening the profile file")
}
log.Debug("Starting CPU profiler")
if err := pprof.StartCPUProfile(profile); err != nil {
log.WithError(err).Error("Error while starting the CPU profiler!!")
}
cancel = func() {
if options.CPUEnabled {
log.Debug("Stopping CPU profiler")
pprof.StopCPUProfile()
if err := profile.Close(); err != nil {
log.WithError(err).Error("Error while closing profile file")
}
}
}
} else {
cancel = func() {}
}
return cancel
}
// watchMemoryAndAlert monitors the memory usage of Poseidon and sends an alert if it exceeds a threshold.
func watchMemoryAndAlert(options config.Profiling) {
if options.MemoryInterval == 0 {
return
}
var exceeded bool
for {
var stats runtime.MemStats
runtime.ReadMemStats(&stats)
log.WithField("heap", stats.HeapAlloc).Trace("Current Memory Usage")
const megabytesToBytes = 1000 * 1000
if !exceeded && stats.HeapAlloc >= uint64(options.MemoryThreshold)*megabytesToBytes {
exceeded = true
log.WithField("heap", stats.HeapAlloc).Warn("Memory Threshold exceeded")
err := pprof.Lookup("heap").WriteTo(os.Stderr, 1)
if err != nil {
log.WithError(err).Warn("Failed to log the heap profile")
}
err = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
if err != nil {
log.WithError(err).Warn("Failed to log the goroutines")
}
} else if exceeded {
exceeded = false
log.WithField("heap", stats.HeapAlloc).Info("Memory Threshold no longer exceeded")
}
select {
case <-time.After(time.Duration(options.MemoryInterval) * time.Millisecond):
continue
case <-context.Background().Done():
return
}
}
}
func runServer(router *mux.Router, server *http.Server, cancel context.CancelFunc) {
defer cancel()
defer shutdownSentry() // shutdownSentry must be executed in the main goroutine.
httpListeners := getHTTPListeners(server)
notifySystemd(router)
serveHTTPListeners(server, httpListeners)
}
func getHTTPListeners(server *http.Server) (httpListeners []net.Listener) {
var err error
if config.Config.Server.SystemdSocketActivation {
httpListeners, err = activation.Listeners()
} else {
var httpListener net.Listener
httpListener, err = net.Listen("tcp", server.Addr)
httpListeners = append(httpListeners, httpListener)
}
if err != nil || httpListeners == nil || len(httpListeners) == 0 {
log.WithError(err).
WithField("listeners", httpListeners).
WithField("systemd_socket", config.Config.Server.SystemdSocketActivation).
Fatal("Failed listening to any socket")
return nil
}
return httpListeners
}
func serveHTTPListeners(server *http.Server, httpListeners []net.Listener) {
var wg sync.WaitGroup
wg.Add(len(httpListeners))
for _, l := range httpListeners {
go func(listener net.Listener) {
defer wg.Done()
log.WithField("address", listener.Addr()).Info("Serving Listener")
serveHTTPListener(server, listener)
}(l)
}
wg.Wait()
}
func serveHTTPListener(server *http.Server, l net.Listener) {
var err error
if config.Config.Server.TLS.Active {
server.TLSConfig = config.TLSConfig
log.WithField("CertFile", config.Config.Server.TLS.CertFile).
WithField("KeyFile", config.Config.Server.TLS.KeyFile).
Debug("Using TLS")
err = server.ServeTLS(l, config.Config.Server.TLS.CertFile, config.Config.Server.TLS.KeyFile)
} else {
err = server.Serve(l)
}
if errors.Is(err, http.ErrServerClosed) {
log.WithError(err).WithField("listener", l.Addr()).Info("Server closed")
} else {
log.WithError(err).WithField("listener", l.Addr()).Error("Error during listening and serving")
}
}
func notifySystemd(router *mux.Router) {
notify, err := daemon.SdNotify(false, daemon.SdNotifyReady)
switch {
case err == nil && !notify:
log.Debug("Systemd Readiness Notification not supported")
case err != nil:
log.WithError(err).WithField("notify", notify).Warn("Failed notifying Readiness to Systemd")
default:
log.Trace("Notified Readiness to Systemd")
}
interval, err := daemon.SdWatchdogEnabled(false)
if err != nil || interval == 0 {
log.WithError(err).Error("Systemd Watchdog not supported")
return
}
go systemdWatchdogLoop(context.Background(), router, interval)
}
func systemdWatchdogLoop(ctx context.Context, router *mux.Router, interval time.Duration) {
healthRoute, err := router.Get(api.HealthPath).URL()
if err != nil {
log.WithError(err).Error("Failed to parse Health route")
return
}
healthURL := config.Config.Server.URL().String() + healthRoute.String()
// Workaround for certificate subject names
unspecifiedAddresses := regexp.MustCompile(`0\.0\.0\.0|\[::]`)
healthURL = unspecifiedAddresses.ReplaceAllString(healthURL, "localhost")
client := &http.Client{}
if config.Config.Server.TLS.Active {
tlsConfig := &tls.Config{RootCAs: x509.NewCertPool()} // #nosec G402 The default MinTLSVersion is secure.
caCertBytes, err := os.ReadFile(config.Config.Server.TLS.CAFile)
if err != nil {
log.WithError(err).Warn("Cannot read tls ca file")
} else {
ok := tlsConfig.RootCAs.AppendCertsFromPEM(caCertBytes)
log.WithField("success", ok).Trace("Loaded CA certificate")
}
client.Transport = &http.Transport{TLSClientConfig: tlsConfig}
}
// notificationIntervalFactor defines how many more notifications we send than required.
const notificationIntervalFactor = 2
for {
select {
case <-ctx.Done():
return
case <-time.After(interval / notificationIntervalFactor):
notifySystemdWatchdog(ctx, healthURL, client)
}
}
}
func notifySystemdWatchdog(ctx context.Context, healthURL string, client *http.Client) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthURL, http.NoBody)
if err != nil {
return
}
req.Header.Set("User-Agent", dto.UserAgentFiltered)
resp, err := client.Do(req)
if err != nil {
log.WithError(err).Debug("Failed watchdog health check")
// We do not check for resp.StatusCode == 503 as Poseidon's error recovery will try to handle such errors
// by itself. The Watchdog should just check that Poseidon handles http requests at all.
return
}
_ = resp.Body.Close()
notify, err := daemon.SdNotify(false, daemon.SdNotifyWatchdog)
switch {
case err == nil && !notify:
log.Debug("Systemd Watchdog Notification not supported")
case err != nil:
log.WithError(err).WithField("notify", notify).Warn("Failed notifying Systemd Watchdog")
default:
log.Trace("Notified Systemd Watchdog")
}
}
type managerCreator func(ctx context.Context) (
runnerManager runner.Manager, environmentManager environment.ManagerHandler)
// createManagerHandler adds the managers of the passed managerCreator to the chain of responsibility.
func createManagerHandler(handler managerCreator, enabled bool,
nextRunnerManager runner.Manager, nextEnvironmentManager environment.ManagerHandler, ctx context.Context) (
runnerManager runner.Manager, environmentManager environment.ManagerHandler) {
if !enabled {
return nextRunnerManager, nextEnvironmentManager
}
runnerManager, environmentManager = handler(ctx)
runnerManager.SetNextHandler(nextRunnerManager)
environmentManager.SetNextHandler(nextEnvironmentManager)
return runnerManager, environmentManager
}
func createKubernetesManager(ctx context.Context) (runner.Manager, environment.ManagerHandler) {
// API initialization
kubernetesClient, err := kubernetes.NewForConfig(config.Config.Kubernetes.Config)
if err != nil {
log.WithError(err).Fatal("Failed to create kubernetes client")
}
runnerManager := runner.NewKubernetesRunnerManager(ctx, kubernetesClient)
environmentManager := environment.NewKubernetesEnvironmentManager(runnerManager, kubernetesClient)
return runnerManager, environmentManager
}
// initRouter builds a router that serves the API with the chain of responsibility for multiple managers.
func initRouter(ctx context.Context) *mux.Router {
runnerManager, environmentManager := createManagerHandler(createKubernetesManager, config.Config.Kubernetes.Enabled,
nil, nil, ctx)
return api.NewRouter(runnerManager, environmentManager)
}
// initServer creates a server that serves the routes provided by the router.
func initServer(router *mux.Router) *http.Server {
sentryHandler := sentryhttp.New(sentryhttp.Options{}).Handle(router)
const readTimeout = 15 * time.Second
const idleTimeout = 60 * time.Second
return &http.Server{
Addr: config.Config.Server.URL().Host,
// A WriteTimeout would prohibit long-running requests such as creating an execution environment.
// See also https://github.com/openHPI/poseidon/pull/68.
// WriteTimeout: time.Second * 15,
ReadHeaderTimeout: readTimeout,
ReadTimeout: readTimeout,
IdleTimeout: idleTimeout,
Handler: sentryHandler,
}
}
// shutdownOnOSSignal listens for a signal from the operating system
// When receiving a signal the server shuts down but waits up to 15 seconds to close remaining connections.
func shutdownOnOSSignal(server *http.Server, ctx context.Context, stopProfiling func()) {
// wait for SIGINT
shutdownSignals := make(chan os.Signal, 1)
signal.Notify(shutdownSignals, syscall.Signal(0x2), syscall.Signal(0xf), syscall.Signal(0x6))
// wait for SIGUSR1
writeProfileSignal := make(chan os.Signal, 1)
signal.Notify(writeProfileSignal, syscall.Signal(0xa))
select {
case <-ctx.Done():
os.Exit(1)
case <-writeProfileSignal:
log.Info("Received SIGUSR1...")
stopProfiling()
// Continue listening on signals and replace `stopProfiling` with an empty function
shutdownOnOSSignal(server, ctx, func() {})
case <-shutdownSignals:
log.Info("Received SIGINT, shutting down...")
defer stopProfiling()
ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownWait)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.WithError(err).Warn("error shutting server down")
}
}
}
func main() {
if err := config.InitConfig(); err != nil {
log.WithError(err).Warn("Could not initialize configuration")
}
initializeUserAgent()
logging.InitializeLogging(config.Config.Logger.Level, config.Config.Logger.Formatter)
initSentry(&config.Config.Sentry, config.Config.Profiling.CPUEnabled)
cancelInflux := monitoring.InitializeInfluxDB(&config.Config.InfluxDB)
defer cancelInflux()
stopProfiling := initProfiling(config.Config.Profiling)
go watchMemoryAndAlert(config.Config.Profiling)
ctx, cancel := context.WithCancel(context.Background())
router := initRouter(ctx)
server := initServer(router)
go runServer(router, server, cancel)
shutdownOnOSSignal(server, ctx, stopProfiling)
}