From f1ad13a216969b20a7fc3481f4fba709bb87ba49 Mon Sep 17 00:00:00 2001 From: juls0730 <62722391+juls0730@users.noreply.github.com> Date: Wed, 4 Dec 2024 23:08:17 -0600 Subject: [PATCH] fix proxy race conditions --- models/docker.go | 1 + server/deployment.go | 7 +- server/proxy.go | 179 ++++++++++++++++++++----------------------- server/schema.sql | 1 + server/server.go | 2 +- 5 files changed, 93 insertions(+), 97 deletions(-) diff --git a/models/docker.go b/models/docker.go index 58f27c1..787d3a2 100644 --- a/models/docker.go +++ b/models/docker.go @@ -2,6 +2,7 @@ package models type Containers struct { ID string `json:"id"` + Head bool `json:"head"` // if the container is the head of the deployment ContainerID string `json:"container_id"` DeploymentID int64 `json:"deployment_id"` CreatedAt string `json:"created_at"` diff --git a/server/deployment.go b/server/deployment.go index 7b208ea..794583b 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -23,7 +23,7 @@ func (s *FluxServer) CreateDeployment(ctx context.Context, projectConfig models. return 0, err } - _, err = s.db.Exec("INSERT INTO containers (container_id, deployment_id) VALUES (?, ?)", containerID, deploymentID) + _, err = s.db.Exec("INSERT INTO containers (container_id, deployment_id, head) VALUES (?, ?, ?)", containerID, deploymentID, true) if err != nil { log.Printf("Failed to get container id: %v\n", err) return 0, err @@ -65,7 +65,10 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64, return err } - s.db.Exec("INSERT INTO containers (container_id, deployment_id) VALUES (?, ?)", containerID, deploymentID) + if _, err := s.db.Exec("INSERT INTO containers (container_id, deployment_id, head) VALUES (?, ?, ?)", containerID, deploymentID, true); err != nil { + log.Printf("Failed to insert container: %v\n", err) + return err + } // update app in the database if _, err := s.db.Exec("UPDATE apps SET project_config = ?, deployment_id = ? WHERE name = ?", configBytes, deploymentID, projectConfig.Name); err != nil { diff --git a/server/proxy.go b/server/proxy.go index 6c2c3ce..723583f 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -3,7 +3,6 @@ package server import ( "context" "database/sql" - "encoding/json" "fmt" "log" "net/http" @@ -18,13 +17,16 @@ import ( ) type ContainerProxy struct { - mu sync.RWMutex - urlMap map[string]*containerRoute + routes *RouteCache db *sql.DB cm *ContainerManager activeConns int64 } +type RouteCache struct { + m sync.Map +} + type containerRoute struct { containerID string port int @@ -33,107 +35,49 @@ type containerRoute struct { isActive bool } -func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - cp.mu.RLock() - // defer cp.mu.RUnlock() +func (rc *RouteCache) GetRoute(appUrl string) *containerRoute { - // Extract app name from host - appUrl := r.Host - var container *containerRoute - container, exists := cp.urlMap[appUrl] - if !exists || !container.isActive { - container = &containerRoute{ - url: appUrl, - } - var deploymentID int64 - cp.db.QueryRow("SELECT id FROM deployments WHERE url = ?", appUrl).Scan(&deploymentID) - if deploymentID == 0 { - fmt.Printf("No deployment found for url: %s\n", appUrl) - http.Error(w, "Container not found", http.StatusNotFound) - return - } - - cp.db.QueryRow("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID).Scan(&container.containerID) - if container.containerID == "" { - fmt.Printf("No container found for deployment: %d\n", deploymentID) - http.Error(w, "Container not found", http.StatusNotFound) - return - } - - var projectConfigStr string - if err := cp.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr); err != nil || projectConfigStr == "" { - http.Error(w, "Container not found", http.StatusNotFound) - return - } - var projectConfig models.ProjectConfig - if err := json.Unmarshal([]byte(projectConfigStr), &projectConfig); err != nil { - http.Error(w, "Failed to parse json", http.StatusNotFound) - return - } - container.port = projectConfig.Port - - // cp.urlMap[appUrl] = container + container, exists := rc.m.Load(appUrl) + if !exists { + return nil } - if container.proxy == nil { - containerJSON, err := cp.cm.dockerClient.ContainerInspect(r.Context(), container.containerID) - if err != nil { - log.Printf("Failed to inspect container: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + return container.(*containerRoute) +} - if containerJSON.State.Status != "running" { - log.Printf("Container %s is not running\n", container.containerID) - http.Error(w, "Container not running", http.StatusInternalServerError) - return - } +func (rc *RouteCache) SetRoute(appUrl string, container *containerRoute) { + rc.m.Store(appUrl, container) +} - url, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, container.port)) - if err != nil { - log.Printf("Failed to parse URL: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - // container.proxy = httputil.NewSingleHostReverseProxy(url) - container.proxy = cp.createProxy(url) - if container.proxy == nil { - log.Printf("Failed to create proxy for container %s\n", container.containerID) - http.Error(w, "Failed to create proxy", http.StatusInternalServerError) - container.isActive = false - return - } +func (rc *RouteCache) DeleteRoute(appUrl string) { + rc.m.Delete(appUrl) +} - cp.mu.RUnlock() - cp.mu.Lock() - cp.urlMap[appUrl] = container - cp.mu.Unlock() - } else { - cp.mu.RUnlock() +func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Extract app name from host + appUrl := r.Host + + container := cp.routes.GetRoute(appUrl) + if container == nil { + http.Error(w, "Container not found", http.StatusNotFound) + return } container.proxy.ServeHTTP(w, r) } func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error { - cp.mu.Lock() - defer cp.mu.Unlock() - containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID) if err != nil { log.Printf("Failed to inspect container: %v\n", err) return err } + containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port)) if err != nil { + log.Printf("Failed to parse URL: %v\n", err) return err } - - container, ok := cp.urlMap[projectConfig.Url] - if ok && container.proxy != nil { - container.isActive = true - return nil - } proxy := cp.createProxy(containerUrl) newRoute := &containerRoute{ @@ -143,7 +87,7 @@ func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, conta isActive: true, } - cp.urlMap[projectConfig.Url] = newRoute + cp.routes.SetRoute(projectConfig.Url, newRoute) return nil } @@ -153,6 +97,13 @@ func (cp *ContainerProxy) createProxy(url *url.URL) *httputil.ReverseProxy { originalDirector := proxy.Director proxy.Director = func(req *http.Request) { atomic.AddInt64(&cp.activeConns, 1) + + // Validate URL before directing + if url == nil { + log.Printf("URL is nil") + return + } + originalDirector(req) } @@ -163,18 +114,20 @@ func (cp *ContainerProxy) createProxy(url *url.URL) *httputil.ReverseProxy { // Handle errors proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { - log.Printf("Proxy error: %v", err) + atomic.AddInt64(&cp.activeConns, -1) + http.Error(w, "Service unavailable", http.StatusServiceUnavailable) - r.Body.Close() + + // Ensure request body is closed + if r.Body != nil { + r.Body.Close() + } } return proxy } func (cp *ContainerProxy) RemoveContainer(containerID string) error { - cp.mu.Lock() - defer cp.mu.Unlock() - var deploymentID int64 if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil { return err @@ -185,8 +138,8 @@ func (cp *ContainerProxy) RemoveContainer(containerID string) error { return err } - container, exists := cp.urlMap[url] - if !exists { + container := cp.routes.GetRoute(url) + if container == nil { return fmt.Errorf("container not found") } @@ -198,19 +151,57 @@ func (cp *ContainerProxy) RemoveContainer(containerID string) error { for { select { case <-ctx.Done(): - delete(cp.urlMap, url) + cp.routes.DeleteRoute(url) return nil default: if atomic.LoadInt64(&cp.activeConns) == 0 { - delete(cp.urlMap, url) + cp.routes.DeleteRoute(url) return nil } - time.Sleep(100 * time.Millisecond) + } + } +} + +func (cp *ContainerProxy) ScanRoutes() { + rows, err := cp.db.Query("SELECT url, id FROM deployments") + if err != nil { + log.Printf("Failed to query deployments: %v\n", err) + return + } + defer rows.Close() + + var containers []models.Containers + for rows.Next() { + var url string + var deploymentID int64 + if err := rows.Scan(&url, &deploymentID); err != nil { + log.Printf("Failed to scan deployment: %v\n", err) + return + } + + rows, err := cp.db.Query("SELECT * FROM containers WHERE deployment_id = ?", deploymentID) + if err != nil { + log.Printf("Failed to query containers: %v\n", err) + return + } + defer rows.Close() + + for rows.Next() { + var container models.Containers + if err := rows.Scan(&container.ID, &container.ContainerID, &container.Head, &container.DeploymentID, &container.CreatedAt); err != nil { + log.Printf("Failed to scan container: %v\n", err) + return + } + + fmt.Printf("Found container: %s\n", container.ContainerID) + + containers = append(containers, container) } } } func (cp *ContainerProxy) Start() { + cp.ScanRoutes() port := os.Getenv("FLUXD_PROXY_PORT") if port == "" { port = "7465" diff --git a/server/schema.sql b/server/schema.sql index 2c6bf6f..a876b86 100644 --- a/server/schema.sql +++ b/server/schema.sql @@ -12,6 +12,7 @@ CREATE TABLE IF NOT EXISTS apps ( CREATE TABLE IF NOT EXISTS containers ( id INTEGER PRIMARY KEY AUTOINCREMENT, container_id TEXT NOT NULL, + head BOOLEAN NOT NULL, deployment_id INTEGER NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(deployment_id) REFERENCES deployments(id) diff --git a/server/server.go b/server/server.go index f50eb84..7ce9abd 100644 --- a/server/server.go +++ b/server/server.go @@ -105,7 +105,7 @@ func NewServer() *FluxServer { config: serverConfig, db: db, Proxy: &ContainerProxy{ - urlMap: make(map[string]*containerRoute), + routes: &RouteCache{}, db: db, cm: containerManager, },