From c51eca5dab235e970216e6a5f871fd046d021ed1 Mon Sep 17 00:00:00 2001 From: Zoe <62722391+juls0730@users.noreply.github.com> Date: Thu, 8 May 2025 09:53:41 -0500 Subject: [PATCH] Expand logging, and daemonless command support. This adds more logging in certain places, and adds logging to the CLI. It also allows for certain commands in the CLI to be used without a daemon connection, namely `init`, which previously required the daemon to be connected, but now does not since it doesnt need it. --- README.md | 2 +- cmd/cli/commands/command.go | 4 +- cmd/cli/commands/delete.go | 6 +- cmd/cli/commands/list.go | 2 +- cmd/cli/commands/start.go | 4 +- cmd/cli/commands/stop.go | 4 +- cmd/cli/main.go | 152 +++++++----- cmd/daemon/main.go | 1 + internal/docker/container.go | 6 +- internal/handlers/app.go | 374 ------------------------------ internal/handlers/deploy.go | 385 +++++++++++++++++++++++++++++++ internal/models/deployment.go | 3 +- internal/services/proxy/proxy.go | 31 +-- internal/util/cli/project.go | 7 +- internal/util/cli/request.go | 25 +- pkg/version.go | 2 +- 16 files changed, 538 insertions(+), 470 deletions(-) create mode 100644 internal/handlers/deploy.go diff --git a/README.md b/README.md index 0c3d85b..188a55d 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Flux is a lightweight self-hosted micro-PaaS for hosting Golang web apps with ea ## Dependencies - [Go](https://golang.org/dl/) -- [ZQDGR](https://github.com/juls0730/zqdgr) +- [ZQDGR](https://github.com/juls0730/zqdgr) (development only) - [Buildpacks](https://buildpacks.io/) (daemon only) - [Docker](https://docs.docker.com/get-docker/) (daemon only) diff --git a/cmd/cli/commands/command.go b/cmd/cli/commands/command.go index a91fef1..6e6ab52 100644 --- a/cmd/cli/commands/command.go +++ b/cmd/cli/commands/command.go @@ -3,11 +3,13 @@ package commands import ( "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" + "go.uber.org/zap" ) type CommandCtx struct { Config pkg.CLIConfig - Info API.Info + Logger *zap.SugaredLogger + Info *API.Info Interactive bool } diff --git a/cmd/cli/commands/delete.go b/cmd/cli/commands/delete.go index fff9c87..4145a0f 100644 --- a/cmd/cli/commands/delete.go +++ b/cmd/cli/commands/delete.go @@ -49,7 +49,7 @@ func deleteAll(ctx CommandCtx, noConfirm *bool) error { } } - util.DeleteRequest(ctx.Config.DaemonURL + "/deployments") + util.DeleteRequest(ctx.Config.DaemonURL+"/deployments", ctx.Logger) fmt.Printf("Successfully deleted all projects\n") return nil @@ -80,7 +80,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error { return deleteAll(ctx, noConfirm) } - project, err := util.GetProject("delete", args, ctx.Config) + project, err := util.GetProject("delete", args, ctx.Config, ctx.Logger) if err != nil { return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete -help for more information", err) } @@ -101,7 +101,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error { } } - err = util.DeleteRequest(ctx.Config.DaemonURL + "/app/" + project.Id) + err = util.DeleteRequest(ctx.Config.DaemonURL+"/app/"+project.Id, ctx.Logger) if err != nil { return fmt.Errorf("failed to delete project: %v", err) } diff --git a/cmd/cli/commands/list.go b/cmd/cli/commands/list.go index f1007af..c0ac174 100644 --- a/cmd/cli/commands/list.go +++ b/cmd/cli/commands/list.go @@ -8,7 +8,7 @@ import ( ) func ListCommand(ctx CommandCtx, args []string) error { - apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL + "/apps") + apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL+"/apps", ctx.Logger) if err != nil { return fmt.Errorf("failed to get apps: %v", err) } diff --git a/cmd/cli/commands/start.go b/cmd/cli/commands/start.go index ff1c14c..c20e1ab 100644 --- a/cmd/cli/commands/start.go +++ b/cmd/cli/commands/start.go @@ -7,14 +7,14 @@ import ( ) func StartCommand(ctx CommandCtx, args []string) error { - projectName, err := util.GetProject("start", args, ctx.Config) + projectName, err := util.GetProject("start", args, ctx.Config, ctx.Logger) if err != nil { return err } // Put request to start the project, since the start endpoint is idempotent. // If the project is already running, this will return a 304 Not Modified - err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil) + err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil, ctx.Logger) if err != nil { return fmt.Errorf("failed to start %s: %v", projectName.Name, err) } diff --git a/cmd/cli/commands/stop.go b/cmd/cli/commands/stop.go index 6649b19..723b7d9 100644 --- a/cmd/cli/commands/stop.go +++ b/cmd/cli/commands/stop.go @@ -7,12 +7,12 @@ import ( ) func StopCommand(ctx CommandCtx, args []string) error { - projectName, err := util.GetProject("stop", args, ctx.Config) + projectName, err := util.GetProject("stop", args, ctx.Config, ctx.Logger) if err != nil { return err } - err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil) + err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil, ctx.Logger) if err != nil { return fmt.Errorf("failed to stop %s: %v", projectName.Name, err) } diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 7d73c78..9c09c1c 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "github.com/agnivade/levenshtein" @@ -15,6 +16,8 @@ import ( "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" "github.com/mattn/go-isatty" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func isInteractive() bool { @@ -43,8 +46,9 @@ Use "flux -help" for more information about a command. var maxDistance = 3 type Command struct { - Help string - HandlerFunc commands.CommandFunc + Help string + DaemonConnected bool + HandlerFunc commands.CommandFunc } type CommandHandler struct { @@ -59,10 +63,11 @@ func NewCommandHandler() CommandHandler { } } -func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, help string) { +func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, daemonConnected bool, help string) { coomand := Command{ - Help: help, - HandlerFunc: handler, + Help: help, + DaemonConnected: daemonConnected, + HandlerFunc: handler, } h.commands[name] = coomand @@ -108,51 +113,35 @@ func (h *CommandHandler) GetHelpCmd(commands.CommandCtx, []string) error { return nil } -func runCommand(command string, args []string, config pkg.CLIConfig, info API.Info, cmdHandler CommandHandler) error { - commandCtx := commands.CommandCtx{ - Config: config, - Info: info, - Interactive: isInteractive(), - } - +func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler CommandHandler, logger *zap.SugaredLogger) error { commandStruct, ok := cmdHandler.commands[command] - if ok { - return commandStruct.HandlerFunc(commandCtx, args) + if !ok { + panic("runCommand was passed an invalid command name") } - // diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant - var closestMatch struct { - name string - score int - } - for cmdName := range cmdHandler.commands { - distance := levenshtein.ComputeDistance(cmdName, command) + var info *API.Info = nil - if distance <= maxDistance { - if closestMatch.name == "" || distance < closestMatch.score { - closestMatch.name = cmdName - closestMatch.score = distance - } + if commandStruct.DaemonConnected { + info, err := util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger) + if err != nil { + fmt.Printf("Failed to connect to daemon\n") + os.Exit(1) + } + + if info.Version != version { + fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version) + os.Exit(1) } } - if closestMatch.name == "" { - return fmt.Errorf("unknown command: %s", command) + commandCtx := commands.CommandCtx{ + Config: config, + Info: info, + Logger: logger, + Interactive: isInteractive(), } - var response string - // new line ommitted because it will be produced when the user presses enter to submit their response - fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name) - fmt.Scanln(&response) - - if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" { - command = closestMatch.name - } else { - return nil - } - - // re-run command after accepting the suggestion - return runCommand(command, args, config, info, cmdHandler) + return commandStruct.HandlerFunc(commandCtx, args) } func main() { @@ -160,21 +149,44 @@ func main() { fmt.Printf("Flux is being run non-interactively\n") } + zapConfig := zap.NewDevelopmentConfig() + verbosity := 0 + + debug, err := strconv.ParseBool(os.Getenv("DEBUG")) + if err != nil { + debug = false + } + + if debug { + zapConfig = zap.NewDevelopmentConfig() + verbosity = -1 + } + + zapConfig.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity)) + + lameLogger, err := zapConfig.Build() + if err != nil { + fmt.Printf("Failed to create logger: %v\n", err) + os.Exit(1) + } + + logger := lameLogger.Sugar() + cmdHandler := NewCommandHandler() - cmdHandler.RegisterCmd("init", commands.InitCommand, "Initialize a new project") - cmdHandler.RegisterCmd("deploy", commands.DeployCommand, "Deploy a new version of the app") - cmdHandler.RegisterCmd("start", commands.StartCommand, "Start the app") - cmdHandler.RegisterCmd("stop", commands.StopCommand, "Stop the app") - cmdHandler.RegisterCmd("list", commands.ListCommand, "List all the apps") - cmdHandler.RegisterCmd("delete", commands.DeleteCommand, "Delete the app") + cmdHandler.RegisterCmd("init", commands.InitCommand, false, "Initialize a new project") + cmdHandler.RegisterCmd("deploy", commands.DeployCommand, true, "Deploy a new version of the app") + cmdHandler.RegisterCmd("start", commands.StartCommand, true, "Start the app") + cmdHandler.RegisterCmd("stop", commands.StopCommand, true, "Stop the app") + cmdHandler.RegisterCmd("list", commands.ListCommand, true, "List all the apps") + cmdHandler.RegisterCmd("delete", commands.DeleteCommand, true, "Delete the app") fs := flag.NewFlagSet("flux", flag.ExitOnError) fs.Usage = func() { cmdHandler.GetHelp() } - err := fs.Parse(os.Args[1:]) + err = fs.Parse(os.Args[1:]) if err != nil { fmt.Println(err) os.Exit(1) @@ -214,18 +226,42 @@ func main() { os.Exit(1) } - info, err := util.GetRequest[API.Info](config.DaemonURL + "/heartbeat") - if err != nil { - fmt.Printf("Failed to connect to daemon\n") - os.Exit(1) + command := os.Args[1] + + if _, ok := cmdHandler.commands[command]; !ok { + var closestMatch struct { + name string + score int + } + for cmdName := range cmdHandler.commands { + distance := levenshtein.ComputeDistance(cmdName, command) + + if distance <= maxDistance { + if closestMatch.name == "" || distance < closestMatch.score { + closestMatch.name = cmdName + closestMatch.score = distance + } + } + } + + if closestMatch.name == "" { + fmt.Printf("unknown command: %s", command) + os.Exit(1) + } + + var response string + // new line ommitted because it will be produced when the user presses enter to submit their response + fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name) + fmt.Scanln(&response) + + if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" { + command = closestMatch.name + } else { + os.Exit(0) + } } - if info.Version != version { - fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version) - os.Exit(1) - } - - err = runCommand(os.Args[1], fs.Args()[1:], config, *info, cmdHandler) + err = runCommand(command, fs.Args()[1:], config, cmdHandler, logger) if err != nil { fmt.Printf("Error: %v\n", err) os.Exit(1) diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index f76943c..be0e876 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -19,6 +19,7 @@ func main() { http.HandleFunc("GET /app/by-name/{name}", fluxServer.GetAppByName) http.HandleFunc("GET /app/by-id/{id}", fluxServer.GetAppById) + // a PUT request is the proper type to use since these endpoints are idempotent http.HandleFunc("PUT /app/{id}/start", fluxServer.StartApp) http.HandleFunc("PUT /app/{id}/stop", fluxServer.StopApp) diff --git a/internal/docker/container.go b/internal/docker/container.go index a6024b4..aeaf73a 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -87,9 +87,11 @@ func (d *DockerClient) StartContainer(ctx context.Context, containerID DockerID) return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{}) } -// blocks until the container returns a 200 status code +const CONTAINER_START_TIMEOUT = 30 * time.Second + +// blocks until the container returns a 200 status code for a max of CONTAINER_START_TIMEOUT (30 seconds) func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error { - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctx, CONTAINER_START_TIMEOUT) defer cancel() for { diff --git a/internal/handlers/app.go b/internal/handlers/app.go index 5dccf1d..547c541 100644 --- a/internal/handlers/app.go +++ b/internal/handlers/app.go @@ -1,389 +1,15 @@ package handlers import ( - "bufio" "encoding/json" - "fmt" - "io" "net/http" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "github.com/docker/docker/pkg/namesgenerator" "github.com/google/uuid" - "github.com/joho/godotenv" proxyManagerService "github.com/juls0730/flux/internal/services/proxy" - "github.com/juls0730/flux/internal/util" - "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" "go.uber.org/zap" ) -var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]() - -func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "test/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - - err := r.ParseMultipartForm(10 << 32) // 10 GiB - if err != nil { - flux.logger.Errorw("Failed to parse multipart form", zap.Error(err)) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var deployRequest API.DeployRequest - projectConfig := new(pkg.ProjectConfig) - if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil { - flux.logger.Errorw("Failed to decode config", zap.Error(err)) - - http.Error(w, "Invalid flux.json", http.StatusBadRequest) - return - } - - deployRequest.Config = *projectConfig - idStr := r.FormValue("id") - - if idStr == "" { - id, err := uuid.NewRandom() - if err != nil { - flux.logger.Errorw("Failed to generate uuid", zap.Error(err)) - http.Error(w, "Failed to generate uuid", http.StatusInternalServerError) - return - } - - deployRequest.Id = id - } else { - deployRequest.Id, err = uuid.Parse(idStr) - if err != nil { - flux.logger.Errorw("Failed to parse uuid", zap.Error(err)) - http.Error(w, "Failed to parse uuid", http.StatusBadRequest) - return - } - - // make sure the id exists in the database - app := flux.appManager.GetApp(deployRequest.Id) - if app == nil { - http.Error(w, "App not found", http.StatusNotFound) - return - } - } - - ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context()) - if err != nil && err == util.ErrLocked { - // This will happen if the app is already being deployed - http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict) - return - } - - go func() { - <-ctx.Done() - deploymentLock.Unlock(deployRequest.Id) - }() - - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusMultiStatus) - - eventChannel := make(chan API.DeploymentEvent, 10) - defer close(eventChannel) - - var wg sync.WaitGroup - // make sure the connection doesnt close while there are SSE events being sent - defer wg.Wait() - - wg.Add(1) - go func(w http.ResponseWriter, flusher http.Flusher) { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-eventChannel: - if !ok { - return - } - - ev := API.DeploymentEvent{ - Message: event.Message, - } - - eventJSON, err := json.Marshal(ev) - if err != nil { - // Write error directly to ResponseWriter - jsonErr := json.NewEncoder(w).Encode(err) - if jsonErr != nil { - fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n") - return - } - - fmt.Fprintf(w, "data: %s\n\n", err.Error()) - if flusher != nil { - flusher.Flush() - } - return - } - - fmt.Fprintf(w, "event: %s\n", event.Stage) - fmt.Fprintf(w, "data: %s\n\n", eventJSON) - if flusher != nil { - flusher.Flush() - } - - if event.Stage == "error" || event.Stage == "complete" { - return - } - } - } - }(w, flusher) - - eventChannel <- API.DeploymentEvent{ - Stage: "start", - Message: "Uploading code", - } - - deployRequest.Code, _, err = r.FormFile("code") - if err != nil { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: "No code archive found", - } - return - } - defer deployRequest.Code.Close() - - if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: "Invalid flux.json, a name, url, and port must be specified", - } - return - } - - if projectConfig.Name == "all" { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: "Reserved name 'all' is not allowed", - } - return - } - - flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String())) - - projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id) - if err != nil { - flux.logger.Infow("Failed to upload code", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to upload code: %s", err), - } - return - } - - if projectConfig.EnvFile != "" { - envPath := filepath.Join(projectPath, projectConfig.EnvFile) - // prevent path traversal - realEnvPath, err := filepath.EvalSymlinks(envPath) - if err != nil { - flux.logger.Errorw("Failed to eval symlinks", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to eval symlinks: %s", err), - } - return - } - - if !strings.HasPrefix(realEnvPath, projectPath) { - flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile), - } - return - } - - envBytes, err := os.Open(realEnvPath) - if err != nil { - flux.logger.Errorw("Failed to open env file", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to open env file: %v", err), - } - return - } - defer envBytes.Close() - - envVars, err := godotenv.Parse(envBytes) - if err != nil { - flux.logger.Errorw("Failed to parse env file", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to parse env file: %v", err), - } - return - } - - for key, value := range envVars { - projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value)) - } - } - - // pipe the output of the build process to the event channel - pipeGroup := sync.WaitGroup{} - streamPipe := func(pipe io.ReadCloser) { - pipeGroup.Add(1) - defer pipeGroup.Done() - defer pipe.Close() - - scanner := bufio.NewScanner(pipe) - for scanner.Scan() { - line := scanner.Text() - eventChannel <- API.DeploymentEvent{ - Stage: "cmd_output", - Message: line, - } - } - - if err := scanner.Err(); err != nil { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to read pipe: %s", err), - } - flux.logger.Errorw("Error reading pipe", zap.Error(err)) - } - } - - flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String())) - eventChannel <- API.DeploymentEvent{ - Stage: "preparing", - Message: "Preparing project", - } - - // redirect stdout and stderr to the event channel - reader, writer := io.Pipe() - prepareCmd := exec.Command("go", "generate") - prepareCmd.Dir = projectPath - prepareCmd.Stdout = writer - prepareCmd.Stderr = writer - - err = prepareCmd.Start() - if err != nil { - flux.logger.Errorw("Failed to prepare project", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to prepare project: %s", err), - } - - return - } - - go streamPipe(reader) - - pipeGroup.Wait() - - err = prepareCmd.Wait() - if err != nil { - flux.logger.Errorw("Failed to prepare project", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to prepare project: %s", err), - } - - return - } - - writer.Close() - - eventChannel <- API.DeploymentEvent{ - Stage: "building", - Message: "Building project image", - } - - reader, writer = io.Pipe() - flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name)) - imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0)) - buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder) - buildCmd.Dir = projectPath - buildCmd.Stdout = writer - buildCmd.Stderr = writer - - err = buildCmd.Start() - if err != nil { - flux.logger.Errorw("Failed to build image", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to build image: %s", err), - } - - return - } - - go streamPipe(reader) - - pipeGroup.Wait() - - err = buildCmd.Wait() - if err != nil { - flux.logger.Errorw("Failed to build image", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to build image: %s", err), - } - - return - } - - app := flux.appManager.GetApp(deployRequest.Id) - - if app == nil { - eventChannel <- API.DeploymentEvent{ - Stage: "creating", - Message: "Creating app, this might take a while...", - } - - app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id) - } else { - eventChannel <- API.DeploymentEvent{ - Stage: "upgrading", - Message: "Upgrading app, this might take a while...", - } - - // we dont need to change `app` since this upgrade will use the same app and update it in place - err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig) - } - - if err != nil { - flux.logger.Errorw("Failed to deploy app", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to upgrade app: %s", err), - } - - return - } - - var extApp API.App - extApp.Id = app.Id - extApp.Name = app.Name - extApp.DeploymentID = app.DeploymentID - - eventChannel <- API.DeploymentEvent{ - Stage: "complete", - Message: extApp, - } - - flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String())) -} - func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) { var apps []API.App for _, app := range flux.appManager.GetAllApps() { diff --git a/internal/handlers/deploy.go b/internal/handlers/deploy.go new file mode 100644 index 0000000..511b350 --- /dev/null +++ b/internal/handlers/deploy.go @@ -0,0 +1,385 @@ +package handlers + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + + "github.com/docker/docker/pkg/namesgenerator" + "github.com/google/uuid" + "github.com/joho/godotenv" + "github.com/juls0730/flux/internal/util" + "github.com/juls0730/flux/pkg" + "github.com/juls0730/flux/pkg/API" + "go.uber.org/zap" +) + +var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]() + +func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "test/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + err := r.ParseMultipartForm(10 << 32) // 10 GiB + if err != nil { + flux.logger.Errorw("Failed to parse multipart form", zap.Error(err)) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var deployRequest API.DeployRequest + projectConfig := new(pkg.ProjectConfig) + if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil { + flux.logger.Errorw("Failed to decode config", zap.Error(err)) + + http.Error(w, "Invalid flux.json", http.StatusBadRequest) + return + } + + deployRequest.Config = *projectConfig + idStr := r.FormValue("id") + + if idStr == "" { + id, err := uuid.NewRandom() + if err != nil { + flux.logger.Errorw("Failed to generate uuid", zap.Error(err)) + http.Error(w, "Failed to generate uuid", http.StatusInternalServerError) + return + } + + deployRequest.Id = id + } else { + deployRequest.Id, err = uuid.Parse(idStr) + if err != nil { + flux.logger.Errorw("Failed to parse uuid", zap.Error(err)) + http.Error(w, "Failed to parse uuid", http.StatusBadRequest) + return + } + + // make sure the id exists in the database + app := flux.appManager.GetApp(deployRequest.Id) + if app == nil { + http.Error(w, "App not found", http.StatusNotFound) + return + } + } + + ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context()) + if err != nil && err == util.ErrLocked { + // This will happen if the app is already being deployed + http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict) + return + } + + go func() { + <-ctx.Done() + deploymentLock.Unlock(deployRequest.Id) + }() + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + // Send 200 to indicate a successful SSE connection to the client + w.WriteHeader(http.StatusOK) + + eventChannel := make(chan API.DeploymentEvent, 10) + defer close(eventChannel) + + var wg sync.WaitGroup + // make sure the connection doesnt close while there are SSE events being sent + defer wg.Wait() + + wg.Add(1) + go func(w http.ResponseWriter, flusher http.Flusher) { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case event, ok := <-eventChannel: + if !ok { + return + } + + ev := API.DeploymentEvent{ + Message: event.Message, + } + + eventJSON, err := json.Marshal(ev) + if err != nil { + // Write error directly to ResponseWriter + jsonErr := json.NewEncoder(w).Encode(err) + if jsonErr != nil { + fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n") + return + } + + fmt.Fprintf(w, "data: %s\n\n", err.Error()) + if flusher != nil { + flusher.Flush() + } + return + } + + fmt.Fprintf(w, "event: %s\n", event.Stage) + fmt.Fprintf(w, "data: %s\n\n", eventJSON) + if flusher != nil { + flusher.Flush() + } + + if event.Stage == "error" || event.Stage == "complete" { + return + } + } + } + }(w, flusher) + + eventChannel <- API.DeploymentEvent{ + Stage: "start", + Message: "Uploading code", + } + + deployRequest.Code, _, err = r.FormFile("code") + if err != nil { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: "No code archive found", + } + return + } + defer deployRequest.Code.Close() + + if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: "Invalid flux.json, a name, url, and port must be specified", + } + return + } + + if projectConfig.Name == "all" { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: "Reserved name 'all' is not allowed", + } + return + } + + flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String())) + + projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id) + if err != nil { + flux.logger.Infow("Failed to upload code", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upload code: %s", err), + } + return + } + + if projectConfig.EnvFile != "" { + envPath := filepath.Join(projectPath, projectConfig.EnvFile) + // prevent path traversal + realEnvPath, err := filepath.EvalSymlinks(envPath) + if err != nil { + flux.logger.Errorw("Failed to eval symlinks", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to eval symlinks: %s", err), + } + return + } + + if !strings.HasPrefix(realEnvPath, projectPath) { + flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile), + } + return + } + + envBytes, err := os.Open(realEnvPath) + if err != nil { + flux.logger.Errorw("Failed to open env file", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to open env file: %v", err), + } + return + } + defer envBytes.Close() + + envVars, err := godotenv.Parse(envBytes) + if err != nil { + flux.logger.Errorw("Failed to parse env file", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to parse env file: %v", err), + } + return + } + + for key, value := range envVars { + projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value)) + } + } + + // pipe the output of the build process to the event channel + pipeGroup := sync.WaitGroup{} + streamPipe := func(pipe io.ReadCloser) { + pipeGroup.Add(1) + defer pipeGroup.Done() + defer pipe.Close() + + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + line := scanner.Text() + eventChannel <- API.DeploymentEvent{ + Stage: "cmd_output", + Message: line, + } + } + + if err := scanner.Err(); err != nil { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to read pipe: %s", err), + } + flux.logger.Errorw("Error reading pipe", zap.Error(err)) + } + } + + flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String())) + eventChannel <- API.DeploymentEvent{ + Stage: "preparing", + Message: "Preparing project", + } + + // redirect stdout and stderr to the event channel + reader, writer := io.Pipe() + prepareCmd := exec.Command("go", "generate") + prepareCmd.Dir = projectPath + prepareCmd.Stdout = writer + prepareCmd.Stderr = writer + + err = prepareCmd.Start() + if err != nil { + flux.logger.Errorw("Failed to prepare project", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + } + + return + } + + go streamPipe(reader) + + pipeGroup.Wait() + + err = prepareCmd.Wait() + if err != nil { + flux.logger.Errorw("Failed to prepare project", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + } + + return + } + + writer.Close() + + eventChannel <- API.DeploymentEvent{ + Stage: "building", + Message: "Building project image", + } + + reader, writer = io.Pipe() + flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name)) + imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0)) + buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder) + buildCmd.Dir = projectPath + buildCmd.Stdout = writer + buildCmd.Stderr = writer + + err = buildCmd.Start() + if err != nil { + flux.logger.Errorw("Failed to build image", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + } + + return + } + + go streamPipe(reader) + + pipeGroup.Wait() + + err = buildCmd.Wait() + if err != nil { + flux.logger.Errorw("Failed to build image", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + } + + return + } + + app := flux.appManager.GetApp(deployRequest.Id) + + if app == nil { + eventChannel <- API.DeploymentEvent{ + Stage: "creating", + Message: "Creating app, this might take a while...", + } + + app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id) + } else { + eventChannel <- API.DeploymentEvent{ + Stage: "upgrading", + Message: "Upgrading app, this might take a while...", + } + + // we dont need to change `app` since this upgrade will use the same app and update it in place + err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig) + } + + if err != nil { + flux.logger.Errorw("Failed to deploy app", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upgrade app: %s", err), + } + + return + } + + var extApp API.App + extApp.Id = app.Id + extApp.Name = app.Name + extApp.DeploymentID = app.DeploymentID + + eventChannel <- API.DeploymentEvent{ + Stage: "complete", + Message: extApp, + } + + flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String())) +} diff --git a/internal/models/deployment.go b/internal/models/deployment.go index 1922904..9b3fa07 100644 --- a/internal/models/deployment.go +++ b/internal/models/deployment.go @@ -190,6 +190,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr return err } + logger.Debugw("Waiting for container to start", zap.String("container_id", string(newHeadContainer.ContainerID))) if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil { logger.Errorw("Failed to wait for container", zap.Error(err)) return err @@ -220,7 +221,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr // gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers if ok { - go oldProxy.GracefulShutdown(func() { + go oldProxy.GracefulShutdown(logger, func() { err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID) if err != nil { logger.Errorw("Failed to stop container", zap.Error(err)) diff --git a/internal/services/proxy/proxy.go b/internal/services/proxy/proxy.go index ae3f06f..69e701b 100644 --- a/internal/services/proxy/proxy.go +++ b/internal/services/proxy/proxy.go @@ -41,6 +41,7 @@ func (proxyManager *ProxyManager) ListenAndServe(host string) error { // Stops forwarding traffic to a deployment func (proxyManager *ProxyManager) RemoveDeployment(host string) { + proxyManager.logger.Debugw("Removing proxy", zap.String("host", host)) proxyManager.Delete(host) } @@ -65,25 +66,20 @@ func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Reque } type Proxy struct { - forwardingFor url.URL - proxyFunc *httputil.ReverseProxy - gracePeriod time.Duration - activeRequests int64 + forwardingFor url.URL + proxyFunc *httputil.ReverseProxy + shutdownTimeout time.Duration + activeRequests int64 } -// type DeploymentProxy struct { -// deployment *models.Deployment -// proxy *httputil.ReverseProxy -// gracePeriod time.Duration -// activeRequests int64 -// } +const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second // Creates a proxy for a given deployment func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) { proxy := &Proxy{ - forwardingFor: forwardingFor, - gracePeriod: time.Second * 30, - activeRequests: 0, + forwardingFor: forwardingFor, + shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT, + activeRequests: 0, } proxy.proxyFunc = &httputil.ReverseProxy{ @@ -111,17 +107,22 @@ func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) { } // Drains connections from a proxy -func (p *Proxy) GracefulShutdown(shutdownFunc func()) { - ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod) +func (p *Proxy) GracefulShutdown(logger *zap.SugaredLogger, shutdownFunc func()) { + logger.Debugw("Shutting down proxy", zap.String("host", p.forwardingFor.Host)) + + ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout) defer cancel() done := false for !done { select { case <-ctx.Done(): + logger.Debugw("Proxy shutdown timed out", zap.String("host", p.forwardingFor.Host)) + done = true default: if atomic.LoadInt64(&p.activeRequests) == 0 { + logger.Debugw("Proxy shutdown completed successfully", zap.String("host", p.forwardingFor.Host)) done = true } diff --git a/internal/util/cli/project.go b/internal/util/cli/project.go index d9608a5..58b85d7 100644 --- a/internal/util/cli/project.go +++ b/internal/util/cli/project.go @@ -7,6 +7,7 @@ import ( "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" + "go.uber.org/zap" ) type Project struct { @@ -14,7 +15,7 @@ type Project struct { Name string `json:"name"` } -func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, error) { +func GetProject(command string, args []string, config pkg.CLIConfig, logger *zap.SugaredLogger) (*Project, error) { var projectName string // we are in a project directory and the project is deployed @@ -24,7 +25,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, return nil, fmt.Errorf("failed to read .fluxid: %v", err) } - app, err := GetRequest[API.App](config.DaemonURL + "/app/by-id/" + string(id)) + app, err := GetRequest[API.App](config.DaemonURL+"/app/by-id/"+string(id), logger) if err != nil { return nil, fmt.Errorf("failed to get app: %v", err) } @@ -58,7 +59,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, } // we are calling flux with a project name (ie `flux start my-project`) - app, err := GetRequest[API.App](config.DaemonURL + "/app/by-name/" + projectName) + app, err := GetRequest[API.App](config.DaemonURL+"/app/by-name/"+projectName, logger) if err != nil { return nil, fmt.Errorf("failed to get app: %v", err) } diff --git a/internal/util/cli/request.go b/internal/util/cli/request.go index c4a262f..b437c48 100644 --- a/internal/util/cli/request.go +++ b/internal/util/cli/request.go @@ -6,17 +6,26 @@ import ( "io" "net/http" "strings" + + "go.uber.org/zap" ) +func isOk(statusCode int) bool { + return statusCode >= 200 && statusCode < 300 +} + // make a function that makes an http GET request to the daemon and returns data of type T -func GetRequest[T any](url string) (*T, error) { +func GetRequest[T any](url string, logger *zap.SugaredLogger) (*T, error) { resp, err := http.Get(url) if err != nil { + logger.Debugw("GET request failed", zap.String("url", url), zap.Error(err)) return nil, fmt.Errorf("http get request failed: %v", err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + logger.Debugw("GET request", zap.String("url", url), resp) + + if !isOk(resp.StatusCode) { responseBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("error reading response body: %v", err) @@ -35,7 +44,7 @@ func GetRequest[T any](url string) (*T, error) { return &data, nil } -func DeleteRequest(url string) error { +func DeleteRequest(url string, logger *zap.SugaredLogger) error { req, err := http.NewRequest("DELETE", url, nil) if err != nil { return fmt.Errorf("failed to delete: %v", err) @@ -46,7 +55,9 @@ func DeleteRequest(url string) error { } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + logger.Debugw("DELETE request", zap.String("url", url), resp) + + if !isOk(resp.StatusCode) { responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("error reading response body: %v", err) @@ -60,7 +71,7 @@ func DeleteRequest(url string) error { return nil } -func PutRequest(url string, data io.Reader) error { +func PutRequest(url string, data io.Reader, logger *zap.SugaredLogger) error { req, err := http.NewRequest("PUT", url, data) if err != nil { return fmt.Errorf("failed to put: %v", err) @@ -71,7 +82,9 @@ func PutRequest(url string, data io.Reader) error { } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + logger.Debugw("PUT request", zap.String("url", url), resp) + + if !isOk(resp.StatusCode) { responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("error reading response body: %v", err) diff --git a/pkg/version.go b/pkg/version.go index 427b3b6..4ceacaf 100644 --- a/pkg/version.go +++ b/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "2025.05.06-16" +const Version = "2025.05.08-14"