diff --git a/README.md b/README.md index 0c3d85b..188a55d 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Flux is a lightweight self-hosted micro-PaaS for hosting Golang web apps with ea ## Dependencies - [Go](https://golang.org/dl/) -- [ZQDGR](https://github.com/juls0730/zqdgr) +- [ZQDGR](https://github.com/juls0730/zqdgr) (development only) - [Buildpacks](https://buildpacks.io/) (daemon only) - [Docker](https://docs.docker.com/get-docker/) (daemon only) diff --git a/cmd/cli/commands/command.go b/cmd/cli/commands/command.go index a91fef1..6e6ab52 100644 --- a/cmd/cli/commands/command.go +++ b/cmd/cli/commands/command.go @@ -3,11 +3,13 @@ package commands import ( "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" + "go.uber.org/zap" ) type CommandCtx struct { Config pkg.CLIConfig - Info API.Info + Logger *zap.SugaredLogger + Info *API.Info Interactive bool } diff --git a/cmd/cli/commands/delete.go b/cmd/cli/commands/delete.go index fff9c87..4145a0f 100644 --- a/cmd/cli/commands/delete.go +++ b/cmd/cli/commands/delete.go @@ -49,7 +49,7 @@ func deleteAll(ctx CommandCtx, noConfirm *bool) error { } } - util.DeleteRequest(ctx.Config.DaemonURL + "/deployments") + util.DeleteRequest(ctx.Config.DaemonURL+"/deployments", ctx.Logger) fmt.Printf("Successfully deleted all projects\n") return nil @@ -80,7 +80,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error { return deleteAll(ctx, noConfirm) } - project, err := util.GetProject("delete", args, ctx.Config) + project, err := util.GetProject("delete", args, ctx.Config, ctx.Logger) if err != nil { return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete -help for more information", err) } @@ -101,7 +101,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error { } } - err = util.DeleteRequest(ctx.Config.DaemonURL + "/app/" + project.Id) + err = util.DeleteRequest(ctx.Config.DaemonURL+"/app/"+project.Id, ctx.Logger) if err != nil { return fmt.Errorf("failed to delete project: %v", err) } diff --git a/cmd/cli/commands/list.go b/cmd/cli/commands/list.go index f1007af..c0ac174 100644 --- a/cmd/cli/commands/list.go +++ b/cmd/cli/commands/list.go @@ -8,7 +8,7 @@ import ( ) func ListCommand(ctx CommandCtx, args []string) error { - apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL + "/apps") + apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL+"/apps", ctx.Logger) if err != nil { return fmt.Errorf("failed to get apps: %v", err) } diff --git a/cmd/cli/commands/start.go b/cmd/cli/commands/start.go index ff1c14c..c20e1ab 100644 --- a/cmd/cli/commands/start.go +++ b/cmd/cli/commands/start.go @@ -7,14 +7,14 @@ import ( ) func StartCommand(ctx CommandCtx, args []string) error { - projectName, err := util.GetProject("start", args, ctx.Config) + projectName, err := util.GetProject("start", args, ctx.Config, ctx.Logger) if err != nil { return err } // Put request to start the project, since the start endpoint is idempotent. // If the project is already running, this will return a 304 Not Modified - err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil) + err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil, ctx.Logger) if err != nil { return fmt.Errorf("failed to start %s: %v", projectName.Name, err) } diff --git a/cmd/cli/commands/stop.go b/cmd/cli/commands/stop.go index 6649b19..723b7d9 100644 --- a/cmd/cli/commands/stop.go +++ b/cmd/cli/commands/stop.go @@ -7,12 +7,12 @@ import ( ) func StopCommand(ctx CommandCtx, args []string) error { - projectName, err := util.GetProject("stop", args, ctx.Config) + projectName, err := util.GetProject("stop", args, ctx.Config, ctx.Logger) if err != nil { return err } - err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil) + err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil, ctx.Logger) if err != nil { return fmt.Errorf("failed to stop %s: %v", projectName.Name, err) } diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 7d73c78..9c09c1c 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "github.com/agnivade/levenshtein" @@ -15,6 +16,8 @@ import ( "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" "github.com/mattn/go-isatty" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func isInteractive() bool { @@ -43,8 +46,9 @@ Use "flux -help" for more information about a command. var maxDistance = 3 type Command struct { - Help string - HandlerFunc commands.CommandFunc + Help string + DaemonConnected bool + HandlerFunc commands.CommandFunc } type CommandHandler struct { @@ -59,10 +63,11 @@ func NewCommandHandler() CommandHandler { } } -func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, help string) { +func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, daemonConnected bool, help string) { coomand := Command{ - Help: help, - HandlerFunc: handler, + Help: help, + DaemonConnected: daemonConnected, + HandlerFunc: handler, } h.commands[name] = coomand @@ -108,51 +113,35 @@ func (h *CommandHandler) GetHelpCmd(commands.CommandCtx, []string) error { return nil } -func runCommand(command string, args []string, config pkg.CLIConfig, info API.Info, cmdHandler CommandHandler) error { - commandCtx := commands.CommandCtx{ - Config: config, - Info: info, - Interactive: isInteractive(), - } - +func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler CommandHandler, logger *zap.SugaredLogger) error { commandStruct, ok := cmdHandler.commands[command] - if ok { - return commandStruct.HandlerFunc(commandCtx, args) + if !ok { + panic("runCommand was passed an invalid command name") } - // diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant - var closestMatch struct { - name string - score int - } - for cmdName := range cmdHandler.commands { - distance := levenshtein.ComputeDistance(cmdName, command) + var info *API.Info = nil - if distance <= maxDistance { - if closestMatch.name == "" || distance < closestMatch.score { - closestMatch.name = cmdName - closestMatch.score = distance - } + if commandStruct.DaemonConnected { + info, err := util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger) + if err != nil { + fmt.Printf("Failed to connect to daemon\n") + os.Exit(1) + } + + if info.Version != version { + fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version) + os.Exit(1) } } - if closestMatch.name == "" { - return fmt.Errorf("unknown command: %s", command) + commandCtx := commands.CommandCtx{ + Config: config, + Info: info, + Logger: logger, + Interactive: isInteractive(), } - var response string - // new line ommitted because it will be produced when the user presses enter to submit their response - fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name) - fmt.Scanln(&response) - - if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" { - command = closestMatch.name - } else { - return nil - } - - // re-run command after accepting the suggestion - return runCommand(command, args, config, info, cmdHandler) + return commandStruct.HandlerFunc(commandCtx, args) } func main() { @@ -160,21 +149,44 @@ func main() { fmt.Printf("Flux is being run non-interactively\n") } + zapConfig := zap.NewDevelopmentConfig() + verbosity := 0 + + debug, err := strconv.ParseBool(os.Getenv("DEBUG")) + if err != nil { + debug = false + } + + if debug { + zapConfig = zap.NewDevelopmentConfig() + verbosity = -1 + } + + zapConfig.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity)) + + lameLogger, err := zapConfig.Build() + if err != nil { + fmt.Printf("Failed to create logger: %v\n", err) + os.Exit(1) + } + + logger := lameLogger.Sugar() + cmdHandler := NewCommandHandler() - cmdHandler.RegisterCmd("init", commands.InitCommand, "Initialize a new project") - cmdHandler.RegisterCmd("deploy", commands.DeployCommand, "Deploy a new version of the app") - cmdHandler.RegisterCmd("start", commands.StartCommand, "Start the app") - cmdHandler.RegisterCmd("stop", commands.StopCommand, "Stop the app") - cmdHandler.RegisterCmd("list", commands.ListCommand, "List all the apps") - cmdHandler.RegisterCmd("delete", commands.DeleteCommand, "Delete the app") + cmdHandler.RegisterCmd("init", commands.InitCommand, false, "Initialize a new project") + cmdHandler.RegisterCmd("deploy", commands.DeployCommand, true, "Deploy a new version of the app") + cmdHandler.RegisterCmd("start", commands.StartCommand, true, "Start the app") + cmdHandler.RegisterCmd("stop", commands.StopCommand, true, "Stop the app") + cmdHandler.RegisterCmd("list", commands.ListCommand, true, "List all the apps") + cmdHandler.RegisterCmd("delete", commands.DeleteCommand, true, "Delete the app") fs := flag.NewFlagSet("flux", flag.ExitOnError) fs.Usage = func() { cmdHandler.GetHelp() } - err := fs.Parse(os.Args[1:]) + err = fs.Parse(os.Args[1:]) if err != nil { fmt.Println(err) os.Exit(1) @@ -214,18 +226,42 @@ func main() { os.Exit(1) } - info, err := util.GetRequest[API.Info](config.DaemonURL + "/heartbeat") - if err != nil { - fmt.Printf("Failed to connect to daemon\n") - os.Exit(1) + command := os.Args[1] + + if _, ok := cmdHandler.commands[command]; !ok { + var closestMatch struct { + name string + score int + } + for cmdName := range cmdHandler.commands { + distance := levenshtein.ComputeDistance(cmdName, command) + + if distance <= maxDistance { + if closestMatch.name == "" || distance < closestMatch.score { + closestMatch.name = cmdName + closestMatch.score = distance + } + } + } + + if closestMatch.name == "" { + fmt.Printf("unknown command: %s", command) + os.Exit(1) + } + + var response string + // new line ommitted because it will be produced when the user presses enter to submit their response + fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name) + fmt.Scanln(&response) + + if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" { + command = closestMatch.name + } else { + os.Exit(0) + } } - if info.Version != version { - fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version) - os.Exit(1) - } - - err = runCommand(os.Args[1], fs.Args()[1:], config, *info, cmdHandler) + err = runCommand(command, fs.Args()[1:], config, cmdHandler, logger) if err != nil { fmt.Printf("Error: %v\n", err) os.Exit(1) diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index f76943c..be0e876 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -19,6 +19,7 @@ func main() { http.HandleFunc("GET /app/by-name/{name}", fluxServer.GetAppByName) http.HandleFunc("GET /app/by-id/{id}", fluxServer.GetAppById) + // a PUT request is the proper type to use since these endpoints are idempotent http.HandleFunc("PUT /app/{id}/start", fluxServer.StartApp) http.HandleFunc("PUT /app/{id}/stop", fluxServer.StopApp) diff --git a/internal/docker/container.go b/internal/docker/container.go index a6024b4..aeaf73a 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -87,9 +87,11 @@ func (d *DockerClient) StartContainer(ctx context.Context, containerID DockerID) return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{}) } -// blocks until the container returns a 200 status code +const CONTAINER_START_TIMEOUT = 30 * time.Second + +// blocks until the container returns a 200 status code for a max of CONTAINER_START_TIMEOUT (30 seconds) func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error { - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctx, CONTAINER_START_TIMEOUT) defer cancel() for { diff --git a/internal/handlers/app.go b/internal/handlers/app.go index 5dccf1d..547c541 100644 --- a/internal/handlers/app.go +++ b/internal/handlers/app.go @@ -1,389 +1,15 @@ package handlers import ( - "bufio" "encoding/json" - "fmt" - "io" "net/http" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "github.com/docker/docker/pkg/namesgenerator" "github.com/google/uuid" - "github.com/joho/godotenv" proxyManagerService "github.com/juls0730/flux/internal/services/proxy" - "github.com/juls0730/flux/internal/util" - "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" "go.uber.org/zap" ) -var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]() - -func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "test/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - - err := r.ParseMultipartForm(10 << 32) // 10 GiB - if err != nil { - flux.logger.Errorw("Failed to parse multipart form", zap.Error(err)) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var deployRequest API.DeployRequest - projectConfig := new(pkg.ProjectConfig) - if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil { - flux.logger.Errorw("Failed to decode config", zap.Error(err)) - - http.Error(w, "Invalid flux.json", http.StatusBadRequest) - return - } - - deployRequest.Config = *projectConfig - idStr := r.FormValue("id") - - if idStr == "" { - id, err := uuid.NewRandom() - if err != nil { - flux.logger.Errorw("Failed to generate uuid", zap.Error(err)) - http.Error(w, "Failed to generate uuid", http.StatusInternalServerError) - return - } - - deployRequest.Id = id - } else { - deployRequest.Id, err = uuid.Parse(idStr) - if err != nil { - flux.logger.Errorw("Failed to parse uuid", zap.Error(err)) - http.Error(w, "Failed to parse uuid", http.StatusBadRequest) - return - } - - // make sure the id exists in the database - app := flux.appManager.GetApp(deployRequest.Id) - if app == nil { - http.Error(w, "App not found", http.StatusNotFound) - return - } - } - - ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context()) - if err != nil && err == util.ErrLocked { - // This will happen if the app is already being deployed - http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict) - return - } - - go func() { - <-ctx.Done() - deploymentLock.Unlock(deployRequest.Id) - }() - - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusMultiStatus) - - eventChannel := make(chan API.DeploymentEvent, 10) - defer close(eventChannel) - - var wg sync.WaitGroup - // make sure the connection doesnt close while there are SSE events being sent - defer wg.Wait() - - wg.Add(1) - go func(w http.ResponseWriter, flusher http.Flusher) { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-eventChannel: - if !ok { - return - } - - ev := API.DeploymentEvent{ - Message: event.Message, - } - - eventJSON, err := json.Marshal(ev) - if err != nil { - // Write error directly to ResponseWriter - jsonErr := json.NewEncoder(w).Encode(err) - if jsonErr != nil { - fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n") - return - } - - fmt.Fprintf(w, "data: %s\n\n", err.Error()) - if flusher != nil { - flusher.Flush() - } - return - } - - fmt.Fprintf(w, "event: %s\n", event.Stage) - fmt.Fprintf(w, "data: %s\n\n", eventJSON) - if flusher != nil { - flusher.Flush() - } - - if event.Stage == "error" || event.Stage == "complete" { - return - } - } - } - }(w, flusher) - - eventChannel <- API.DeploymentEvent{ - Stage: "start", - Message: "Uploading code", - } - - deployRequest.Code, _, err = r.FormFile("code") - if err != nil { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: "No code archive found", - } - return - } - defer deployRequest.Code.Close() - - if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: "Invalid flux.json, a name, url, and port must be specified", - } - return - } - - if projectConfig.Name == "all" { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: "Reserved name 'all' is not allowed", - } - return - } - - flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String())) - - projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id) - if err != nil { - flux.logger.Infow("Failed to upload code", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to upload code: %s", err), - } - return - } - - if projectConfig.EnvFile != "" { - envPath := filepath.Join(projectPath, projectConfig.EnvFile) - // prevent path traversal - realEnvPath, err := filepath.EvalSymlinks(envPath) - if err != nil { - flux.logger.Errorw("Failed to eval symlinks", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to eval symlinks: %s", err), - } - return - } - - if !strings.HasPrefix(realEnvPath, projectPath) { - flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile), - } - return - } - - envBytes, err := os.Open(realEnvPath) - if err != nil { - flux.logger.Errorw("Failed to open env file", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to open env file: %v", err), - } - return - } - defer envBytes.Close() - - envVars, err := godotenv.Parse(envBytes) - if err != nil { - flux.logger.Errorw("Failed to parse env file", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to parse env file: %v", err), - } - return - } - - for key, value := range envVars { - projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value)) - } - } - - // pipe the output of the build process to the event channel - pipeGroup := sync.WaitGroup{} - streamPipe := func(pipe io.ReadCloser) { - pipeGroup.Add(1) - defer pipeGroup.Done() - defer pipe.Close() - - scanner := bufio.NewScanner(pipe) - for scanner.Scan() { - line := scanner.Text() - eventChannel <- API.DeploymentEvent{ - Stage: "cmd_output", - Message: line, - } - } - - if err := scanner.Err(); err != nil { - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to read pipe: %s", err), - } - flux.logger.Errorw("Error reading pipe", zap.Error(err)) - } - } - - flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String())) - eventChannel <- API.DeploymentEvent{ - Stage: "preparing", - Message: "Preparing project", - } - - // redirect stdout and stderr to the event channel - reader, writer := io.Pipe() - prepareCmd := exec.Command("go", "generate") - prepareCmd.Dir = projectPath - prepareCmd.Stdout = writer - prepareCmd.Stderr = writer - - err = prepareCmd.Start() - if err != nil { - flux.logger.Errorw("Failed to prepare project", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to prepare project: %s", err), - } - - return - } - - go streamPipe(reader) - - pipeGroup.Wait() - - err = prepareCmd.Wait() - if err != nil { - flux.logger.Errorw("Failed to prepare project", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to prepare project: %s", err), - } - - return - } - - writer.Close() - - eventChannel <- API.DeploymentEvent{ - Stage: "building", - Message: "Building project image", - } - - reader, writer = io.Pipe() - flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name)) - imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0)) - buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder) - buildCmd.Dir = projectPath - buildCmd.Stdout = writer - buildCmd.Stderr = writer - - err = buildCmd.Start() - if err != nil { - flux.logger.Errorw("Failed to build image", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to build image: %s", err), - } - - return - } - - go streamPipe(reader) - - pipeGroup.Wait() - - err = buildCmd.Wait() - if err != nil { - flux.logger.Errorw("Failed to build image", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to build image: %s", err), - } - - return - } - - app := flux.appManager.GetApp(deployRequest.Id) - - if app == nil { - eventChannel <- API.DeploymentEvent{ - Stage: "creating", - Message: "Creating app, this might take a while...", - } - - app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id) - } else { - eventChannel <- API.DeploymentEvent{ - Stage: "upgrading", - Message: "Upgrading app, this might take a while...", - } - - // we dont need to change `app` since this upgrade will use the same app and update it in place - err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig) - } - - if err != nil { - flux.logger.Errorw("Failed to deploy app", zap.Error(err)) - eventChannel <- API.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to upgrade app: %s", err), - } - - return - } - - var extApp API.App - extApp.Id = app.Id - extApp.Name = app.Name - extApp.DeploymentID = app.DeploymentID - - eventChannel <- API.DeploymentEvent{ - Stage: "complete", - Message: extApp, - } - - flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String())) -} - func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) { var apps []API.App for _, app := range flux.appManager.GetAllApps() { diff --git a/internal/handlers/deploy.go b/internal/handlers/deploy.go new file mode 100644 index 0000000..511b350 --- /dev/null +++ b/internal/handlers/deploy.go @@ -0,0 +1,385 @@ +package handlers + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + + "github.com/docker/docker/pkg/namesgenerator" + "github.com/google/uuid" + "github.com/joho/godotenv" + "github.com/juls0730/flux/internal/util" + "github.com/juls0730/flux/pkg" + "github.com/juls0730/flux/pkg/API" + "go.uber.org/zap" +) + +var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]() + +func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "test/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + err := r.ParseMultipartForm(10 << 32) // 10 GiB + if err != nil { + flux.logger.Errorw("Failed to parse multipart form", zap.Error(err)) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var deployRequest API.DeployRequest + projectConfig := new(pkg.ProjectConfig) + if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil { + flux.logger.Errorw("Failed to decode config", zap.Error(err)) + + http.Error(w, "Invalid flux.json", http.StatusBadRequest) + return + } + + deployRequest.Config = *projectConfig + idStr := r.FormValue("id") + + if idStr == "" { + id, err := uuid.NewRandom() + if err != nil { + flux.logger.Errorw("Failed to generate uuid", zap.Error(err)) + http.Error(w, "Failed to generate uuid", http.StatusInternalServerError) + return + } + + deployRequest.Id = id + } else { + deployRequest.Id, err = uuid.Parse(idStr) + if err != nil { + flux.logger.Errorw("Failed to parse uuid", zap.Error(err)) + http.Error(w, "Failed to parse uuid", http.StatusBadRequest) + return + } + + // make sure the id exists in the database + app := flux.appManager.GetApp(deployRequest.Id) + if app == nil { + http.Error(w, "App not found", http.StatusNotFound) + return + } + } + + ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context()) + if err != nil && err == util.ErrLocked { + // This will happen if the app is already being deployed + http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict) + return + } + + go func() { + <-ctx.Done() + deploymentLock.Unlock(deployRequest.Id) + }() + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + // Send 200 to indicate a successful SSE connection to the client + w.WriteHeader(http.StatusOK) + + eventChannel := make(chan API.DeploymentEvent, 10) + defer close(eventChannel) + + var wg sync.WaitGroup + // make sure the connection doesnt close while there are SSE events being sent + defer wg.Wait() + + wg.Add(1) + go func(w http.ResponseWriter, flusher http.Flusher) { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case event, ok := <-eventChannel: + if !ok { + return + } + + ev := API.DeploymentEvent{ + Message: event.Message, + } + + eventJSON, err := json.Marshal(ev) + if err != nil { + // Write error directly to ResponseWriter + jsonErr := json.NewEncoder(w).Encode(err) + if jsonErr != nil { + fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n") + return + } + + fmt.Fprintf(w, "data: %s\n\n", err.Error()) + if flusher != nil { + flusher.Flush() + } + return + } + + fmt.Fprintf(w, "event: %s\n", event.Stage) + fmt.Fprintf(w, "data: %s\n\n", eventJSON) + if flusher != nil { + flusher.Flush() + } + + if event.Stage == "error" || event.Stage == "complete" { + return + } + } + } + }(w, flusher) + + eventChannel <- API.DeploymentEvent{ + Stage: "start", + Message: "Uploading code", + } + + deployRequest.Code, _, err = r.FormFile("code") + if err != nil { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: "No code archive found", + } + return + } + defer deployRequest.Code.Close() + + if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: "Invalid flux.json, a name, url, and port must be specified", + } + return + } + + if projectConfig.Name == "all" { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: "Reserved name 'all' is not allowed", + } + return + } + + flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String())) + + projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id) + if err != nil { + flux.logger.Infow("Failed to upload code", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upload code: %s", err), + } + return + } + + if projectConfig.EnvFile != "" { + envPath := filepath.Join(projectPath, projectConfig.EnvFile) + // prevent path traversal + realEnvPath, err := filepath.EvalSymlinks(envPath) + if err != nil { + flux.logger.Errorw("Failed to eval symlinks", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to eval symlinks: %s", err), + } + return + } + + if !strings.HasPrefix(realEnvPath, projectPath) { + flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile), + } + return + } + + envBytes, err := os.Open(realEnvPath) + if err != nil { + flux.logger.Errorw("Failed to open env file", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to open env file: %v", err), + } + return + } + defer envBytes.Close() + + envVars, err := godotenv.Parse(envBytes) + if err != nil { + flux.logger.Errorw("Failed to parse env file", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to parse env file: %v", err), + } + return + } + + for key, value := range envVars { + projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value)) + } + } + + // pipe the output of the build process to the event channel + pipeGroup := sync.WaitGroup{} + streamPipe := func(pipe io.ReadCloser) { + pipeGroup.Add(1) + defer pipeGroup.Done() + defer pipe.Close() + + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + line := scanner.Text() + eventChannel <- API.DeploymentEvent{ + Stage: "cmd_output", + Message: line, + } + } + + if err := scanner.Err(); err != nil { + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to read pipe: %s", err), + } + flux.logger.Errorw("Error reading pipe", zap.Error(err)) + } + } + + flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String())) + eventChannel <- API.DeploymentEvent{ + Stage: "preparing", + Message: "Preparing project", + } + + // redirect stdout and stderr to the event channel + reader, writer := io.Pipe() + prepareCmd := exec.Command("go", "generate") + prepareCmd.Dir = projectPath + prepareCmd.Stdout = writer + prepareCmd.Stderr = writer + + err = prepareCmd.Start() + if err != nil { + flux.logger.Errorw("Failed to prepare project", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + } + + return + } + + go streamPipe(reader) + + pipeGroup.Wait() + + err = prepareCmd.Wait() + if err != nil { + flux.logger.Errorw("Failed to prepare project", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + } + + return + } + + writer.Close() + + eventChannel <- API.DeploymentEvent{ + Stage: "building", + Message: "Building project image", + } + + reader, writer = io.Pipe() + flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name)) + imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0)) + buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder) + buildCmd.Dir = projectPath + buildCmd.Stdout = writer + buildCmd.Stderr = writer + + err = buildCmd.Start() + if err != nil { + flux.logger.Errorw("Failed to build image", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + } + + return + } + + go streamPipe(reader) + + pipeGroup.Wait() + + err = buildCmd.Wait() + if err != nil { + flux.logger.Errorw("Failed to build image", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + } + + return + } + + app := flux.appManager.GetApp(deployRequest.Id) + + if app == nil { + eventChannel <- API.DeploymentEvent{ + Stage: "creating", + Message: "Creating app, this might take a while...", + } + + app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id) + } else { + eventChannel <- API.DeploymentEvent{ + Stage: "upgrading", + Message: "Upgrading app, this might take a while...", + } + + // we dont need to change `app` since this upgrade will use the same app and update it in place + err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig) + } + + if err != nil { + flux.logger.Errorw("Failed to deploy app", zap.Error(err)) + eventChannel <- API.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upgrade app: %s", err), + } + + return + } + + var extApp API.App + extApp.Id = app.Id + extApp.Name = app.Name + extApp.DeploymentID = app.DeploymentID + + eventChannel <- API.DeploymentEvent{ + Stage: "complete", + Message: extApp, + } + + flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String())) +} diff --git a/internal/models/deployment.go b/internal/models/deployment.go index 1922904..9b3fa07 100644 --- a/internal/models/deployment.go +++ b/internal/models/deployment.go @@ -190,6 +190,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr return err } + logger.Debugw("Waiting for container to start", zap.String("container_id", string(newHeadContainer.ContainerID))) if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil { logger.Errorw("Failed to wait for container", zap.Error(err)) return err @@ -220,7 +221,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr // gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers if ok { - go oldProxy.GracefulShutdown(func() { + go oldProxy.GracefulShutdown(logger, func() { err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID) if err != nil { logger.Errorw("Failed to stop container", zap.Error(err)) diff --git a/internal/services/proxy/proxy.go b/internal/services/proxy/proxy.go index ae3f06f..69e701b 100644 --- a/internal/services/proxy/proxy.go +++ b/internal/services/proxy/proxy.go @@ -41,6 +41,7 @@ func (proxyManager *ProxyManager) ListenAndServe(host string) error { // Stops forwarding traffic to a deployment func (proxyManager *ProxyManager) RemoveDeployment(host string) { + proxyManager.logger.Debugw("Removing proxy", zap.String("host", host)) proxyManager.Delete(host) } @@ -65,25 +66,20 @@ func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Reque } type Proxy struct { - forwardingFor url.URL - proxyFunc *httputil.ReverseProxy - gracePeriod time.Duration - activeRequests int64 + forwardingFor url.URL + proxyFunc *httputil.ReverseProxy + shutdownTimeout time.Duration + activeRequests int64 } -// type DeploymentProxy struct { -// deployment *models.Deployment -// proxy *httputil.ReverseProxy -// gracePeriod time.Duration -// activeRequests int64 -// } +const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second // Creates a proxy for a given deployment func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) { proxy := &Proxy{ - forwardingFor: forwardingFor, - gracePeriod: time.Second * 30, - activeRequests: 0, + forwardingFor: forwardingFor, + shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT, + activeRequests: 0, } proxy.proxyFunc = &httputil.ReverseProxy{ @@ -111,17 +107,22 @@ func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) { } // Drains connections from a proxy -func (p *Proxy) GracefulShutdown(shutdownFunc func()) { - ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod) +func (p *Proxy) GracefulShutdown(logger *zap.SugaredLogger, shutdownFunc func()) { + logger.Debugw("Shutting down proxy", zap.String("host", p.forwardingFor.Host)) + + ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout) defer cancel() done := false for !done { select { case <-ctx.Done(): + logger.Debugw("Proxy shutdown timed out", zap.String("host", p.forwardingFor.Host)) + done = true default: if atomic.LoadInt64(&p.activeRequests) == 0 { + logger.Debugw("Proxy shutdown completed successfully", zap.String("host", p.forwardingFor.Host)) done = true } diff --git a/internal/util/cli/project.go b/internal/util/cli/project.go index d9608a5..58b85d7 100644 --- a/internal/util/cli/project.go +++ b/internal/util/cli/project.go @@ -7,6 +7,7 @@ import ( "github.com/juls0730/flux/pkg" "github.com/juls0730/flux/pkg/API" + "go.uber.org/zap" ) type Project struct { @@ -14,7 +15,7 @@ type Project struct { Name string `json:"name"` } -func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, error) { +func GetProject(command string, args []string, config pkg.CLIConfig, logger *zap.SugaredLogger) (*Project, error) { var projectName string // we are in a project directory and the project is deployed @@ -24,7 +25,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, return nil, fmt.Errorf("failed to read .fluxid: %v", err) } - app, err := GetRequest[API.App](config.DaemonURL + "/app/by-id/" + string(id)) + app, err := GetRequest[API.App](config.DaemonURL+"/app/by-id/"+string(id), logger) if err != nil { return nil, fmt.Errorf("failed to get app: %v", err) } @@ -58,7 +59,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, } // we are calling flux with a project name (ie `flux start my-project`) - app, err := GetRequest[API.App](config.DaemonURL + "/app/by-name/" + projectName) + app, err := GetRequest[API.App](config.DaemonURL+"/app/by-name/"+projectName, logger) if err != nil { return nil, fmt.Errorf("failed to get app: %v", err) } diff --git a/internal/util/cli/request.go b/internal/util/cli/request.go index c4a262f..b437c48 100644 --- a/internal/util/cli/request.go +++ b/internal/util/cli/request.go @@ -6,17 +6,26 @@ import ( "io" "net/http" "strings" + + "go.uber.org/zap" ) +func isOk(statusCode int) bool { + return statusCode >= 200 && statusCode < 300 +} + // make a function that makes an http GET request to the daemon and returns data of type T -func GetRequest[T any](url string) (*T, error) { +func GetRequest[T any](url string, logger *zap.SugaredLogger) (*T, error) { resp, err := http.Get(url) if err != nil { + logger.Debugw("GET request failed", zap.String("url", url), zap.Error(err)) return nil, fmt.Errorf("http get request failed: %v", err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + logger.Debugw("GET request", zap.String("url", url), resp) + + if !isOk(resp.StatusCode) { responseBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("error reading response body: %v", err) @@ -35,7 +44,7 @@ func GetRequest[T any](url string) (*T, error) { return &data, nil } -func DeleteRequest(url string) error { +func DeleteRequest(url string, logger *zap.SugaredLogger) error { req, err := http.NewRequest("DELETE", url, nil) if err != nil { return fmt.Errorf("failed to delete: %v", err) @@ -46,7 +55,9 @@ func DeleteRequest(url string) error { } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + logger.Debugw("DELETE request", zap.String("url", url), resp) + + if !isOk(resp.StatusCode) { responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("error reading response body: %v", err) @@ -60,7 +71,7 @@ func DeleteRequest(url string) error { return nil } -func PutRequest(url string, data io.Reader) error { +func PutRequest(url string, data io.Reader, logger *zap.SugaredLogger) error { req, err := http.NewRequest("PUT", url, data) if err != nil { return fmt.Errorf("failed to put: %v", err) @@ -71,7 +82,9 @@ func PutRequest(url string, data io.Reader) error { } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + logger.Debugw("PUT request", zap.String("url", url), resp) + + if !isOk(resp.StatusCode) { responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("error reading response body: %v", err) diff --git a/pkg/version.go b/pkg/version.go index 427b3b6..4ceacaf 100644 --- a/pkg/version.go +++ b/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "2025.05.06-16" +const Version = "2025.05.08-14"