From fb2588cc3a84fdba443f5bfb387037c688b68619 Mon Sep 17 00:00:00 2001 From: Zoe <62722391+juls0730@users.noreply.github.com> Date: Thu, 15 May 2025 18:32:14 +0000 Subject: [PATCH] use the proxy as a package --- cmd/cli/main.go | 3 +- go.mod | 8 +- go.sum | 4 + internal/handlers/app.go | 3 +- internal/handlers/server.go | 23 +++- internal/models/deployment.go | 20 ++- .../services/appManagerService/appmanager.go | 9 +- internal/services/proxy/proxy.go | 130 +----------------- internal/util/lock.go | 2 +- pkg/version.go | 2 +- 10 files changed, 62 insertions(+), 142 deletions(-) diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 9c09c1c..d85517b 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -122,7 +122,8 @@ func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler var info *API.Info = nil if commandStruct.DaemonConnected { - info, err := util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger) + var err error + info, err = util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger) if err != nil { fmt.Printf("Failed to connect to daemon\n") os.Exit(1) diff --git a/go.mod b/go.mod index cf57874..07b42e0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/juls0730/flux -go 1.23.3 +go 1.24.2 require ( github.com/briandowns/spinner v1.23.1 @@ -10,7 +10,11 @@ require ( github.com/mattn/go-sqlite3 v1.14.24 ) -require go.uber.org/multierr v1.10.0 // indirect +require ( + github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/sync v0.14.0 // indirect +) require ( github.com/Microsoft/go-winio v0.4.14 // indirect diff --git a/go.sum b/go.sum index 2e3e8ee..8c6cdd9 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd h1:JNazPdlAs307Gtaqmb+wfCjcOzO3MRXxg9nf0bAKAnc= +github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd/go.mod h1:CnRvcleiS2kvK1N2PeQmeoRP5EXpBDpHPkg72vAUaSg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -113,6 +115,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/handlers/app.go b/internal/handlers/app.go index 547c541..dd724be 100644 --- a/internal/handlers/app.go +++ b/internal/handlers/app.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" proxyManagerService "github.com/juls0730/flux/internal/services/proxy" "github.com/juls0730/flux/pkg/API" + "github.com/juls0730/sentinel" "go.uber.org/zap" ) @@ -128,7 +129,7 @@ func (flux *FluxServer) StartApp(w http.ResponseWriter, r *http.Request) { return } - newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl) + newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/handlers/server.go b/internal/handlers/server.go index caebee2..add211f 100644 --- a/internal/handlers/server.go +++ b/internal/handlers/server.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strconv" + "time" _ "embed" @@ -19,7 +20,7 @@ import ( "github.com/google/uuid" "github.com/juls0730/flux/internal/docker" "github.com/juls0730/flux/internal/services/appManagerService" - proxyManagerService "github.com/juls0730/flux/internal/services/proxy" + "github.com/juls0730/sentinel" "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" @@ -44,7 +45,7 @@ type FluxServer struct { docker *docker.DockerClient - proxy *proxyManagerService.ProxyManager + proxy *sentinel.ProxyManager appManager *appManagerService.AppManager rootDir string @@ -149,7 +150,8 @@ func NewServer() *FluxServer { // blocking until the iamge is pulled io.Copy(io.Discard, events) - flux.proxy = proxyManagerService.NewProxyManager(flux.logger) + requestLogger := &RequestLogger{logger: flux.logger} + flux.proxy = sentinel.NewProxyManager(requestLogger) flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger) err = flux.appManager.Init() @@ -167,7 +169,12 @@ func (s *FluxServer) Stop() { func (s *FluxServer) ListenAndServe() error { s.logger.Infow("Starting server", zap.String("daemon_host", s.config.DaemonHost), zap.String("proxy_host", s.config.ProxyHost)) - go s.proxy.ListenAndServe(s.config.ProxyHost) + go func() { + err := s.proxy.ListenAndServe(s.config.ProxyHost) + if err != nil { + s.logger.Errorw("Failed to start proxy", zap.Error(err)) + } + }() return http.ListenAndServe(s.config.DaemonHost, nil) } @@ -253,3 +260,11 @@ func (s *FluxServer) UploadAppCode(code io.Reader, appId uuid.UUID) (string, err return outputPath, nil } + +type RequestLogger struct { + logger *zap.SugaredLogger +} + +func (r *RequestLogger) LogRequest(host string, status int, latency time.Duration, ip, method, path string) { + r.logger.Infow("Request", zap.String("host", host), zap.Int("status", status), zap.Duration("latency", latency), zap.String("ip", ip), zap.String("method", method), zap.String("path", path)) +} diff --git a/internal/models/deployment.go b/internal/models/deployment.go index 9b3fa07..b1bc9f5 100644 --- a/internal/models/deployment.go +++ b/internal/models/deployment.go @@ -9,6 +9,7 @@ import ( "github.com/juls0730/flux/internal/docker" proxyManagerService "github.com/juls0730/flux/internal/services/proxy" "github.com/juls0730/flux/pkg" + "github.com/juls0730/sentinel" "go.uber.org/zap" ) @@ -169,7 +170,7 @@ func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.D } // Takes an existing deployment, and gracefully upgrades the app to a new image -func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error { +func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error { // copy the old head container since Upgrade updates the container in place oldHeadContainer := *deployment.Head() @@ -202,14 +203,18 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr } // Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections - oldProxy, ok := proxyManager.Load(deployment.URL) + var oldProxy *sentinel.Proxy + var ok bool = false + if value, exists := proxyManager.Load(deployment.URL); exists { + oldProxy, ok = value.(*sentinel.Proxy) + } newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient) if err != nil { logger.Errorw("Failed to get internal url", zap.Error(err)) return err } - newProxy, err := proxyManagerService.NewDeploymentProxy(*newDeploymentInternalUrl) + newProxy, err := sentinel.NewDeploymentProxy(newDeploymentInternalUrl.String(), proxyManagerService.GetTransport) if err != nil { logger.Errorw("Failed to create deployment proxy", zap.Error(err)) return err @@ -221,7 +226,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr // gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers if ok { - go oldProxy.GracefulShutdown(logger, func() { + go oldProxy.GracefulShutdown(func() { err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID) if err != nil { logger.Errorw("Failed to stop container", zap.Error(err)) @@ -234,7 +239,12 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr } }) } else { - err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID) + err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID) + if err != nil { + logger.Errorw("Failed to stop container", zap.Error(err)) + } + + err = dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID) if err != nil { logger.Errorw("Failed to remove container", zap.Error(err)) } diff --git a/internal/services/appManagerService/appmanager.go b/internal/services/appManagerService/appmanager.go index 9d642af..3ab8adb 100644 --- a/internal/services/appManagerService/appmanager.go +++ b/internal/services/appManagerService/appmanager.go @@ -11,6 +11,7 @@ import ( proxyManagerService "github.com/juls0730/flux/internal/services/proxy" "github.com/juls0730/flux/internal/util" "github.com/juls0730/flux/pkg" + "github.com/juls0730/sentinel" "go.uber.org/zap" ) @@ -18,12 +19,12 @@ type AppManager struct { util.TypedMap[uuid.UUID, *models.App] nameIndex util.TypedMap[string, uuid.UUID] logger *zap.SugaredLogger - proxyManager *proxyManagerService.ProxyManager + proxyManager *sentinel.ProxyManager dockerClient *docker.DockerClient db *sql.DB } -func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, logger *zap.SugaredLogger) *AppManager { +func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, logger *zap.SugaredLogger) *AppManager { return &AppManager{ db: db, dockerClient: dockerClient, @@ -87,7 +88,7 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p return nil, fmt.Errorf("failed to get internal url: %v", err) } - newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl) + newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport) if err != nil { appManager.logger.Errorw("Failed to create deployment proxy", zap.Error(err)) return nil, fmt.Errorf("failed to create deployment proxy: %v", err) @@ -293,7 +294,7 @@ func (am *AppManager) Init() error { continue } - proxy, err := proxyManagerService.NewDeploymentProxy(*proxyURL) + proxy, err := sentinel.NewDeploymentProxy(proxyURL.String(), proxyManagerService.GetTransport) if err != nil { am.logger.Errorw("Failed to create proxy", zap.Error(err)) continue diff --git a/internal/services/proxy/proxy.go b/internal/services/proxy/proxy.go index 69e701b..de7f683 100644 --- a/internal/services/proxy/proxy.go +++ b/internal/services/proxy/proxy.go @@ -1,134 +1,18 @@ package proxyManagerService import ( - "context" - "fmt" "net/http" - "net/http/httputil" "net/url" - "sync/atomic" "time" - - "github.com/juls0730/flux/internal/util" - "go.uber.org/zap" ) -type DeploymentId int64 - -// this is the object that oversees the proxying of requests to the correct deployment -type ProxyManager struct { - util.TypedMap[string, *Proxy] - logger *zap.SugaredLogger -} - -func NewProxyManager(logger *zap.SugaredLogger) *ProxyManager { - return &ProxyManager{ - logger: logger, - } -} - -func (proxyManager *ProxyManager) ListenAndServe(host string) error { - if host == "" { - host = "0.0.0.0:7465" - } - - proxyManager.logger.Infof("Proxy server starting on http://%s", host) - if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed { - return fmt.Errorf("failed to start proxy server: %v", err) - } - return nil -} - -// Stops forwarding traffic to a deployment -func (proxyManager *ProxyManager) RemoveDeployment(host string) { - proxyManager.logger.Debugw("Removing proxy", zap.String("host", host)) - proxyManager.Delete(host) -} - -// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called. -func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) { - proxyManager.logger.Debugw("Adding proxy", zap.String("host", host)) - proxyManager.Store(host, proxy) -} - -// This function is responsible for taking an http request and forwarding it to the correct deployment -func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) { - host := r.Host - - proxyManager.logger.Debugw("Proxying request", zap.String("host", host)) - proxy, ok := proxyManager.Load(host) - if !ok { - http.Error(w, "Not found", http.StatusNotFound) - return - } - - proxy.proxyFunc.ServeHTTP(w, r) -} - -type Proxy struct { - forwardingFor url.URL - proxyFunc *httputil.ReverseProxy - shutdownTimeout time.Duration - activeRequests int64 -} - -const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second - -// Creates a proxy for a given deployment -func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) { - proxy := &Proxy{ - forwardingFor: forwardingFor, - shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT, - activeRequests: 0, - } - - proxy.proxyFunc = &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL = &url.URL{ - Scheme: forwardingFor.Scheme, - Host: forwardingFor.Host, - Path: req.URL.Path, - } - req.Host = forwardingFor.Host - atomic.AddInt64(&proxy.activeRequests, 1) - }, - Transport: &http.Transport{ - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - MaxIdleConnsPerHost: 100, - }, - ModifyResponse: func(resp *http.Response) error { - atomic.AddInt64(&proxy.activeRequests, -1) - return nil +func GetTransport(target string) *http.Transport { + return &http.Transport{ + Proxy: func(r *http.Request) (*url.URL, error) { + return url.Parse(target) }, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + MaxIdleConnsPerHost: 100, } - - return proxy, nil -} - -// Drains connections from a proxy -func (p *Proxy) GracefulShutdown(logger *zap.SugaredLogger, shutdownFunc func()) { - logger.Debugw("Shutting down proxy", zap.String("host", p.forwardingFor.Host)) - - ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout) - defer cancel() - - done := false - for !done { - select { - case <-ctx.Done(): - logger.Debugw("Proxy shutdown timed out", zap.String("host", p.forwardingFor.Host)) - - done = true - default: - if atomic.LoadInt64(&p.activeRequests) == 0 { - logger.Debugw("Proxy shutdown completed successfully", zap.String("host", p.forwardingFor.Host)) - done = true - } - - time.Sleep(time.Second) - } - } - - shutdownFunc() } diff --git a/internal/util/lock.go b/internal/util/lock.go index ce953bd..4bc25a9 100644 --- a/internal/util/lock.go +++ b/internal/util/lock.go @@ -41,7 +41,7 @@ func (dt *MutexLock[T]) Unlock(id T) { dt.mu.Lock() defer dt.mu.Unlock() - // Remove the app from deployed tracking + // Remove the object from the map (functionally unlocking it) if cancel, exists := dt.deployed[id]; exists { // Cancel the context cancel() diff --git a/pkg/version.go b/pkg/version.go index 4ceacaf..93b23d6 100644 --- a/pkg/version.go +++ b/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "2025.05.08-14" +const Version = "2025.05.15-18"