diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 57f95dd..f6690df 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -15,7 +15,6 @@ import ( "path/filepath" "strconv" "strings" - "syscall" "time" "github.com/briandowns/spinner" @@ -31,12 +30,26 @@ type Config struct { DeamonURL string `json:"deamon_url"` } -func compressDirectory() ([]byte, error) { +func compressDirectory(compression pkg.Compression) ([]byte, error) { var buf bytes.Buffer - gzWriter := gzip.NewWriter(&buf) - tarWriter := tar.NewWriter(gzWriter) + var err error - err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error { + var gzWriter *gzip.Writer + if compression.Enabled { + gzWriter, err = gzip.NewWriterLevel(&buf, compression.Level) + if err != nil { + return nil, err + } + } + + var tarWriter *tar.Writer + if gzWriter != nil { + tarWriter = tar.NewWriter(gzWriter) + } else { + tarWriter = tar.NewWriter(&buf) + } + + err = filepath.Walk(".", func(path string, info os.FileInfo, err error) error { if err != nil { return err } @@ -51,7 +64,7 @@ func compressDirectory() ([]byte, error) { } header.Name = path - if err := tarWriter.WriteHeader(header); err != nil { + if err = tarWriter.WriteHeader(header); err != nil { return err } @@ -62,7 +75,7 @@ func compressDirectory() ([]byte, error) { } defer file.Close() - if _, err := io.Copy(tarWriter, file); err != nil { + if _, err = io.Copy(tarWriter, file); err != nil { return err } } @@ -74,55 +87,56 @@ func compressDirectory() ([]byte, error) { return nil, err } - if err := tarWriter.Close(); err != nil { + if err = tarWriter.Close(); err != nil { return nil, err } - if err := gzWriter.Close(); err != nil { - return nil, err + + if gzWriter != nil { + if err = gzWriter.Close(); err != nil { + return nil, err + } } return buf.Bytes(), nil } -func getProjectName() string { +func getProjectName(command string, args []string) (string, error) { var projectName string - if len(os.Args) < 3 { + if len(args) == 0 { if _, err := os.Stat("flux.json"); err != nil { - fmt.Printf("Usage: flux %[1]s , or run flux %[1]s in the project directory\n", os.Args[1]) - os.Exit(1) + return "", fmt.Errorf("Usage: flux %[1]s , or run flux %[1]s in the project directory\n", command) } fluxConfigFile, err := os.Open("flux.json") if err != nil { - fmt.Printf("Failed to open flux.json: %v\n", err) - os.Exit(1) + return "", fmt.Errorf("Failed to open flux.json: %v\n", err) } defer fluxConfigFile.Close() var config pkg.ProjectConfig if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - fmt.Printf("Failed to decode flux.json: %v\n", err) - os.Exit(1) + return "", fmt.Errorf("Failed to decode flux.json: %v\n", err) } projectName = config.Name } else { - projectName = os.Args[2] + projectName = args[0] } - return projectName + return projectName, nil } -func main() { +func runCommand(command string, args []string, config Config, info pkg.Info) error { loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond) - if len(os.Args) < 2 { - fmt.Println("Usage: flux ") - os.Exit(1) - } + defer func() { + if loadingSpinner.Active() { + loadingSpinner.Stop() + } + }() signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(signalChannel, os.Interrupt) go func() { <-signalChannel if loadingSpinner.Active() { @@ -132,7 +146,297 @@ func main() { os.Exit(0) }() - command := os.Args[1] + switch command { + case "deploy": + if _, err := os.Stat("flux.json"); err != nil { + return fmt.Errorf("No flux.json found, please run flux init first\n") + } + + loadingSpinner.Suffix = " Deploying" + loadingSpinner.Start() + + buf, err := compressDirectory(info.Compression) + if err != nil { + return fmt.Errorf("Failed to compress directory: %v\n", err) + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + configPart, err := writer.CreateFormFile("config", "flux.json") + + if err != nil { + return fmt.Errorf("Failed to create config part: %v\n", err) + } + + fluxConfigFile, err := os.Open("flux.json") + if err != nil { + return fmt.Errorf("Failed to open flux.json: %v\n", err) + } + defer fluxConfigFile.Close() + + if _, err := io.Copy(configPart, fluxConfigFile); err != nil { + return fmt.Errorf("Failed to write config part: %v\n", err) + } + + codePart, err := writer.CreateFormFile("code", "code.tar.gz") + if err != nil { + return fmt.Errorf("Failed to create code part: %v\n", err) + } + + if _, err := codePart.Write(buf); err != nil { + return fmt.Errorf("Failed to write code part: %v\n", err) + } + + if err := writer.Close(); err != nil { + return fmt.Errorf("Failed to close writer: %v\n", err) + } + + resp, err := http.Post(config.DeamonURL+"/deploy", "multipart/form-data; boundary="+writer.Boundary(), body) + if err != nil { + return fmt.Errorf("Failed to send request: %v\n", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v\n", err) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + return fmt.Errorf("Deploy failed: %s\n", responseBody) + } + + loadingSpinner.Stop() + fmt.Println("Deployed successfully!") + case "stop": + projectName, err := getProjectName(command, args) + if err != nil { + return err + } + + req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) + if err != nil { + return fmt.Errorf("Failed to stop app: %v\n", err) + } + defer req.Body.Close() + + if req.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v\n", err) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + return fmt.Errorf("Stop failed: %s\n", responseBody) + } + + fmt.Printf("Successfully stopped %s\n", projectName) + case "start": + projectName, err := getProjectName(command, args) + if err != nil { + return err + } + + req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) + if err != nil { + return fmt.Errorf("Failed to start app: %v\n", err) + } + defer req.Body.Close() + + if req.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v\n", err) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + return fmt.Errorf("Start failed: %s\n", responseBody) + } + + fmt.Printf("Successfully started %s\n", projectName) + case "delete": + if len(args) == 1 { + if args[0] == "all" { + var response string + fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + response = "" + + fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) + if err != nil { + return fmt.Errorf("Failed to delete deployments: %v\n", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to delete deployments: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v\n", err) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + return fmt.Errorf("delete failed: %s", responseBody) + } + + fmt.Printf("Successfully deleted all projects\n") + return nil + } + } + + projectName, err := getProjectName(command, args) + if err != nil { + return err + } + + // ask for confirmation + fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) + var response string + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments/"+projectName, nil) + if err != nil { + return fmt.Errorf("failed to delete app: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to delete app: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + return fmt.Errorf("delete failed: %s", responseBody) + } + + fmt.Printf("Successfully deleted %s\n", projectName) + case "list": + resp, err := http.Get(config.DeamonURL + "/apps") + if err != nil { + return fmt.Errorf("failed to get apps: %v", err) + } + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { + responseBody = responseBody[:len(responseBody)-1] + } + + return fmt.Errorf("list failed: %s", responseBody) + } + + var apps []pkg.App + if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { + return fmt.Errorf("failed to decode apps: %v", err) + } + + if len(apps) == 0 { + fmt.Println("No apps found") + return nil + } + + for _, app := range apps { + fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) + } + case "init": + var projectConfig pkg.ProjectConfig + + var response string + if len(args) > 1 { + response = args[0] + } else { + fmt.Println("What is the name of your project?") + fmt.Scanln(&response) + } + + projectConfig.Name = response + + fmt.Println("What URL should your project listen to?") + fmt.Scanln(&response) + if strings.HasPrefix(response, "http") { + strings.TrimPrefix(response, "http://") + strings.TrimPrefix(response, "https://") + } + + response = strings.Split(response, "/")[0] + + projectConfig.Url = response + + fmt.Println("What port does your project listen to?") + fmt.Scanln(&response) + port, err := strconv.ParseUint(response, 10, 16) + projectConfig.Port = uint16(port) + if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { + return fmt.Errorf("That doesnt look like a valid port, try a number between 1 and 65535") + } + + configBytes, err := json.MarshalIndent(projectConfig, "", " ") + if err != nil { + return fmt.Errorf("failed to parse project config: %v", err) + } + + os.WriteFile("flux.json", configBytes, 0644) + + fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) + default: + return fmt.Errorf("unknown command: %s", command) + } + + return nil +} + +func main() { + if len(os.Args) < 2 { + fmt.Println("Usage: flux ") + os.Exit(1) + } if _, err := os.Stat(filepath.Join(configPath, "config.json")); err != nil { if err := os.MkdirAll(configPath, 0755); err != nil { @@ -158,325 +462,35 @@ func main() { os.Exit(1) } - switch command { - case "deploy": - if _, err := os.Stat("flux.json"); err != nil { - fmt.Printf("No flux.json found, please run flux init first\n") - os.Exit(1) - } + command := os.Args[1] + args := os.Args[2:] - loadingSpinner.Suffix = " Deploying" - loadingSpinner.Start() + resp, err := http.Get(config.DeamonURL + "/heartbeat") + if err != nil { + fmt.Println("Failed to connect to daemon") + os.Exit(1) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + fmt.Println("Failed to connect to daemon") + os.Exit(1) + } - buf, err := compressDirectory() - if err != nil { - loadingSpinner.Stop() + var info pkg.Info + err = json.NewDecoder(resp.Body).Decode(&info) + if err != nil { + fmt.Printf("Failed to decode info: %v\n", err) + os.Exit(1) + } - fmt.Printf("Failed to compress directory: %v\n", err) - os.Exit(1) - } + if resp.StatusCode != http.StatusOK { + fmt.Println("Failed to connect to daemon") + os.Exit(1) + } - body := &bytes.Buffer{} - writer := multipart.NewWriter(body) - configPart, err := writer.CreateFormFile("config", "flux.json") - - if err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to create config part: %v\n", err) - os.Exit(1) - } - - fluxConfigFile, err := os.Open("flux.json") - if err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to open flux.json: %v\n", err) - os.Exit(1) - } - defer fluxConfigFile.Close() - - if _, err := io.Copy(configPart, fluxConfigFile); err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to write config part: %v\n", err) - os.Exit(1) - } - - codePart, err := writer.CreateFormFile("code", "code.tar.gz") - if err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to create code part: %v\n", err) - os.Exit(1) - } - - if _, err := codePart.Write(buf); err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to write code part: %v\n", err) - os.Exit(1) - } - - if err := writer.Close(); err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to close writer: %v\n", err) - os.Exit(1) - } - - resp, err := http.Post(config.DeamonURL+"/deploy", "multipart/form-data; boundary="+writer.Boundary(), body) - if err != nil { - loadingSpinner.Stop() - - fmt.Printf("Failed to send request: %v\n", err) - os.Exit(1) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - loadingSpinner.Stop() - - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - fmt.Printf("error reading response body: %v\n", err) - os.Exit(1) - } - - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } - - fmt.Printf("Deploy failed: %s\n", responseBody) - os.Exit(1) - } - - loadingSpinner.Stop() - fmt.Println("Deployed successfully!") - case "stop": - projectName := getProjectName() - - req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) - if err != nil { - fmt.Printf("Failed to stop app: %v\n", err) - os.Exit(1) - } - defer req.Body.Close() - - if req.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(req.Body) - if err != nil { - fmt.Printf("error reading response body: %v\n", err) - os.Exit(1) - } - - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } - - fmt.Printf("Stop failed: %s\n", responseBody) - os.Exit(1) - } - - fmt.Printf("Successfully stopped %s\n", projectName) - case "start": - projectName := getProjectName() - - req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) - if err != nil { - fmt.Printf("Failed to start app: %v\n", err) - os.Exit(1) - } - defer req.Body.Close() - - if req.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(req.Body) - if err != nil { - fmt.Printf("error reading response body: %v\n", err) - os.Exit(1) - } - - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } - - fmt.Printf("Start failed: %s\n", responseBody) - os.Exit(1) - } - - fmt.Printf("Successfully started %s\n", projectName) - case "delete": - if len(os.Args) == 3 { - if os.Args[2] == "all" { - var response string - fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") - fmt.Scanln(&response) - - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - os.Exit(0) - } - - response = "" - - fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") - fmt.Scanln(&response) - - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - os.Exit(0) - } - - req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) - if err != nil { - fmt.Printf("Failed to delete deployments: %v\n", err) - os.Exit(1) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Printf("Failed to delete deployments: %v\n", err) - os.Exit(1) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - fmt.Printf("error reading response body: %v\n", err) - os.Exit(1) - } - - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } - - fmt.Printf("Delete failed: %s\n", responseBody) - os.Exit(1) - } - - fmt.Printf("Successfully deleted all projects\n") - return - } - } - - projectName := getProjectName() - - // ask for confirmation - fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) - var response string - fmt.Scanln(&response) - - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - os.Exit(0) - } - - req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments/"+projectName, nil) - if err != nil { - fmt.Printf("Failed to delete app: %v\n", err) - os.Exit(1) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Printf("Failed to delete app: %v\n", err) - os.Exit(1) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - fmt.Printf("error reading response body: %v\n", err) - os.Exit(1) - } - - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } - - fmt.Printf("Delete failed: %s\n", responseBody) - os.Exit(1) - } - - fmt.Printf("Successfully deleted %s\n", projectName) - case "list": - resp, err := http.Get(config.DeamonURL + "/apps") - if err != nil { - fmt.Printf("Failed to get apps: %v\n", err) - os.Exit(1) - } - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - fmt.Printf("error reading response body: %v\n", err) - os.Exit(1) - } - - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } - - fmt.Printf("List failed: %s\n", responseBody) - os.Exit(1) - } - - var apps []pkg.App - if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { - fmt.Printf("Failed to decode apps: %v\n", err) - os.Exit(1) - } - - if len(apps) == 0 { - fmt.Println("No apps found") - os.Exit(0) - } - - for _, app := range apps { - fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) - } - case "init": - var projectConfig pkg.ProjectConfig - - var response string - if len(os.Args) > 2 { - response = os.Args[2] - } else { - fmt.Println("What is the name of your project?") - fmt.Scanln(&response) - projectConfig.Name = response - } - - fmt.Println("What URL should your project listen to?") - fmt.Scanln(&response) - if strings.HasPrefix(response, "http") { - strings.TrimPrefix(response, "http://") - strings.TrimPrefix(response, "https://") - } - - response = strings.Split(response, "/")[0] - - projectConfig.Url = response - - fmt.Println("What port does your project listen to?") - fmt.Scanln(&response) - port, err := strconv.ParseUint(response, 10, 16) - projectConfig.Port = uint16(port) - if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { - fmt.Println("That doesnt look like a valid port", err) - os.Exit(1) - } - - configBytes, err := json.MarshalIndent(projectConfig, "", " ") - if err != nil { - fmt.Printf("Failed to marshal project config: %v\n", err) - os.Exit(1) - } - - os.WriteFile("flux.json", configBytes, 0644) - - fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) - default: - fmt.Println("Unknown command:", command) + err = runCommand(command, args, config, info) + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) } } diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index 637af4d..1c25b70 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -3,15 +3,13 @@ package main import ( "log" "net/http" + _ "net/http/pprof" "github.com/juls0730/fluxd/server" ) func main() { fluxServer := server.NewServer() - server.InitReverseProxy() - - // go fluxServer.Proxy.Start() http.HandleFunc("POST /deploy", fluxServer.DeployHandler) http.HandleFunc("DELETE /deployments", fluxServer.DeleteAllDeploymentsHandler) @@ -19,6 +17,7 @@ func main() { http.HandleFunc("POST /start/{name}", fluxServer.StartDeployHandler) http.HandleFunc("POST /stop/{name}", fluxServer.StopDeployHandler) http.HandleFunc("GET /apps", fluxServer.ListAppsHandler) + http.HandleFunc("GET /heartbeat", fluxServer.DaemonInfoHandler) log.Printf("Fluxd started on http://127.0.0.1:5647\n") log.Fatal(http.ListenAndServe(":5647", nil)) diff --git a/pkg/responses.go b/pkg/responses.go index 994396c..7152a28 100644 --- a/pkg/responses.go +++ b/pkg/responses.go @@ -6,3 +6,12 @@ type App struct { DeploymentID int64 `json:"deployment_id,omitempty"` DeploymentStatus string `json:"deployment_status,omitempty"` } + +type Compression struct { + Enabled bool `json:"enabled"` + Level int `json:"level,omitempty"` +} + +type Info struct { + Compression Compression `json:"compression"` +} diff --git a/server/container.go b/server/container.go index c15f5a7..3fa6217 100644 --- a/server/container.go +++ b/server/container.go @@ -20,10 +20,17 @@ import ( var dockerClient *client.Client +type Volume struct { + ID int64 `json:"id"` + VolumeID string `json:"volume_id"` + ContainerID int64 `json:"container_id"` +} + type Container struct { ID int64 `json:"id"` Head bool `json:"head"` // if the container is the head of the deployment Deployment *Deployment + Volumes []Volume `json:"volumes"` ContainerID [64]byte `json:"container_id"` DeploymentID int64 `json:"deployment_id"` } @@ -38,7 +45,26 @@ func init() { } } -func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (string, error) { +func CreateVolume(ctx context.Context, name string) (vol *Volume, err error) { + dockerVolume, err := dockerClient.VolumeCreate(ctx, volume.CreateOptions{ + Driver: "local", + DriverOpts: map[string]string{}, + Name: name, + }) + if err != nil { + return nil, fmt.Errorf("Failed to create volume: %v", err) + } + + log.Printf("Volume %s created at %s\n", dockerVolume.Name, dockerVolume.Mountpoint) + + vol = &Volume{ + VolumeID: dockerVolume.Name, + } + + return vol, nil +} + +func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (c *Container, err error) { log.Printf("Deploying container with image %s\n", imageName) containerName := fmt.Sprintf("%s-%s", projectConfig.Name, time.Now().Format("20060102-150405")) @@ -46,13 +72,13 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p if projectConfig.EnvFile != "" { envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile)) if err != nil { - return "", fmt.Errorf("Failed to open env file: %v", err) + return nil, fmt.Errorf("Failed to open env file: %v", err) } defer envBytes.Close() envVars, err := godotenv.Parse(envBytes) if err != nil { - return "", fmt.Errorf("Failed to parse env file: %v", err) + return nil, fmt.Errorf("Failed to parse env file: %v", err) } for key, value := range envVars { @@ -60,23 +86,14 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p } } - vol, err := dockerClient.VolumeCreate(ctx, volume.CreateOptions{ - Driver: "local", - DriverOpts: map[string]string{}, - Name: fmt.Sprintf("flux_%s-volume", projectConfig.Name), - }) - if err != nil { - return "", fmt.Errorf("Failed to create volume: %v", err) - } - - log.Printf("Volume %s created at %s\n", vol.Name, vol.Mountpoint) + vol, err := CreateVolume(ctx, fmt.Sprintf("flux_%s-volume", projectConfig.Name)) log.Printf("Creating container %s...\n", containerName) resp, err := dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: projectConfig.Environment, Volumes: map[string]struct{}{ - vol.Name: {}, + vol.VolumeID: {}, }, }, &container.HostConfig{ @@ -85,7 +102,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p Mounts: []mount.Mount{ { Type: mount.TypeVolume, - Source: vol.Name, + Source: vol.VolumeID, Target: "/workspace", ReadOnly: false, }, @@ -96,11 +113,16 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p containerName, ) if err != nil { - return "", fmt.Errorf("Failed to create container: %v", err) + return nil, fmt.Errorf("Failed to create container: %v", err) + } + + c = &Container{ + ContainerID: [64]byte([]byte(resp.ID)), + Volumes: []Volume{*vol}, } log.Printf("Created new container: %s\n", containerName) - return resp.ID, nil + return c, nil } func (c *Container) Start(ctx context.Context) error { @@ -112,7 +134,43 @@ func (c *Container) Stop(ctx context.Context) error { } func (c *Container) Remove(ctx context.Context) error { - return RemoveDockerContainer(ctx, string(c.ContainerID[:])) + err := RemoveDockerContainer(ctx, string(c.ContainerID[:])) + + if err != nil { + return fmt.Errorf("Failed to remove container (%s): %v", c.ContainerID[:12], err) + } + + tx, err := Flux.db.Begin() + if err != nil { + log.Printf("Failed to begin transaction: %v\n", err) + return err + } + + _, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", c.ContainerID[:]) + if err != nil { + tx.Rollback() + return err + } + + for _, volume := range c.Volumes { + if err := RemoveVolume(ctx, volume.VolumeID); err != nil { + tx.Rollback() + return fmt.Errorf("Failed to remove volume (%s): %v", volume.VolumeID, err) + } + + _, err = tx.Exec("DELETE FROM volumes WHERE volume_id = ?", volume.VolumeID) + if err != nil { + tx.Rollback() + return err + } + } + + if err := tx.Commit(); err != nil { + log.Printf("Failed to commit transaction: %v\n", err) + return err + } + + return nil } func (c *Container) Wait(ctx context.Context, port uint16) error { @@ -122,11 +180,11 @@ func (c *Container) Wait(ctx context.Context, port uint16) error { // RemoveContainer stops and removes a container, but be warned that this will not remove the container from the database func RemoveDockerContainer(ctx context.Context, containerID string) error { if err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { - return fmt.Errorf("Failed to stop existing container: %v", err) + return fmt.Errorf("Failed to stop container (%s): %v", containerID[:12], err) } if err := dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { - return fmt.Errorf("Failed to remove existing container: %v", err) + return fmt.Errorf("Failed to remove container (%s): %v", containerID[:12], err) } return nil @@ -192,8 +250,10 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er } func RemoveVolume(ctx context.Context, volumeID string) error { + log.Printf("Removed volume %s\n", volumeID) + if err := dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { - return fmt.Errorf("Failed to remove existing volume: %v", err) + return fmt.Errorf("Failed to remove volume (%s): %v", volumeID, err) } return nil diff --git a/server/deploy.go b/server/deploy.go index 4626a70..ce61813 100644 --- a/server/deploy.go +++ b/server/deploy.go @@ -7,9 +7,7 @@ import ( "log" "mime/multipart" "net/http" - "os" "os/exec" - "path/filepath" "github.com/juls0730/fluxd/pkg" ) @@ -93,7 +91,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - app := Apps.GetApp(projectConfig.Name) + if Flux.appManager == nil { + panic("App manager is nil") + } + + app := Flux.appManager.GetApp(projectConfig.Name) if app == nil { app = &App{ @@ -101,14 +103,14 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } log.Printf("Creating deployment %s...\n", app.Name) - containerID, err := CreateDockerContainer(r.Context(), imageName, projectPath, projectConfig) - if err != nil { + container, err := CreateDockerContainer(r.Context(), imageName, projectPath, projectConfig) + if err != nil || container == nil { log.Printf("Failed to create container: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - deployment, err := CreateDeployment(containerID, projectConfig.Port, projectConfig.Url, s.db) + deployment, err := CreateDeployment(*container, projectConfig.Port, projectConfig.Url, s.db) app.Deployment = deployment if err != nil { log.Printf("Failed to create deployment: %v\n", err) @@ -154,9 +156,9 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - ReverseProxy.AddDeployment(&deployment) + Flux.proxy.AddDeployment(&deployment) - Apps.AddApp(app.Name, app) + Flux.appManager.AddApp(app.Name, app) } else { log.Printf("Upgrading deployment %s...\n", app.Name) @@ -171,7 +173,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } } - err = app.Deployment.Upgrade(r.Context(), projectConfig, imageName, projectPath, s) + err = app.Deployment.Upgrade(r.Context(), projectConfig, imageName, projectPath) if err != nil { log.Printf("Failed to upgrade deployment: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -189,7 +191,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") - app := Apps.GetApp(name) + app := Flux.appManager.GetApp(name) if app == nil { http.Error(w, "App not found", http.StatusNotFound) return @@ -218,7 +220,7 @@ func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") - app := Apps.GetApp(name) + app := Flux.appManager.GetApp(name) if app == nil { http.Error(w, "App not found", http.StatusNotFound) return @@ -246,159 +248,37 @@ func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) { func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") - var err error - - app := Apps.GetApp(name) - if app == nil { - http.Error(w, "App not found", http.StatusNotFound) - return - } log.Printf("Deleting deployment %s...\n", name) - for _, container := range app.Deployment.Containers { - err = RemoveDockerContainer(r.Context(), string(container.ContainerID[:])) - if err != nil { - log.Printf("Failed to remove container: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } + err := Flux.appManager.DeleteApp(name) - err = RemoveVolume(r.Context(), fmt.Sprintf("flux_%s-volume", name)) if err != nil { - log.Printf("Failed to remove volume: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - tx, err := s.db.Begin() - if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = tx.Exec("DELETE FROM deployments WHERE id = ?", app.DeploymentID) - if err != nil { - tx.Rollback() - log.Printf("Failed to delete deployment: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = tx.Exec("DELETE FROM containers WHERE deployment_id = ?", app.DeploymentID) - if err != nil { - tx.Rollback() - log.Printf("Failed to delete containers: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = tx.Exec("DELETE FROM apps WHERE id = ?", app.ID) - if err != nil { - tx.Rollback() log.Printf("Failed to delete app: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusNotFound) return } - if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - projectPath := filepath.Join(s.rootDir, "apps", name) - err = os.RemoveAll(projectPath) - if err != nil { - log.Printf("Failed to remove project directory: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - Apps.DeleteApp(name) - w.WriteHeader(http.StatusOK) } func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) { - var err error - - for _, app := range Apps.GetAllApps() { - for _, container := range app.Deployment.Containers { - err = RemoveDockerContainer(r.Context(), string(container.ContainerID[:])) - if err != nil { - log.Printf("Failed to remove container: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - err = RemoveVolume(r.Context(), fmt.Sprintf("flux_%s-volume", app.Name)) + for _, app := range Flux.appManager.GetAllApps() { + err := app.Remove(r.Context()) if err != nil { - log.Printf("Failed to remove volume: %v\n", err) + log.Printf("Failed to remove app: %v\n", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } } - tx, err := s.db.Begin() - if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = tx.Exec("DELETE FROM deployments") - if err != nil { - tx.Rollback() - log.Printf("Failed to delete deployments: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = tx.Exec("DELETE FROM containers") - if err != nil { - tx.Rollback() - log.Printf("Failed to delete containers: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, err = tx.Exec("DELETE FROM apps") - if err != nil { - tx.Rollback() - log.Printf("Failed to delete apps: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if err := os.RemoveAll(filepath.Join(s.rootDir, "apps")); err != nil { - log.Printf("Failed to remove apps directory: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if err := os.RemoveAll(filepath.Join(s.rootDir, "deployments")); err != nil { - log.Printf("Failed to remove deployments directory: %v\n", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(http.StatusOK) } func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) { // for each app, get the deployment status var apps []*pkg.App - for _, app := range Apps.GetAllApps() { + for _, app := range Flux.appManager.GetAllApps() { var extApp pkg.App deploymentStatus, err := app.Deployment.Status(r.Context()) if err != nil { @@ -417,3 +297,10 @@ func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(apps) } + +func (s *FluxServer) DaemonInfoHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pkg.Info{ + Compression: s.config.Compression, + }) +} diff --git a/server/deployment.go b/server/deployment.go index 6f57c5f..313ad15 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -5,15 +5,18 @@ import ( "database/sql" "fmt" "log" + "os" + "path/filepath" "sync" "github.com/juls0730/fluxd/pkg" ) var ( - Apps *AppManager = new(AppManager) deploymentInsertStmt *sql.Stmt containerInsertStmt *sql.Stmt + volumeInsertStmt *sql.Stmt + updateVolumeStmt *sql.Stmt ) type AppManager struct { @@ -27,6 +30,28 @@ type App struct { DeploymentID int64 `json:"deployment_id,omitempty"` } +func (app *App) Remove(ctx context.Context) error { + err := app.Deployment.Remove(ctx) + if err != nil { + log.Printf("Failed to remove deployment: %v\n", err) + return err + } + + _, err = Flux.db.Exec("DELETE FROM apps WHERE id = ?", app.ID) + if err != nil { + log.Printf("Failed to delete app: %v\n", err) + return err + } + + projectPath := filepath.Join(Flux.rootDir, "apps", app.Name) + err = os.RemoveAll(projectPath) + if err != nil { + return fmt.Errorf("Failed to remove project directory: %v", err) + } + + return nil +} + type Deployment struct { ID int64 `json:"id"` Containers []Container `json:"-"` @@ -59,18 +84,30 @@ func (am *AppManager) AddApp(name string, app *App) { am.Store(name, app) } -func (am *AppManager) DeleteApp(name string) { +func (am *AppManager) DeleteApp(name string) error { + app := am.GetApp(name) + if app == nil { + return fmt.Errorf("App not found") + } + + err := app.Remove(context.Background()) + if err != nil { + return err + } + am.Delete(name) + + return nil } -func (am *AppManager) Init() { +func (am *AppManager) Init(db *sql.DB) { log.Printf("Initializing deployments...\n") - if DB == nil { + if db == nil { log.Panicf("DB is nil") } - rows, err := DB.Query("SELECT id, name, deployment_id FROM apps") + rows, err := db.Query("SELECT id, name, deployment_id FROM apps") if err != nil { log.Printf("Failed to query apps: %v\n", err) return @@ -90,14 +127,15 @@ func (am *AppManager) Init() { for _, app := range apps { var deployment Deployment var headContainer *Container - DB.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port) + db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port) deployment.Containers = make([]Container, 0) - rows, err := DB.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) + rows, err = db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) if err != nil { log.Printf("Failed to query containers: %v\n", err) return } + defer rows.Close() for rows.Next() { var container Container @@ -113,6 +151,24 @@ func (am *AppManager) Init() { deployment.Containers = append(deployment.Containers, container) } + for i, container := range deployment.Containers { + var volumes []Volume + rows, err := db.Query("SELECT id, volume_id, container_id FROM volumes WHERE container_id = ?", container.ID) + if err != nil { + log.Printf("Failed to query volumes: %v\n", err) + return + } + defer rows.Close() + + for rows.Next() { + var volume Volume + rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID) + volumes = append(volumes, volume) + } + + deployment.Containers[i].Volumes = volumes + } + deployment.Proxy, err = NewDeploymentProxy(&deployment, headContainer) if err != nil { log.Printf("Failed to create deployment proxy: %v\n", err) @@ -121,12 +177,12 @@ func (am *AppManager) Init() { app.Deployment = deployment - Apps.AddApp(app.Name, &app) + am.AddApp(app.Name, &app) } } // Creates a deployment and containers in the database -func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB) (Deployment, error) { +func CreateDeployment(container Container, port uint16, appUrl string, db *sql.DB) (Deployment, error) { var deployment Deployment var err error @@ -144,7 +200,6 @@ func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB return Deployment{}, err } - var container Container if containerInsertStmt == nil { containerInsertStmt, err = db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") if err != nil { @@ -154,35 +209,50 @@ func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB } var containerIDString string - err = containerInsertStmt.QueryRow(containerID, deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) + err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) if err != nil { log.Printf("Failed to get container id: %v\n", err) return Deployment{}, err } copy(container.ContainerID[:], containerIDString) + for i, volume := range container.Volumes { + if volumeInsertStmt == nil { + volumeInsertStmt, err = db.Prepare("INSERT INTO volumes (volume_id, container_id) VALUES (?, ?) RETURNING id, volume_id, container_id") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + return Deployment{}, err + } + } + + if err := volumeInsertStmt.QueryRow(volume.VolumeID, container.ID).Scan(&container.Volumes[i].ID, &container.Volumes[i].VolumeID, &container.Volumes[i].ContainerID); err != nil { + log.Printf("Failed to insert volume: %v\n", err) + return Deployment{}, err + } + } + container.Deployment = &deployment deployment.Containers = append(deployment.Containers, container) return deployment, nil } -func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string, s *FluxServer) error { +func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string) error { existingContainers, err := findExistingDockerContainers(ctx, projectConfig.Name) if err != nil { return fmt.Errorf("Failed to find existing containers: %v", err) } // Deploy new container before deleting old one - containerID, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig) - if err != nil { + c, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig) + if err != nil || c == nil { log.Printf("Failed to create container: %v\n", err) return err } - var container Container + var container Container = *c if containerInsertStmt == nil { - containerInsertStmt, err = DB.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") + containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") if err != nil { log.Printf("Failed to prepare statement: %v\n", err) return err @@ -190,17 +260,48 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro } var containerIDString string - err = containerInsertStmt.QueryRow(containerID, deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) + err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) if err != nil { log.Printf("Failed to get container id: %v\n", err) return err } container.Deployment = deployment + // the space time complexity of this is pretty bad, but it works + for _, existingContainer := range deployment.Containers { + if !existingContainer.Head { + continue + } + + for _, volume := range existingContainer.Volumes { + var targetVolume *Volume + for i, volume := range container.Volumes { + if volume.VolumeID == volume.VolumeID { + targetVolume = &container.Volumes[i] + break + } + } + + if updateVolumeStmt == nil { + updateVolumeStmt, err = Flux.db.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, container_id") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + return err + } + } + + err := updateVolumeStmt.QueryRow(container.ID, volume.ID).Scan(&targetVolume.ID, &targetVolume.VolumeID, &targetVolume.ContainerID) + if err != nil { + log.Printf("Failed to update volume: %v\n", err) + return err + } + } + } + copy(container.ContainerID[:], containerIDString) deployment.Containers = append(deployment.Containers, container) - log.Printf("Starting container %s...\n", containerID) + log.Printf("Starting container %s...\n", container.ContainerID[:]) err = container.Start(ctx) if err != nil { log.Printf("Failed to start container: %v\n", err) @@ -212,7 +313,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro return err } - tx, err := s.db.Begin() + tx, err := Flux.db.Begin() if err != nil { log.Printf("Failed to begin transaction: %v\n", err) return err @@ -235,12 +336,6 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro return err } - tx, err = s.db.Begin() - if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) - return err - } - // Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully shutdown oldProxy := deployment.Proxy deployment.Proxy, err = NewDeploymentProxy(deployment, &container) @@ -249,13 +344,19 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro return err } + tx, err = Flux.db.Begin() + if err != nil { + log.Printf("Failed to begin transaction: %v\n", err) + return err + } + var containers []Container var oldContainers []*Container for _, container := range deployment.Containers { if existingContainers[string(container.ContainerID[:])] { - log.Printf("Stopping existing container: %s\n", container.ContainerID[0:12]) + log.Printf("Deleting container from db: %s\n", container.ContainerID[0:12]) - _, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", string(container.ContainerID[:])) + _, err = tx.Exec("DELETE FROM containers WHERE id = ?", container.ID) oldContainers = append(oldContainers, &container) if err != nil { @@ -269,19 +370,26 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro containers = append(containers, container) } - if oldProxy != nil { - go oldProxy.GracefulShutdown(oldContainers) - } - - deployment.Containers = containers - - ReverseProxy.AddDeployment(deployment) - if err := tx.Commit(); err != nil { log.Printf("Failed to commit transaction: %v\n", err) return err } + if oldProxy != nil { + go oldProxy.GracefulShutdown(oldContainers) + } else { + for _, container := range oldContainers { + err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:])) + if err != nil { + log.Printf("Failed to remove container: %v\n", err) + } + } + } + + deployment.Containers = containers + + Flux.proxy.AddDeployment(deployment) + return nil } @@ -295,6 +403,24 @@ func arrayContains(arr []string, str string) bool { return false } +func (d *Deployment) Remove(ctx context.Context) error { + for _, container := range d.Containers { + err := container.Remove(ctx) + if err != nil { + log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err) + return err + } + } + + _, err := Flux.db.Exec("DELETE FROM deployments WHERE id = ?", d.ID) + if err != nil { + log.Printf("Failed to delete deployment: %v\n", err) + return err + } + + return nil +} + func (d *Deployment) Start(ctx context.Context) error { for _, container := range d.Containers { err := container.Start(ctx) diff --git a/server/proxy.go b/server/proxy.go index 885f63d..b224469 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -7,14 +7,11 @@ import ( "net/http" "net/http/httputil" "net/url" - "os" "sync" "sync/atomic" "time" ) -var ReverseProxy *Proxy - type Proxy struct { deployments sync.Map } @@ -111,32 +108,7 @@ func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) { for _, container := range oldContainers { err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:])) if err != nil { - log.Printf("Failed to remove container: %v\n", err) + log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err) } } } - -func InitProxy(apps *AppManager) { - ReverseProxy = &Proxy{} - - apps.Range(func(key, value interface{}) bool { - app := value.(*App) - ReverseProxy.AddDeployment(&app.Deployment) - return true - }) -} - -func InitReverseProxy() { - InitProxy(Apps) - port := os.Getenv("FLUXD_PROXY_PORT") - if port == "" { - port = "7465" - } - - go func() { - log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port) - if err := http.ListenAndServe(fmt.Sprintf(":%s", port), ReverseProxy); err != nil && err != http.ErrServerClosed { - log.Fatalf("Proxy server error: %v", err) - } - }() -} diff --git a/server/schema.sql b/server/schema.sql index 5b560c8..e6cd682 100644 --- a/server/schema.sql +++ b/server/schema.sql @@ -1,20 +1,27 @@ CREATE TABLE IF NOT EXISTS deployments ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT NOT NULL, + id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, + url TEXT NOT NULL UNIQUE, port INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS apps ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, + id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, + name TEXT NOT NULL UNIQUE, deployment_id INTEGER, FOREIGN KEY(deployment_id) REFERENCES deployments(id) ); CREATE TABLE IF NOT EXISTS containers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, container_id TEXT NOT NULL, head BOOLEAN NOT NULL, deployment_id INTEGER NOT NULL, FOREIGN KEY(deployment_id) REFERENCES deployments(id) +); + +CREATE TABLE IF NOT EXISTS volumes ( + id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, + volume_id TEXT NOT NULL, + container_id INTEGER NOT NULL, + FOREIGN KEY(container_id) REFERENCES containers(id) ); \ No newline at end of file diff --git a/server/server.go b/server/server.go index e9858a4..25bfd3d 100644 --- a/server/server.go +++ b/server/server.go @@ -5,8 +5,10 @@ import ( "compress/gzip" "database/sql" "encoding/json" + "fmt" "io" "log" + "net/http" "os" "path/filepath" @@ -21,18 +23,25 @@ var ( schemaBytes []byte DefaultConfig = FluxServerConfig{ Builder: "paketobuildpacks/builder-jammy-tiny", + Compression: pkg.Compression{ + Enabled: false, + Level: 0, + }, } - DB *sql.DB + Flux *FluxServer ) type FluxServerConfig struct { - Builder string `json:"builder"` + Builder string `json:"builder"` + Compression pkg.Compression `json:"compression"` } type FluxServer struct { - config FluxServerConfig - db *sql.DB - rootDir string + config FluxServerConfig + db *sql.DB + proxy *Proxy + rootDir string + appManager *AppManager } func NewServer() *FluxServer { @@ -74,40 +83,79 @@ func NewServer() *FluxServer { log.Fatalf("Failed to create apps directory: %v\n", err) } - DB, err = sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db")) + db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db")) if err != nil { log.Fatalf("Failed to open database: %v\n", err) } - _, err = DB.Exec(string(schemaBytes)) + _, err = db.Exec(string(schemaBytes)) if err != nil { log.Fatalf("Failed to create database schema: %v\n", err) } - Apps.Init() + appManager := new(AppManager) + appManager.Init(db) - return &FluxServer{ - config: serverConfig, - db: DB, - rootDir: rootDir, + proxy := &Proxy{} + + appManager.Range(func(key, value interface{}) bool { + app := value.(*App) + proxy.AddDeployment(&app.Deployment) + return true + }) + + port := os.Getenv("FLUXD_PROXY_PORT") + if port == "" { + port = "7465" } + + go func() { + log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port) + if err := http.ListenAndServe(fmt.Sprintf(":%s", port), proxy); err != nil && err != http.ErrServerClosed { + log.Fatalf("Proxy server error: %v", err) + } + }() + + Flux = &FluxServer{ + config: serverConfig, + db: db, + proxy: proxy, + appManager: appManager, + rootDir: rootDir, + } + + return Flux } func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConfig) (string, error) { + var err error projectPath := filepath.Join(s.rootDir, "apps", projectConfig.Name) - if err := os.MkdirAll(projectPath, 0755); err != nil { + if err = os.MkdirAll(projectPath, 0755); err != nil { log.Printf("Failed to create project directory: %v\n", err) return "", err } - gzReader, err := gzip.NewReader(code) - if err != nil { - log.Printf("Failed to create gzip reader: %v\n", err) - return "", err - } - defer gzReader.Close() + var gzReader *gzip.Reader + defer func() { + if gzReader != nil { + gzReader.Close() + } + }() - tarReader := tar.NewReader(gzReader) + if s.config.Compression.Enabled { + gzReader, err = gzip.NewReader(code) + if err != nil { + log.Printf("Failed to create gzip reader: %v\n", err) + return "", err + } + } + var tarReader *tar.Reader + + if gzReader != nil { + tarReader = tar.NewReader(gzReader) + } else { + tarReader = tar.NewReader(code) + } log.Printf("Extracting files for %s...\n", projectPath) for { @@ -126,12 +174,12 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf // Handle different file types switch header.Typeflag { case tar.TypeDir: - if err := os.MkdirAll(path, 0755); err != nil { + if err = os.MkdirAll(path, 0755); err != nil { log.Printf("Failed to extract directory: %v\n", err) return "", err } case tar.TypeReg: - if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + if err = os.MkdirAll(filepath.Dir(path), 0755); err != nil { log.Printf("Failed to extract directory: %v\n", err) return "", err } @@ -143,7 +191,7 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf } defer outFile.Close() - if _, err := io.Copy(outFile, tarReader); err != nil { + if _, err = io.Copy(outFile, tarReader); err != nil { log.Printf("Failed to copy file during extraction: %v\n", err) return "", err }