package main import ( "context" "crypto/tls" "crypto/x509" "errors" "github.com/coreos/go-systemd/v22/activation" "github.com/coreos/go-systemd/v22/daemon" "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" "github.com/gorilla/mux" "github.com/openHPI/poseidon/internal/api" "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/environment" "github.com/openHPI/poseidon/internal/runner" "github.com/openHPI/poseidon/pkg/dto" "github.com/openHPI/poseidon/pkg/logging" "github.com/openHPI/poseidon/pkg/monitoring" "k8s.io/client-go/kubernetes" "net" "net/http" "os" "os/signal" "regexp" "runtime" "runtime/debug" "runtime/pprof" "strconv" "strings" "sync" "syscall" "time" ) var ( gracefulShutdownWait = 15 * time.Second log = logging.GetLogger("main") // If pgoEnabled is true, the binary was built with PGO enabled. // This is set during compilation with our Makefile as a STRING. pgoEnabled = "false" ) func getVcsRevision(short bool) string { vcsRevision := "unknown" vcsModified := false if info, ok := debug.ReadBuildInfo(); ok { for _, setting := range info.Settings { if setting.Key == "vcs.revision" { vcsRevision = setting.Value } else if setting.Key == "vcs.modified" { var err error vcsModified, err = strconv.ParseBool(setting.Value) if err != nil { vcsModified = true // fallback to true, so we can see that something is wrong log.WithError(err).Error("Could not parse the vcs.modified setting") } } } } if short { vcsRevision = vcsRevision[:7] } if vcsModified { return vcsRevision + "-modified" } else { return vcsRevision } } func initializeUserAgent() { dto.UserAgentOut = strings.ReplaceAll(dto.UserAgentOut, dto.UserAgentVCSPlaceholder, getVcsRevision(true)) dto.UserAgentFiltered = strings.ReplaceAll(dto.UserAgentFiltered, dto.UserAgentVCSPlaceholder, getVcsRevision(true)) dto.UserAgentFiltered = strings.ReplaceAll(dto.UserAgentFiltered, dto.UserAgentFilterTokenPlaceholder, config.Config.Server.LoggingFilterToken) } func initSentry(options *sentry.ClientOptions, profilingEnabled bool) { if options.Release == "" { commit := getVcsRevision(false) options.Release = commit } options.BeforeSendTransaction = func(event *sentry.Event, _ *sentry.EventHint) *sentry.Event { if event.Tags == nil { event.Tags = make(map[string]string) } event.Tags["go_pgo"] = pgoEnabled event.Tags["go_profiling"] = strconv.FormatBool(profilingEnabled) return event } if err := sentry.Init(*options); err != nil { log.Errorf("sentry.Init: %s", err) } } func shutdownSentry() { if err := recover(); err != nil { sentry.CurrentHub().Recover(err) sentry.Flush(logging.GracefulSentryShutdown) } } func initProfiling(options config.Profiling) (cancel func()) { if options.CPUEnabled { profile, err := os.Create(options.CPUFile) if err != nil { log.WithError(err).Error("Error while opening the profile file") } log.Debug("Starting CPU profiler") if err := pprof.StartCPUProfile(profile); err != nil { log.WithError(err).Error("Error while starting the CPU profiler!!") } cancel = func() { if options.CPUEnabled { log.Debug("Stopping CPU profiler") pprof.StopCPUProfile() if err := profile.Close(); err != nil { log.WithError(err).Error("Error while closing profile file") } } } } else { cancel = func() {} } return cancel } // watchMemoryAndAlert monitors the memory usage of Poseidon and sends an alert if it exceeds a threshold. func watchMemoryAndAlert(options config.Profiling) { if options.MemoryInterval == 0 { return } var exceeded bool for { var stats runtime.MemStats runtime.ReadMemStats(&stats) log.WithField("heap", stats.HeapAlloc).Trace("Current Memory Usage") const megabytesToBytes = 1000 * 1000 if !exceeded && stats.HeapAlloc >= uint64(options.MemoryThreshold)*megabytesToBytes { exceeded = true log.WithField("heap", stats.HeapAlloc).Warn("Memory Threshold exceeded") err := pprof.Lookup("heap").WriteTo(os.Stderr, 1) if err != nil { log.WithError(err).Warn("Failed to log the heap profile") } err = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) if err != nil { log.WithError(err).Warn("Failed to log the goroutines") } } else if exceeded { exceeded = false log.WithField("heap", stats.HeapAlloc).Info("Memory Threshold no longer exceeded") } select { case <-time.After(time.Duration(options.MemoryInterval) * time.Millisecond): continue case <-context.Background().Done(): return } } } func runServer(router *mux.Router, server *http.Server, cancel context.CancelFunc) { defer cancel() defer shutdownSentry() // shutdownSentry must be executed in the main goroutine. httpListeners := getHTTPListeners(server) notifySystemd(router) serveHTTPListeners(server, httpListeners) } func getHTTPListeners(server *http.Server) (httpListeners []net.Listener) { var err error if config.Config.Server.SystemdSocketActivation { httpListeners, err = activation.Listeners() } else { var httpListener net.Listener httpListener, err = net.Listen("tcp", server.Addr) httpListeners = append(httpListeners, httpListener) } if err != nil || httpListeners == nil || len(httpListeners) == 0 { log.WithError(err). WithField("listeners", httpListeners). WithField("systemd_socket", config.Config.Server.SystemdSocketActivation). Fatal("Failed listening to any socket") return nil } return httpListeners } func serveHTTPListeners(server *http.Server, httpListeners []net.Listener) { var wg sync.WaitGroup wg.Add(len(httpListeners)) for _, l := range httpListeners { go func(listener net.Listener) { defer wg.Done() log.WithField("address", listener.Addr()).Info("Serving Listener") serveHTTPListener(server, listener) }(l) } wg.Wait() } func serveHTTPListener(server *http.Server, l net.Listener) { var err error if config.Config.Server.TLS.Active { server.TLSConfig = config.TLSConfig log.WithField("CertFile", config.Config.Server.TLS.CertFile). WithField("KeyFile", config.Config.Server.TLS.KeyFile). Debug("Using TLS") err = server.ServeTLS(l, config.Config.Server.TLS.CertFile, config.Config.Server.TLS.KeyFile) } else { err = server.Serve(l) } if errors.Is(err, http.ErrServerClosed) { log.WithError(err).WithField("listener", l.Addr()).Info("Server closed") } else { log.WithError(err).WithField("listener", l.Addr()).Error("Error during listening and serving") } } func notifySystemd(router *mux.Router) { notify, err := daemon.SdNotify(false, daemon.SdNotifyReady) switch { case err == nil && !notify: log.Debug("Systemd Readiness Notification not supported") case err != nil: log.WithError(err).WithField("notify", notify).Warn("Failed notifying Readiness to Systemd") default: log.Trace("Notified Readiness to Systemd") } interval, err := daemon.SdWatchdogEnabled(false) if err != nil || interval == 0 { log.WithError(err).Error("Systemd Watchdog not supported") return } go systemdWatchdogLoop(context.Background(), router, interval) } func systemdWatchdogLoop(ctx context.Context, router *mux.Router, interval time.Duration) { healthRoute, err := router.Get(api.HealthPath).URL() if err != nil { log.WithError(err).Error("Failed to parse Health route") return } healthURL := config.Config.Server.URL().String() + healthRoute.String() // Workaround for certificate subject names unspecifiedAddresses := regexp.MustCompile(`0\.0\.0\.0|\[::]`) healthURL = unspecifiedAddresses.ReplaceAllString(healthURL, "localhost") client := &http.Client{} if config.Config.Server.TLS.Active { tlsConfig := &tls.Config{RootCAs: x509.NewCertPool()} // #nosec G402 The default MinTLSVersion is secure. caCertBytes, err := os.ReadFile(config.Config.Server.TLS.CAFile) if err != nil { log.WithError(err).Warn("Cannot read tls ca file") } else { ok := tlsConfig.RootCAs.AppendCertsFromPEM(caCertBytes) log.WithField("success", ok).Trace("Loaded CA certificate") } client.Transport = &http.Transport{TLSClientConfig: tlsConfig} } // notificationIntervalFactor defines how many more notifications we send than required. const notificationIntervalFactor = 2 for { select { case <-ctx.Done(): return case <-time.After(interval / notificationIntervalFactor): notifySystemdWatchdog(ctx, healthURL, client) } } } func notifySystemdWatchdog(ctx context.Context, healthURL string, client *http.Client) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthURL, http.NoBody) if err != nil { return } req.Header.Set("User-Agent", dto.UserAgentFiltered) resp, err := client.Do(req) if err != nil { log.WithError(err).Debug("Failed watchdog health check") // We do not check for resp.StatusCode == 503 as Poseidon's error recovery will try to handle such errors // by itself. The Watchdog should just check that Poseidon handles http requests at all. return } _ = resp.Body.Close() notify, err := daemon.SdNotify(false, daemon.SdNotifyWatchdog) switch { case err == nil && !notify: log.Debug("Systemd Watchdog Notification not supported") case err != nil: log.WithError(err).WithField("notify", notify).Warn("Failed notifying Systemd Watchdog") default: log.Trace("Notified Systemd Watchdog") } } type managerCreator func(ctx context.Context) ( runnerManager runner.Manager, environmentManager environment.ManagerHandler) // createManagerHandler adds the managers of the passed managerCreator to the chain of responsibility. func createManagerHandler(handler managerCreator, enabled bool, nextRunnerManager runner.Manager, nextEnvironmentManager environment.ManagerHandler, ctx context.Context) ( runnerManager runner.Manager, environmentManager environment.ManagerHandler) { if !enabled { return nextRunnerManager, nextEnvironmentManager } runnerManager, environmentManager = handler(ctx) runnerManager.SetNextHandler(nextRunnerManager) environmentManager.SetNextHandler(nextEnvironmentManager) return runnerManager, environmentManager } func createKubernetesManager(ctx context.Context) (runner.Manager, environment.ManagerHandler) { // API initialization kubernetesClient, err := kubernetes.NewForConfig(config.Config.Kubernetes.Config) if err != nil { log.WithError(err).Fatal("Failed to create kubernetes client") } runnerManager := runner.NewKubernetesRunnerManager(ctx, kubernetesClient) environmentManager := environment.NewKubernetesEnvironmentManager(runnerManager, kubernetesClient) return runnerManager, environmentManager } // initRouter builds a router that serves the API with the chain of responsibility for multiple managers. func initRouter(ctx context.Context) *mux.Router { runnerManager, environmentManager := createManagerHandler(createKubernetesManager, config.Config.Kubernetes.Enabled, nil, nil, ctx) return api.NewRouter(runnerManager, environmentManager) } // initServer creates a server that serves the routes provided by the router. func initServer(router *mux.Router) *http.Server { sentryHandler := sentryhttp.New(sentryhttp.Options{}).Handle(router) const readTimeout = 15 * time.Second const idleTimeout = 60 * time.Second return &http.Server{ Addr: config.Config.Server.URL().Host, // A WriteTimeout would prohibit long-running requests such as creating an execution environment. // See also https://github.com/openHPI/poseidon/pull/68. // WriteTimeout: time.Second * 15, ReadHeaderTimeout: readTimeout, ReadTimeout: readTimeout, IdleTimeout: idleTimeout, Handler: sentryHandler, } } // shutdownOnOSSignal listens for a signal from the operating system // When receiving a signal the server shuts down but waits up to 15 seconds to close remaining connections. func shutdownOnOSSignal(server *http.Server, ctx context.Context, stopProfiling func()) { // wait for SIGINT shutdownSignals := make(chan os.Signal, 1) signal.Notify(shutdownSignals, syscall.Signal(0x2), syscall.Signal(0xf), syscall.Signal(0x6)) // wait for SIGUSR1 writeProfileSignal := make(chan os.Signal, 1) signal.Notify(writeProfileSignal, syscall.Signal(0xa)) select { case <-ctx.Done(): os.Exit(1) case <-writeProfileSignal: log.Info("Received SIGUSR1...") stopProfiling() // Continue listening on signals and replace `stopProfiling` with an empty function shutdownOnOSSignal(server, ctx, func() {}) case <-shutdownSignals: log.Info("Received SIGINT, shutting down...") defer stopProfiling() ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownWait) defer cancel() if err := server.Shutdown(ctx); err != nil { log.WithError(err).Warn("error shutting server down") } } } func main() { if err := config.InitConfig(); err != nil { log.WithError(err).Warn("Could not initialize configuration") } initializeUserAgent() logging.InitializeLogging(config.Config.Logger.Level, config.Config.Logger.Formatter) initSentry(&config.Config.Sentry, config.Config.Profiling.CPUEnabled) cancelInflux := monitoring.InitializeInfluxDB(&config.Config.InfluxDB) defer cancelInflux() stopProfiling := initProfiling(config.Config.Profiling) go watchMemoryAndAlert(config.Config.Profiling) ctx, cancel := context.WithCancel(context.Background()) router := initRouter(ctx) server := initServer(router) go runServer(router, server, cancel) shutdownOnOSSignal(server, ctx, stopProfiling) }