From de22bd20c9989ffdc200670dfc0a35e15e8ff96b Mon Sep 17 00:00:00 2001 From: juls0730 <62722391+juls0730@users.noreply.github.com> Date: Sat, 14 Dec 2024 02:49:05 -0600 Subject: [PATCH] fix pipe closed issue, and a lot of other stuff --- Dockerfile | 20 ++ README.md | 53 ++- cmd/flux/handlers/delete.go | 113 ++++++ cmd/flux/handlers/deploy.go | 254 ++++++++++++++ cmd/flux/handlers/init.go | 73 ++++ cmd/flux/handlers/list.go | 55 +++ cmd/flux/handlers/project.go | 36 ++ cmd/flux/handlers/start.go | 48 +++ cmd/flux/handlers/stop.go | 47 +++ cmd/flux/main.go | 649 +---------------------------------- cmd/flux/models/config.go | 5 + cmd/flux/models/writers.go | 69 ++++ cmd/fluxd/main.go | 10 +- docker-compose.yml | 16 + go.mod | 5 +- go.sum | 10 + pkg/responses.go | 2 +- server/app.go | 38 +- server/container.go | 22 +- server/deploy.go | 119 ++++--- server/deployment.go | 40 ++- server/proxy.go | 11 +- server/server.go | 132 ++++--- 23 files changed, 1032 insertions(+), 795 deletions(-) create mode 100644 Dockerfile create mode 100644 cmd/flux/handlers/delete.go create mode 100644 cmd/flux/handlers/deploy.go create mode 100644 cmd/flux/handlers/init.go create mode 100644 cmd/flux/handlers/list.go create mode 100644 cmd/flux/handlers/project.go create mode 100644 cmd/flux/handlers/start.go create mode 100644 cmd/flux/handlers/stop.go create mode 100644 cmd/flux/models/config.go create mode 100644 cmd/flux/models/writers.go create mode 100644 docker-compose.yml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ce01a27 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.23-bookworm as builder + +WORKDIR /app + +COPY go.mod go.sum ./ + +RUN go mod download + +COPY . . + +RUN GOOS=linux go build -o fluxd cmd/fluxd/main.go + +RUN (curl -sSL "https://github.com/buildpacks/pack/releases/download/v0.36.0/pack-v0.36.0-linux.tgz" | tar -C /usr/local/bin/ --no-same-owner -xzv pack) +RUN apt-get install -y ca-certificates + +EXPOSE 5647 7465 + +VOLUME [ "/var/run/docker.sock" ] + +CMD ["/app/fluxd"] diff --git a/README.md b/README.md index 6bbceed..0a5b559 100644 --- a/README.md +++ b/README.md @@ -22,17 +22,62 @@ Flux is a lightweight self-hosted pseudo-PaaS for hosting Golang web apps with e To install and start the Flux daemon using ZQDGR, run the following command: +> [!IMPORTANT] +> CGO is required to build the daemon due to the use of [mattn/go-sqlite3](https://github.com/mattn/go-sqlite3) + +#### Method 1: ZQDGR + ```bash -# method 1 +go install github.com/juls0730/zqdgr@latest + +git clone https://github.com/juls0730/flux.git +cd flux + +# either zqdgr build:daemon sudo ./fluxd -# method 2 +# or FLUXD_ROOT_DIR=$PWD/fluxdd zqdgr run:daemon ``` -> [!IMPORTANT] -> CGO is required to build the daemon due to the use of [mattn/go-sqlite3](https://github.com/mattn/go-sqlite3) +#### Method 2: Docker + +```bash +docker run -d --name fluxd --network host -v /var/run/docker.sock:/var/run/docker.sock -v fluxd-data:/var/fluxd -p 5647:5647 -p 7465:7465 zoeissleeping/fluxd:latest +``` + +#### Method 3: Systemd + +```bash +go install github.com/juls0730/zqdgr@latest + +git clone https://github.com/juls0730/flux.git +cd flux + +zqdgr build:daemon +sudo mv fluxd /usr/local/bin/ + +sudo cat < /etc/systemd/system/fluxd.service +[Unit] +Description=Flux Daemon +After=network.target + +[Service] +ExecStart=/usr/local/bin/fluxd +WorkingDirectory=/var/fluxd +User=fluxuser +Group=fluxgroup +Restart=always +Environment=FLUXD_ROOT_DIR=/var/fluxd + +[Install] +WantedBy=multi-user.target +EOF + +sudo systemctl daemon-reload +sudo systemctl enable --now fluxd +``` ### CLI diff --git a/cmd/flux/handlers/delete.go b/cmd/flux/handlers/delete.go new file mode 100644 index 0000000..99fa62a --- /dev/null +++ b/cmd/flux/handlers/delete.go @@ -0,0 +1,113 @@ +package handlers + +import ( + "fmt" + "io" + "net/http" + "strings" + + "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/models" + "github.com/juls0730/flux/pkg" +) + +func DeleteCommand(seekingHelp bool, config models.Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *models.CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux delete [project-name | all] + + Options: + project-name: The name of the project to delete + all: Delete all projects + + Flux will delete the deployment of the app in the current directory or the specified project.`) + return nil + } + + if len(args) == 1 { + if args[0] == "all" { + var response string + fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + response = "" + + fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) + if err != nil { + return fmt.Errorf("failed to delete deployments: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to delete deployments: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + + return fmt.Errorf("delete failed: %s", responseBody) + } + + fmt.Printf("Successfully deleted all projects\n") + return nil + } + } + + projectName, err := GetProjectName("delete", args) + if err != nil { + return err + } + + // ask for confirmation + fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) + var response string + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments/"+projectName, nil) + if err != nil { + return fmt.Errorf("failed to delete app: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to delete app: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + + return fmt.Errorf("delete failed: %s", responseBody) + } + + fmt.Printf("Successfully deleted %s\n", projectName) + + return nil +} diff --git a/cmd/flux/handlers/deploy.go b/cmd/flux/handlers/deploy.go new file mode 100644 index 0000000..15586bc --- /dev/null +++ b/cmd/flux/handlers/deploy.go @@ -0,0 +1,254 @@ +package handlers + +import ( + "archive/tar" + "bufio" + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "path/filepath" + "regexp" + "strings" + + "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/models" + "github.com/juls0730/flux/pkg" +) + +func matchesIgnorePattern(path string, info os.FileInfo, patterns []string) bool { + normalizedPath := filepath.ToSlash(path) + normalizedPath = strings.TrimPrefix(normalizedPath, "./") + + for _, pattern := range patterns { + pattern = strings.TrimSpace(pattern) + if pattern == "" || strings.HasPrefix(pattern, "#") { + continue + } + + regexPattern := convertGitignorePatternToRegex(pattern) + + matched, err := regexp.MatchString(regexPattern, normalizedPath) + if err == nil && matched { + if strings.HasSuffix(pattern, "/") && info.IsDir() { + return true + } + if !info.IsDir() { + dir := filepath.Dir(normalizedPath) + for dir != "." && dir != "/" { + dirPattern := convertGitignorePatternToRegex(pattern) + if matched, _ := regexp.MatchString(dirPattern, filepath.ToSlash(dir)); matched { + return true + } + dir = filepath.Dir(dir) + } + } + return true + } + } + return false +} + +func convertGitignorePatternToRegex(pattern string) string { + pattern = strings.TrimSuffix(pattern, "/") + pattern = regexp.QuoteMeta(pattern) + pattern = strings.ReplaceAll(pattern, "\\*\\*", ".*") + pattern = strings.ReplaceAll(pattern, "\\*", "[^/]*") + pattern = strings.ReplaceAll(pattern, "\\?", ".") + pattern = "(^|.*/)" + pattern + "(/.*)?$" + + return pattern +} + +func compressDirectory(compression pkg.Compression) ([]byte, error) { + var buf bytes.Buffer + var err error + + var ignoredFiles []string + fluxIgnore, err := os.Open(".fluxignore") + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } + + if fluxIgnore != nil { + defer fluxIgnore.Close() + + scanner := bufio.NewScanner(fluxIgnore) + for scanner.Scan() { + ignoredFiles = append(ignoredFiles, scanner.Text()) + } + } + + var gzWriter *gzip.Writer + if compression.Enabled { + gzWriter, err = gzip.NewWriterLevel(&buf, compression.Level) + if err != nil { + return nil, err + } + } + + var tarWriter *tar.Writer + if gzWriter != nil { + tarWriter = tar.NewWriter(gzWriter) + } else { + tarWriter = tar.NewWriter(&buf) + } + + err = filepath.Walk(".", func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if path == "flux.json" || info.IsDir() || matchesIgnorePattern(path, info, ignoredFiles) { + return nil + } + + header, err := tar.FileInfoHeader(info, "") + if err != nil { + return err + } + header.Name = path + + if err = tarWriter.WriteHeader(header); err != nil { + return err + } + + if !info.IsDir() { + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + if _, err = io.Copy(tarWriter, file); err != nil { + return err + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + if err = tarWriter.Close(); err != nil { + return nil, err + } + + if gzWriter != nil { + if err = gzWriter.Close(); err != nil { + return nil, err + } + } + + return buf.Bytes(), nil +} + +func DeployCommand(seekingHelp bool, config models.Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *models.CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux deploy + + Flux will deploy the app in the current directory, and start routing traffic to it.`) + return nil + } + + if _, err := os.Stat("flux.json"); err != nil { + return fmt.Errorf("no flux.json found, please run flux init first") + } + + loadingSpinner.Suffix = " Deploying" + loadingSpinner.Start() + + buf, err := compressDirectory(info.Compression) + if err != nil { + return fmt.Errorf("failed to compress directory: %v", err) + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + configPart, err := writer.CreateFormFile("config", "flux.json") + + if err != nil { + return fmt.Errorf("failed to create config part: %v", err) + } + + fluxConfigFile, err := os.Open("flux.json") + if err != nil { + return fmt.Errorf("failed to open flux.json: %v", err) + } + defer fluxConfigFile.Close() + + if _, err := io.Copy(configPart, fluxConfigFile); err != nil { + return fmt.Errorf("failed to write config part: %v", err) + } + + codePart, err := writer.CreateFormFile("code", "code.tar.gz") + if err != nil { + return fmt.Errorf("failed to create code part: %v", err) + } + + if _, err := codePart.Write(buf); err != nil { + return fmt.Errorf("failed to write code part: %v", err) + } + + if err := writer.Close(); err != nil { + return fmt.Errorf("failed to close writer: %v", err) + } + + req, err := http.NewRequest("POST", config.DeamonURL+"/deploy", body) + req.Header.Set("Content-Type", writer.FormDataContentType()) + + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %v", err) + } + defer resp.Body.Close() + + customWriter := models.NewCustomStdout(spinnerWriter) + + scanner := bufio.NewScanner(resp.Body) + var event string + var data pkg.DeploymentEvent + var line string + for scanner.Scan() { + line = scanner.Text() + if strings.HasPrefix(line, "data: ") { + if err := json.Unmarshal([]byte(line[6:]), &data); err != nil { + return fmt.Errorf("failed to parse deployment event: %v", err) + } + + switch event { + case "complete": + loadingSpinner.Stop() + fmt.Printf("App %s deployed successfully!\n", data.Message.(map[string]interface{})["name"]) + return nil + case "cmd_output": + customWriter.Printf("... %s\n", data.Message) + case "error": + loadingSpinner.Stop() + return fmt.Errorf("deployment failed: %s", data.Message) + default: + customWriter.Printf("%s\n", data.Message) + } + event = "" + } else if strings.HasPrefix(line, "event: ") { + event = strings.TrimPrefix(line, "event: ") + } + } + + // the stream closed, but we didnt get a "complete" event + line = strings.TrimSuffix(line, "\n") + return fmt.Errorf("deploy failed: %s", line) +} diff --git a/cmd/flux/handlers/init.go b/cmd/flux/handlers/init.go new file mode 100644 index 0000000..33555f8 --- /dev/null +++ b/cmd/flux/handlers/init.go @@ -0,0 +1,73 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + + "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/models" + "github.com/juls0730/flux/pkg" +) + +func InitCommand(seekingHelp bool, config models.Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *models.CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux init [project-name] + + Options: + project-name: The name of the project to initialize + + Flux will initialize a new project in the current directory or the specified project.`) + return nil + } + + var projectConfig pkg.ProjectConfig + + var response string + if len(args) > 1 { + response = args[0] + } else { + fmt.Println("What is the name of your project?") + fmt.Scanln(&response) + } + + projectConfig.Name = response + + fmt.Println("What URL should your project listen to?") + fmt.Scanln(&response) + if strings.HasPrefix(response, "http") { + response = strings.TrimPrefix(response, "http://") + response = strings.TrimPrefix(response, "https://") + } + + response = strings.Split(response, "/")[0] + + projectConfig.Url = response + + fmt.Println("What port does your project listen to?") + fmt.Scanln(&response) + port, err := strconv.ParseUint(response, 10, 16) + portErr := fmt.Errorf("that doesnt look like a valid port, try a number between 1024 and 65535") + if port > 65535 { + return portErr + } + + projectConfig.Port = uint16(port) + if err != nil || projectConfig.Port < 1024 { + return portErr + } + + configBytes, err := json.MarshalIndent(projectConfig, "", " ") + if err != nil { + return fmt.Errorf("failed to parse project config: %v", err) + } + + os.WriteFile("flux.json", configBytes, 0644) + + fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) + + return nil +} diff --git a/cmd/flux/handlers/list.go b/cmd/flux/handlers/list.go new file mode 100644 index 0000000..514a1e5 --- /dev/null +++ b/cmd/flux/handlers/list.go @@ -0,0 +1,55 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/models" + "github.com/juls0730/flux/pkg" +) + +func ListCommand(seekingHelp bool, config models.Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *models.CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux list + + Flux will list all the apps in the daemon.`) + return nil + } + + resp, err := http.Get(config.DeamonURL + "/apps") + if err != nil { + return fmt.Errorf("failed to get apps: %v", err) + } + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + + return fmt.Errorf("list failed: %s", responseBody) + } + + var apps []pkg.App + if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { + return fmt.Errorf("failed to decode apps: %v", err) + } + + if len(apps) == 0 { + fmt.Println("No apps found") + return nil + } + + for _, app := range apps { + fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) + } + + return nil +} diff --git a/cmd/flux/handlers/project.go b/cmd/flux/handlers/project.go new file mode 100644 index 0000000..f2e8b90 --- /dev/null +++ b/cmd/flux/handlers/project.go @@ -0,0 +1,36 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/juls0730/flux/pkg" +) + +func GetProjectName(command string, args []string) (string, error) { + var projectName string + + if len(args) == 0 { + if _, err := os.Stat("flux.json"); err != nil { + return "", fmt.Errorf("usage: flux %[1]s , or run flux %[1]s in the project directory", command) + } + + fluxConfigFile, err := os.Open("flux.json") + if err != nil { + return "", fmt.Errorf("failed to open flux.json: %v", err) + } + defer fluxConfigFile.Close() + + var config pkg.ProjectConfig + if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { + return "", fmt.Errorf("failed to decode flux.json: %v", err) + } + + projectName = config.Name + } else { + projectName = args[0] + } + + return projectName, nil +} diff --git a/cmd/flux/handlers/start.go b/cmd/flux/handlers/start.go new file mode 100644 index 0000000..6e383ba --- /dev/null +++ b/cmd/flux/handlers/start.go @@ -0,0 +1,48 @@ +package handlers + +import ( + "fmt" + "io" + "net/http" + "strings" + + "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/models" + "github.com/juls0730/flux/pkg" +) + +func StartCommand(seekingHelp bool, config models.Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *models.CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux start + + Flux will start the deployment of the app in the current directory.`) + return nil + } + + projectName, err := GetProjectName("start", args) + if err != nil { + return err + } + + req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) + if err != nil { + return fmt.Errorf("failed to start app: %v", err) + } + defer req.Body.Close() + + if req.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + + return fmt.Errorf("start failed: %s", responseBody) + } + + fmt.Printf("Successfully started %s\n", projectName) + + return nil +} diff --git a/cmd/flux/handlers/stop.go b/cmd/flux/handlers/stop.go new file mode 100644 index 0000000..eddc366 --- /dev/null +++ b/cmd/flux/handlers/stop.go @@ -0,0 +1,47 @@ +package handlers + +import ( + "fmt" + "io" + "net/http" + "strings" + + "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/models" + "github.com/juls0730/flux/pkg" +) + +func StopCommand(seekingHelp bool, config models.Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *models.CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux stop + + Flux will stop the deployment of the app in the current directory.`) + return nil + } + + projectName, err := GetProjectName("stop", args) + if err != nil { + return err + } + + req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) + if err != nil { + return fmt.Errorf("failed to stop app: %v", err) + } + defer req.Body.Close() + + if req.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + + return fmt.Errorf("stop failed: %s", responseBody) + } + + fmt.Printf("Successfully stopped %s\n", projectName) + return nil +} diff --git a/cmd/flux/main.go b/cmd/flux/main.go index d8bad12..4eea9b3 100644 --- a/cmd/flux/main.go +++ b/cmd/flux/main.go @@ -1,27 +1,20 @@ package main import ( - "archive/tar" - "bufio" - "bytes" - "compress/gzip" _ "embed" "encoding/json" "fmt" - "io" - "mime/multipart" "net/http" "os" "os/signal" "path/filepath" - "regexp" - "strconv" "strings" - "sync" "time" "github.com/agnivade/levenshtein" "github.com/briandowns/spinner" + "github.com/juls0730/flux/cmd/flux/handlers" + "github.com/juls0730/flux/cmd/flux/models" "github.com/juls0730/flux/pkg" ) @@ -30,613 +23,6 @@ var config []byte var configPath = filepath.Join(os.Getenv("HOME"), "/.config/flux") -type Config struct { - DeamonURL string `json:"deamon_url"` -} - -func matchesIgnorePattern(path string, info os.FileInfo, patterns []string) bool { - normalizedPath := filepath.ToSlash(path) - normalizedPath = strings.TrimPrefix(normalizedPath, "./") - - for _, pattern := range patterns { - pattern = strings.TrimSpace(pattern) - if pattern == "" || strings.HasPrefix(pattern, "#") { - continue - } - - regexPattern := convertGitignorePatternToRegex(pattern) - - matched, err := regexp.MatchString(regexPattern, normalizedPath) - if err == nil && matched { - if strings.HasSuffix(pattern, "/") && info.IsDir() { - return true - } - if !info.IsDir() { - dir := filepath.Dir(normalizedPath) - for dir != "." && dir != "/" { - dirPattern := convertGitignorePatternToRegex(pattern) - if matched, _ := regexp.MatchString(dirPattern, filepath.ToSlash(dir)); matched { - return true - } - dir = filepath.Dir(dir) - } - } - return true - } - } - return false -} - -func convertGitignorePatternToRegex(pattern string) string { - pattern = strings.TrimSuffix(pattern, "/") - pattern = regexp.QuoteMeta(pattern) - pattern = strings.ReplaceAll(pattern, "\\*\\*", ".*") - pattern = strings.ReplaceAll(pattern, "\\*", "[^/]*") - pattern = strings.ReplaceAll(pattern, "\\?", ".") - pattern = "(^|.*/)" + pattern + "(/.*)?$" - - return pattern -} - -func compressDirectory(compression pkg.Compression) ([]byte, error) { - var buf bytes.Buffer - var err error - - var ignoredFiles []string - fluxIgnore, err := os.Open(".fluxignore") - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - } - - if fluxIgnore != nil { - defer fluxIgnore.Close() - - scanner := bufio.NewScanner(fluxIgnore) - for scanner.Scan() { - ignoredFiles = append(ignoredFiles, scanner.Text()) - } - } - - var gzWriter *gzip.Writer - if compression.Enabled { - gzWriter, err = gzip.NewWriterLevel(&buf, compression.Level) - if err != nil { - return nil, err - } - } - - var tarWriter *tar.Writer - if gzWriter != nil { - tarWriter = tar.NewWriter(gzWriter) - } else { - tarWriter = tar.NewWriter(&buf) - } - - err = filepath.Walk(".", func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if path == "flux.json" || info.IsDir() || matchesIgnorePattern(path, info, ignoredFiles) { - return nil - } - - header, err := tar.FileInfoHeader(info, "") - if err != nil { - return err - } - header.Name = path - - if err = tarWriter.WriteHeader(header); err != nil { - return err - } - - if !info.IsDir() { - file, err := os.Open(path) - if err != nil { - return err - } - defer file.Close() - - if _, err = io.Copy(tarWriter, file); err != nil { - return err - } - } - - return nil - }) - - if err != nil { - return nil, err - } - - if err = tarWriter.Close(); err != nil { - return nil, err - } - - if gzWriter != nil { - if err = gzWriter.Close(); err != nil { - return nil, err - } - } - - return buf.Bytes(), nil -} - -func arrayContains(arr []string, str string) bool { - for _, a := range arr { - if a == str { - return true - } - } - return false -} - -func getProjectName(command string, args []string) (string, error) { - var projectName string - - if len(args) == 0 { - if _, err := os.Stat("flux.json"); err != nil { - return "", fmt.Errorf("usage: flux %[1]s , or run flux %[1]s in the project directory", command) - } - - fluxConfigFile, err := os.Open("flux.json") - if err != nil { - return "", fmt.Errorf("failed to open flux.json: %v", err) - } - defer fluxConfigFile.Close() - - var config pkg.ProjectConfig - if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - return "", fmt.Errorf("failed to decode flux.json: %v", err) - } - - projectName = config.Name - } else { - projectName = args[0] - } - - return projectName, nil -} - -type CustomSpinnerWriter struct { - currentSpinnerMsg string - lock sync.Mutex -} - -func (w *CustomSpinnerWriter) Write(p []byte) (n int, err error) { - w.lock.Lock() - defer w.lock.Unlock() - - n, err = os.Stdout.Write(p) - if err != nil { - return n, err - } - - w.currentSpinnerMsg = string(p) - - return len(p), nil -} - -type CustomStdout struct { - spinner *CustomSpinnerWriter - lock sync.Mutex -} - -func (w *CustomStdout) Write(p []byte) (n int, err error) { - w.lock.Lock() - defer w.lock.Unlock() - - n, err = os.Stdout.Write([]byte(fmt.Sprintf("\033[2K\r%s", p))) - if err != nil { - return n, err - } - - nn, err := os.Stdout.Write([]byte(w.spinner.currentSpinnerMsg)) - if err != nil { - return n, err - } - - n = nn + n - - return n, nil -} - -func (w *CustomStdout) Printf(format string, a ...interface{}) (n int, err error) { - str := fmt.Sprintf(format, a...) - return w.Write([]byte(str)) -} - -func DeployCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { - if seekingHelp { - fmt.Println(`Usage: - flux deploy - - Flux will deploy the app in the current directory, and start routing traffic to it.`) - return nil - } - - if _, err := os.Stat("flux.json"); err != nil { - return fmt.Errorf("no flux.json found, please run flux init first") - } - - loadingSpinner.Suffix = " Deploying" - loadingSpinner.Start() - - buf, err := compressDirectory(info.Compression) - if err != nil { - return fmt.Errorf("failed to compress directory: %v", err) - } - - body := &bytes.Buffer{} - writer := multipart.NewWriter(body) - configPart, err := writer.CreateFormFile("config", "flux.json") - - if err != nil { - return fmt.Errorf("failed to create config part: %v", err) - } - - fluxConfigFile, err := os.Open("flux.json") - if err != nil { - return fmt.Errorf("failed to open flux.json: %v", err) - } - defer fluxConfigFile.Close() - - if _, err := io.Copy(configPart, fluxConfigFile); err != nil { - return fmt.Errorf("failed to write config part: %v", err) - } - - codePart, err := writer.CreateFormFile("code", "code.tar.gz") - if err != nil { - return fmt.Errorf("failed to create code part: %v", err) - } - - if _, err := codePart.Write(buf); err != nil { - return fmt.Errorf("failed to write code part: %v", err) - } - - if err := writer.Close(); err != nil { - return fmt.Errorf("failed to close writer: %v", err) - } - - req, err := http.NewRequest("POST", config.DeamonURL+"/deploy", body) - req.Header.Set("Content-Type", writer.FormDataContentType()) - - if err != nil { - return fmt.Errorf("failed to create request: %v", err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to send request: %v", err) - } - defer resp.Body.Close() - - customWriter := &CustomStdout{ - spinner: spinnerWriter, - lock: sync.Mutex{}, - } - - scanner := bufio.NewScanner(resp.Body) - var event string - var data pkg.DeploymentEvent - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, "data: ") { - if err := json.Unmarshal([]byte(line[6:]), &data); err != nil { - return fmt.Errorf("failed to parse deployment event: %v", err) - } - - switch event { - case "complete": - loadingSpinner.Stop() - var deploymentResponse struct { - App pkg.App `json:"app"` - } - if err := json.Unmarshal([]byte(data.Message), &deploymentResponse); err != nil { - return fmt.Errorf("failed to parse deployment response: %v", err) - } - fmt.Printf("App %s deployed successfully!\n", deploymentResponse.App.Name) - return nil - case "cmd_output": - customWriter.Printf("... %s\n", data.Message) - case "error": - loadingSpinner.Stop() - return fmt.Errorf("deployment failed: %s", data.Message) - default: - customWriter.Printf("%s\n", data.Message) - } - event = "" - } else if strings.HasPrefix(line, "event: ") { - event = strings.TrimPrefix(line, "event: ") - } - } - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - - return fmt.Errorf("deploy failed: %s", responseBody) - } - - return nil -} - -func StopCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { - if seekingHelp { - fmt.Println(`Usage: - flux stop - - Flux will stop the deployment of the app in the current directory.`) - return nil - } - - projectName, err := getProjectName("stop", args) - if err != nil { - return err - } - - req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) - if err != nil { - return fmt.Errorf("failed to stop app: %v", err) - } - defer req.Body.Close() - - if req.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(req.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - - return fmt.Errorf("stop failed: %s", responseBody) - } - - fmt.Printf("Successfully stopped %s\n", projectName) - return nil -} - -func StartCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { - if seekingHelp { - fmt.Println(`Usage: - flux start - - Flux will start the deployment of the app in the current directory.`) - return nil - } - - projectName, err := getProjectName("start", args) - if err != nil { - return err - } - - req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) - if err != nil { - return fmt.Errorf("failed to start app: %v", err) - } - defer req.Body.Close() - - if req.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(req.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - - return fmt.Errorf("start failed: %s", responseBody) - } - - fmt.Printf("Successfully started %s\n", projectName) - - return nil -} - -func DeleteCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { - if seekingHelp { - fmt.Println(`Usage: - flux delete [project-name | all] - - Options: - project-name: The name of the project to delete - all: Delete all projects - - Flux will delete the deployment of the app in the current directory or the specified project.`) - return nil - } - - if len(args) == 1 { - if args[0] == "all" { - var response string - fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") - fmt.Scanln(&response) - - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - return nil - } - - response = "" - - fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") - fmt.Scanln(&response) - - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - return nil - } - - req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) - if err != nil { - return fmt.Errorf("failed to delete deployments: %v", err) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to delete deployments: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - - return fmt.Errorf("delete failed: %s", responseBody) - } - - fmt.Printf("Successfully deleted all projects\n") - return nil - } - } - - projectName, err := getProjectName("delete", args) - if err != nil { - return err - } - - // ask for confirmation - fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) - var response string - fmt.Scanln(&response) - - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - return nil - } - - req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments/"+projectName, nil) - if err != nil { - return fmt.Errorf("failed to delete app: %v", err) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to delete app: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - - return fmt.Errorf("delete failed: %s", responseBody) - } - - fmt.Printf("Successfully deleted %s\n", projectName) - - return nil -} - -func ListCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { - if seekingHelp { - fmt.Println(`Usage: - flux list - - Flux will list all the apps in the daemon.`) - return nil - } - - resp, err := http.Get(config.DeamonURL + "/apps") - if err != nil { - return fmt.Errorf("failed to get apps: %v", err) - } - - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - - return fmt.Errorf("list failed: %s", responseBody) - } - - var apps []pkg.App - if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { - return fmt.Errorf("failed to decode apps: %v", err) - } - - if len(apps) == 0 { - fmt.Println("No apps found") - return nil - } - - for _, app := range apps { - fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) - } - - return nil -} - -func InitCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { - if seekingHelp { - fmt.Println(`Usage: - flux init [project-name] - - Options: - project-name: The name of the project to initialize - - Flux will initialize a new project in the current directory or the specified project.`) - return nil - } - - var projectConfig pkg.ProjectConfig - - var response string - if len(args) > 1 { - response = args[0] - } else { - fmt.Println("What is the name of your project?") - fmt.Scanln(&response) - } - - projectConfig.Name = response - - fmt.Println("What URL should your project listen to?") - fmt.Scanln(&response) - if strings.HasPrefix(response, "http") { - response = strings.TrimPrefix(response, "http://") - response = strings.TrimPrefix(response, "https://") - } - - response = strings.Split(response, "/")[0] - - projectConfig.Url = response - - fmt.Println("What port does your project listen to?") - fmt.Scanln(&response) - port, err := strconv.ParseUint(response, 10, 16) - projectConfig.Port = uint16(port) - if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { - return fmt.Errorf("that doesnt look like a valid port, try a number between 1 and 65535") - } - - configBytes, err := json.MarshalIndent(projectConfig, "", " ") - if err != nil { - return fmt.Errorf("failed to parse project config: %v", err) - } - - os.WriteFile("flux.json", configBytes, 0644) - - fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) - - return nil -} - var helpStr = `Usage: flux @@ -656,16 +42,16 @@ Use "flux --help" for more information about a command.` var maxDistance = 3 type CommandHandler struct { - commands map[string]func(bool, Config, pkg.Info, *spinner.Spinner, *CustomSpinnerWriter, []string) error + commands map[string]func(bool, models.Config, pkg.Info, *spinner.Spinner, *models.CustomSpinnerWriter, []string) error } -func (h *CommandHandler) RegisterCmd(name string, handler func(bool, Config, pkg.Info, *spinner.Spinner, *CustomSpinnerWriter, []string) error) { +func (h *CommandHandler) RegisterCmd(name string, handler func(bool, models.Config, pkg.Info, *spinner.Spinner, *models.CustomSpinnerWriter, []string) error) { h.commands[name] = handler } -func runCommand(command string, args []string, config Config, info pkg.Info, cmdHandler CommandHandler, try int) error { +func runCommand(command string, args []string, config models.Config, info pkg.Info, cmdHandler CommandHandler, try int) error { if try == 2 { - return fmt.Errorf("Unknown command: %s", command) + return fmt.Errorf("unknown command: %s", command) } seekingHelp := false @@ -674,12 +60,9 @@ func runCommand(command string, args []string, config Config, info pkg.Info, cmd args = args[:len(args)-1] } - spinnerWriter := CustomSpinnerWriter{ - currentSpinnerMsg: "", - lock: sync.Mutex{}, - } + spinnerWriter := models.NewCustomSpinnerWriter() - loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(&spinnerWriter)) + loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(spinnerWriter)) defer func() { if loadingSpinner.Active() { loadingSpinner.Stop() @@ -699,7 +82,7 @@ func runCommand(command string, args []string, config Config, info pkg.Info, cmd handler, ok := cmdHandler.commands[command] if ok { - return handler(seekingHelp, config, info, loadingSpinner, &spinnerWriter, args) + return handler(seekingHelp, config, info, loadingSpinner, spinnerWriter, args) } // diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant @@ -758,7 +141,7 @@ func main() { } } - var config Config + var config models.Config configBytes, err := os.ReadFile(filepath.Join(configPath, "config.json")) if err != nil { fmt.Printf("Failed to read config file: %v\n", err) @@ -797,14 +180,14 @@ func main() { } cmdHandler := CommandHandler{ - commands: make(map[string]func(bool, Config, pkg.Info, *spinner.Spinner, *CustomSpinnerWriter, []string) error), + commands: make(map[string]func(bool, models.Config, pkg.Info, *spinner.Spinner, *models.CustomSpinnerWriter, []string) error), } - cmdHandler.RegisterCmd("deploy", DeployCommand) - cmdHandler.RegisterCmd("stop", StopCommand) - cmdHandler.RegisterCmd("start", StartCommand) - cmdHandler.RegisterCmd("delete", DeleteCommand) - cmdHandler.RegisterCmd("init", InitCommand) + cmdHandler.RegisterCmd("deploy", handlers.DeployCommand) + cmdHandler.RegisterCmd("stop", handlers.StopCommand) + cmdHandler.RegisterCmd("start", handlers.StartCommand) + cmdHandler.RegisterCmd("delete", handlers.DeleteCommand) + cmdHandler.RegisterCmd("init", handlers.InitCommand) err = runCommand(command, args, config, info, cmdHandler, 0) if err != nil { diff --git a/cmd/flux/models/config.go b/cmd/flux/models/config.go new file mode 100644 index 0000000..d5f7e3a --- /dev/null +++ b/cmd/flux/models/config.go @@ -0,0 +1,5 @@ +package models + +type Config struct { + DeamonURL string `json:"deamon_url"` +} diff --git a/cmd/flux/models/writers.go b/cmd/flux/models/writers.go new file mode 100644 index 0000000..f5d542c --- /dev/null +++ b/cmd/flux/models/writers.go @@ -0,0 +1,69 @@ +package models + +import ( + "fmt" + "os" + "sync" +) + +type CustomSpinnerWriter struct { + currentSpinnerMsg string + lock sync.Mutex +} + +func NewCustomSpinnerWriter() *CustomSpinnerWriter { + return &CustomSpinnerWriter{ + currentSpinnerMsg: "", + lock: sync.Mutex{}, + } +} + +func (w *CustomSpinnerWriter) Write(p []byte) (n int, err error) { + w.lock.Lock() + defer w.lock.Unlock() + + n, err = os.Stdout.Write(p) + if err != nil { + return n, err + } + + w.currentSpinnerMsg = string(p) + + return len(p), nil +} + +type CustomStdout struct { + spinner *CustomSpinnerWriter + lock sync.Mutex +} + +func NewCustomStdout(spinner *CustomSpinnerWriter) *CustomStdout { + return &CustomStdout{ + spinner: spinner, + lock: sync.Mutex{}, + } +} + +func (w *CustomStdout) Write(p []byte) (n int, err error) { + w.lock.Lock() + defer w.lock.Unlock() + + n, err = os.Stdout.Write([]byte(fmt.Sprintf("\033[2K\r%s", p))) + if err != nil { + return n, err + } + + nn, err := os.Stdout.Write([]byte(w.spinner.currentSpinnerMsg)) + if err != nil { + return n, err + } + + n = nn + n + + return n, nil +} + +func (w *CustomStdout) Printf(format string, a ...interface{}) (n int, err error) { + str := fmt.Sprintf(format, a...) + return w.Write([]byte(str)) +} diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 7c62577..cc6393b 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -1,15 +1,16 @@ package main import ( - "log" "net/http" _ "net/http/pprof" "github.com/juls0730/flux/server" + "go.uber.org/zap" ) func main() { fluxServer := server.NewServer() + defer fluxServer.Stop() http.HandleFunc("POST /deploy", fluxServer.DeployHandler) http.HandleFunc("DELETE /deployments", fluxServer.DeleteAllDeploymentsHandler) @@ -19,6 +20,9 @@ func main() { http.HandleFunc("GET /apps", fluxServer.ListAppsHandler) http.HandleFunc("GET /heartbeat", fluxServer.DaemonInfoHandler) - log.Printf("Fluxd started on http://127.0.0.1:5647\n") - log.Fatal(http.ListenAndServe(":5647", nil)) + fluxServer.Logger.Info("Fluxd started on http://127.0.0.1:5647") + err := http.ListenAndServe(":5647", nil) + if err != nil { + fluxServer.Logger.Fatalf("Failed to start server: %v", zap.Error(err)) + } } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d035c99 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,16 @@ +services: + daemon: + build: . + container_name: fluxd + ports: + - "5647:5647" + - "7465:7465" + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" + - "fluxd-data:/var/fluxd" + restart: unless-stopped + network_mode: host + +volumes: + fluxd-data: + driver: local diff --git a/go.mod b/go.mod index 9ae452f..0f1631f 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,11 @@ require ( github.com/mattn/go-sqlite3 v1.14.24 ) +require go.uber.org/multierr v1.10.0 // indirect + require ( github.com/Microsoft/go-winio v0.4.14 // indirect - github.com/agnivade/levenshtein v1.2.0 // indirect + github.com/agnivade/levenshtein v1.2.0 github.com/containerd/log v0.1.0 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.5.0 // indirect @@ -35,6 +37,7 @@ require ( go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect + go.uber.org/zap v1.27.0 golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.1.0 // indirect golang.org/x/time v0.8.0 // indirect diff --git a/go.sum b/go.sum index c983eaa..34582f2 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+q github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/agnivade/levenshtein v1.2.0 h1:U9L4IOT0Y3i0TIlUIDJ7rVUziKi/zPbrJGaFrtYH3SY= github.com/agnivade/levenshtein v1.2.0/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= +github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= +github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/briandowns/spinner v1.23.1 h1:t5fDPmScwUjozhDj4FA46p5acZWIPXYE30qW2Ptu650= github.com/briandowns/spinner v1.23.1/go.mod h1:LaZeM4wm2Ywy6vO571mvhQNRcWfRUnXOs0RcKV0wYKM= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -12,6 +14,8 @@ github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7cNTs5R6Hk4V2lcmLz2NsG2VnInyNo= +github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI= @@ -88,6 +92,12 @@ go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQD go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/pkg/responses.go b/pkg/responses.go index d9847f9..b3c7f15 100644 --- a/pkg/responses.go +++ b/pkg/responses.go @@ -17,5 +17,5 @@ type Info struct { } type DeploymentEvent struct { - Message string `json:"message"` + Message interface{} `json:"message"` } diff --git a/server/app.go b/server/app.go index 41735be..75a242c 100644 --- a/server/app.go +++ b/server/app.go @@ -3,17 +3,17 @@ package server import ( "context" "fmt" - "log" "os" "path/filepath" "sync" "github.com/juls0730/flux/pkg" + "go.uber.org/zap" ) type App struct { ID int64 `json:"id,omitempty"` - Deployment *Deployment `json:"deployment,omitempty"` + Deployment *Deployment `json:"-"` Name string `json:"name,omitempty"` DeploymentID int64 `json:"deployment_id,omitempty"` } @@ -22,12 +22,12 @@ func CreateApp(ctx context.Context, imageName string, projectPath string, projec app := &App{ Name: projectConfig.Name, } - log.Printf("Creating deployment %s...\n", app.Name) + logger.Debugw("Creating deployment", zap.String("name", app.Name)) deployment, err := CreateDeployment(projectConfig.Port, projectConfig.Url, Flux.db) app.Deployment = deployment if err != nil { - log.Printf("Failed to create deployment: %v", err) + logger.Errorw("Failed to create deployment", zap.Error(err)) return nil, err } @@ -60,7 +60,7 @@ func CreateApp(ctx context.Context, imageName string, projectPath string, projec } func (app *App) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string) error { - log.Printf("Upgrading deployment %s...\n", app.Name) + logger.Debugw("Upgrading deployment", zap.String("name", app.Name)) // if deploy is not started, start it deploymentStatus, err := app.Deployment.Status(ctx) @@ -84,15 +84,17 @@ func (app *App) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, im } func (app *App) Remove(ctx context.Context) error { + Flux.appManager.RemoveApp(app.Name) + err := app.Deployment.Remove(ctx) if err != nil { - log.Printf("Failed to remove deployment: %v\n", err) + logger.Errorw("Failed to remove deployment", zap.Error(err)) return err } _, err = Flux.db.Exec("DELETE FROM apps WHERE id = ?", app.ID) if err != nil { - log.Printf("Failed to delete app: %v\n", err) + logger.Errorw("Failed to delete app", zap.Error(err)) return err } @@ -129,6 +131,10 @@ func (am *AppManager) GetAllApps() []*App { return apps } +func (am *AppManager) RemoveApp(name string) { + am.Delete(name) +} + func (am *AppManager) AddApp(name string, app *App) { if app.Deployment.Containers == nil || app.Deployment.Head == nil || len(app.Deployment.Containers) == 0 { panic("nil containers") @@ -154,15 +160,15 @@ func (am *AppManager) DeleteApp(name string) error { } func (am *AppManager) Init() { - log.Printf("Initializing deployments...\n") + logger.Info("Initializing deployments") if Flux.db == nil { - log.Panicf("DB is nil") + logger.Panic("DB is nil") } rows, err := Flux.db.Query("SELECT id, name, deployment_id FROM apps") if err != nil { - log.Printf("Failed to query apps: %v\n", err) + logger.Warnw("Failed to query apps", zap.Error(err)) return } defer rows.Close() @@ -171,7 +177,7 @@ func (am *AppManager) Init() { for rows.Next() { var app App if err := rows.Scan(&app.ID, &app.Name, &app.DeploymentID); err != nil { - log.Printf("Failed to scan app: %v\n", err) + logger.Warnw("Failed to scan app", zap.Error(err)) return } apps = append(apps, app) @@ -185,7 +191,7 @@ func (am *AppManager) Init() { rows, err = Flux.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) if err != nil { - log.Printf("Failed to query containers: %v\n", err) + logger.Warnw("Failed to query containers", zap.Error(err)) return } defer rows.Close() @@ -199,7 +205,7 @@ func (am *AppManager) Init() { if container.Head { if headContainer != nil { - log.Fatalf("Several containers are marked as head") + logger.Fatal("Several containers are marked as head") } headContainer = &container @@ -207,7 +213,7 @@ func (am *AppManager) Init() { rows, err := Flux.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:]) if err != nil { - log.Printf("Failed to query volumes: %v\n", err) + logger.Warnw("Failed to query volumes", zap.Error(err)) return } defer rows.Close() @@ -222,7 +228,7 @@ func (am *AppManager) Init() { } if headContainer == nil { - log.Fatalf("head container is nil!") + logger.Fatal("head container is nil!") } deployment.Head = headContainer @@ -231,7 +237,7 @@ func (am *AppManager) Init() { status, err := deployment.Status(context.Background()) if err != nil { - log.Printf("Failed to get deployment status: %v\n", err) + logger.Warnw("Failed to get deployment status", zap.Error(err)) continue } diff --git a/server/container.go b/server/container.go index fa51756..6426761 100644 --- a/server/container.go +++ b/server/container.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "log" "net/http" "os" "path/filepath" @@ -16,6 +15,7 @@ import ( "github.com/docker/docker/api/types/volume" "github.com/joho/godotenv" "github.com/juls0730/flux/pkg" + "go.uber.org/zap" ) var ( @@ -49,7 +49,7 @@ func CreateDockerVolume(ctx context.Context) (vol *Volume, err error) { return nil, fmt.Errorf("failed to create volume: %v", err) } - log.Printf("Volume %s created at %s\n", dockerVolume.Name, dockerVolume.Mountpoint) + logger.Debugw("Volume created", zap.String("volume_id", dockerVolume.Name), zap.String("mountpoint", dockerVolume.Mountpoint)) vol = &Volume{ VolumeID: dockerVolume.Name, @@ -78,7 +78,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p } } - log.Printf("Creating container %s...\n", containerName) + logger.Debugw("Creating container", zap.String("container_id", containerName)) resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: projectConfig.Environment, @@ -115,7 +115,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p } func CreateContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig, head bool, deployment *Deployment) (c *Container, err error) { - log.Printf("Creating container with image %s\n", imageName) + logger.Debugw("Creating container with image", zap.String("image", imageName)) if projectConfig.EnvFile != "" { envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile)) @@ -145,7 +145,7 @@ func CreateContainer(ctx context.Context, imageName, projectPath string, project if volumeInsertStmt == nil { volumeInsertStmt, err = Flux.db.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id") if err != nil { - log.Printf("Failed to prepare statement: %v\n", err) + logger.Errorw("Failed to prepare statement", zap.Error(err)) return nil, err } } @@ -185,7 +185,7 @@ func CreateContainer(ctx context.Context, imageName, projectPath string, project func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (*Container, error) { // Create new container with new image - log.Printf("Upgrading container %s...\n", c.ContainerID[:12]) + logger.Debugw("Upgrading container", zap.ByteString("container_id", c.ContainerID[:12])) if c.Volumes == nil { return nil, fmt.Errorf("no volumes found for container %s", c.ContainerID[:12]) } @@ -208,7 +208,7 @@ func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, var containerIDString string err = containerInsertStmt.QueryRow(newContainer.ContainerID[:], c.Head, c.Deployment.ID).Scan(&newContainer.ID, &containerIDString, &newContainer.Head, &newContainer.DeploymentID) if err != nil { - log.Printf("Failed to insert container: %v\n", err) + logger.Errorw("Failed to insert container", zap.Error(err)) return nil, err } copy(newContainer.ContainerID[:], containerIDString) @@ -223,7 +223,7 @@ func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, vol = &newContainer.Volumes[0] volumeUpdateStmt.QueryRow(newContainer.ContainerID[:], vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) - log.Printf("Upgraded container") + logger.Debug("Upgraded container") return newContainer, nil } @@ -245,7 +245,7 @@ func (c *Container) Remove(ctx context.Context) error { tx, err := Flux.db.Begin() if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) + logger.Errorw("Failed to begin transaction", zap.Error(err)) return err } @@ -269,7 +269,7 @@ func (c *Container) Remove(ctx context.Context) error { } if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) + logger.Errorw("Failed to commit transaction", zap.Error(err)) return err } @@ -362,7 +362,7 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er } func RemoveVolume(ctx context.Context, volumeID string) error { - log.Printf("Removed volume %s\n", volumeID) + logger.Debugw("Removed volume", zap.String("volume_id", volumeID)) if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { return fmt.Errorf("failed to remove volume (%s): %v", volumeID, err) diff --git a/server/deploy.go b/server/deploy.go index 4075c1e..0cb03f0 100644 --- a/server/deploy.go +++ b/server/deploy.go @@ -7,13 +7,13 @@ import ( "encoding/json" "fmt" "io" - "log" "mime/multipart" "net/http" "os/exec" "sync" "github.com/juls0730/flux/pkg" + "go.uber.org/zap" ) var ( @@ -74,9 +74,9 @@ func (dt *DeploymentLock) CompleteDeployment(appName string) { var deploymentLock = NewDeploymentLock() type DeploymentEvent struct { - Stage string `json:"stage"` - Message string `json:"message"` - StatusCode int `json:"status,omitempty"` + Stage string `json:"stage"` + Message interface{} `json:"message"` + StatusCode int `json:"status,omitempty"` } func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { @@ -90,7 +90,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { err := r.ParseMultipartForm(10 << 30) // 10 GiB if err != nil { - log.Printf("Failed to parse multipart form: %v\n", err) + logger.Errorw("Failed to parse multipart form", zap.Error(err)) http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -105,7 +105,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { var projectConfig pkg.ProjectConfig if err := json.NewDecoder(deployRequest.Config).Decode(&projectConfig); err != nil { - log.Printf("Failed to decode config: %v\n", err) + logger.Errorw("Failed to decode config", zap.Error(err)) http.Error(w, "Invalid flux.json", http.StatusBadRequest) return @@ -118,7 +118,10 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - defer deploymentLock.CompleteDeployment(projectConfig.Name) + go func() { + <-ctx.Done() + deploymentLock.CompleteDeployment(projectConfig.Name) + }() flusher, ok := w.(http.Flusher) if !ok { @@ -147,9 +150,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - ev := struct { - Message string `json:"message"` - }{ + ev := pkg.DeploymentEvent{ Message: event.Message, } @@ -207,11 +208,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - log.Printf("Deploying project %s to %s\n", projectConfig.Name, projectConfig.Url) + logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url)) projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig) if err != nil { - log.Printf("Failed to upload code: %v\n", err) + logger.Infow("Failed to upload code", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to upload code: %s", err), @@ -221,11 +222,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } // Streams the each line of the pipe into the eventChannel, this closes the pipe when the function exits + var pipeGroup sync.WaitGroup + streamPipe := func(pipe io.ReadCloser) { - // we need a wait group because otherwise the function *could* exit before the pipe is closed - // and wreck havoc on every future request - wg.Add(1) - defer wg.Done() + pipeGroup.Add(1) + defer pipeGroup.Done() scanner := bufio.NewScanner(pipe) for scanner.Scan() { @@ -241,11 +242,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { Stage: "error", Message: fmt.Sprintf("Failed to read pipe: %s", err), } - log.Printf("Error reading pipe: %s\n", err) + logger.Errorw("Error reading pipe", zap.Error(err)) } } - log.Printf("Preparing project %s...\n", projectConfig.Name) + logger.Debugw("Preparing project", zap.String("name", projectConfig.Name)) eventChannel <- DeploymentEvent{ Stage: "preparing", Message: "Preparing project", @@ -255,7 +256,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { prepareCmd.Dir = projectPath cmdOut, err := prepareCmd.StdoutPipe() if err != nil { - log.Printf("Failed to get stdout pipe: %v\n", err) + logger.Errorw("Failed to get stdout pipe", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), @@ -266,7 +267,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } cmdErr, err := prepareCmd.StderrPipe() if err != nil { - log.Printf("Failed to get stderr pipe: %v\n", err) + logger.Errorw("Failed to get stderr pipe", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), @@ -275,12 +276,26 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } + err = prepareCmd.Start() + if err != nil { + logger.Errorw("Failed to prepare project", zap.Error(err)) + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + StatusCode: http.StatusInternalServerError, + } + + return + } + go streamPipe(cmdOut) go streamPipe(cmdErr) - err = prepareCmd.Run() + pipeGroup.Wait() + + err = prepareCmd.Wait() if err != nil { - log.Printf("Failed to prepare project: %s\n", err) + logger.Errorw("Failed to prepare project", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to prepare project: %s", err), @@ -295,13 +310,13 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { Message: "Building project image", } - log.Printf("Building image for project %s...\n", projectConfig.Name) + logger.Debugw("Building image for project", zap.String("name", projectConfig.Name)) imageName := fmt.Sprintf("flux_%s-image", projectConfig.Name) buildCmd := exec.Command("pack", "build", imageName, "--builder", s.config.Builder) buildCmd.Dir = projectPath cmdOut, err = buildCmd.StdoutPipe() if err != nil { - log.Printf("Failed to get stdout pipe: %v\n", err) + logger.Errorw("Failed to get stdout pipe", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), @@ -312,7 +327,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } cmdErr, err = buildCmd.StderrPipe() if err != nil { - log.Printf("Failed to get stderr pipe: %v\n", err) + logger.Errorw("Failed to get stderr pipe", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), @@ -322,12 +337,26 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } + err = buildCmd.Start() + if err != nil { + logger.Errorw("Failed to build image", zap.Error(err)) + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + StatusCode: http.StatusInternalServerError, + } + + return + } + go streamPipe(cmdOut) go streamPipe(cmdErr) - err = buildCmd.Run() + pipeGroup.Wait() + + err = buildCmd.Wait() if err != nil { - log.Printf("Failed to build image: %s\n", err) + logger.Errorw("Failed to build image", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to build image: %s", err), @@ -347,7 +376,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { if app == nil { app, err = CreateApp(ctx, imageName, projectPath, projectConfig) if err != nil { - log.Printf("Failed to create app: %v", err) + logger.Errorw("Failed to create app", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to create app: %s", err), @@ -359,7 +388,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } else { err = app.Upgrade(ctx, projectConfig, imageName, projectPath) if err != nil { - log.Printf("Failed to upgrade app: %v", err) + logger.Errorw("Failed to upgrade app", zap.Error(err)) eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to upgrade app: %s", err), @@ -370,26 +399,12 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } } - responseJSON, err := json.Marshal(DeployResponse{ - App: *app, - }) - if err != nil { - log.Printf("Failed to marshal deploy response: %v\n", err) - eventChannel <- DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to marshal deploy response: %s", err), - StatusCode: http.StatusInternalServerError, - } - - return - } - eventChannel <- DeploymentEvent{ Stage: "complete", - Message: string(responseJSON), + Message: app, } - log.Printf("App %s deployed successfully!\n", app.Name) + logger.Infow("App deployed successfully", zap.String("name", app.Name)) } func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) { @@ -457,12 +472,12 @@ func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) { func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") - log.Printf("Deleting deployment %s...\n", name) + logger.Debugw("Deleting deployment", zap.String("name", name)) err := Flux.appManager.DeleteApp(name) if err != nil { - log.Printf("Failed to delete app: %v\n", err) + logger.Errorw("Failed to delete app", zap.Error(err)) http.Error(w, err.Error(), http.StatusNotFound) return } @@ -472,9 +487,9 @@ func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) { for _, app := range Flux.appManager.GetAllApps() { - err := app.Remove(r.Context()) + err := Flux.appManager.DeleteApp(app.Name) if err != nil { - log.Printf("Failed to remove app: %v\n", err) + logger.Errorw("Failed to remove app", zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -485,12 +500,12 @@ func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http. func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) { // for each app, get the deployment status - var apps []*pkg.App + var apps []pkg.App for _, app := range Flux.appManager.GetAllApps() { var extApp pkg.App deploymentStatus, err := app.Deployment.Status(r.Context()) if err != nil { - log.Printf("Failed to get deployment status: %v\n", err) + logger.Errorw("Failed to get deployment status", zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -499,7 +514,7 @@ func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) { extApp.Name = app.Name extApp.DeploymentID = app.DeploymentID extApp.DeploymentStatus = deploymentStatus - apps = append(apps, &extApp) + apps = append(apps, extApp) } w.Header().Set("Content-Type", "application/json") diff --git a/server/deployment.go b/server/deployment.go index 086571f..7da4468 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -4,9 +4,9 @@ import ( "context" "database/sql" "fmt" - "log" "github.com/juls0730/flux/pkg" + "go.uber.org/zap" ) var ( @@ -30,14 +30,14 @@ func CreateDeployment(port uint16, appUrl string, db *sql.DB) (*Deployment, erro if deploymentInsertStmt == nil { deploymentInsertStmt, err = db.Prepare("INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port") if err != nil { - log.Printf("Failed to prepare statement: %v\n", err) + logger.Errorw("Failed to prepare statement", zap.Error(err)) return nil, err } } err = deploymentInsertStmt.QueryRow(appUrl, port).Scan(&deployment.ID, &deployment.URL, &deployment.Port) if err != nil { - log.Printf("Failed to insert deployment: %v\n", err) + logger.Errorw("Failed to insert deployment", zap.Error(err)) return nil, err } @@ -52,7 +52,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro container, err := deployment.Head.Upgrade(ctx, imageName, projectPath, projectConfig) if err != nil { - log.Printf("Failed to upgrade container: %v\n", err) + logger.Errorw("Failed to upgrade container", zap.Error(err)) return err } @@ -60,20 +60,20 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro deployment.Head = container deployment.Containers = append(deployment.Containers, container) - log.Printf("Starting container %s...\n", container.ContainerID[:12]) + logger.Debugw("Starting container", zap.ByteString("container_id", container.ContainerID[:12])) err = container.Start(ctx) if err != nil { - log.Printf("Failed to start container: %v\n", err) + logger.Errorw("Failed to start container", zap.Error(err)) return err } if err := container.Wait(ctx, projectConfig.Port); err != nil { - log.Printf("Failed to wait for container: %v\n", err) + logger.Errorw("Failed to wait for container", zap.Error(err)) return err } if _, err := Flux.db.Exec("UPDATE deployments SET url = ?, port = ? WHERE id = ?", projectConfig.Url, projectConfig.Port, deployment.ID); err != nil { - log.Printf("Failed to update deployment: %v\n", err) + logger.Errorw("Failed to update deployment", zap.Error(err)) return err } @@ -81,13 +81,13 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro oldProxy := deployment.Proxy deployment.Proxy, err = deployment.NewDeploymentProxy() if err != nil { - log.Printf("Failed to create deployment proxy: %v\n", err) + logger.Errorw("Failed to create deployment proxy", zap.Error(err)) return err } tx, err := Flux.db.Begin() if err != nil { - log.Printf("Failed to begin transaction: %v\n", err) + logger.Errorw("Failed to begin transaction", zap.Error(err)) return err } @@ -95,13 +95,13 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro var oldContainers []*Container for _, container := range deployment.Containers { if existingContainers[string(container.ContainerID[:])] { - log.Printf("Deleting container from db: %s\n", container.ContainerID[:12]) + logger.Debugw("Deleting container from db", zap.ByteString("container_id", container.ContainerID[:12])) _, err = tx.Exec("DELETE FROM containers WHERE id = ?", container.ID) oldContainers = append(oldContainers, container) if err != nil { - log.Printf("Failed to delete container: %v\n", err) + logger.Errorw("Failed to delete container", zap.Error(err)) tx.Rollback() return err } @@ -113,7 +113,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro } if err := tx.Commit(); err != nil { - log.Printf("Failed to commit transaction: %v\n", err) + logger.Errorw("Failed to commit transaction", zap.Error(err)) return err } @@ -123,7 +123,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro for _, container := range oldContainers { err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:])) if err != nil { - log.Printf("Failed to remove container: %v\n", err) + logger.Errorw("Failed to remove container", zap.Error(err)) } } } @@ -136,14 +136,16 @@ func (d *Deployment) Remove(ctx context.Context) error { for _, container := range d.Containers { err := container.Remove(ctx) if err != nil { - log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err) + logger.Errorf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err) return err } } + Flux.proxy.RemoveDeployment(d) + _, err := Flux.db.Exec("DELETE FROM deployments WHERE id = ?", d.ID) if err != nil { - log.Printf("Failed to delete deployment: %v\n", err) + logger.Errorw("Failed to delete deployment", zap.Error(err)) return err } @@ -154,7 +156,7 @@ func (d *Deployment) Start(ctx context.Context) error { for _, container := range d.Containers { err := container.Start(ctx) if err != nil { - log.Printf("Failed to start container: %v\n", err) + logger.Errorf("Failed to start container (%s): %v\n", container.ContainerID[:12], err) return err } } @@ -171,7 +173,7 @@ func (d *Deployment) Stop(ctx context.Context) error { for _, container := range d.Containers { err := container.Stop(ctx) if err != nil { - log.Printf("Failed to start container: %v\n", err) + logger.Errorf("Failed to start container (%s): %v\n", container.ContainerID[:12], err) return err } } @@ -195,7 +197,7 @@ func (d *Deployment) Status(ctx context.Context) (string, error) { for _, container := range d.Containers { containerStatus, err := container.Status(ctx) if err != nil { - log.Printf("Failed to get container status: %v\n", err) + logger.Errorw("Failed to get container status", zap.Error(err)) return "", err } diff --git a/server/proxy.go b/server/proxy.go index ed5c75c..f66ed09 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -3,13 +3,14 @@ package server import ( "context" "fmt" - "log" "net/http" "net/http/httputil" "net/url" "sync" "sync/atomic" "time" + + "go.uber.org/zap" ) type Proxy struct { @@ -21,11 +22,7 @@ func (p *Proxy) RemoveDeployment(deployment *Deployment) { } func (p *Proxy) AddDeployment(deployment *Deployment) { - if deployment.Containers == nil { - panic("containers is nil") - } - - log.Printf("Adding deployment %s\n", deployment.URL) + logger.Debugw("Adding deployment", zap.String("url", deployment.URL)) p.deployments.Store(deployment.URL, deployment) } @@ -114,7 +111,7 @@ func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) { for _, container := range oldContainers { err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:])) if err != nil { - log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err) + logger.Errorw("Failed to remove container", zap.Error(err)) } } } diff --git a/server/server.go b/server/server.go index c12c2e8..641c733 100644 --- a/server/server.go +++ b/server/server.go @@ -8,10 +8,10 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "os" "path/filepath" + "strconv" _ "embed" @@ -19,6 +19,8 @@ import ( "github.com/docker/docker/client" "github.com/juls0730/flux/pkg" _ "github.com/mattn/go-sqlite3" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) var ( @@ -31,7 +33,8 @@ var ( Level: 0, }, } - Flux *FluxServer + Flux *FluxServer + logger *zap.SugaredLogger ) type FluxServerConfig struct { @@ -46,92 +49,125 @@ type FluxServer struct { rootDir string appManager *AppManager dockerClient *client.Client + Logger *zap.SugaredLogger +} + +func NewFluxServer() *FluxServer { + dockerClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + logger.Fatalw("Failed to create docker client", zap.Error(err)) + } + + rootDir := os.Getenv("FLUXD_ROOT_DIR") + if rootDir == "" { + rootDir = "/var/fluxd" + } + + db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db")) + if err != nil { + logger.Fatalw("Failed to open database", zap.Error(err)) + } + + _, err = db.Exec(string(schemaBytes)) + if err != nil { + logger.Fatalw("Failed to create database schema", zap.Error(err)) + } + + return &FluxServer{ + db: db, + proxy: &Proxy{}, + appManager: new(AppManager), + rootDir: rootDir, + dockerClient: dockerClient, + } +} + +func (s *FluxServer) Stop() { + s.Logger.Sync() } func NewServer() *FluxServer { - Flux = new(FluxServer) + verbosity, err := strconv.Atoi(os.Getenv("FLUXD_VERBOSITY")) + if err != nil { + verbosity = 0 + } + + config := zap.NewProductionConfig() + + if os.Getenv("DEBUG") == "true" { + config = zap.NewDevelopmentConfig() + verbosity = -1 + } + + config.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity)) + + lameLogger, err := config.Build() + logger = lameLogger.Sugar() + + if err != nil { + logger.Fatalw("Failed to create logger", zap.Error(err)) + } + + Flux = NewFluxServer() + Flux.Logger = logger var serverConfig FluxServerConfig - Flux.rootDir = os.Getenv("FLUXD_ROOT_DIR") - if Flux.rootDir == "" { - Flux.rootDir = "/var/fluxd" - } - // parse config, if it doesnt exist, create it and use the default config configPath := filepath.Join(Flux.rootDir, "config.json") if _, err := os.Stat(configPath); err != nil { if err := os.MkdirAll(Flux.rootDir, 0755); err != nil { - log.Fatalf("Failed to create fluxd directory: %v\n", err) + logger.Fatalw("Failed to create fluxd directory", zap.Error(err)) } configBytes, err := json.Marshal(DefaultConfig) if err != nil { - log.Fatalf("Failed to marshal default config: %v\n", err) + logger.Fatalw("Failed to marshal default config", zap.Error(err)) } - log.Printf("Config file not found, creating default config file at %s\n", configPath) + logger.Debugw("Config file not found creating default config file at", zap.String("path", configPath)) if err := os.WriteFile(configPath, configBytes, 0644); err != nil { - log.Fatalf("Failed to write config file: %v\n", err) + logger.Fatalw("Failed to write config file", zap.Error(err)) } } configFile, err := os.ReadFile(configPath) if err != nil { - log.Fatalf("Failed to read config file: %v\n", err) + logger.Fatalw("Failed to read config file", zap.Error(err)) } if err := json.Unmarshal(configFile, &serverConfig); err != nil { - log.Fatalf("Failed to parse config file: %v\n", err) + logger.Fatalw("Failed to parse config file", zap.Error(err)) } Flux.config = serverConfig - Flux.dockerClient, err = client.NewClientWithOpts(client.FromEnv) - if err != nil { - log.Fatalf("Failed to create docker client: %v\n", err) - } - - log.Printf("Pulling builder image %s, this may take a while...\n", serverConfig.Builder) - + logger.Infof("Pulling builder image %s this may take a while...", serverConfig.Builder) events, err := Flux.dockerClient.ImagePull(context.Background(), fmt.Sprintf("%s:latest", serverConfig.Builder), image.PullOptions{}) if err != nil { - log.Fatalf("Failed to pull builder image: %v\n", err) + logger.Fatalw("Failed to pull builder image", zap.Error(err)) } - // wait for the iamge to be pulled + // blocking wait for the iamge to be pulled io.Copy(io.Discard, events) - log.Printf("Successfully pulled builder image %s\n", serverConfig.Builder) + logger.Infow("Successfully pulled builder image", zap.String("image", serverConfig.Builder)) if err := os.MkdirAll(filepath.Join(Flux.rootDir, "apps"), 0755); err != nil { - log.Fatalf("Failed to create apps directory: %v\n", err) + logger.Fatalw("Failed to create apps directory", zap.Error(err)) } - Flux.db, err = sql.Open("sqlite3", filepath.Join(Flux.rootDir, "fluxd.db")) - if err != nil { - log.Fatalf("Failed to open database: %v\n", err) - } - - _, err = Flux.db.Exec(string(schemaBytes)) - if err != nil { - log.Fatalf("Failed to create database schema: %v\n", err) - } - - Flux.appManager = new(AppManager) Flux.appManager.Init() - Flux.proxy = &Proxy{} - port := os.Getenv("FLUXD_PROXY_PORT") if port == "" { port = "7465" } go func() { - log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port) + logger.Infof("Proxy server starting on http://127.0.0.1:%s", port) if err := http.ListenAndServe(fmt.Sprintf(":%s", port), Flux.proxy); err != nil && err != http.ErrServerClosed { - log.Fatalf("Proxy server error: %v", err) + logger.Fatalw("Proxy server error", zap.Error(err)) } }() @@ -142,7 +178,7 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf var err error projectPath := filepath.Join(s.rootDir, "apps", projectConfig.Name) if err = os.MkdirAll(projectPath, 0755); err != nil { - log.Printf("Failed to create project directory: %v\n", err) + logger.Errorw("Failed to create project directory", zap.Error(err)) return "", err } @@ -156,7 +192,7 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf if s.config.Compression.Enabled { gzReader, err = gzip.NewReader(code) if err != nil { - log.Printf("Failed to create gzip reader: %v\n", err) + logger.Infow("Failed to create gzip reader", zap.Error(err)) return "", err } } @@ -168,14 +204,14 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf tarReader = tar.NewReader(code) } - log.Printf("Extracting files for %s...\n", projectPath) + logger.Infow("Extracting files for project", zap.String("project", projectPath)) for { header, err := tarReader.Next() if err == io.EOF { break } if err != nil { - log.Printf("Failed to read tar header: %v\n", err) + logger.Debugw("Failed to read tar header", zap.Error(err)) return "", err } @@ -186,24 +222,24 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf switch header.Typeflag { case tar.TypeDir: if err = os.MkdirAll(path, 0755); err != nil { - log.Printf("Failed to extract directory: %v\n", err) + logger.Debugw("Failed to extract directory", zap.Error(err)) return "", err } case tar.TypeReg: if err = os.MkdirAll(filepath.Dir(path), 0755); err != nil { - log.Printf("Failed to extract directory: %v\n", err) + logger.Debugw("Failed to extract directory", zap.Error(err)) return "", err } outFile, err := os.Create(path) if err != nil { - log.Printf("Failed to extract file: %v\n", err) + logger.Debugw("Failed to extract file", zap.Error(err)) return "", err } defer outFile.Close() if _, err = io.Copy(outFile, tarReader); err != nil { - log.Printf("Failed to copy file during extraction: %v\n", err) + logger.Debugw("Failed to copy file during extraction", zap.Error(err)) return "", err } }