fix proxy race conditions

This commit is contained in:
Zoe
2024-12-04 23:08:17 -06:00
parent 7a71275551
commit f1ad13a216
5 changed files with 93 additions and 97 deletions

View File

@@ -2,6 +2,7 @@ package models
type Containers struct { type Containers struct {
ID string `json:"id"` ID string `json:"id"`
Head bool `json:"head"` // if the container is the head of the deployment
ContainerID string `json:"container_id"` ContainerID string `json:"container_id"`
DeploymentID int64 `json:"deployment_id"` DeploymentID int64 `json:"deployment_id"`
CreatedAt string `json:"created_at"` CreatedAt string `json:"created_at"`

View File

@@ -23,7 +23,7 @@ func (s *FluxServer) CreateDeployment(ctx context.Context, projectConfig models.
return 0, err return 0, err
} }
_, err = s.db.Exec("INSERT INTO containers (container_id, deployment_id) VALUES (?, ?)", containerID, deploymentID) _, err = s.db.Exec("INSERT INTO containers (container_id, deployment_id, head) VALUES (?, ?, ?)", containerID, deploymentID, true)
if err != nil { if err != nil {
log.Printf("Failed to get container id: %v\n", err) log.Printf("Failed to get container id: %v\n", err)
return 0, err return 0, err
@@ -65,7 +65,10 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64,
return err return err
} }
s.db.Exec("INSERT INTO containers (container_id, deployment_id) VALUES (?, ?)", containerID, deploymentID) if _, err := s.db.Exec("INSERT INTO containers (container_id, deployment_id, head) VALUES (?, ?, ?)", containerID, deploymentID, true); err != nil {
log.Printf("Failed to insert container: %v\n", err)
return err
}
// update app in the database // update app in the database
if _, err := s.db.Exec("UPDATE apps SET project_config = ?, deployment_id = ? WHERE name = ?", configBytes, deploymentID, projectConfig.Name); err != nil { if _, err := s.db.Exec("UPDATE apps SET project_config = ?, deployment_id = ? WHERE name = ?", configBytes, deploymentID, projectConfig.Name); err != nil {

View File

@@ -3,7 +3,6 @@ package server
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
@@ -18,13 +17,16 @@ import (
) )
type ContainerProxy struct { type ContainerProxy struct {
mu sync.RWMutex routes *RouteCache
urlMap map[string]*containerRoute
db *sql.DB db *sql.DB
cm *ContainerManager cm *ContainerManager
activeConns int64 activeConns int64
} }
type RouteCache struct {
m sync.Map
}
type containerRoute struct { type containerRoute struct {
containerID string containerID string
port int port int
@@ -33,107 +35,49 @@ type containerRoute struct {
isActive bool isActive bool
} }
func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (rc *RouteCache) GetRoute(appUrl string) *containerRoute {
cp.mu.RLock()
// defer cp.mu.RUnlock()
// Extract app name from host container, exists := rc.m.Load(appUrl)
appUrl := r.Host if !exists {
var container *containerRoute return nil
container, exists := cp.urlMap[appUrl]
if !exists || !container.isActive {
container = &containerRoute{
url: appUrl,
}
var deploymentID int64
cp.db.QueryRow("SELECT id FROM deployments WHERE url = ?", appUrl).Scan(&deploymentID)
if deploymentID == 0 {
fmt.Printf("No deployment found for url: %s\n", appUrl)
http.Error(w, "Container not found", http.StatusNotFound)
return
}
cp.db.QueryRow("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID).Scan(&container.containerID)
if container.containerID == "" {
fmt.Printf("No container found for deployment: %d\n", deploymentID)
http.Error(w, "Container not found", http.StatusNotFound)
return
}
var projectConfigStr string
if err := cp.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr); err != nil || projectConfigStr == "" {
http.Error(w, "Container not found", http.StatusNotFound)
return
}
var projectConfig models.ProjectConfig
if err := json.Unmarshal([]byte(projectConfigStr), &projectConfig); err != nil {
http.Error(w, "Failed to parse json", http.StatusNotFound)
return
}
container.port = projectConfig.Port
// cp.urlMap[appUrl] = container
} }
if container.proxy == nil { return container.(*containerRoute)
containerJSON, err := cp.cm.dockerClient.ContainerInspect(r.Context(), container.containerID) }
if err != nil {
log.Printf("Failed to inspect container: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if containerJSON.State.Status != "running" { func (rc *RouteCache) SetRoute(appUrl string, container *containerRoute) {
log.Printf("Container %s is not running\n", container.containerID) rc.m.Store(appUrl, container)
http.Error(w, "Container not running", http.StatusInternalServerError) }
return
}
url, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, container.port)) func (rc *RouteCache) DeleteRoute(appUrl string) {
if err != nil { rc.m.Delete(appUrl)
log.Printf("Failed to parse URL: %v\n", err) }
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// container.proxy = httputil.NewSingleHostReverseProxy(url)
container.proxy = cp.createProxy(url)
if container.proxy == nil {
log.Printf("Failed to create proxy for container %s\n", container.containerID)
http.Error(w, "Failed to create proxy", http.StatusInternalServerError)
container.isActive = false
return
}
cp.mu.RUnlock() func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cp.mu.Lock() // Extract app name from host
cp.urlMap[appUrl] = container appUrl := r.Host
cp.mu.Unlock()
} else { container := cp.routes.GetRoute(appUrl)
cp.mu.RUnlock() if container == nil {
http.Error(w, "Container not found", http.StatusNotFound)
return
} }
container.proxy.ServeHTTP(w, r) container.proxy.ServeHTTP(w, r)
} }
func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error { func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error {
cp.mu.Lock()
defer cp.mu.Unlock()
containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID) containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID)
if err != nil { if err != nil {
log.Printf("Failed to inspect container: %v\n", err) log.Printf("Failed to inspect container: %v\n", err)
return err return err
} }
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port)) containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port))
if err != nil { if err != nil {
log.Printf("Failed to parse URL: %v\n", err)
return err return err
} }
container, ok := cp.urlMap[projectConfig.Url]
if ok && container.proxy != nil {
container.isActive = true
return nil
}
proxy := cp.createProxy(containerUrl) proxy := cp.createProxy(containerUrl)
newRoute := &containerRoute{ newRoute := &containerRoute{
@@ -143,7 +87,7 @@ func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, conta
isActive: true, isActive: true,
} }
cp.urlMap[projectConfig.Url] = newRoute cp.routes.SetRoute(projectConfig.Url, newRoute)
return nil return nil
} }
@@ -153,6 +97,13 @@ func (cp *ContainerProxy) createProxy(url *url.URL) *httputil.ReverseProxy {
originalDirector := proxy.Director originalDirector := proxy.Director
proxy.Director = func(req *http.Request) { proxy.Director = func(req *http.Request) {
atomic.AddInt64(&cp.activeConns, 1) atomic.AddInt64(&cp.activeConns, 1)
// Validate URL before directing
if url == nil {
log.Printf("URL is nil")
return
}
originalDirector(req) originalDirector(req)
} }
@@ -163,18 +114,20 @@ func (cp *ContainerProxy) createProxy(url *url.URL) *httputil.ReverseProxy {
// Handle errors // Handle errors
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
log.Printf("Proxy error: %v", err) atomic.AddInt64(&cp.activeConns, -1)
http.Error(w, "Service unavailable", http.StatusServiceUnavailable) http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
r.Body.Close()
// Ensure request body is closed
if r.Body != nil {
r.Body.Close()
}
} }
return proxy return proxy
} }
func (cp *ContainerProxy) RemoveContainer(containerID string) error { func (cp *ContainerProxy) RemoveContainer(containerID string) error {
cp.mu.Lock()
defer cp.mu.Unlock()
var deploymentID int64 var deploymentID int64
if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil { if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil {
return err return err
@@ -185,8 +138,8 @@ func (cp *ContainerProxy) RemoveContainer(containerID string) error {
return err return err
} }
container, exists := cp.urlMap[url] container := cp.routes.GetRoute(url)
if !exists { if container == nil {
return fmt.Errorf("container not found") return fmt.Errorf("container not found")
} }
@@ -198,19 +151,57 @@ func (cp *ContainerProxy) RemoveContainer(containerID string) error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
delete(cp.urlMap, url) cp.routes.DeleteRoute(url)
return nil return nil
default: default:
if atomic.LoadInt64(&cp.activeConns) == 0 { if atomic.LoadInt64(&cp.activeConns) == 0 {
delete(cp.urlMap, url) cp.routes.DeleteRoute(url)
return nil return nil
} }
time.Sleep(100 * time.Millisecond) }
}
}
func (cp *ContainerProxy) ScanRoutes() {
rows, err := cp.db.Query("SELECT url, id FROM deployments")
if err != nil {
log.Printf("Failed to query deployments: %v\n", err)
return
}
defer rows.Close()
var containers []models.Containers
for rows.Next() {
var url string
var deploymentID int64
if err := rows.Scan(&url, &deploymentID); err != nil {
log.Printf("Failed to scan deployment: %v\n", err)
return
}
rows, err := cp.db.Query("SELECT * FROM containers WHERE deployment_id = ?", deploymentID)
if err != nil {
log.Printf("Failed to query containers: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var container models.Containers
if err := rows.Scan(&container.ID, &container.ContainerID, &container.Head, &container.DeploymentID, &container.CreatedAt); err != nil {
log.Printf("Failed to scan container: %v\n", err)
return
}
fmt.Printf("Found container: %s\n", container.ContainerID)
containers = append(containers, container)
} }
} }
} }
func (cp *ContainerProxy) Start() { func (cp *ContainerProxy) Start() {
cp.ScanRoutes()
port := os.Getenv("FLUXD_PROXY_PORT") port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" { if port == "" {
port = "7465" port = "7465"

View File

@@ -12,6 +12,7 @@ CREATE TABLE IF NOT EXISTS apps (
CREATE TABLE IF NOT EXISTS containers ( CREATE TABLE IF NOT EXISTS containers (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
container_id TEXT NOT NULL, container_id TEXT NOT NULL,
head BOOLEAN NOT NULL,
deployment_id INTEGER NOT NULL, deployment_id INTEGER NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(deployment_id) REFERENCES deployments(id) FOREIGN KEY(deployment_id) REFERENCES deployments(id)

View File

@@ -105,7 +105,7 @@ func NewServer() *FluxServer {
config: serverConfig, config: serverConfig,
db: db, db: db,
Proxy: &ContainerProxy{ Proxy: &ContainerProxy{
urlMap: make(map[string]*containerRoute), routes: &RouteCache{},
db: db, db: db,
cm: containerManager, cm: containerManager,
}, },