use the proxy as a package
This commit is contained in:
@@ -122,7 +122,8 @@ func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler
|
||||
var info *API.Info = nil
|
||||
|
||||
if commandStruct.DaemonConnected {
|
||||
info, err := util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger)
|
||||
var err error
|
||||
info, err = util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to daemon\n")
|
||||
os.Exit(1)
|
||||
|
||||
8
go.mod
8
go.mod
@@ -1,6 +1,6 @@
|
||||
module github.com/juls0730/flux
|
||||
|
||||
go 1.23.3
|
||||
go 1.24.2
|
||||
|
||||
require (
|
||||
github.com/briandowns/spinner v1.23.1
|
||||
@@ -10,7 +10,11 @@ require (
|
||||
github.com/mattn/go-sqlite3 v1.14.24
|
||||
)
|
||||
|
||||
require go.uber.org/multierr v1.10.0 // indirect
|
||||
require (
|
||||
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
golang.org/x/sync v0.14.0 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Microsoft/go-winio v0.4.14 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@@ -43,6 +43,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd h1:JNazPdlAs307Gtaqmb+wfCjcOzO3MRXxg9nf0bAKAnc=
|
||||
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd/go.mod h1:CnRvcleiS2kvK1N2PeQmeoRP5EXpBDpHPkg72vAUaSg=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
@@ -113,6 +115,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
|
||||
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||
"github.com/juls0730/flux/pkg/API"
|
||||
"github.com/juls0730/sentinel"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -128,7 +129,7 @@ func (flux *FluxServer) StartApp(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
|
||||
newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
_ "embed"
|
||||
|
||||
@@ -19,7 +20,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/juls0730/flux/internal/docker"
|
||||
"github.com/juls0730/flux/internal/services/appManagerService"
|
||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||
"github.com/juls0730/sentinel"
|
||||
|
||||
"github.com/juls0730/flux/pkg"
|
||||
"github.com/juls0730/flux/pkg/API"
|
||||
@@ -44,7 +45,7 @@ type FluxServer struct {
|
||||
|
||||
docker *docker.DockerClient
|
||||
|
||||
proxy *proxyManagerService.ProxyManager
|
||||
proxy *sentinel.ProxyManager
|
||||
appManager *appManagerService.AppManager
|
||||
|
||||
rootDir string
|
||||
@@ -149,7 +150,8 @@ func NewServer() *FluxServer {
|
||||
// blocking until the iamge is pulled
|
||||
io.Copy(io.Discard, events)
|
||||
|
||||
flux.proxy = proxyManagerService.NewProxyManager(flux.logger)
|
||||
requestLogger := &RequestLogger{logger: flux.logger}
|
||||
flux.proxy = sentinel.NewProxyManager(requestLogger)
|
||||
|
||||
flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger)
|
||||
err = flux.appManager.Init()
|
||||
@@ -167,7 +169,12 @@ func (s *FluxServer) Stop() {
|
||||
func (s *FluxServer) ListenAndServe() error {
|
||||
s.logger.Infow("Starting server", zap.String("daemon_host", s.config.DaemonHost), zap.String("proxy_host", s.config.ProxyHost))
|
||||
|
||||
go s.proxy.ListenAndServe(s.config.ProxyHost)
|
||||
go func() {
|
||||
err := s.proxy.ListenAndServe(s.config.ProxyHost)
|
||||
if err != nil {
|
||||
s.logger.Errorw("Failed to start proxy", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
return http.ListenAndServe(s.config.DaemonHost, nil)
|
||||
}
|
||||
|
||||
@@ -253,3 +260,11 @@ func (s *FluxServer) UploadAppCode(code io.Reader, appId uuid.UUID) (string, err
|
||||
|
||||
return outputPath, nil
|
||||
}
|
||||
|
||||
type RequestLogger struct {
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func (r *RequestLogger) LogRequest(host string, status int, latency time.Duration, ip, method, path string) {
|
||||
r.logger.Infow("Request", zap.String("host", host), zap.Int("status", status), zap.Duration("latency", latency), zap.String("ip", ip), zap.String("method", method), zap.String("path", path))
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/juls0730/flux/internal/docker"
|
||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||
"github.com/juls0730/flux/pkg"
|
||||
"github.com/juls0730/sentinel"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -169,7 +170,7 @@ func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.D
|
||||
}
|
||||
|
||||
// Takes an existing deployment, and gracefully upgrades the app to a new image
|
||||
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
|
||||
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
|
||||
// copy the old head container since Upgrade updates the container in place
|
||||
oldHeadContainer := *deployment.Head()
|
||||
|
||||
@@ -202,14 +203,18 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
|
||||
}
|
||||
|
||||
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections
|
||||
oldProxy, ok := proxyManager.Load(deployment.URL)
|
||||
var oldProxy *sentinel.Proxy
|
||||
var ok bool = false
|
||||
if value, exists := proxyManager.Load(deployment.URL); exists {
|
||||
oldProxy, ok = value.(*sentinel.Proxy)
|
||||
}
|
||||
|
||||
newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to get internal url", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
newProxy, err := proxyManagerService.NewDeploymentProxy(*newDeploymentInternalUrl)
|
||||
newProxy, err := sentinel.NewDeploymentProxy(newDeploymentInternalUrl.String(), proxyManagerService.GetTransport)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
|
||||
return err
|
||||
@@ -221,7 +226,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
|
||||
|
||||
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
|
||||
if ok {
|
||||
go oldProxy.GracefulShutdown(logger, func() {
|
||||
go oldProxy.GracefulShutdown(func() {
|
||||
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to stop container", zap.Error(err))
|
||||
@@ -234,7 +239,12 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
|
||||
}
|
||||
})
|
||||
} else {
|
||||
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to stop container", zap.Error(err))
|
||||
}
|
||||
|
||||
err = dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to remove container", zap.Error(err))
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||
"github.com/juls0730/flux/internal/util"
|
||||
"github.com/juls0730/flux/pkg"
|
||||
"github.com/juls0730/sentinel"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -18,12 +19,12 @@ type AppManager struct {
|
||||
util.TypedMap[uuid.UUID, *models.App]
|
||||
nameIndex util.TypedMap[string, uuid.UUID]
|
||||
logger *zap.SugaredLogger
|
||||
proxyManager *proxyManagerService.ProxyManager
|
||||
proxyManager *sentinel.ProxyManager
|
||||
dockerClient *docker.DockerClient
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, logger *zap.SugaredLogger) *AppManager {
|
||||
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, logger *zap.SugaredLogger) *AppManager {
|
||||
return &AppManager{
|
||||
db: db,
|
||||
dockerClient: dockerClient,
|
||||
@@ -87,7 +88,7 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p
|
||||
return nil, fmt.Errorf("failed to get internal url: %v", err)
|
||||
}
|
||||
|
||||
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
|
||||
newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport)
|
||||
if err != nil {
|
||||
appManager.logger.Errorw("Failed to create deployment proxy", zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to create deployment proxy: %v", err)
|
||||
@@ -293,7 +294,7 @@ func (am *AppManager) Init() error {
|
||||
continue
|
||||
}
|
||||
|
||||
proxy, err := proxyManagerService.NewDeploymentProxy(*proxyURL)
|
||||
proxy, err := sentinel.NewDeploymentProxy(proxyURL.String(), proxyManagerService.GetTransport)
|
||||
if err != nil {
|
||||
am.logger.Errorw("Failed to create proxy", zap.Error(err))
|
||||
continue
|
||||
|
||||
@@ -1,134 +1,18 @@
|
||||
package proxyManagerService
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/juls0730/flux/internal/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type DeploymentId int64
|
||||
|
||||
// this is the object that oversees the proxying of requests to the correct deployment
|
||||
type ProxyManager struct {
|
||||
util.TypedMap[string, *Proxy]
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func NewProxyManager(logger *zap.SugaredLogger) *ProxyManager {
|
||||
return &ProxyManager{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (proxyManager *ProxyManager) ListenAndServe(host string) error {
|
||||
if host == "" {
|
||||
host = "0.0.0.0:7465"
|
||||
}
|
||||
|
||||
proxyManager.logger.Infof("Proxy server starting on http://%s", host)
|
||||
if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("failed to start proxy server: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stops forwarding traffic to a deployment
|
||||
func (proxyManager *ProxyManager) RemoveDeployment(host string) {
|
||||
proxyManager.logger.Debugw("Removing proxy", zap.String("host", host))
|
||||
proxyManager.Delete(host)
|
||||
}
|
||||
|
||||
// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called.
|
||||
func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) {
|
||||
proxyManager.logger.Debugw("Adding proxy", zap.String("host", host))
|
||||
proxyManager.Store(host, proxy)
|
||||
}
|
||||
|
||||
// This function is responsible for taking an http request and forwarding it to the correct deployment
|
||||
func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
host := r.Host
|
||||
|
||||
proxyManager.logger.Debugw("Proxying request", zap.String("host", host))
|
||||
proxy, ok := proxyManager.Load(host)
|
||||
if !ok {
|
||||
http.Error(w, "Not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
proxy.proxyFunc.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
type Proxy struct {
|
||||
forwardingFor url.URL
|
||||
proxyFunc *httputil.ReverseProxy
|
||||
shutdownTimeout time.Duration
|
||||
activeRequests int64
|
||||
}
|
||||
|
||||
const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second
|
||||
|
||||
// Creates a proxy for a given deployment
|
||||
func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) {
|
||||
proxy := &Proxy{
|
||||
forwardingFor: forwardingFor,
|
||||
shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT,
|
||||
activeRequests: 0,
|
||||
}
|
||||
|
||||
proxy.proxyFunc = &httputil.ReverseProxy{
|
||||
Director: func(req *http.Request) {
|
||||
req.URL = &url.URL{
|
||||
Scheme: forwardingFor.Scheme,
|
||||
Host: forwardingFor.Host,
|
||||
Path: req.URL.Path,
|
||||
}
|
||||
req.Host = forwardingFor.Host
|
||||
atomic.AddInt64(&proxy.activeRequests, 1)
|
||||
},
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
MaxIdleConnsPerHost: 100,
|
||||
},
|
||||
ModifyResponse: func(resp *http.Response) error {
|
||||
atomic.AddInt64(&proxy.activeRequests, -1)
|
||||
return nil
|
||||
func GetTransport(target string) *http.Transport {
|
||||
return &http.Transport{
|
||||
Proxy: func(r *http.Request) (*url.URL, error) {
|
||||
return url.Parse(target)
|
||||
},
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
MaxIdleConnsPerHost: 100,
|
||||
}
|
||||
|
||||
return proxy, nil
|
||||
}
|
||||
|
||||
// Drains connections from a proxy
|
||||
func (p *Proxy) GracefulShutdown(logger *zap.SugaredLogger, shutdownFunc func()) {
|
||||
logger.Debugw("Shutting down proxy", zap.String("host", p.forwardingFor.Host))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
done := false
|
||||
for !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debugw("Proxy shutdown timed out", zap.String("host", p.forwardingFor.Host))
|
||||
|
||||
done = true
|
||||
default:
|
||||
if atomic.LoadInt64(&p.activeRequests) == 0 {
|
||||
logger.Debugw("Proxy shutdown completed successfully", zap.String("host", p.forwardingFor.Host))
|
||||
done = true
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
shutdownFunc()
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ func (dt *MutexLock[T]) Unlock(id T) {
|
||||
dt.mu.Lock()
|
||||
defer dt.mu.Unlock()
|
||||
|
||||
// Remove the app from deployed tracking
|
||||
// Remove the object from the map (functionally unlocking it)
|
||||
if cancel, exists := dt.deployed[id]; exists {
|
||||
// Cancel the context
|
||||
cancel()
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
package pkg
|
||||
|
||||
const Version = "2025.05.08-14"
|
||||
const Version = "2025.05.15-18"
|
||||
|
||||
Reference in New Issue
Block a user