From d27cc71f1d9005e63d5dd05297a74901f4e0a768 Mon Sep 17 00:00:00 2001 From: juls0730 <62722391+juls0730@users.noreply.github.com> Date: Sat, 7 Dec 2024 02:35:38 -0600 Subject: [PATCH] almost there --- cmd/cli/main.go | 96 +++++++++- cmd/daemon/main.go | 6 +- models/app.go | 20 -- models/docker.go | 15 -- pkg/config.go | 9 + pkg/responses.go | 8 + server/container.go | 92 +++++----- server/deploy.go | 299 +++++++++++++++++------------- server/deployment.go | 426 +++++++++++++++++++++++++++---------------- server/proxy.go | 243 ++++++++---------------- server/schema.sql | 17 +- server/server.go | 62 +++---- 12 files changed, 707 insertions(+), 586 deletions(-) delete mode 100644 models/app.go delete mode 100644 models/docker.go create mode 100644 pkg/config.go create mode 100644 pkg/responses.go diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 4b4bb1d..a5c1caa 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -11,13 +11,15 @@ import ( "mime/multipart" "net/http" "os" + "os/signal" "path/filepath" "strconv" "strings" + "syscall" "time" "github.com/briandowns/spinner" - "github.com/juls0730/fluxd/models" + "github.com/juls0730/fluxd/pkg" ) //go:embed config.json @@ -98,7 +100,7 @@ func getProjectName() string { } defer fluxConfigFile.Close() - var config models.ProjectConfig + var config pkg.ProjectConfig if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { fmt.Printf("Failed to decode flux.json: %v\n", err) os.Exit(1) @@ -113,11 +115,23 @@ func getProjectName() string { } func main() { + loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond) if len(os.Args) < 2 { fmt.Println("Usage: flux ") os.Exit(1) } + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-signalChannel + if loadingSpinner.Active() { + loadingSpinner.Stop() + } + + os.Exit(0) + }() + command := os.Args[1] if _, err := os.Stat(filepath.Join(configPath, "config.json")); err != nil { @@ -151,7 +165,6 @@ func main() { os.Exit(1) } - loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond) loadingSpinner.Suffix = " Deploying" loadingSpinner.Start() @@ -293,10 +306,63 @@ func main() { fmt.Printf("Successfully started %s\n", projectName) case "delete": + if len(os.Args) == 3 { + if os.Args[2] == "all" { + var response string + fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + os.Exit(0) + } + + response = "" + + fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + os.Exit(0) + } + + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) + if err != nil { + fmt.Printf("Failed to delete deployments: %v\n", err) + os.Exit(1) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("Failed to delete deployments: %v\n", err) + os.Exit(1) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Printf("error reading response body: %v\n", err) + os.Exit(1) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + fmt.Printf("Delete failed: %s\n", responseBody) + os.Exit(1) + } + + fmt.Printf("Successfully deleted all projects\n") + return + } + } + projectName := getProjectName() // ask for confirmation - fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N]", projectName) + fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) var response string fmt.Scanln(&response) @@ -340,7 +406,22 @@ func main() { os.Exit(1) } - var apps []models.App + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Printf("error reading response body: %v\n", err) + os.Exit(1) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + fmt.Printf("List failed: %s\n", responseBody) + os.Exit(1) + } + + var apps []pkg.App if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { fmt.Printf("Failed to decode apps: %v\n", err) os.Exit(1) @@ -355,7 +436,7 @@ func main() { fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) } case "init": - var projectConfig models.ProjectConfig + var projectConfig pkg.ProjectConfig var response string if len(os.Args) > 2 { @@ -379,7 +460,8 @@ func main() { fmt.Println("What port does your project listen to?") fmt.Scanln(&response) - projectConfig.Port, err = strconv.Atoi(response) + port, err := strconv.ParseUint(response, 10, 16) + projectConfig.Port = uint16(port) if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { fmt.Println("That doesnt look like a valid port", err) os.Exit(1) diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index a43e901..637af4d 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -9,11 +9,13 @@ import ( func main() { fluxServer := server.NewServer() + server.InitReverseProxy() - go fluxServer.Proxy.Start() + // go fluxServer.Proxy.Start() http.HandleFunc("POST /deploy", fluxServer.DeployHandler) - http.HandleFunc("DELETE /deploy/{name}", fluxServer.DeleteDeployHandler) + http.HandleFunc("DELETE /deployments", fluxServer.DeleteAllDeploymentsHandler) + http.HandleFunc("DELETE /deployments/{name}", fluxServer.DeleteDeployHandler) http.HandleFunc("POST /start/{name}", fluxServer.StartDeployHandler) http.HandleFunc("POST /stop/{name}", fluxServer.StopDeployHandler) http.HandleFunc("GET /apps", fluxServer.ListAppsHandler) diff --git a/models/app.go b/models/app.go deleted file mode 100644 index f2bfed8..0000000 --- a/models/app.go +++ /dev/null @@ -1,20 +0,0 @@ -package models - -type ProjectConfig struct { - Name string `json:"name,omitempty"` - Url string `json:"url,omitempty"` - Port int `json:"port,omitempty"` - EnvFile string `json:"env_file,omitempty"` - Environment []string `json:"environment,omitempty"` -} - -type App struct { - ID int64 `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Image string `json:"image,omitempty"` - ProjectPath string `json:"project_path,omitempty"` - ProjectConfig ProjectConfig `json:"project_config,omitempty"` - DeploymentID int64 `json:"deployment_id,omitempty"` - CreatedAt string `json:"created_at,omitempty"` - DeploymentStatus string `json:"deployment_status,omitempty"` -} diff --git a/models/docker.go b/models/docker.go deleted file mode 100644 index 787d3a2..0000000 --- a/models/docker.go +++ /dev/null @@ -1,15 +0,0 @@ -package models - -type Containers struct { - ID string `json:"id"` - Head bool `json:"head"` // if the container is the head of the deployment - ContainerID string `json:"container_id"` - DeploymentID int64 `json:"deployment_id"` - CreatedAt string `json:"created_at"` -} - -type Deployments struct { - ID int64 `json:"id"` - URL string `json:"url"` - CreatedAt string `json:"created_at"` -} diff --git a/pkg/config.go b/pkg/config.go new file mode 100644 index 0000000..36066bd --- /dev/null +++ b/pkg/config.go @@ -0,0 +1,9 @@ +package pkg + +type ProjectConfig struct { + Name string `json:"name,omitempty"` + Url string `json:"url,omitempty"` + Port uint16 `json:"port,omitempty"` + EnvFile string `json:"env_file,omitempty"` + Environment []string `json:"environment,omitempty"` +} diff --git a/pkg/responses.go b/pkg/responses.go new file mode 100644 index 0000000..994396c --- /dev/null +++ b/pkg/responses.go @@ -0,0 +1,8 @@ +package pkg + +type App struct { + ID int64 `json:"id,omitempty"` + Name string `json:"name,omitempty"` + DeploymentID int64 `json:"deployment_id,omitempty"` + DeploymentStatus string `json:"deployment_status,omitempty"` +} diff --git a/server/container.go b/server/container.go index 7bd42f0..bc19a80 100644 --- a/server/container.go +++ b/server/container.go @@ -15,25 +15,30 @@ import ( "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" "github.com/joho/godotenv" - "github.com/juls0730/fluxd/models" + "github.com/juls0730/fluxd/pkg" ) -type ContainerManager struct { - dockerClient *client.Client +var dockerClient *client.Client + +type Container struct { + ID int64 `json:"id"` + Head bool `json:"head"` // if the container is the head of the deployment + Deployment *Deployment + ContainerID [64]byte `json:"container_id"` + DeploymentID int64 `json:"deployment_id"` } -func NewContainerManager() *ContainerManager { - dockerClient, err := client.NewClientWithOpts(client.FromEnv) +func init() { + log.Printf("Initializing Docker client...\n") + + var err error + dockerClient, err = client.NewClientWithOpts(client.FromEnv) if err != nil { log.Fatalf("Failed to create Docker client: %v", err) } - - return &ContainerManager{ - dockerClient: dockerClient, - } } -func (cm *ContainerManager) CreateContainer(ctx context.Context, imageName, projectPath string, projectConfig models.ProjectConfig) (string, error) { +func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (string, error) { log.Printf("Deploying container with image %s\n", imageName) containerName := fmt.Sprintf("%s-%s", projectConfig.Name, time.Now().Format("20060102-150405")) @@ -55,10 +60,10 @@ func (cm *ContainerManager) CreateContainer(ctx context.Context, imageName, proj } } - vol, err := cm.dockerClient.VolumeCreate(ctx, volume.CreateOptions{ + vol, err := dockerClient.VolumeCreate(ctx, volume.CreateOptions{ Driver: "local", DriverOpts: map[string]string{}, - Name: fmt.Sprintf("%s-volume", projectConfig.Name), + Name: fmt.Sprintf("flux_%s-volume", projectConfig.Name), }) if err != nil { return "", fmt.Errorf("Failed to create volume: %v", err) @@ -67,25 +72,15 @@ func (cm *ContainerManager) CreateContainer(ctx context.Context, imageName, proj log.Printf("Volume %s created at %s\n", vol.Name, vol.Mountpoint) log.Printf("Creating container %s...\n", containerName) - resp, err := cm.dockerClient.ContainerCreate(ctx, &container.Config{ + resp, err := dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: projectConfig.Environment, - // ExposedPorts: nat.PortSet{ - // nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): {}, - // }, Volumes: map[string]struct{}{ vol.Name: {}, }, }, &container.HostConfig{ - // PortBindings: nat.PortMap{ - // nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): []nat.PortBinding{ - // { - // HostIP: "0.0.0.0", - // HostPort: strconv.Itoa(projectConfig.Port), - // }, - // }, - // }, + NetworkMode: "bridge", Mounts: []mount.Mount{ { Type: mount.TypeVolume, @@ -107,28 +102,37 @@ func (cm *ContainerManager) CreateContainer(ctx context.Context, imageName, proj return resp.ID, nil } -func (cm *ContainerManager) StartContainer(ctx context.Context, containerID string) error { - return cm.dockerClient.ContainerStart(ctx, containerID, container.StartOptions{}) +func (c *Container) Start(ctx context.Context) error { + return dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{}) } -func (cm *ContainerManager) StopContainer(ctx context.Context, containerID string) error { - return cm.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}) +func (c *Container) Stop(ctx context.Context) error { + return dockerClient.ContainerStop(ctx, string(c.ContainerID[:]), container.StopOptions{}) +} + +func (c *Container) Remove(ctx context.Context) error { + return RemoveDockerContainer(ctx, string(c.ContainerID[:])) +} + +func (c *Container) Wait(ctx context.Context, port uint16) error { + return WaitForDockerContainer(ctx, string(c.ContainerID[:]), port) } // RemoveContainer stops and removes a container, but be warned that this will not remove the container from the database -func (cm *ContainerManager) RemoveContainer(ctx context.Context, containerID string) error { - if err := cm.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { +func RemoveDockerContainer(ctx context.Context, containerID string) error { + if err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { return fmt.Errorf("Failed to stop existing container: %v", err) } - if err := cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { + if err := dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { return fmt.Errorf("Failed to remove existing container: %v", err) } return nil } -func (cm *ContainerManager) WaitForContainer(ctx context.Context, containerID string, containerPort int) error { +// scuffed af "health check" for docker containers +func WaitForDockerContainer(ctx context.Context, containerID string, containerPort uint16) error { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -138,7 +142,7 @@ func (cm *ContainerManager) WaitForContainer(ctx context.Context, containerID st return fmt.Errorf("container failed to become ready in time") default: - containerJSON, err := cm.dockerClient.ContainerInspect(ctx, containerID) + containerJSON, err := dockerClient.ContainerInspect(ctx, containerID) if err != nil { return err } @@ -155,9 +159,9 @@ func (cm *ContainerManager) WaitForContainer(ctx context.Context, containerID st } } -func (cm *ContainerManager) GracefullyRemoveContainer(ctx context.Context, containerID string) error { +func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) error { timeout := 30 - err := cm.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{ + err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{ Timeout: &timeout, }) if err != nil { @@ -170,15 +174,15 @@ func (cm *ContainerManager) GracefullyRemoveContainer(ctx context.Context, conta for { select { case <-ctx.Done(): - return cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + return dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) default: - containerJSON, err := cm.dockerClient.ContainerInspect(ctx, containerID) + containerJSON, err := dockerClient.ContainerInspect(ctx, containerID) if err != nil { return err } if !containerJSON.State.Running { - return cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + return dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) } time.Sleep(time.Second) @@ -186,26 +190,26 @@ func (cm *ContainerManager) GracefullyRemoveContainer(ctx context.Context, conta } } -func (cm *ContainerManager) RemoveVolume(ctx context.Context, volumeID string) error { - if err := cm.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { +func RemoveVolume(ctx context.Context, volumeID string) error { + if err := dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { return fmt.Errorf("Failed to remove existing volume: %v", err) } return nil } -func (cm *ContainerManager) findExistingContainers(ctx context.Context, containerPrefix string) ([]string, error) { - containers, err := cm.dockerClient.ContainerList(ctx, container.ListOptions{ +func findExistingDockerContainers(ctx context.Context, containerPrefix string) (map[string]bool, error) { + containers, err := dockerClient.ContainerList(ctx, container.ListOptions{ All: true, }) if err != nil { return nil, err } - var existingContainers []string + var existingContainers map[string]bool = make(map[string]bool) for _, container := range containers { if strings.HasPrefix(container.Names[0], fmt.Sprintf("/%s-", containerPrefix)) { - existingContainers = append(existingContainers, container.ID) + existingContainers[container.ID] = true } } diff --git a/server/deploy.go b/server/deploy.go index 95bbefd..c185a47 100644 --- a/server/deploy.go +++ b/server/deploy.go @@ -1,14 +1,21 @@ package server import ( + "database/sql" "encoding/json" "fmt" "log" "mime/multipart" "net/http" + "os" "os/exec" + "path/filepath" - "github.com/juls0730/fluxd/models" + "github.com/juls0730/fluxd/pkg" +) + +var ( + appInsertStmt *sql.Stmt ) type DeployRequest struct { @@ -17,7 +24,7 @@ type DeployRequest struct { } type DeployResponse struct { - App models.App `json:"app"` + App App `json:"app"` } func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { @@ -44,25 +51,15 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } defer deployRequest.Code.Close() - var projectConfig models.ProjectConfig + var projectConfig pkg.ProjectConfig if err := json.NewDecoder(deployRequest.Config).Decode(&projectConfig); err != nil { log.Printf("Failed to decode config: %v\n", err) http.Error(w, "Invalid flux.json", http.StatusBadRequest) return } - if projectConfig.Name == "" { - http.Error(w, "No project name specified", http.StatusBadRequest) - return - } - - if projectConfig.Url == "" { - http.Error(w, "No deployment url specified", http.StatusBadRequest) - return - } - - if projectConfig.Port == 0 { - http.Error(w, "No port specified", http.StatusBadRequest) + if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { + http.Error(w, "Invalid flux.json, a name, url, and port must be specified", http.StatusBadRequest) return } @@ -86,7 +83,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } log.Printf("Building image for project %s...\n", projectConfig.Name) - imageName := fmt.Sprintf("%s-image", projectConfig.Name) + imageName := fmt.Sprintf("flux_%s-image", projectConfig.Name) buildCmd := exec.Command("pack", "build", imageName, "--builder", s.config.Builder) buildCmd.Dir = projectPath err = buildCmd.Run() @@ -96,59 +93,61 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - var app models.App - s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE name = ?", projectConfig.Name).Scan(&app.ID, &app.Name, &app.DeploymentID) + app := Apps.GetApp(projectConfig.Name) - if app.ID == 0 { - configBytes, err := json.Marshal(projectConfig) - if err != nil { - log.Printf("Failed to marshal project config: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + if app == nil { + app = &App{ + Name: projectConfig.Name, } + log.Printf("Creating deployment %s...\n", app.Name) - containerID, err := s.containerManager.CreateContainer(r.Context(), imageName, projectPath, projectConfig) + containerID, err := CreateDockerContainer(r.Context(), imageName, projectPath, projectConfig) if err != nil { log.Printf("Failed to create container: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - deploymentID, err := s.CreateDeployment(r.Context(), projectConfig, containerID) + deployment, err := CreateDeployment(containerID, projectConfig.Port, projectConfig.Url, s.db) + app.Deployment = deployment if err != nil { log.Printf("Failed to create deployment: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } + if appInsertStmt == nil { + appInsertStmt, err = s.db.Prepare("INSERT INTO apps (name, deployment_id) VALUES ($1, $2) RETURNING id, name, deployment_id") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + // create app in the database - appResult, err := s.db.Exec("INSERT INTO apps (name, image, project_path, project_config, deployment_id) VALUES (?, ?, ?, ?, ?)", projectConfig.Name, imageName, projectPath, configBytes, deploymentID) + err = appInsertStmt.QueryRow(projectConfig.Name, deployment.ID).Scan(&app.ID, &app.Name, &app.DeploymentID) if err != nil { log.Printf("Failed to insert app: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - appID, err := appResult.LastInsertId() - if err != nil { - log.Printf("Failed to get app id: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE id = ?", appID).Scan(&app.ID, &app.Name, &app.DeploymentID) - - err = s.StartDeployment(r.Context(), app.DeploymentID) + err = deployment.Start(r.Context()) if err != nil { log.Printf("Failed to start deployment: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } + + Apps.AddApp(app.Name, app) } else { + log.Printf("Upgrading deployment %s...\n", app.Name) + // if deploy is not started, start it - deploymentStatus, err := s.GetDeploymentStatus(r.Context(), app.DeploymentID) - if deploymentStatus != "started" || err != nil { - err = s.StartDeployment(r.Context(), app.DeploymentID) + deploymentStatus, err := app.Deployment.Status(r.Context()) + if deploymentStatus != "running" || err != nil { + err = app.Deployment.Start(r.Context()) if err != nil { log.Printf("Failed to start deployment: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -156,7 +155,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } } - err = s.UpgradeDeployment(r.Context(), app.DeploymentID, projectConfig, imageName, projectPath) + err = app.Deployment.Upgrade(r.Context(), projectConfig, imageName, projectPath, s) if err != nil { log.Printf("Failed to upgrade deployment: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -167,26 +166,31 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { log.Printf("App %s deployed successfully!\n", app.Name) json.NewEncoder(w).Encode(DeployResponse{ - App: app, + App: *app, }) } func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") - var app struct { - id int64 - name string - deployment_id int64 - } - s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE name = ?", name).Scan(&app.id, &app.name, &app.deployment_id) - - if app.id == 0 { + app := Apps.GetApp(name) + if app == nil { http.Error(w, "App not found", http.StatusNotFound) return } - err := s.StartDeployment(r.Context(), app.deployment_id) + status, err := app.Deployment.Status(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if status == "running" { + http.Error(w, "App is already running", http.StatusBadRequest) + return + } + + err = app.Deployment.Start(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -198,19 +202,24 @@ func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") - var app struct { - id int64 - name string - deployment_id int64 - } - s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE name = ?", name).Scan(&app.id, &app.name, &app.deployment_id) - - if app.id == 0 { + app := Apps.GetApp(name) + if app == nil { http.Error(w, "App not found", http.StatusNotFound) return } - err := s.StopDeployment(r.Context(), app.deployment_id) + status, err := app.Deployment.Status(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if status == "stopped" { + http.Error(w, "App is already stopped", http.StatusBadRequest) + return + } + + err = app.Deployment.Stop(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -221,47 +230,32 @@ func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) { func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") + var err error - var app struct { - id int - name string - deployment_id int - } - s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE name = ?", name).Scan(&app.id, &app.name, &app.deployment_id) - - if app.id == 0 { + app := Apps.GetApp(name) + if app == nil { http.Error(w, "App not found", http.StatusNotFound) return } - var containerId []string - rows, err := s.db.Query("SELECT container_id FROM containers WHERE deployment_id = ?", app.deployment_id) - if err != nil { - log.Printf("Failed to query containers: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer rows.Close() + log.Printf("Deleting deployment %s...\n", name) - for rows.Next() { - var newContainerId string - if err := rows.Scan(&newContainerId); err != nil { - log.Printf("Failed to scan container id: %v\n", err) + for _, container := range app.Deployment.Containers { + err = RemoveDockerContainer(r.Context(), string(container.ContainerID[:])) + if err != nil { + log.Printf("Failed to remove container: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - - containerId = append(containerId, newContainerId) } - log.Printf("Deleting deployment %s...\n", name) - - for _, container := range containerId { - s.containerManager.RemoveContainer(r.Context(), container) + err = RemoveVolume(r.Context(), fmt.Sprintf("flux_%s-volume", name)) + if err != nil { + log.Printf("Failed to remove volume: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return } - s.containerManager.RemoveVolume(r.Context(), fmt.Sprintf("%s-volume", name)) - tx, err := s.db.Begin() if err != nil { log.Printf("Failed to begin transaction: %v\n", err) @@ -269,7 +263,7 @@ func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) return } - _, err = tx.Exec("DELETE FROM deployments WHERE id = ?", app.deployment_id) + _, err = tx.Exec("DELETE FROM deployments WHERE id = ?", app.DeploymentID) if err != nil { tx.Rollback() log.Printf("Failed to delete deployment: %v\n", err) @@ -277,7 +271,7 @@ func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) return } - _, err = tx.Exec("DELETE FROM containers WHERE deployment_id = ?", app.deployment_id) + _, err = tx.Exec("DELETE FROM containers WHERE deployment_id = ?", app.DeploymentID) if err != nil { tx.Rollback() log.Printf("Failed to delete containers: %v\n", err) @@ -285,7 +279,7 @@ func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) return } - _, err = tx.Exec("DELETE FROM apps WHERE id = ?", app.id) + _, err = tx.Exec("DELETE FROM apps WHERE id = ?", app.ID) if err != nil { tx.Rollback() log.Printf("Failed to delete app: %v\n", err) @@ -299,48 +293,109 @@ func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) return } + projectPath := filepath.Join(s.rootDir, "apps", name) + err = os.RemoveAll(projectPath) + if err != nil { + log.Printf("Failed to remove project directory: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + Apps.DeleteApp(name) + + w.WriteHeader(http.StatusOK) +} + +func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) { + var err error + + for _, app := range Apps.GetAllApps() { + for _, container := range app.Deployment.Containers { + err = RemoveDockerContainer(r.Context(), string(container.ContainerID[:])) + if err != nil { + log.Printf("Failed to remove container: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + err = RemoveVolume(r.Context(), fmt.Sprintf("flux_%s-volume", app.Name)) + if err != nil { + log.Printf("Failed to remove volume: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + tx, err := s.db.Begin() + if err != nil { + log.Printf("Failed to begin transaction: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + _, err = tx.Exec("DELETE FROM deployments") + if err != nil { + tx.Rollback() + log.Printf("Failed to delete deployments: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + _, err = tx.Exec("DELETE FROM containers") + if err != nil { + tx.Rollback() + log.Printf("Failed to delete containers: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + _, err = tx.Exec("DELETE FROM apps") + if err != nil { + tx.Rollback() + log.Printf("Failed to delete apps: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := tx.Commit(); err != nil { + log.Printf("Failed to commit transaction: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := os.RemoveAll(filepath.Join(s.rootDir, "apps")); err != nil { + log.Printf("Failed to remove apps directory: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := os.RemoveAll(filepath.Join(s.rootDir, "deployments")); err != nil { + log.Printf("Failed to remove deployments directory: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) } func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) { - var apps []models.App - rows, err := s.db.Query("SELECT * FROM apps") - if err != nil { - log.Printf("Failed to query apps: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer rows.Close() - - for rows.Next() { - var app models.App - var configBytes string - if err := rows.Scan(&app.ID, &app.Name, &app.Image, &app.ProjectPath, &configBytes, &app.DeploymentID, &app.CreatedAt); err != nil { - log.Printf("Failed to scan app: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - err = json.Unmarshal([]byte(configBytes), &app.ProjectConfig) - if err != nil { - log.Printf("Failed to unmarshal project config: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - apps = append(apps, app) - } - // for each app, get the deployment status - for i, app := range apps { - deploymentStatus, err := s.GetDeploymentStatus(r.Context(), app.DeploymentID) + var apps []*pkg.App + for _, app := range Apps.GetAllApps() { + var extApp pkg.App + deploymentStatus, err := app.Deployment.Status(r.Context()) if err != nil { log.Printf("Failed to get deployment status: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - apps[i].DeploymentStatus = deploymentStatus + extApp.ID = app.ID + extApp.Name = app.Name + extApp.DeploymentID = app.DeploymentID + extApp.DeploymentStatus = deploymentStatus + apps = append(apps, &extApp) } w.Header().Set("Content-Type", "application/json") diff --git a/server/deployment.go b/server/deployment.go index 794583b..866500a 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -2,98 +2,291 @@ package server import ( "context" - "encoding/json" + "database/sql" "fmt" "log" + "sync" + "time" - "github.com/juls0730/fluxd/models" + "github.com/juls0730/fluxd/pkg" ) -// Creates a deployment and containers in the database -func (s *FluxServer) CreateDeployment(ctx context.Context, projectConfig models.ProjectConfig, containerID string) (int64, error) { - deploymentResult, err := s.db.Exec("INSERT INTO deployments (url) VALUES (?)", projectConfig.Url) - if err != nil { - log.Printf("Failed to insert deployment: %v\n", err) - return 0, err - } +var ( + Apps *AppManager = new(AppManager) + deploymentInsertStmt *sql.Stmt + containerInsertStmt *sql.Stmt +) - deploymentID, err := deploymentResult.LastInsertId() - if err != nil { - log.Printf("Failed to get deployment id: %v\n", err) - return 0, err - } - - _, err = s.db.Exec("INSERT INTO containers (container_id, deployment_id, head) VALUES (?, ?, ?)", containerID, deploymentID, true) - if err != nil { - log.Printf("Failed to get container id: %v\n", err) - return 0, err - } - - return deploymentID, nil +type AppManager struct { + sync.Map } -func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64, projectConfig models.ProjectConfig, imageName string, projectPath string) error { - configBytes, err := json.Marshal(projectConfig) - if err != nil { - log.Printf("Failed to marshal project config: %v\n", err) - return err +type App struct { + ID int64 `json:"id,omitempty"` + Deployment Deployment `json:"-"` + Name string `json:"name,omitempty"` + DeploymentID int64 `json:"deployment_id,omitempty"` +} + +type Deployment struct { + ID int64 `json:"id"` + Containers []Container `json:"-"` + Proxy *DeploymentProxy `json:"-"` + URL string `json:"url"` + Port uint16 `json:"port"` +} + +func (am *AppManager) GetApp(name string) *App { + app, exists := am.Load(name) + if !exists { + return nil } - existingContainers, err := s.containerManager.findExistingContainers(ctx, projectConfig.Name) + return app.(*App) +} + +func (am *AppManager) GetAllApps() []*App { + var apps []*App + am.Range(func(key, value interface{}) bool { + if app, ok := value.(*App); ok { + apps = append(apps, app) + } + return true + }) + return apps +} + +func (am *AppManager) AddApp(name string, app *App) { + am.Store(name, app) +} + +func (am *AppManager) DeleteApp(name string) { + am.Delete(name) +} + +func (am *AppManager) Init() { + log.Printf("Initializing deployments...\n") + + if DB == nil { + log.Panicf("DB is nil") + } + + rows, err := DB.Query("SELECT id, name, deployment_id FROM apps") + if err != nil { + log.Printf("Failed to query apps: %v\n", err) + return + } + defer rows.Close() + + var apps []App + for rows.Next() { + var app App + if err := rows.Scan(&app.ID, &app.Name, &app.DeploymentID); err != nil { + log.Printf("Failed to scan app: %v\n", err) + return + } + apps = append(apps, app) + } + + for _, app := range apps { + var deployment Deployment + var headContainer *Container + DB.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port) + deployment.Containers = make([]Container, 0) + + rows, err := DB.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) + if err != nil { + log.Printf("Failed to query containers: %v\n", err) + return + } + + for rows.Next() { + var container Container + var containerIDString string + rows.Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) + container.Deployment = &deployment + copy(container.ContainerID[:], containerIDString) + + if container.Head { + headContainer = &container + } + + deployment.Containers = append(deployment.Containers, container) + } + + deployment.Proxy = &DeploymentProxy{ + deployment: &deployment, + currentHead: headContainer, + gracePeriod: time.Second * 30, + activeRequests: 0, + } + app.Deployment = deployment + + Apps.AddApp(app.Name, &app) + } +} + +// Creates a deployment and containers in the database +func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB) (Deployment, error) { + var deployment Deployment + var err error + + if deploymentInsertStmt == nil { + deploymentInsertStmt, err = db.Prepare("INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + return Deployment{}, err + } + } + + err = deploymentInsertStmt.QueryRow(appUrl, port).Scan(&deployment.ID, &deployment.URL, &deployment.Port) + if err != nil { + log.Printf("Failed to insert deployment: %v\n", err) + return Deployment{}, err + } + + var container Container + if containerInsertStmt == nil { + containerInsertStmt, err = db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + return Deployment{}, err + } + } + + var containerIDString string + err = containerInsertStmt.QueryRow(containerID, deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) + if err != nil { + log.Printf("Failed to get container id: %v\n", err) + return Deployment{}, err + } + copy(container.ContainerID[:], containerIDString) + + deployment.Proxy = &DeploymentProxy{ + deployment: &deployment, + currentHead: &container, + gracePeriod: time.Second * 30, + activeRequests: 0, + } + + container.Deployment = &deployment + deployment.Containers = append(deployment.Containers, container) + ReverseProxy.AddDeployment(&deployment) + + return deployment, nil +} + +func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string, s *FluxServer) error { + existingContainers, err := findExistingDockerContainers(ctx, projectConfig.Name) if err != nil { return fmt.Errorf("Failed to find existing containers: %v", err) } - fmt.Printf("There are %d existing containers\n", len(existingContainers)) - // Deploy new container before deleting old one - containerID, err := s.containerManager.CreateContainer(ctx, imageName, projectPath, projectConfig) + containerID, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig) if err != nil { log.Printf("Failed to create container: %v\n", err) return err } - // calls AddContainer in proxy - err = s.containerManager.StartContainer(ctx, containerID) + var container Container + if containerInsertStmt == nil { + containerInsertStmt, err = DB.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + return err + } + } + + var containerIDString string + err = containerInsertStmt.QueryRow(containerID, deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) + if err != nil { + log.Printf("Failed to get container id: %v\n", err) + return err + } + container.Deployment = deployment + + copy(container.ContainerID[:], containerIDString) + deployment.Containers = append(deployment.Containers, container) + + log.Printf("Starting container %s...\n", containerID) + err = container.Start(ctx) if err != nil { log.Printf("Failed to start container: %v\n", err) return err } - if err := s.containerManager.WaitForContainer(ctx, containerID, projectConfig.Port); err != nil { + if err := container.Wait(ctx, projectConfig.Port); err != nil { log.Printf("Failed to wait for container: %v\n", err) return err } - if _, err := s.db.Exec("INSERT INTO containers (container_id, deployment_id, head) VALUES (?, ?, ?)", containerID, deploymentID, true); err != nil { - log.Printf("Failed to insert container: %v\n", err) + tx, err := s.db.Begin() + if err != nil { + log.Printf("Failed to begin transaction: %v\n", err) return err } - // update app in the database - if _, err := s.db.Exec("UPDATE apps SET project_config = ?, deployment_id = ? WHERE name = ?", configBytes, deploymentID, projectConfig.Name); err != nil { + if _, err := tx.Exec("UPDATE deployments SET url = ?, port = ? WHERE id = ?", projectConfig.Url, projectConfig.Port, deployment.ID); err != nil { + log.Printf("Failed to update deployment: %v\n", err) + tx.Rollback() + return err + } + + if _, err := tx.Exec("UPDATE apps SET deployment_id = ? WHERE name = ?", deployment.ID, projectConfig.Name); err != nil { log.Printf("Failed to update app: %v\n", err) + tx.Rollback() return err } - // TODO: swap containers if they are running and have the same image so that we can have a constant uptime - tx, err := s.db.Begin() + if err := tx.Commit(); err != nil { + log.Printf("Failed to commit transaction: %v\n", err) + return err + } + + tx, err = s.db.Begin() if err != nil { log.Printf("Failed to begin transaction: %v\n", err) return err } - for _, existingContainer := range existingContainers { - log.Printf("Stopping existing container: %s\n", existingContainer[0:12]) - - tx.Exec("DELETE FROM containers WHERE container_id = ?", existingContainer) - err = s.containerManager.GracefullyRemoveContainer(ctx, existingContainer) - if err != nil { - tx.Rollback() - return err - } + // Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully shutdown + oldProxy := deployment.Proxy + deployment.Proxy = &DeploymentProxy{ + deployment: deployment, + currentHead: &container, + gracePeriod: time.Second * 30, + activeRequests: 0, } + var containers []Container + var oldContainers []*Container + for _, container := range deployment.Containers { + if existingContainers[string(container.ContainerID[:])] { + log.Printf("Stopping existing container: %s\n", container.ContainerID[0:12]) + + _, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", string(container.ContainerID[:])) + oldContainers = append(oldContainers, &container) + + if err != nil { + tx.Rollback() + return err + } + + continue + } + + containers = append(containers, container) + } + + if oldProxy != nil { + go oldProxy.GracefulShutdown(oldContainers) + } + + deployment.Containers = containers + + ReverseProxy.AddDeployment(deployment) + if err := tx.Commit(); err != nil { log.Printf("Failed to commit transaction: %v\n", err) return err @@ -102,112 +295,42 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64, return nil } -func (s *FluxServer) StartDeployment(ctx context.Context, deploymentID int64) error { - var containerIds []string - rows, err := s.db.Query("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID) - if err != nil { - log.Printf("Failed to query containers: %v\n", err) - return err - } - defer rows.Close() - - for rows.Next() { - var newContainerId string - if err := rows.Scan(&newContainerId); err != nil { - log.Printf("Failed to scan container id: %v\n", err) - return err +func arrayContains(arr []string, str string) bool { + for _, a := range arr { + if a == str { + return true } - - containerIds = append(containerIds, newContainerId) } - var projectConfigStr []byte - s.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr) - var projectConfig models.ProjectConfig - if err := json.Unmarshal(projectConfigStr, &projectConfig); err != nil { - return err - } - if projectConfig.Name == "" { - return fmt.Errorf("No project config found for deployment") - } + return false +} - for _, containerId := range containerIds { - err := s.containerManager.StartContainer(ctx, containerId) - s.Proxy.AddContainer(projectConfig, containerId) +func (d *Deployment) Start(ctx context.Context) error { + for _, container := range d.Containers { + err := container.Start(ctx) if err != nil { log.Printf("Failed to start container: %v\n", err) return err } } - tx, err := s.db.Begin() - if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) - return err - } - - if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) - return err - } - return nil } -func (s *FluxServer) StopDeployment(ctx context.Context, deploymentID int64) error { - var containerIds []string - rows, err := s.db.Query("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID) - if err != nil { - log.Printf("Failed to query containers: %v\n", err) - return err - } - defer rows.Close() - - for rows.Next() { - var newContainerId string - if err := rows.Scan(&newContainerId); err != nil { - log.Printf("Failed to scan container id: %v\n", err) - return err - } - - containerIds = append(containerIds, newContainerId) - } - - var projectConfigStr []byte - s.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr) - var projectConfig models.ProjectConfig - if err := json.Unmarshal(projectConfigStr, &projectConfig); err != nil { - return err - } - if projectConfig.Name == "" { - return fmt.Errorf("No project config found for deployment") - } - - for _, containerId := range containerIds { - err := s.containerManager.StopContainer(ctx, containerId) - s.Proxy.RemoveContainer(containerId) +func (d *Deployment) Stop(ctx context.Context) error { + for _, container := range d.Containers { + err := container.Stop(ctx) if err != nil { log.Printf("Failed to start container: %v\n", err) return err } } - tx, err := s.db.Begin() - if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) - return err - } - - if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) - return err - } - return nil } -func (s *FluxServer) GetStatus(ctx context.Context, containerID string) (string, error) { - containerJSON, err := s.containerManager.dockerClient.ContainerInspect(ctx, containerID) +func (c *Container) GetStatus(ctx context.Context) (string, error) { + containerJSON, err := dockerClient.ContainerInspect(ctx, string(c.ContainerID[:])) if err != nil { return "", err } @@ -215,31 +338,20 @@ func (s *FluxServer) GetStatus(ctx context.Context, containerID string) (string, return containerJSON.State.Status, nil } -func (s *FluxServer) GetDeploymentStatus(ctx context.Context, deploymentID int64) (string, error) { - var deployment models.Deployments - s.db.QueryRow("SELECT id, url FROM deployments WHERE id = ?", deploymentID).Scan(&deployment.ID, &deployment.URL) - - var containerIds []string - rows, err := s.db.Query("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID) - if err != nil { - log.Printf("Failed to query containers: %v\n", err) - return "", err - } - defer rows.Close() - - for rows.Next() { - var newContainerId string - if err := rows.Scan(&newContainerId); err != nil { - log.Printf("Failed to scan container id: %v\n", err) - return "", err - } - - containerIds = append(containerIds, newContainerId) - } - +func (d *Deployment) Status(ctx context.Context) (string, error) { var status string - for _, containerId := range containerIds { - containerStatus, err := s.GetStatus(ctx, containerId) + if d == nil { + fmt.Printf("Deployment is nil\n") + return "stopped", nil + } + + if d.Containers == nil { + fmt.Printf("Containers are nil\n") + return "stopped", nil + } + + for _, container := range d.Containers { + containerStatus, err := container.GetStatus(ctx) if err != nil { log.Printf("Failed to get container status: %v\n", err) return "", err diff --git a/server/proxy.go b/server/proxy.go index 723583f..e6cbdb3 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -2,7 +2,6 @@ package server import ( "context" - "database/sql" "fmt" "log" "net/http" @@ -12,209 +11,119 @@ import ( "sync" "sync/atomic" "time" - - "github.com/juls0730/fluxd/models" ) -type ContainerProxy struct { - routes *RouteCache - db *sql.DB - cm *ContainerManager - activeConns int64 +var ReverseProxy *Proxy + +type Proxy struct { + deployments sync.Map } -type RouteCache struct { - m sync.Map +func (p *Proxy) AddDeployment(deployment *Deployment) { + log.Printf("Adding deployment %s\n", deployment.URL) + p.deployments.Store(deployment.URL, deployment) } -type containerRoute struct { - containerID string - port int - url string - proxy *httputil.ReverseProxy - isActive bool -} +func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + host := r.Host -func (rc *RouteCache) GetRoute(appUrl string) *containerRoute { - - container, exists := rc.m.Load(appUrl) - if !exists { - return nil - } - - return container.(*containerRoute) -} - -func (rc *RouteCache) SetRoute(appUrl string, container *containerRoute) { - rc.m.Store(appUrl, container) -} - -func (rc *RouteCache) DeleteRoute(appUrl string) { - rc.m.Delete(appUrl) -} - -func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Extract app name from host - appUrl := r.Host - - container := cp.routes.GetRoute(appUrl) - if container == nil { - http.Error(w, "Container not found", http.StatusNotFound) + deployment, ok := p.deployments.Load(host) + if !ok { + http.Error(w, "Not found", http.StatusNotFound) return } - container.proxy.ServeHTTP(w, r) -} + atomic.AddInt64(&deployment.(*Deployment).Proxy.activeRequests, 1) -func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error { - containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID) - if err != nil { - log.Printf("Failed to inspect container: %v\n", err) - return err - } - - containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port)) - if err != nil { - log.Printf("Failed to parse URL: %v\n", err) - return err - } - proxy := cp.createProxy(containerUrl) - - newRoute := &containerRoute{ - url: projectConfig.Url, - proxy: proxy, - port: projectConfig.Port, - isActive: true, - } - - cp.routes.SetRoute(projectConfig.Url, newRoute) - return nil -} - -func (cp *ContainerProxy) createProxy(url *url.URL) *httputil.ReverseProxy { - proxy := httputil.NewSingleHostReverseProxy(url) - - originalDirector := proxy.Director - proxy.Director = func(req *http.Request) { - atomic.AddInt64(&cp.activeConns, 1) - - // Validate URL before directing - if url == nil { - log.Printf("URL is nil") - return - } - - originalDirector(req) - } - - proxy.ModifyResponse = func(resp *http.Response) error { - atomic.AddInt64(&cp.activeConns, -1) - return nil - } - - // Handle errors - proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { - atomic.AddInt64(&cp.activeConns, -1) - - http.Error(w, "Service unavailable", http.StatusServiceUnavailable) - - // Ensure request body is closed - if r.Body != nil { - r.Body.Close() - } - } - - return proxy -} - -func (cp *ContainerProxy) RemoveContainer(containerID string) error { - var deploymentID int64 - if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil { - return err - } - - var url string - if err := cp.db.QueryRow("SELECT url FROM deployments WHERE id = ?", deploymentID).Scan(&url); err != nil { - return err - } - - container := cp.routes.GetRoute(url) + container := deployment.(*Deployment).Proxy.currentHead if container == nil { - return fmt.Errorf("container not found") + http.Error(w, "No active container found", http.StatusNotFound) + return } - container.isActive = false + containerJSON, err := dockerClient.ContainerInspect(context.Background(), string(container.ContainerID[:])) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, container.Deployment.Port)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + proxy := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL = containerUrl + req.Host = containerUrl.Host + }, + ModifyResponse: func(resp *http.Response) error { + atomic.AddInt64(&deployment.(*Deployment).Proxy.activeRequests, -1) + return nil + }, + } + + proxy.ServeHTTP(w, r) +} + +type DeploymentProxy struct { + deployment *Deployment + currentHead *Container + gracePeriod time.Duration + activeRequests int64 +} + +func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) { + ctx, cancel := context.WithTimeout(context.Background(), dp.gracePeriod) defer cancel() + // Create a channel to signal when wait group is done for { select { case <-ctx.Done(): - cp.routes.DeleteRoute(url) - return nil + break default: - if atomic.LoadInt64(&cp.activeConns) == 0 { - cp.routes.DeleteRoute(url) - return nil + if atomic.LoadInt64(&dp.activeRequests) == 0 { + break } - } - } -} -func (cp *ContainerProxy) ScanRoutes() { - rows, err := cp.db.Query("SELECT url, id FROM deployments") - if err != nil { - log.Printf("Failed to query deployments: %v\n", err) - return - } - defer rows.Close() - - var containers []models.Containers - for rows.Next() { - var url string - var deploymentID int64 - if err := rows.Scan(&url, &deploymentID); err != nil { - log.Printf("Failed to scan deployment: %v\n", err) - return + time.Sleep(time.Second) } - rows, err := cp.db.Query("SELECT * FROM containers WHERE deployment_id = ?", deploymentID) + if atomic.LoadInt64(&dp.activeRequests) == 0 || ctx.Err() != nil { + break + } + } + + for _, container := range oldContainers { + err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:])) if err != nil { - log.Printf("Failed to query containers: %v\n", err) - return - } - defer rows.Close() - - for rows.Next() { - var container models.Containers - if err := rows.Scan(&container.ID, &container.ContainerID, &container.Head, &container.DeploymentID, &container.CreatedAt); err != nil { - log.Printf("Failed to scan container: %v\n", err) - return - } - - fmt.Printf("Found container: %s\n", container.ContainerID) - - containers = append(containers, container) + log.Printf("Failed to remove container: %v\n", err) } } } -func (cp *ContainerProxy) Start() { - cp.ScanRoutes() +func InitProxy(apps *AppManager) { + ReverseProxy = &Proxy{} + + apps.Range(func(key, value interface{}) bool { + app := value.(*App) + ReverseProxy.AddDeployment(&app.Deployment) + return true + }) +} + +func InitReverseProxy() { + InitProxy(Apps) port := os.Getenv("FLUXD_PROXY_PORT") if port == "" { port = "7465" } - server := &http.Server{ - Addr: fmt.Sprintf(":%s", port), - Handler: cp, - } - go func() { log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port) - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + if err := http.ListenAndServe(fmt.Sprintf(":%s", port), ReverseProxy); err != nil && err != http.ErrServerClosed { log.Fatalf("Proxy server error: %v", err) } }() diff --git a/server/schema.sql b/server/schema.sql index a876b86..5b560c8 100644 --- a/server/schema.sql +++ b/server/schema.sql @@ -1,11 +1,13 @@ +CREATE TABLE IF NOT EXISTS deployments ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + port INTEGER NOT NULL +); + CREATE TABLE IF NOT EXISTS apps ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, - image TEXT NOT NULL, - project_path TEXT NOT NULL, - project_config TEXT NOT NULL, deployment_id INTEGER, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(deployment_id) REFERENCES deployments(id) ); @@ -14,12 +16,5 @@ CREATE TABLE IF NOT EXISTS containers ( container_id TEXT NOT NULL, head BOOLEAN NOT NULL, deployment_id INTEGER NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(deployment_id) REFERENCES deployments(id) -); - -CREATE TABLE IF NOT EXISTS deployments ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); \ No newline at end of file diff --git a/server/server.go b/server/server.go index 7ce9abd..e9858a4 100644 --- a/server/server.go +++ b/server/server.go @@ -4,48 +4,38 @@ import ( "archive/tar" "compress/gzip" "database/sql" - "embed" "encoding/json" "io" "log" "os" "path/filepath" - "github.com/juls0730/fluxd/models" + _ "embed" + + "github.com/juls0730/fluxd/pkg" _ "github.com/mattn/go-sqlite3" ) -//go:embed schema.sql -var schema embed.FS - -var DefaultConfig = FluxServerConfig{ - Builder: "paketobuildpacks/builder-jammy-tiny", -} +var ( + //go:embed schema.sql + schemaBytes []byte + DefaultConfig = FluxServerConfig{ + Builder: "paketobuildpacks/builder-jammy-tiny", + } + DB *sql.DB +) type FluxServerConfig struct { Builder string `json:"builder"` } type FluxServer struct { - containerManager *ContainerManager - config FluxServerConfig - db *sql.DB - Proxy *ContainerProxy - rootDir string + config FluxServerConfig + db *sql.DB + rootDir string } -// var rootDir string - -// func init() { -// rootDir = os.Getenv("FLUXD_ROOT_DIR") -// if rootDir == "" { -// rootDir = "/var/fluxd" -// } -// } - func NewServer() *FluxServer { - containerManager := NewContainerManager() - var serverConfig FluxServerConfig rootDir := os.Getenv("FLUXD_ROOT_DIR") @@ -84,36 +74,26 @@ func NewServer() *FluxServer { log.Fatalf("Failed to create apps directory: %v\n", err) } - db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db")) + DB, err = sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db")) if err != nil { log.Fatalf("Failed to open database: %v\n", err) } - // create database schema - schemaBytes, err := schema.ReadFile("schema.sql") - if err != nil { - log.Fatalf("Failed to read schema file: %v\n", err) - } - - _, err = db.Exec(string(schemaBytes)) + _, err = DB.Exec(string(schemaBytes)) if err != nil { log.Fatalf("Failed to create database schema: %v\n", err) } + Apps.Init() + return &FluxServer{ - containerManager: containerManager, - config: serverConfig, - db: db, - Proxy: &ContainerProxy{ - routes: &RouteCache{}, - db: db, - cm: containerManager, - }, + config: serverConfig, + db: DB, rootDir: rootDir, } } -func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig models.ProjectConfig) (string, error) { +func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConfig) (string, error) { projectPath := filepath.Join(s.rootDir, "apps", projectConfig.Name) if err := os.MkdirAll(projectPath, 0755); err != nil { log.Printf("Failed to create project directory: %v\n", err)