almost there

This commit is contained in:
Zoe
2024-12-07 02:35:38 -06:00
parent f1ad13a216
commit d27cc71f1d
12 changed files with 707 additions and 586 deletions

View File

@@ -2,7 +2,6 @@ package server
import (
"context"
"database/sql"
"fmt"
"log"
"net/http"
@@ -12,209 +11,119 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/juls0730/fluxd/models"
)
type ContainerProxy struct {
routes *RouteCache
db *sql.DB
cm *ContainerManager
activeConns int64
var ReverseProxy *Proxy
type Proxy struct {
deployments sync.Map
}
type RouteCache struct {
m sync.Map
func (p *Proxy) AddDeployment(deployment *Deployment) {
log.Printf("Adding deployment %s\n", deployment.URL)
p.deployments.Store(deployment.URL, deployment)
}
type containerRoute struct {
containerID string
port int
url string
proxy *httputil.ReverseProxy
isActive bool
}
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host := r.Host
func (rc *RouteCache) GetRoute(appUrl string) *containerRoute {
container, exists := rc.m.Load(appUrl)
if !exists {
return nil
}
return container.(*containerRoute)
}
func (rc *RouteCache) SetRoute(appUrl string, container *containerRoute) {
rc.m.Store(appUrl, container)
}
func (rc *RouteCache) DeleteRoute(appUrl string) {
rc.m.Delete(appUrl)
}
func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Extract app name from host
appUrl := r.Host
container := cp.routes.GetRoute(appUrl)
if container == nil {
http.Error(w, "Container not found", http.StatusNotFound)
deployment, ok := p.deployments.Load(host)
if !ok {
http.Error(w, "Not found", http.StatusNotFound)
return
}
container.proxy.ServeHTTP(w, r)
}
atomic.AddInt64(&deployment.(*Deployment).Proxy.activeRequests, 1)
func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error {
containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID)
if err != nil {
log.Printf("Failed to inspect container: %v\n", err)
return err
}
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port))
if err != nil {
log.Printf("Failed to parse URL: %v\n", err)
return err
}
proxy := cp.createProxy(containerUrl)
newRoute := &containerRoute{
url: projectConfig.Url,
proxy: proxy,
port: projectConfig.Port,
isActive: true,
}
cp.routes.SetRoute(projectConfig.Url, newRoute)
return nil
}
func (cp *ContainerProxy) createProxy(url *url.URL) *httputil.ReverseProxy {
proxy := httputil.NewSingleHostReverseProxy(url)
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
atomic.AddInt64(&cp.activeConns, 1)
// Validate URL before directing
if url == nil {
log.Printf("URL is nil")
return
}
originalDirector(req)
}
proxy.ModifyResponse = func(resp *http.Response) error {
atomic.AddInt64(&cp.activeConns, -1)
return nil
}
// Handle errors
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
atomic.AddInt64(&cp.activeConns, -1)
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
// Ensure request body is closed
if r.Body != nil {
r.Body.Close()
}
}
return proxy
}
func (cp *ContainerProxy) RemoveContainer(containerID string) error {
var deploymentID int64
if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil {
return err
}
var url string
if err := cp.db.QueryRow("SELECT url FROM deployments WHERE id = ?", deploymentID).Scan(&url); err != nil {
return err
}
container := cp.routes.GetRoute(url)
container := deployment.(*Deployment).Proxy.currentHead
if container == nil {
return fmt.Errorf("container not found")
http.Error(w, "No active container found", http.StatusNotFound)
return
}
container.isActive = false
containerJSON, err := dockerClient.ContainerInspect(context.Background(), string(container.ContainerID[:]))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, container.Deployment.Port))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL = containerUrl
req.Host = containerUrl.Host
},
ModifyResponse: func(resp *http.Response) error {
atomic.AddInt64(&deployment.(*Deployment).Proxy.activeRequests, -1)
return nil
},
}
proxy.ServeHTTP(w, r)
}
type DeploymentProxy struct {
deployment *Deployment
currentHead *Container
gracePeriod time.Duration
activeRequests int64
}
func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) {
ctx, cancel := context.WithTimeout(context.Background(), dp.gracePeriod)
defer cancel()
// Create a channel to signal when wait group is done
for {
select {
case <-ctx.Done():
cp.routes.DeleteRoute(url)
return nil
break
default:
if atomic.LoadInt64(&cp.activeConns) == 0 {
cp.routes.DeleteRoute(url)
return nil
if atomic.LoadInt64(&dp.activeRequests) == 0 {
break
}
}
}
}
func (cp *ContainerProxy) ScanRoutes() {
rows, err := cp.db.Query("SELECT url, id FROM deployments")
if err != nil {
log.Printf("Failed to query deployments: %v\n", err)
return
}
defer rows.Close()
var containers []models.Containers
for rows.Next() {
var url string
var deploymentID int64
if err := rows.Scan(&url, &deploymentID); err != nil {
log.Printf("Failed to scan deployment: %v\n", err)
return
time.Sleep(time.Second)
}
rows, err := cp.db.Query("SELECT * FROM containers WHERE deployment_id = ?", deploymentID)
if atomic.LoadInt64(&dp.activeRequests) == 0 || ctx.Err() != nil {
break
}
}
for _, container := range oldContainers {
err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to query containers: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var container models.Containers
if err := rows.Scan(&container.ID, &container.ContainerID, &container.Head, &container.DeploymentID, &container.CreatedAt); err != nil {
log.Printf("Failed to scan container: %v\n", err)
return
}
fmt.Printf("Found container: %s\n", container.ContainerID)
containers = append(containers, container)
log.Printf("Failed to remove container: %v\n", err)
}
}
}
func (cp *ContainerProxy) Start() {
cp.ScanRoutes()
func InitProxy(apps *AppManager) {
ReverseProxy = &Proxy{}
apps.Range(func(key, value interface{}) bool {
app := value.(*App)
ReverseProxy.AddDeployment(&app.Deployment)
return true
})
}
func InitReverseProxy() {
InitProxy(Apps)
port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" {
port = "7465"
}
server := &http.Server{
Addr: fmt.Sprintf(":%s", port),
Handler: cp,
}
go func() {
log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), ReverseProxy); err != nil && err != http.ErrServerClosed {
log.Fatalf("Proxy server error: %v", err)
}
}()