From 11561e864cf3a832f8775227a4578361419a752f Mon Sep 17 00:00:00 2001 From: juls0730 <62722391+juls0730@users.noreply.github.com> Date: Wed, 4 Dec 2024 06:16:28 -0600 Subject: [PATCH] random fixes and a REVERSE PROXY!!! --- cmd/cli/main.go | 154 +++++++++++++++++----------------- cmd/daemon/main.go | 2 + go.mod | 2 +- models/app.go | 10 +-- models/docker.go | 2 +- server/container.go | 85 +++++++++++++++---- server/deploy.go | 33 +++++--- server/deployment.go | 85 +++++++++++++------ server/proxy.go | 192 +++++++++++++++++++++++++++++++++++++++++++ server/schema.sql | 2 +- server/server.go | 8 +- 11 files changed, 443 insertions(+), 132 deletions(-) create mode 100644 server/proxy.go diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 4c78b84..4b4bb1d 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" "strings" "time" @@ -81,6 +82,36 @@ func compressDirectory() ([]byte, error) { return buf.Bytes(), nil } +func getProjectName() string { + var projectName string + + if len(os.Args) < 3 { + if _, err := os.Stat("flux.json"); err != nil { + fmt.Printf("Usage: flux %[1]s , or run flux %[1]s in the project directory\n", os.Args[1]) + os.Exit(1) + } + + fluxConfigFile, err := os.Open("flux.json") + if err != nil { + fmt.Printf("Failed to open flux.json: %v\n", err) + os.Exit(1) + } + defer fluxConfigFile.Close() + + var config models.ProjectConfig + if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { + fmt.Printf("Failed to decode flux.json: %v\n", err) + os.Exit(1) + } + + projectName = config.Name + } else { + projectName = os.Args[2] + } + + return projectName +} + func main() { if len(os.Args) < 2 { fmt.Println("Usage: flux ") @@ -115,6 +146,11 @@ func main() { switch command { case "deploy": + if _, err := os.Stat("flux.json"); err != nil { + fmt.Printf("No flux.json found, please run flux init first\n") + os.Exit(1) + } + loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond) loadingSpinner.Suffix = " Deploying" loadingSpinner.Start() @@ -205,31 +241,7 @@ func main() { loadingSpinner.Stop() fmt.Println("Deployed successfully!") case "stop": - var projectName string - - if len(os.Args) < 3 { - if _, err := os.Stat("flux.json"); err != nil { - fmt.Printf("Usage: flux delete , or run flux delete in the project directory\n") - os.Exit(1) - } - - fluxConfigFile, err := os.Open("flux.json") - if err != nil { - fmt.Printf("Failed to open flux.json: %v\n", err) - os.Exit(1) - } - defer fluxConfigFile.Close() - - var config models.ProjectConfig - if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - fmt.Printf("Failed to decode flux.json: %v\n", err) - os.Exit(1) - } - - projectName = config.Name - } else { - projectName = os.Args[2] - } + projectName := getProjectName() req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) if err != nil { @@ -255,31 +267,7 @@ func main() { fmt.Printf("Successfully stopped %s\n", projectName) case "start": - var projectName string - - if len(os.Args) < 3 { - if _, err := os.Stat("flux.json"); err != nil { - fmt.Printf("Usage: flux delete , or run flux delete in the project directory\n") - os.Exit(1) - } - - fluxConfigFile, err := os.Open("flux.json") - if err != nil { - fmt.Printf("Failed to open flux.json: %v\n", err) - os.Exit(1) - } - defer fluxConfigFile.Close() - - var config models.ProjectConfig - if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - fmt.Printf("Failed to decode flux.json: %v\n", err) - os.Exit(1) - } - - projectName = config.Name - } else { - projectName = os.Args[2] - } + projectName := getProjectName() req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) if err != nil { @@ -305,31 +293,7 @@ func main() { fmt.Printf("Successfully started %s\n", projectName) case "delete": - var projectName string - - if len(os.Args) < 3 { - if _, err := os.Stat("flux.json"); err != nil { - fmt.Printf("Usage: flux delete , or run flux delete in the project directory\n") - os.Exit(1) - } - - fluxConfigFile, err := os.Open("flux.json") - if err != nil { - fmt.Printf("Failed to open flux.json: %v\n", err) - os.Exit(1) - } - defer fluxConfigFile.Close() - - var config models.ProjectConfig - if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - fmt.Printf("Failed to decode flux.json: %v\n", err) - os.Exit(1) - } - - projectName = config.Name - } else { - projectName = os.Args[2] - } + projectName := getProjectName() // ask for confirmation fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N]", projectName) @@ -390,6 +354,46 @@ func main() { for _, app := range apps { fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) } + case "init": + var projectConfig models.ProjectConfig + + var response string + if len(os.Args) > 2 { + response = os.Args[2] + } else { + fmt.Println("What is the name of your project?") + fmt.Scanln(&response) + projectConfig.Name = response + } + + fmt.Println("What URL should your project listen to?") + fmt.Scanln(&response) + if strings.HasPrefix(response, "http") { + strings.TrimPrefix(response, "http://") + strings.TrimPrefix(response, "https://") + } + + response = strings.Split(response, "/")[0] + + projectConfig.Url = response + + fmt.Println("What port does your project listen to?") + fmt.Scanln(&response) + projectConfig.Port, err = strconv.Atoi(response) + if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { + fmt.Println("That doesnt look like a valid port", err) + os.Exit(1) + } + + configBytes, err := json.MarshalIndent(projectConfig, "", " ") + if err != nil { + fmt.Printf("Failed to marshal project config: %v\n", err) + os.Exit(1) + } + + os.WriteFile("flux.json", configBytes, 0644) + + fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) default: fmt.Println("Unknown command:", command) } diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index a5dba20..a43e901 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -10,6 +10,8 @@ import ( func main() { fluxServer := server.NewServer() + go fluxServer.Proxy.Start() + http.HandleFunc("POST /deploy", fluxServer.DeployHandler) http.HandleFunc("DELETE /deploy/{name}", fluxServer.DeleteDeployHandler) http.HandleFunc("POST /start/{name}", fluxServer.StartDeployHandler) diff --git a/go.mod b/go.mod index ec9df84..45dc159 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.23.3 require ( github.com/briandowns/spinner v1.23.1 github.com/docker/docker v27.3.1+incompatible - github.com/docker/go-connections v0.5.0 github.com/joho/godotenv v1.5.1 github.com/mattn/go-sqlite3 v1.14.24 ) @@ -14,6 +13,7 @@ require ( github.com/Microsoft/go-winio v0.4.14 // indirect github.com/containerd/log v0.1.0 // indirect github.com/distribution/reference v0.6.0 // indirect + github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/fatih/color v1.7.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect diff --git a/models/app.go b/models/app.go index 4ab4d26..f2bfed8 100644 --- a/models/app.go +++ b/models/app.go @@ -1,11 +1,11 @@ package models type ProjectConfig struct { - Name string `json:"name"` - Urls []string `json:"urls"` - Port int `json:"port"` - EnvFile string `json:"env_file"` - Environment []string `json:"environment"` + Name string `json:"name,omitempty"` + Url string `json:"url,omitempty"` + Port int `json:"port,omitempty"` + EnvFile string `json:"env_file,omitempty"` + Environment []string `json:"environment,omitempty"` } type App struct { diff --git a/models/docker.go b/models/docker.go index cff7004..58f27c1 100644 --- a/models/docker.go +++ b/models/docker.go @@ -9,6 +9,6 @@ type Containers struct { type Deployments struct { ID int64 `json:"id"` - URLs string `json:"urls"` + URL string `json:"url"` CreatedAt string `json:"created_at"` } diff --git a/server/container.go b/server/container.go index 0c9c2a8..7bd42f0 100644 --- a/server/container.go +++ b/server/container.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "log" + "net/http" "os" "path/filepath" - "strconv" "strings" "time" @@ -14,7 +14,6 @@ import ( "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" - "github.com/docker/go-connections/nat" "github.com/joho/godotenv" "github.com/juls0730/fluxd/models" ) @@ -71,22 +70,22 @@ func (cm *ContainerManager) CreateContainer(ctx context.Context, imageName, proj resp, err := cm.dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: projectConfig.Environment, - ExposedPorts: nat.PortSet{ - nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): {}, - }, + // ExposedPorts: nat.PortSet{ + // nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): {}, + // }, Volumes: map[string]struct{}{ vol.Name: {}, }, }, &container.HostConfig{ - PortBindings: nat.PortMap{ - nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): []nat.PortBinding{ - { - HostIP: "0.0.0.0", - HostPort: strconv.Itoa(projectConfig.Port), - }, - }, - }, + // PortBindings: nat.PortMap{ + // nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): []nat.PortBinding{ + // { + // HostIP: "0.0.0.0", + // HostPort: strconv.Itoa(projectConfig.Port), + // }, + // }, + // }, Mounts: []mount.Mount{ { Type: mount.TypeVolume, @@ -129,6 +128,64 @@ func (cm *ContainerManager) RemoveContainer(ctx context.Context, containerID str return nil } +func (cm *ContainerManager) WaitForContainer(ctx context.Context, containerID string, containerPort int) error { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("container failed to become ready in time") + + default: + containerJSON, err := cm.dockerClient.ContainerInspect(ctx, containerID) + if err != nil { + return err + } + + if containerJSON.State.Running { + resp, err := http.Get(fmt.Sprintf("http://%s:%d/", containerJSON.NetworkSettings.IPAddress, containerPort)) + if err == nil && resp.StatusCode == http.StatusOK { + return nil + } + } + + time.Sleep(time.Second) + } + } +} + +func (cm *ContainerManager) GracefullyRemoveContainer(ctx context.Context, containerID string) error { + timeout := 30 + err := cm.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{ + Timeout: &timeout, + }) + if err != nil { + return fmt.Errorf("Failed to stop container: %v", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + return cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + default: + containerJSON, err := cm.dockerClient.ContainerInspect(ctx, containerID) + if err != nil { + return err + } + + if !containerJSON.State.Running { + return cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + } + + time.Sleep(time.Second) + } + } +} + func (cm *ContainerManager) RemoveVolume(ctx context.Context, volumeID string) error { if err := cm.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { return fmt.Errorf("Failed to remove existing volume: %v", err) @@ -147,7 +204,7 @@ func (cm *ContainerManager) findExistingContainers(ctx context.Context, containe var existingContainers []string for _, container := range containers { - if strings.HasPrefix(container.Names[0], fmt.Sprintf("/%s", containerPrefix)) { + if strings.HasPrefix(container.Names[0], fmt.Sprintf("/%s-", containerPrefix)) { existingContainers = append(existingContainers, container.ID) } } diff --git a/server/deploy.go b/server/deploy.go index 1f5afe9..95bbefd 100644 --- a/server/deploy.go +++ b/server/deploy.go @@ -56,8 +56,8 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - if projectConfig.Urls == nil || len(projectConfig.Urls) == 0 { - http.Error(w, "No deployment urls specified", http.StatusBadRequest) + if projectConfig.Url == "" { + http.Error(w, "No deployment url specified", http.StatusBadRequest) return } @@ -66,7 +66,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - log.Printf("Deploying project %s to %s\n", projectConfig.Name, projectConfig.Urls) + log.Printf("Deploying project %s to %s\n", projectConfig.Name, projectConfig.Url) projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig) if err != nil { @@ -97,7 +97,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } var app models.App - s.db.QueryRow("SELECT id FROM apps WHERE name = ?", projectConfig.Name).Scan(&app.ID) + s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE name = ?", projectConfig.Name).Scan(&app.ID, &app.Name, &app.DeploymentID) if app.ID == 0 { configBytes, err := json.Marshal(projectConfig) @@ -137,7 +137,25 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE id = ?", appID).Scan(&app.ID, &app.Name, &app.DeploymentID) + + err = s.StartDeployment(r.Context(), app.DeploymentID) + if err != nil { + log.Printf("Failed to start deployment: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } else { + // if deploy is not started, start it + deploymentStatus, err := s.GetDeploymentStatus(r.Context(), app.DeploymentID) + if deploymentStatus != "started" || err != nil { + err = s.StartDeployment(r.Context(), app.DeploymentID) + if err != nil { + log.Printf("Failed to start deployment: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + err = s.UpgradeDeployment(r.Context(), app.DeploymentID, projectConfig, imageName, projectPath) if err != nil { log.Printf("Failed to upgrade deployment: %v\n", err) @@ -146,13 +164,6 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } } - err = s.StartDeployment(r.Context(), app.DeploymentID) - if err != nil { - log.Printf("Failed to start deployment: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - log.Printf("App %s deployed successfully!\n", app.Name) json.NewEncoder(w).Encode(DeployResponse{ diff --git a/server/deployment.go b/server/deployment.go index e3738cf..d0d1fbd 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -5,14 +5,13 @@ import ( "encoding/json" "fmt" "log" - "strings" "github.com/juls0730/fluxd/models" ) // Creates a deployment and containers in the database func (s *FluxServer) CreateDeployment(ctx context.Context, projectConfig models.ProjectConfig, containerID string) (int64, error) { - deploymentResult, err := s.db.Exec("INSERT INTO deployments (urls) VALUES (?)", strings.Join(projectConfig.Urls, ",")) + deploymentResult, err := s.db.Exec("INSERT INTO deployments (url) VALUES (?)", projectConfig.Url) if err != nil { log.Printf("Failed to insert deployment: %v\n", err) return 0, err @@ -45,33 +44,28 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64, return fmt.Errorf("Failed to find existing containers: %v", err) } - tx, err := s.db.Begin() - if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) - return err - } - // TODO: swap containers if they are running and have the same image so that we can have a constant uptime - for _, existingContainer := range existingContainers { - log.Printf("Stopping existing container: %s\n", existingContainer[0:12]) - - tx.Exec("DELETE FROM containers WHERE container_id = ?", existingContainer) - err = s.containerManager.RemoveContainer(ctx, existingContainer) - if err != nil { - return err - } - } - - if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) - return err - } + fmt.Printf("There are %d existing containers\n", len(existingContainers)) + // Deploy new container before deleting old one containerID, err := s.containerManager.CreateContainer(ctx, imageName, projectPath, projectConfig) if err != nil { log.Printf("Failed to create container: %v\n", err) return err } + err = s.containerManager.StartContainer(ctx, containerID) + if err != nil { + log.Printf("Failed to start container: %v\n", err) + return err + } + + if err := s.containerManager.WaitForContainer(ctx, containerID, projectConfig.Port); err != nil { + log.Printf("Failed to wait for container: %v\n", err) + return err + } + + s.Proxy.AddContainer(projectConfig, containerID) + s.db.Exec("INSERT INTO containers (container_id, deployment_id) VALUES (?, ?)", containerID, deploymentID) // update app in the database @@ -80,6 +74,29 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64, return err } + // TODO: swap containers if they are running and have the same image so that we can have a constant uptime + tx, err := s.db.Begin() + if err != nil { + log.Printf("Failed to begin transaction: %v\n", err) + return err + } + + for _, existingContainer := range existingContainers { + log.Printf("Stopping existing container: %s\n", existingContainer[0:12]) + + tx.Exec("DELETE FROM containers WHERE container_id = ?", existingContainer) + err = s.containerManager.GracefullyRemoveContainer(ctx, existingContainer) + if err != nil { + tx.Rollback() + return err + } + } + + if err := tx.Commit(); err != nil { + log.Printf("Failed to commit transaction: %v\n", err) + return err + } + return nil } @@ -102,8 +119,19 @@ func (s *FluxServer) StartDeployment(ctx context.Context, deploymentID int64) er containerIds = append(containerIds, newContainerId) } + var projectConfigStr []byte + s.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr) + var projectConfig models.ProjectConfig + if err := json.Unmarshal(projectConfigStr, &projectConfig); err != nil { + return err + } + if projectConfig.Name == "" { + return fmt.Errorf("No project config found for deployment") + } + for _, containerId := range containerIds { err := s.containerManager.StartContainer(ctx, containerId) + s.Proxy.AddContainer(projectConfig, containerId) if err != nil { log.Printf("Failed to start container: %v\n", err) return err @@ -143,8 +171,19 @@ func (s *FluxServer) StopDeployment(ctx context.Context, deploymentID int64) err containerIds = append(containerIds, newContainerId) } + var projectConfigStr []byte + s.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr) + var projectConfig models.ProjectConfig + if err := json.Unmarshal(projectConfigStr, &projectConfig); err != nil { + return err + } + if projectConfig.Name == "" { + return fmt.Errorf("No project config found for deployment") + } + for _, containerId := range containerIds { err := s.containerManager.StopContainer(ctx, containerId) + s.Proxy.RemoveContainer(containerId) if err != nil { log.Printf("Failed to start container: %v\n", err) return err @@ -176,7 +215,7 @@ func (s *FluxServer) GetStatus(ctx context.Context, containerID string) (string, func (s *FluxServer) GetDeploymentStatus(ctx context.Context, deploymentID int64) (string, error) { var deployment models.Deployments - s.db.QueryRow("SELECT id, urls FROM deployments WHERE id = ?", deploymentID).Scan(&deployment.ID, &deployment.URLs) + s.db.QueryRow("SELECT id, url FROM deployments WHERE id = ?", deploymentID).Scan(&deployment.ID, &deployment.URL) var containerIds []string rows, err := s.db.Query("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID) diff --git a/server/proxy.go b/server/proxy.go new file mode 100644 index 0000000..bfdc6e9 --- /dev/null +++ b/server/proxy.go @@ -0,0 +1,192 @@ +package server + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "net/http/httputil" + "net/url" + "sync" + "sync/atomic" + "time" + + "github.com/juls0730/fluxd/models" +) + +type ContainerProxy struct { + mu sync.RWMutex + urlMap map[string]*containerRoute + db *sql.DB + cm *ContainerManager + activeConns int64 +} + +type containerRoute struct { + containerID string + port int + url string + proxy *httputil.ReverseProxy + isActive bool +} + +func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + cp.mu.RLock() + defer cp.mu.RUnlock() + + // Extract app name from host + appUrl := r.Host + var container *containerRoute + container, exists := cp.urlMap[appUrl] + if !exists || !container.isActive { + container = &containerRoute{ + url: appUrl, + } + var deploymentID int64 + cp.db.QueryRow("SELECT id FROM deployments WHERE url = ?", appUrl).Scan(&deploymentID) + if deploymentID == 0 { + fmt.Printf("No deployment found for url: %s\n", appUrl) + http.Error(w, "Container not found", http.StatusNotFound) + return + } + + cp.db.QueryRow("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID).Scan(&container.containerID) + if container.containerID == "" { + fmt.Printf("No container found for deployment: %d\n", deploymentID) + http.Error(w, "Container not found", http.StatusNotFound) + return + } + + var projectConfigStr string + if err := cp.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr); err != nil || projectConfigStr == "" { + http.Error(w, "Container not found", http.StatusNotFound) + return + } + var projectConfig models.ProjectConfig + if err := json.Unmarshal([]byte(projectConfigStr), &projectConfig); err != nil { + http.Error(w, "Failed to parse json", http.StatusNotFound) + return + } + container.port = projectConfig.Port + + cp.urlMap[appUrl] = container + } + + if container.proxy == nil { + containerJSON, err := cp.cm.dockerClient.ContainerInspect(r.Context(), container.containerID) + if err != nil { + log.Printf("Failed to inspect container: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + url, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, container.port)) + if err != nil { + log.Printf("Failed to parse URL: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + container.proxy = httputil.NewSingleHostReverseProxy(url) + } + + container.proxy.ServeHTTP(w, r) +} + +func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error { + cp.mu.Lock() + defer cp.mu.Unlock() + + containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID) + if err != nil { + log.Printf("Failed to inspect container: %v\n", err) + return err + } + containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port)) + if err != nil { + return err + } + + proxy := httputil.NewSingleHostReverseProxy(containerUrl) + + originalDirector := proxy.Director + proxy.Director = func(req *http.Request) { + atomic.AddInt64(&cp.activeConns, 1) + originalDirector(req) + } + + proxy.ModifyResponse = func(resp *http.Response) error { + atomic.AddInt64(&cp.activeConns, -1) + return nil + } + + // Handle errors + proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + log.Printf("Proxy error: %v", err) + http.Error(w, "Service unavailable", http.StatusServiceUnavailable) + } + + newRoute := &containerRoute{ + url: projectConfig.Url, + proxy: proxy, + port: projectConfig.Port, + isActive: true, + } + + cp.urlMap[projectConfig.Url] = newRoute + return nil +} + +func (cp *ContainerProxy) RemoveContainer(containerID string) error { + cp.mu.Lock() + defer cp.mu.Unlock() + + var deploymentID int64 + if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil { + return err + } + + var url string + if err := cp.db.QueryRow("SELECT url FROM deployments WHERE id = ?", deploymentID).Scan(&url); err != nil { + return err + } + + container, exists := cp.urlMap[url] + if !exists { + return fmt.Errorf("container not found") + } + + container.isActive = false + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + delete(cp.urlMap, url) + return nil + default: + if atomic.LoadInt64(&cp.activeConns) == 0 { + delete(cp.urlMap, url) + return nil + } + time.Sleep(100 * time.Millisecond) + } + } +} + +func (cp *ContainerProxy) Start() { + server := &http.Server{ + Addr: ":7465", + Handler: cp, + } + + go func() { + log.Printf("Proxy server starting on http://127.0.0.1:7465\n") + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Proxy server error: %v", err) + } + }() +} diff --git a/server/schema.sql b/server/schema.sql index a58cd03..2c6bf6f 100644 --- a/server/schema.sql +++ b/server/schema.sql @@ -19,6 +19,6 @@ CREATE TABLE IF NOT EXISTS containers ( CREATE TABLE IF NOT EXISTS deployments ( id INTEGER PRIMARY KEY AUTOINCREMENT, - urls TEXT NOT NULL, + url TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); \ No newline at end of file diff --git a/server/server.go b/server/server.go index 98e0f71..f50eb84 100644 --- a/server/server.go +++ b/server/server.go @@ -30,6 +30,7 @@ type FluxServer struct { containerManager *ContainerManager config FluxServerConfig db *sql.DB + Proxy *ContainerProxy rootDir string } @@ -103,7 +104,12 @@ func NewServer() *FluxServer { containerManager: containerManager, config: serverConfig, db: db, - rootDir: rootDir, + Proxy: &ContainerProxy{ + urlMap: make(map[string]*containerRoute), + db: db, + cm: containerManager, + }, + rootDir: rootDir, } }