Expand logging, and daemonless command support.

This adds more logging in certain places, and adds logging to the CLI.
It also allows for certain commands in the CLI to be used without a
daemon connection, namely `init`, which previously required the daemon
to be connected, but now does not since it doesnt need it.
This commit is contained in:
Zoe
2025-05-08 09:53:41 -05:00
parent 5bb696052a
commit c51eca5dab
16 changed files with 538 additions and 470 deletions

View File

@@ -26,7 +26,7 @@ Flux is a lightweight self-hosted micro-PaaS for hosting Golang web apps with ea
## Dependencies
- [Go](https://golang.org/dl/)
- [ZQDGR](https://github.com/juls0730/zqdgr)
- [ZQDGR](https://github.com/juls0730/zqdgr) (development only)
- [Buildpacks](https://buildpacks.io/) (daemon only)
- [Docker](https://docs.docker.com/get-docker/) (daemon only)

View File

@@ -3,11 +3,13 @@ package commands
import (
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
type CommandCtx struct {
Config pkg.CLIConfig
Info API.Info
Logger *zap.SugaredLogger
Info *API.Info
Interactive bool
}

View File

@@ -49,7 +49,7 @@ func deleteAll(ctx CommandCtx, noConfirm *bool) error {
}
}
util.DeleteRequest(ctx.Config.DaemonURL + "/deployments")
util.DeleteRequest(ctx.Config.DaemonURL+"/deployments", ctx.Logger)
fmt.Printf("Successfully deleted all projects\n")
return nil
@@ -80,7 +80,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error {
return deleteAll(ctx, noConfirm)
}
project, err := util.GetProject("delete", args, ctx.Config)
project, err := util.GetProject("delete", args, ctx.Config, ctx.Logger)
if err != nil {
return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete -help for more information", err)
}
@@ -101,7 +101,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error {
}
}
err = util.DeleteRequest(ctx.Config.DaemonURL + "/app/" + project.Id)
err = util.DeleteRequest(ctx.Config.DaemonURL+"/app/"+project.Id, ctx.Logger)
if err != nil {
return fmt.Errorf("failed to delete project: %v", err)
}

View File

@@ -8,7 +8,7 @@ import (
)
func ListCommand(ctx CommandCtx, args []string) error {
apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL + "/apps")
apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL+"/apps", ctx.Logger)
if err != nil {
return fmt.Errorf("failed to get apps: %v", err)
}

View File

@@ -7,14 +7,14 @@ import (
)
func StartCommand(ctx CommandCtx, args []string) error {
projectName, err := util.GetProject("start", args, ctx.Config)
projectName, err := util.GetProject("start", args, ctx.Config, ctx.Logger)
if err != nil {
return err
}
// Put request to start the project, since the start endpoint is idempotent.
// If the project is already running, this will return a 304 Not Modified
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil)
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil, ctx.Logger)
if err != nil {
return fmt.Errorf("failed to start %s: %v", projectName.Name, err)
}

View File

@@ -7,12 +7,12 @@ import (
)
func StopCommand(ctx CommandCtx, args []string) error {
projectName, err := util.GetProject("stop", args, ctx.Config)
projectName, err := util.GetProject("stop", args, ctx.Config, ctx.Logger)
if err != nil {
return err
}
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil)
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil, ctx.Logger)
if err != nil {
return fmt.Errorf("failed to stop %s: %v", projectName.Name, err)
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/agnivade/levenshtein"
@@ -15,6 +16,8 @@ import (
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"github.com/mattn/go-isatty"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func isInteractive() bool {
@@ -44,6 +47,7 @@ var maxDistance = 3
type Command struct {
Help string
DaemonConnected bool
HandlerFunc commands.CommandFunc
}
@@ -59,9 +63,10 @@ func NewCommandHandler() CommandHandler {
}
}
func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, help string) {
func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, daemonConnected bool, help string) {
coomand := Command{
Help: help,
DaemonConnected: daemonConnected,
HandlerFunc: handler,
}
@@ -108,73 +113,80 @@ func (h *CommandHandler) GetHelpCmd(commands.CommandCtx, []string) error {
return nil
}
func runCommand(command string, args []string, config pkg.CLIConfig, info API.Info, cmdHandler CommandHandler) error {
func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler CommandHandler, logger *zap.SugaredLogger) error {
commandStruct, ok := cmdHandler.commands[command]
if !ok {
panic("runCommand was passed an invalid command name")
}
var info *API.Info = nil
if commandStruct.DaemonConnected {
info, err := util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger)
if err != nil {
fmt.Printf("Failed to connect to daemon\n")
os.Exit(1)
}
if info.Version != version {
fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version)
os.Exit(1)
}
}
commandCtx := commands.CommandCtx{
Config: config,
Info: info,
Logger: logger,
Interactive: isInteractive(),
}
commandStruct, ok := cmdHandler.commands[command]
if ok {
return commandStruct.HandlerFunc(commandCtx, args)
}
// diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant
var closestMatch struct {
name string
score int
}
for cmdName := range cmdHandler.commands {
distance := levenshtein.ComputeDistance(cmdName, command)
if distance <= maxDistance {
if closestMatch.name == "" || distance < closestMatch.score {
closestMatch.name = cmdName
closestMatch.score = distance
}
}
}
if closestMatch.name == "" {
return fmt.Errorf("unknown command: %s", command)
}
var response string
// new line ommitted because it will be produced when the user presses enter to submit their response
fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name)
fmt.Scanln(&response)
if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" {
command = closestMatch.name
} else {
return nil
}
// re-run command after accepting the suggestion
return runCommand(command, args, config, info, cmdHandler)
}
func main() {
if !isInteractive() {
fmt.Printf("Flux is being run non-interactively\n")
}
zapConfig := zap.NewDevelopmentConfig()
verbosity := 0
debug, err := strconv.ParseBool(os.Getenv("DEBUG"))
if err != nil {
debug = false
}
if debug {
zapConfig = zap.NewDevelopmentConfig()
verbosity = -1
}
zapConfig.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity))
lameLogger, err := zapConfig.Build()
if err != nil {
fmt.Printf("Failed to create logger: %v\n", err)
os.Exit(1)
}
logger := lameLogger.Sugar()
cmdHandler := NewCommandHandler()
cmdHandler.RegisterCmd("init", commands.InitCommand, "Initialize a new project")
cmdHandler.RegisterCmd("deploy", commands.DeployCommand, "Deploy a new version of the app")
cmdHandler.RegisterCmd("start", commands.StartCommand, "Start the app")
cmdHandler.RegisterCmd("stop", commands.StopCommand, "Stop the app")
cmdHandler.RegisterCmd("list", commands.ListCommand, "List all the apps")
cmdHandler.RegisterCmd("delete", commands.DeleteCommand, "Delete the app")
cmdHandler.RegisterCmd("init", commands.InitCommand, false, "Initialize a new project")
cmdHandler.RegisterCmd("deploy", commands.DeployCommand, true, "Deploy a new version of the app")
cmdHandler.RegisterCmd("start", commands.StartCommand, true, "Start the app")
cmdHandler.RegisterCmd("stop", commands.StopCommand, true, "Stop the app")
cmdHandler.RegisterCmd("list", commands.ListCommand, true, "List all the apps")
cmdHandler.RegisterCmd("delete", commands.DeleteCommand, true, "Delete the app")
fs := flag.NewFlagSet("flux", flag.ExitOnError)
fs.Usage = func() {
cmdHandler.GetHelp()
}
err := fs.Parse(os.Args[1:])
err = fs.Parse(os.Args[1:])
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -214,18 +226,42 @@ func main() {
os.Exit(1)
}
info, err := util.GetRequest[API.Info](config.DaemonURL + "/heartbeat")
if err != nil {
fmt.Printf("Failed to connect to daemon\n")
command := os.Args[1]
if _, ok := cmdHandler.commands[command]; !ok {
var closestMatch struct {
name string
score int
}
for cmdName := range cmdHandler.commands {
distance := levenshtein.ComputeDistance(cmdName, command)
if distance <= maxDistance {
if closestMatch.name == "" || distance < closestMatch.score {
closestMatch.name = cmdName
closestMatch.score = distance
}
}
}
if closestMatch.name == "" {
fmt.Printf("unknown command: %s", command)
os.Exit(1)
}
if info.Version != version {
fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version)
os.Exit(1)
var response string
// new line ommitted because it will be produced when the user presses enter to submit their response
fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name)
fmt.Scanln(&response)
if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" {
command = closestMatch.name
} else {
os.Exit(0)
}
}
err = runCommand(os.Args[1], fs.Args()[1:], config, *info, cmdHandler)
err = runCommand(command, fs.Args()[1:], config, cmdHandler, logger)
if err != nil {
fmt.Printf("Error: %v\n", err)
os.Exit(1)

View File

@@ -19,6 +19,7 @@ func main() {
http.HandleFunc("GET /app/by-name/{name}", fluxServer.GetAppByName)
http.HandleFunc("GET /app/by-id/{id}", fluxServer.GetAppById)
// a PUT request is the proper type to use since these endpoints are idempotent
http.HandleFunc("PUT /app/{id}/start", fluxServer.StartApp)
http.HandleFunc("PUT /app/{id}/stop", fluxServer.StopApp)

View File

@@ -87,9 +87,11 @@ func (d *DockerClient) StartContainer(ctx context.Context, containerID DockerID)
return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{})
}
// blocks until the container returns a 200 status code
const CONTAINER_START_TIMEOUT = 30 * time.Second
// blocks until the container returns a 200 status code for a max of CONTAINER_START_TIMEOUT (30 seconds)
func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
ctx, cancel := context.WithTimeout(ctx, CONTAINER_START_TIMEOUT)
defer cancel()
for {

View File

@@ -1,389 +1,15 @@
package handlers
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/joho/godotenv"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "test/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
err := r.ParseMultipartForm(10 << 32) // 10 GiB
if err != nil {
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var deployRequest API.DeployRequest
projectConfig := new(pkg.ProjectConfig)
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
flux.logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
}
deployRequest.Config = *projectConfig
idStr := r.FormValue("id")
if idStr == "" {
id, err := uuid.NewRandom()
if err != nil {
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
return
}
deployRequest.Id = id
} else {
deployRequest.Id, err = uuid.Parse(idStr)
if err != nil {
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
return
}
// make sure the id exists in the database
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
}
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
if err != nil && err == util.ErrLocked {
// This will happen if the app is already being deployed
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
return
}
go func() {
<-ctx.Done()
deploymentLock.Unlock(deployRequest.Id)
}()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusMultiStatus)
eventChannel := make(chan API.DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
// make sure the connection doesnt close while there are SSE events being sent
defer wg.Wait()
wg.Add(1)
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
ev := API.DeploymentEvent{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}(w, flusher)
eventChannel <- API.DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "No code archive found",
}
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
}
return
}
if projectConfig.Name == "all" {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Reserved name 'all' is not allowed",
}
return
}
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
if err != nil {
flux.logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
}
return
}
if projectConfig.EnvFile != "" {
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
// prevent path traversal
realEnvPath, err := filepath.EvalSymlinks(envPath)
if err != nil {
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
}
return
}
if !strings.HasPrefix(realEnvPath, projectPath) {
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
}
return
}
envBytes, err := os.Open(realEnvPath)
if err != nil {
flux.logger.Errorw("Failed to open env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to open env file: %v", err),
}
return
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to parse env file: %v", err),
}
return
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
// pipe the output of the build process to the event channel
pipeGroup := sync.WaitGroup{}
streamPipe := func(pipe io.ReadCloser) {
pipeGroup.Add(1)
defer pipeGroup.Done()
defer pipe.Close()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- API.DeploymentEvent{
Stage: "cmd_output",
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
flux.logger.Errorw("Error reading pipe", zap.Error(err))
}
}
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
eventChannel <- API.DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
// redirect stdout and stderr to the event channel
reader, writer := io.Pipe()
prepareCmd := exec.Command("go", "generate")
prepareCmd.Dir = projectPath
prepareCmd.Stdout = writer
prepareCmd.Stderr = writer
err = prepareCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
writer.Close()
eventChannel <- API.DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
reader, writer = io.Pipe()
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
buildCmd.Dir = projectPath
buildCmd.Stdout = writer
buildCmd.Stderr = writer
err = buildCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
eventChannel <- API.DeploymentEvent{
Stage: "creating",
Message: "Creating app, this might take a while...",
}
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
} else {
eventChannel <- API.DeploymentEvent{
Stage: "upgrading",
Message: "Upgrading app, this might take a while...",
}
// we dont need to change `app` since this upgrade will use the same app and update it in place
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
}
if err != nil {
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
}
return
}
var extApp API.App
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
eventChannel <- API.DeploymentEvent{
Stage: "complete",
Message: extApp,
}
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
}
func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) {
var apps []API.App
for _, app := range flux.appManager.GetAllApps() {

385
internal/handlers/deploy.go Normal file
View File

@@ -0,0 +1,385 @@
package handlers
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/joho/godotenv"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "test/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
err := r.ParseMultipartForm(10 << 32) // 10 GiB
if err != nil {
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var deployRequest API.DeployRequest
projectConfig := new(pkg.ProjectConfig)
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
flux.logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
}
deployRequest.Config = *projectConfig
idStr := r.FormValue("id")
if idStr == "" {
id, err := uuid.NewRandom()
if err != nil {
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
return
}
deployRequest.Id = id
} else {
deployRequest.Id, err = uuid.Parse(idStr)
if err != nil {
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
return
}
// make sure the id exists in the database
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
}
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
if err != nil && err == util.ErrLocked {
// This will happen if the app is already being deployed
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
return
}
go func() {
<-ctx.Done()
deploymentLock.Unlock(deployRequest.Id)
}()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Send 200 to indicate a successful SSE connection to the client
w.WriteHeader(http.StatusOK)
eventChannel := make(chan API.DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
// make sure the connection doesnt close while there are SSE events being sent
defer wg.Wait()
wg.Add(1)
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
ev := API.DeploymentEvent{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}(w, flusher)
eventChannel <- API.DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "No code archive found",
}
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
}
return
}
if projectConfig.Name == "all" {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Reserved name 'all' is not allowed",
}
return
}
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
if err != nil {
flux.logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
}
return
}
if projectConfig.EnvFile != "" {
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
// prevent path traversal
realEnvPath, err := filepath.EvalSymlinks(envPath)
if err != nil {
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
}
return
}
if !strings.HasPrefix(realEnvPath, projectPath) {
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
}
return
}
envBytes, err := os.Open(realEnvPath)
if err != nil {
flux.logger.Errorw("Failed to open env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to open env file: %v", err),
}
return
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to parse env file: %v", err),
}
return
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
// pipe the output of the build process to the event channel
pipeGroup := sync.WaitGroup{}
streamPipe := func(pipe io.ReadCloser) {
pipeGroup.Add(1)
defer pipeGroup.Done()
defer pipe.Close()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- API.DeploymentEvent{
Stage: "cmd_output",
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
flux.logger.Errorw("Error reading pipe", zap.Error(err))
}
}
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
eventChannel <- API.DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
// redirect stdout and stderr to the event channel
reader, writer := io.Pipe()
prepareCmd := exec.Command("go", "generate")
prepareCmd.Dir = projectPath
prepareCmd.Stdout = writer
prepareCmd.Stderr = writer
err = prepareCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
writer.Close()
eventChannel <- API.DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
reader, writer = io.Pipe()
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
buildCmd.Dir = projectPath
buildCmd.Stdout = writer
buildCmd.Stderr = writer
err = buildCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
eventChannel <- API.DeploymentEvent{
Stage: "creating",
Message: "Creating app, this might take a while...",
}
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
} else {
eventChannel <- API.DeploymentEvent{
Stage: "upgrading",
Message: "Upgrading app, this might take a while...",
}
// we dont need to change `app` since this upgrade will use the same app and update it in place
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
}
if err != nil {
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
}
return
}
var extApp API.App
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
eventChannel <- API.DeploymentEvent{
Stage: "complete",
Message: extApp,
}
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
}

