package sentinel import ( "context" "fmt" "net/http" "net/http/httputil" "net/url" "sync/atomic" "time" "golang.org/x/sync/syncmap" ) type RequestLogger interface { // LogRequest is called after an HTTP request has been processed by the proxy. // It provides details about the request and its outcome. LogRequest(host string, status int, latency time.Duration, ip, method, path string) } // this is the object that oversees the proxying of requests to the correct deployment type ProxyManager struct { // string -> *Proxy syncmap.Map requestLogger RequestLogger } func NewProxyManager(RequestLogger RequestLogger) *ProxyManager { return &ProxyManager{ Map: syncmap.Map{}, requestLogger: RequestLogger, } } func (proxyManager *ProxyManager) ListenAndServe(host string) error { if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed { return fmt.Errorf("failed to start proxy server: %v", err) } return nil } // Stops forwarding traffic to a deployment func (proxyManager *ProxyManager) RemoveDeployment(host string) { proxyManager.Delete(host) } // Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called. func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) { proxyManager.Store(host, proxy) } // This function is responsible for taking an http request and forwarding it to the correct deployment func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() host := r.Host path := r.URL.Path method := r.Method ip := getClientIP(r) proxy, ok := proxyManager.Load(host) if !ok { http.Error(w, "Not found", http.StatusNotFound) if proxyManager.requestLogger != nil { proxyManager.requestLogger.LogRequest(host, http.StatusNotFound, time.Since(start), ip, method, path) } return } // Create a custom ResponseWriter to capture the status code rw := &ResponseWriterInterceptor{ResponseWriter: w, statusCode: http.StatusOK} proxy.(*Proxy).proxyFunc.ServeHTTP(rw, r) latency := time.Since(start) statusCode := rw.statusCode if proxyManager.requestLogger != nil { proxyManager.requestLogger.LogRequest(host, statusCode, latency, ip, method, path) } } // getClientIP retrieves the client's IP address from the request. // It handles cases where the IP might be forwarded by proxies. func getClientIP(r *http.Request) string { if forwarded := r.Header.Get("X-Forwarded-For"); forwarded != "" { return forwarded } return r.RemoteAddr } // ResponseWriterInterceptor is a custom http.ResponseWriter that captures the status code. type ResponseWriterInterceptor struct { http.ResponseWriter statusCode int } func (rw *ResponseWriterInterceptor) WriteHeader(code int) { rw.statusCode = code rw.ResponseWriter.WriteHeader(code) } type Proxy struct { target string proxyFunc *httputil.ReverseProxy shutdownTimeout time.Duration activeRequests int64 } // TODO: make this configurable? const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second // Creates a proxy for a given deployment func NewDeploymentProxy(target string, transportFunc func(string) *http.Transport) (*Proxy, error) { proxy := &Proxy{ target: target, shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT, activeRequests: 0, } transport := transportFunc(target) proxy.proxyFunc = &httputil.ReverseProxy{ Director: func(req *http.Request) { req.URL = &url.URL{ Scheme: "http", Host: req.Host, Path: req.URL.Path, } atomic.AddInt64(&proxy.activeRequests, 1) }, Transport: transport, ModifyResponse: func(resp *http.Response) error { atomic.AddInt64(&proxy.activeRequests, -1) return nil }, ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { atomic.AddInt64(&proxy.activeRequests, -1) w.WriteHeader(http.StatusInternalServerError) }, } return proxy, nil } // waits for the proxy to be drained of connections within the shutdown timeout, then calls the shutdownFunc (the proxy should be removes or replaced in the ProxyMaager) func (p *Proxy) GracefulShutdown(shutdownFunc func()) error { ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout) defer cancel() for { select { case <-ctx.Done(): if shutdownFunc != nil { shutdownFunc() } return fmt.Errorf("proxy shutdown timed out for %s", p.target) default: } if atomic.LoadInt64(&p.activeRequests) == 0 { break } time.Sleep(time.Second) } if shutdownFunc != nil { shutdownFunc() } return nil }