small cleanput and switch to sentinel
This commit is contained in:
221
main.go
221
main.go
@@ -11,9 +11,7 @@ import (
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/rpc"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
@@ -21,11 +19,11 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
"github.com/juls0730/gloom/libs"
|
||||
"github.com/juls0730/sentinel"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
@@ -54,10 +52,10 @@ type GLoom struct {
|
||||
hostMap libs.SyncMap[string, bool]
|
||||
|
||||
DB *sql.DB
|
||||
ProxyManager *ProxyManager
|
||||
ProxyManager *sentinel.ProxyManager
|
||||
}
|
||||
|
||||
func NewGloom(proxyManager *ProxyManager) (*GLoom, error) {
|
||||
func NewGloom(proxyManager *sentinel.ProxyManager) (*GLoom, error) {
|
||||
pluginsDir := os.Getenv("PLUGINS_DIR")
|
||||
if pluginsDir == "" {
|
||||
pluginsDir = "plugs"
|
||||
@@ -132,11 +130,11 @@ func NewGloom(proxyManager *ProxyManager) (*GLoom, error) {
|
||||
}
|
||||
|
||||
func (gloom *GLoom) LoadInitialPlugins() error {
|
||||
slog.Debug("Loading initial plugins")
|
||||
slog.Info("Loading initial plugins")
|
||||
|
||||
for _, plugin := range gloom.preloadPlugins {
|
||||
if err := gloom.RegisterPlugin(filepath.Join(gloom.pluginDir, plugin.File), plugin.File, plugin.Domains); err != nil {
|
||||
slog.Warn("Failed to register plugin", "pluginPath", plugin.File, "error", err)
|
||||
panic(fmt.Errorf("failed to load preload plugin %s: %w (make sure its in %s)", plugin.File, err, gloom.pluginDir))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,6 +162,8 @@ func (gloom *GLoom) LoadInitialPlugins() error {
|
||||
}
|
||||
}
|
||||
|
||||
slog.Info("Loaded initial plugins")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -233,7 +233,15 @@ func (gloom *GLoom) RegisterPlugin(pluginPath string, name string, domains []str
|
||||
}
|
||||
process := cmd.Process
|
||||
|
||||
timeout := time.After(5 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
_ = process.Signal(os.Interrupt)
|
||||
return fmt.Errorf("timed out waiting for pluginHost to start (this is likely a GLoom bug)")
|
||||
default:
|
||||
}
|
||||
_, err := os.Stat(controlPath)
|
||||
if err == nil {
|
||||
break
|
||||
@@ -276,16 +284,22 @@ func (gloom *GLoom) RegisterPlugin(pluginPath string, name string, domains []str
|
||||
}
|
||||
}
|
||||
|
||||
proxy, err := NewDeploymentProxy(socketPath)
|
||||
proxy, err := sentinel.NewDeploymentProxy(socketPath, NewUnixSocketTransport)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var oldProxy *Proxy
|
||||
var oldProxy *sentinel.Proxy
|
||||
for _, domain := range domains {
|
||||
var ok bool
|
||||
oldProxy, ok = gloom.ProxyManager.Load(domain)
|
||||
// there can only be one in a set of domains. If a is the domains already attached to the proxy, and b is
|
||||
if value, exists := gloom.ProxyManager.Load(domain); exists {
|
||||
oldProxy = value.(*sentinel.Proxy)
|
||||
ok = true
|
||||
} else {
|
||||
ok = false
|
||||
}
|
||||
|
||||
// there can only be one proxy in a set of domains. If a is the domains already attached to the proxy, and b is
|
||||
// a superset of a, but the new members of b are not in any other set, then we can be sure there is just one
|
||||
if ok {
|
||||
break
|
||||
@@ -307,7 +321,11 @@ func (gloom *GLoom) RegisterPlugin(pluginPath string, name string, domains []str
|
||||
|
||||
if oldProxy != nil {
|
||||
go func() {
|
||||
oldProxy.GracefulShutdown(nil)
|
||||
slog.Debug("Gracefully shutting down old proxy")
|
||||
err := oldProxy.GracefulShutdown(nil)
|
||||
if err != nil {
|
||||
slog.Warn("Failed to gracefully shutdown old proxy", "error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -352,7 +370,7 @@ func (gloom *GLoom) StartRPCServer() error {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("RPC server running on port 7143\n")
|
||||
slog.Info("RPC server running on port 7143\n")
|
||||
go func() {
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
@@ -439,7 +457,6 @@ func (rpc *GloomRPC) UploadPlugin(plugin PluginUpload, reply *string) error {
|
||||
}
|
||||
|
||||
var plugExists bool
|
||||
// TODO: make name a consistent identifier
|
||||
slog.Debug("Checking if plugin exists", "pluginPath", pluginPath, "pluginName", plugin.Name)
|
||||
rpc.gloom.DB.QueryRow("SELECT 1 FROM plugins WHERE name = ?", plugin.Name).Scan(&plugExists)
|
||||
slog.Debug("Plugin exists", "pluginExists", plugExists)
|
||||
@@ -515,7 +532,7 @@ func (rpc *GloomRPC) UploadPlugin(plugin PluginUpload, reply *string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("Plugin uploaded successfully")
|
||||
slog.Debug("Plugin uploaded successfully")
|
||||
|
||||
if err := rpc.gloom.RegisterPlugin(pluginPath, plugin.Name, domains); err != nil {
|
||||
os.Remove(pluginPath)
|
||||
@@ -524,26 +541,23 @@ func (rpc *GloomRPC) UploadPlugin(plugin PluginUpload, reply *string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if !plugExists {
|
||||
_, err = rpc.gloom.DB.Exec("INSERT INTO plugins (path, name, domains) VALUES (?, ?, ?)", pluginPath, plugin.Name, strings.Join(plugin.Domains, ","))
|
||||
if err != nil {
|
||||
*reply = fmt.Sprintf("Plugin upload failed: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if plugExists {
|
||||
_, err = rpc.gloom.DB.Exec("UPDATE plugins SET domains = ?, path = ? WHERE name = ?", strings.Join(plugin.Domains, ","), pluginPath, plugin.Name)
|
||||
if err != nil {
|
||||
*reply = fmt.Sprintf("Plugin upload failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if plugExists {
|
||||
// exit out early otherwise we risk creating multiple of the same plugin and causing undefined behavior
|
||||
*reply = "Plugin updated successfully"
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = rpc.gloom.DB.Exec("INSERT INTO plugins (path, name, domains) VALUES (?, ?, ?)", pluginPath, plugin.Name, strings.Join(plugin.Domains, ","))
|
||||
if err != nil {
|
||||
*reply = fmt.Sprintf("Plugin upload failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = "Plugin uploaded successfully"
|
||||
return nil
|
||||
}
|
||||
@@ -594,7 +608,7 @@ func main() {
|
||||
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level}))
|
||||
slog.SetDefault(logger)
|
||||
|
||||
proxyManager := NewProxyManager()
|
||||
proxyManager := sentinel.NewProxyManager(RequestLogger{})
|
||||
|
||||
gloom, err := NewGloom(proxyManager)
|
||||
if err != nil {
|
||||
@@ -607,89 +621,15 @@ func main() {
|
||||
|
||||
gloom.LoadInitialPlugins()
|
||||
|
||||
fmt.Println("Server running at http://localhost:3000")
|
||||
slog.Info("Server running at http://localhost:3000")
|
||||
if err := gloom.ProxyManager.ListenAndServe("127.0.0.1:3000"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// this is the object that oversees the proxying of requests to the correct deployment
|
||||
type ProxyManager struct {
|
||||
libs.SyncMap[string, *Proxy]
|
||||
}
|
||||
type RequestLogger struct{}
|
||||
|
||||
func NewProxyManager() *ProxyManager {
|
||||
return &ProxyManager{}
|
||||
}
|
||||
|
||||
func (proxyManager *ProxyManager) ListenAndServe(host string) error {
|
||||
slog.Info("Proxy server starting", "url", host)
|
||||
if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("failed to start proxy server: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stops forwarding traffic to a deployment
|
||||
func (proxyManager *ProxyManager) RemoveDeployment(host string) {
|
||||
slog.Info("Removing proxy", "host", host)
|
||||
proxyManager.Delete(host)
|
||||
}
|
||||
|
||||
// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called.
|
||||
func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) {
|
||||
slog.Debug("Adding proxy", "host", host)
|
||||
proxyManager.Store(host, proxy)
|
||||
}
|
||||
|
||||
// This function is responsible for taking an http request and forwarding it to the correct deployment
|
||||
func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
host := r.Host
|
||||
path := r.URL.Path
|
||||
method := r.Method
|
||||
ip := getClientIP(r)
|
||||
|
||||
slog.Debug("Proxying request", "host", host, "path", path, "method", method, "ip", ip)
|
||||
proxy, ok := proxyManager.Load(host)
|
||||
if !ok {
|
||||
http.Error(w, "Not found", http.StatusNotFound)
|
||||
logRequest(host, http.StatusNotFound, time.Since(start), ip, method, path)
|
||||
return
|
||||
}
|
||||
|
||||
// Create a custom ResponseWriter to capture the status code
|
||||
rw := &ResponseWriterInterceptor{ResponseWriter: w, statusCode: http.StatusOK}
|
||||
|
||||
proxy.proxyFunc.ServeHTTP(rw, r)
|
||||
|
||||
latency := time.Since(start)
|
||||
statusCode := rw.statusCode
|
||||
|
||||
logRequest(host, statusCode, latency, ip, method, path)
|
||||
}
|
||||
|
||||
// getClientIP retrieves the client's IP address from the request.
|
||||
// It handles cases where the IP might be forwarded by proxies.
|
||||
func getClientIP(r *http.Request) string {
|
||||
if forwarded := r.Header.Get("X-Forwarded-For"); forwarded != "" {
|
||||
return forwarded
|
||||
}
|
||||
return r.RemoteAddr
|
||||
}
|
||||
|
||||
// ResponseWriterInterceptor is a custom http.ResponseWriter that captures the status code.
|
||||
type ResponseWriterInterceptor struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (rw *ResponseWriterInterceptor) WriteHeader(code int) {
|
||||
rw.statusCode = code
|
||||
rw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
func logRequest(app string, status int, latency time.Duration, ip, method, path string) {
|
||||
func (RequestLogger) LogRequest(app string, status int, latency time.Duration, ip, method, path string) {
|
||||
slog.Info("Proxy Request",
|
||||
slog.String("time", time.Now().Format(time.RFC3339)),
|
||||
slog.Int("status", status),
|
||||
@@ -710,85 +650,12 @@ func (d *unixDialer) DialContext(ctx context.Context, network, address string) (
|
||||
return net.Dial("unix", d.socketPath)
|
||||
}
|
||||
|
||||
func NewUnixSocketTransport(socketPath string) *http.Transport {
|
||||
func NewUnixSocketTransport(socket string) *http.Transport {
|
||||
return &http.Transport{
|
||||
DialContext: (&unixDialer{socketPath: socketPath}).DialContext,
|
||||
}
|
||||
}
|
||||
|
||||
type Proxy struct {
|
||||
socket string
|
||||
proxyFunc *httputil.ReverseProxy
|
||||
shutdownTimeout time.Duration
|
||||
activeRequests int64
|
||||
}
|
||||
|
||||
const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second
|
||||
|
||||
// Creates a proxy for a given deployment
|
||||
func NewDeploymentProxy(socket string) (*Proxy, error) {
|
||||
proxy := &Proxy{
|
||||
socket: socket,
|
||||
shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT,
|
||||
activeRequests: 0,
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
DialContext: (&unixDialer{socketPath: socket}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
MaxIdleConnsPerHost: 100,
|
||||
ForceAttemptHTTP2: false,
|
||||
}
|
||||
|
||||
proxy.proxyFunc = &httputil.ReverseProxy{
|
||||
Director: func(req *http.Request) {
|
||||
req.URL = &url.URL{
|
||||
Scheme: "http",
|
||||
Host: req.Host,
|
||||
Path: req.URL.Path,
|
||||
}
|
||||
atomic.AddInt64(&proxy.activeRequests, 1)
|
||||
},
|
||||
Transport: transport,
|
||||
ModifyResponse: func(resp *http.Response) error {
|
||||
atomic.AddInt64(&proxy.activeRequests, -1)
|
||||
return nil
|
||||
},
|
||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
slog.Error("Proxy error", "error", err)
|
||||
atomic.AddInt64(&proxy.activeRequests, -1)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
},
|
||||
}
|
||||
|
||||
return proxy, nil
|
||||
}
|
||||
|
||||
func (p *Proxy) GracefulShutdown(shutdownFunc func()) {
|
||||
slog.Debug("Shutting down proxy", "socket", p.socket)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
done := false
|
||||
for !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
slog.Debug("Proxy shutdown timed out", "socket", p.socket)
|
||||
|
||||
done = true
|
||||
default:
|
||||
if atomic.LoadInt64(&p.activeRequests) == 0 {
|
||||
slog.Debug("Proxy shutdown completed successfully", "socket", p.socket)
|
||||
done = true
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
if shutdownFunc != nil {
|
||||
shutdownFunc()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user