View File

@@ -190,6 +190,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
return err
}
logger.Debugw("Waiting for container to start", zap.String("container_id", string(newHeadContainer.ContainerID)))
if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil {
logger.Errorw("Failed to wait for container", zap.Error(err))
return err
@@ -220,7 +221,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
if ok {
go oldProxy.GracefulShutdown(func() {
go oldProxy.GracefulShutdown(logger, func() {
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to stop container", zap.Error(err))

View File

@@ -41,6 +41,7 @@ func (proxyManager *ProxyManager) ListenAndServe(host string) error {
// Stops forwarding traffic to a deployment
func (proxyManager *ProxyManager) RemoveDeployment(host string) {
proxyManager.logger.Debugw("Removing proxy", zap.String("host", host))
proxyManager.Delete(host)
}
@@ -67,22 +68,17 @@ func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Reque
type Proxy struct {
forwardingFor url.URL
proxyFunc *httputil.ReverseProxy
gracePeriod time.Duration
shutdownTimeout time.Duration
activeRequests int64
}
// type DeploymentProxy struct {
// deployment *models.Deployment
// proxy *httputil.ReverseProxy
// gracePeriod time.Duration
// activeRequests int64
// }
const PROXY_SHUTDOWN_TIMEOUT = 30 * time.Second
// Creates a proxy for a given deployment
func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) {
proxy := &Proxy{
forwardingFor: forwardingFor,
gracePeriod: time.Second * 30,
shutdownTimeout: PROXY_SHUTDOWN_TIMEOUT,
activeRequests: 0,
}
@@ -111,17 +107,22 @@ func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) {
}
// Drains connections from a proxy
func (p *Proxy) GracefulShutdown(shutdownFunc func()) {
ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod)
func (p *Proxy) GracefulShutdown(logger *zap.SugaredLogger, shutdownFunc func()) {
logger.Debugw("Shutting down proxy", zap.String("host", p.forwardingFor.Host))
ctx, cancel := context.WithTimeout(context.Background(), p.shutdownTimeout)
defer cancel()
done := false
for !done {
select {
case <-ctx.Done():
logger.Debugw("Proxy shutdown timed out", zap.String("host", p.forwardingFor.Host))
done = true
default:
if atomic.LoadInt64(&p.activeRequests) == 0 {
logger.Debugw("Proxy shutdown completed successfully", zap.String("host", p.forwardingFor.Host))
done = true
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
type Project struct {
@@ -14,7 +15,7 @@ type Project struct {
Name string `json:"name"`
}
func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, error) {
func GetProject(command string, args []string, config pkg.CLIConfig, logger *zap.SugaredLogger) (*Project, error) {
var projectName string
// we are in a project directory and the project is deployed
@@ -24,7 +25,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project,
return nil, fmt.Errorf("failed to read .fluxid: %v", err)
}
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-id/" + string(id))
app, err := GetRequest[API.App](config.DaemonURL+"/app/by-id/"+string(id), logger)
if err != nil {
return nil, fmt.Errorf("failed to get app: %v", err)
}
@@ -58,7 +59,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project,
}
// we are calling flux with a project name (ie `flux start my-project`)
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-name/" + projectName)
app, err := GetRequest[API.App](config.DaemonURL+"/app/by-name/"+projectName, logger)
if err != nil {
return nil, fmt.Errorf("failed to get app: %v", err)
}

View File

@@ -6,17 +6,26 @@ import (
"io"
"net/http"
"strings"
"go.uber.org/zap"
)
func isOk(statusCode int) bool {
return statusCode >= 200 && statusCode < 300
}
// make a function that makes an http GET request to the daemon and returns data of type T
func GetRequest[T any](url string) (*T, error) {
func GetRequest[T any](url string, logger *zap.SugaredLogger) (*T, error) {
resp, err := http.Get(url)
if err != nil {
logger.Debugw("GET request failed", zap.String("url", url), zap.Error(err))
return nil, fmt.Errorf("http get request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Debugw("GET request", zap.String("url", url), resp)
if !isOk(resp.StatusCode) {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
@@ -35,7 +44,7 @@ func GetRequest[T any](url string) (*T, error) {
return &data, nil
}
func DeleteRequest(url string) error {
func DeleteRequest(url string, logger *zap.SugaredLogger) error {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to delete: %v", err)
@@ -46,7 +55,9 @@ func DeleteRequest(url string) error {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Debugw("DELETE request", zap.String("url", url), resp)
if !isOk(resp.StatusCode) {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)
@@ -60,7 +71,7 @@ func DeleteRequest(url string) error {
return nil
}
func PutRequest(url string, data io.Reader) error {
func PutRequest(url string, data io.Reader, logger *zap.SugaredLogger) error {
req, err := http.NewRequest("PUT", url, data)
if err != nil {
return fmt.Errorf("failed to put: %v", err)
@@ -71,7 +82,9 @@ func PutRequest(url string, data io.Reader) error {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Debugw("PUT request", zap.String("url", url), resp)
if !isOk(resp.StatusCode) {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)

View File

@@ -1,3 +1,3 @@
package pkg
const Version = "2025.05.06-16"
const Version = "2025.05.08-14"