3 Commits

Author SHA1 Message Date
Zoe
fb2588cc3a use the proxy as a package 2025-05-15 18:33:02 +00:00
Zoe
c51eca5dab Expand logging, and daemonless command support.
This adds more logging in certain places, and adds logging to the CLI.
It also allows for certain commands in the CLI to be used without a
daemon connection, namely `init`, which previously required the daemon
to be connected, but now does not since it doesnt need it.
2025-05-08 09:53:41 -05:00
Zoe
5bb696052a add tests, fix bugs, and make cli usable without interactivity 2025-05-06 11:04:26 -05:00
26 changed files with 795 additions and 638 deletions

View File

@@ -26,7 +26,7 @@ Flux is a lightweight self-hosted micro-PaaS for hosting Golang web apps with ea
## Dependencies
- [Go](https://golang.org/dl/)
- [ZQDGR](https://github.com/juls0730/zqdgr)
- [ZQDGR](https://github.com/juls0730/zqdgr) (development only)
- [Buildpacks](https://buildpacks.io/) (daemon only)
- [Docker](https://docs.docker.com/get-docker/) (daemon only)

View File

@@ -3,11 +3,13 @@ package commands
import (
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
type CommandCtx struct {
Config pkg.CLIConfig
Info API.Info
Logger *zap.SugaredLogger
Info *API.Info
Interactive bool
}

View File

@@ -49,7 +49,7 @@ func deleteAll(ctx CommandCtx, noConfirm *bool) error {
}
}
util.DeleteRequest(ctx.Config.DaemonURL + "/deployments")
util.DeleteRequest(ctx.Config.DaemonURL+"/deployments", ctx.Logger)
fmt.Printf("Successfully deleted all projects\n")
return nil
@@ -80,9 +80,9 @@ func DeleteCommand(ctx CommandCtx, args []string) error {
return deleteAll(ctx, noConfirm)
}
project, err := util.GetProject("delete", args, ctx.Config)
project, err := util.GetProject("delete", args, ctx.Config, ctx.Logger)
if err != nil {
return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete --help for more information", err)
return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete -help for more information", err)
}
// ask for confirmation if not --no-confirm
@@ -101,7 +101,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error {
}
}
err = util.DeleteRequest(ctx.Config.DaemonURL + "/app/" + project.Id)
err = util.DeleteRequest(ctx.Config.DaemonURL+"/app/"+project.Id, ctx.Logger)
if err != nil {
return fmt.Errorf("failed to delete project: %v", err)
}

View File

@@ -6,6 +6,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"flag"
"fmt"
"io"
"mime/multipart"
@@ -175,11 +176,38 @@ func preprocessEnvFile(envFile string, target *[]string) error {
return nil
}
var deployUsage = `Usage:
flux deploy [flags]
Flags:
-help, -h: Show this help message
%s
Flux will deploy or redeploy the app in the current directory.
`
func DeployCommand(ctx CommandCtx, args []string) error {
if _, err := os.Stat("flux.json"); err != nil {
return fmt.Errorf("no flux.json found, please run flux init first")
}
fs := flag.NewFlagSet("deploy", flag.ExitOnError)
fs.Usage = func() {
var buf bytes.Buffer
// Redirect flagset to print to buffer instead of stdout
fs.SetOutput(&buf)
fs.PrintDefaults()
fmt.Println(deployUsage, strings.TrimRight(buf.String(), "\n"))
}
quiet := fs.Bool("q", false, "Don't print the deployment logs")
err := fs.Parse(args)
if err != nil {
return err
}
spinnerWriter := util.NewCustomSpinnerWriter()
loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(spinnerWriter))
@@ -353,7 +381,10 @@ func DeployCommand(ctx CommandCtx, args []string) error {
}
return nil
case "cmd_output":
// suppress the command output if the quiet flag is set
if quiet == nil || !*quiet {
customWriter.Printf("... %s\n", data.Message)
}
case "error":
loadingSpinner.Stop()
return fmt.Errorf("deployment failed: %s", data.Message)

View File

@@ -13,17 +13,20 @@ import (
)
var initUsage = `Usage:
flux init [project-name]
flux init [flags] [project-name]
Options:
project-name: The name of the project to initialize
Flux will initialize a new project in the current directory or the specified project.`
Flags:
-help, -h: Show this help message
%s
Flux will initialize a new project in the current directory or the specified project.
`
func InitCommand(ctx CommandCtx, args []string) error {
if !ctx.Interactive {
return fmt.Errorf("init command can only be run in interactive mode")
}
var projectConfig pkg.ProjectConfig
fs := flag.NewFlagSet("init", flag.ExitOnError)
fs.Usage = func() {
@@ -32,8 +35,10 @@ func InitCommand(ctx CommandCtx, args []string) error {
fs.SetOutput(&buf)
fs.PrintDefaults()
fmt.Println(initUsage)
fmt.Printf(initUsage, strings.TrimRight(buf.String(), "\n"))
}
hostUrl := fs.String("host-url", "", "The URL of the host")
projectPort := fs.Uint("project-port", 0, "The port of the host")
err := fs.Parse(args)
if err != nil {
@@ -43,10 +48,22 @@ func InitCommand(ctx CommandCtx, args []string) error {
args = fs.Args()
var projectConfig pkg.ProjectConfig
if !ctx.Interactive {
if hostUrl == nil || *hostUrl == "" {
return fmt.Errorf("host-url is required when not in interactive mode")
}
if projectPort == nil || *projectPort == 0 {
return fmt.Errorf("project-port is required when not in interactive mode")
}
if len(args) < 1 {
return fmt.Errorf("project-name is required when not in interactive mode")
}
}
var response string
if len(args) > 1 {
if len(args) > 0 {
response = args[0]
} else {
fmt.Println("What is the name of your project?")
@@ -55,6 +72,16 @@ func InitCommand(ctx CommandCtx, args []string) error {
projectConfig.Name = response
if hostUrl != nil && *hostUrl != "" {
if strings.HasPrefix(*hostUrl, "http") {
*hostUrl = strings.TrimPrefix(*hostUrl, "http://")
*hostUrl = strings.TrimPrefix(*hostUrl, "https://")
}
*hostUrl = strings.Split(*hostUrl, "/")[0]
projectConfig.Url = *hostUrl
} else {
fmt.Println("What URL should your project listen to?")
fmt.Scanln(&response)
if strings.HasPrefix(response, "http") {
@@ -65,7 +92,15 @@ func InitCommand(ctx CommandCtx, args []string) error {
response = strings.Split(response, "/")[0]
projectConfig.Url = response
}
if projectPort != nil && *projectPort != 0 {
if *projectPort < 1024 || *projectPort > 65535 {
return fmt.Errorf("project-port must be between 1024 and 65535")
}
projectConfig.Port = uint16(*projectPort)
} else {
fmt.Println("What port does your project listen to?")
fmt.Scanln(&response)
port, err := strconv.ParseUint(response, 10, 16)
@@ -78,6 +113,7 @@ func InitCommand(ctx CommandCtx, args []string) error {
if err != nil || projectConfig.Port < 1024 {
return portErr
}
}
configBytes, err := json.MarshalIndent(projectConfig, "", " ")
if err != nil {

View File

@@ -8,7 +8,7 @@ import (
)
func ListCommand(ctx CommandCtx, args []string) error {
apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL + "/apps")
apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL+"/apps", ctx.Logger)
if err != nil {
return fmt.Errorf("failed to get apps: %v", err)
}

View File

@@ -7,14 +7,14 @@ import (
)
func StartCommand(ctx CommandCtx, args []string) error {
projectName, err := util.GetProject("start", args, ctx.Config)
projectName, err := util.GetProject("start", args, ctx.Config, ctx.Logger)
if err != nil {
return err
}
// Put request to start the project, since the start endpoint is idempotent.
// If the project is already running, this will return a 304 Not Modified
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil)
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil, ctx.Logger)
if err != nil {
return fmt.Errorf("failed to start %s: %v", projectName.Name, err)
}

View File

@@ -7,12 +7,12 @@ import (
)
func StopCommand(ctx CommandCtx, args []string) error {
projectName, err := util.GetProject("stop", args, ctx.Config)
projectName, err := util.GetProject("stop", args, ctx.Config, ctx.Logger)
if err != nil {
return err
}
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil)
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil, ctx.Logger)
if err != nil {
return fmt.Errorf("failed to stop %s: %v", projectName.Name, err)
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/agnivade/levenshtein"
@@ -15,6 +16,8 @@ import (
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"github.com/mattn/go-isatty"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func isInteractive() bool {
@@ -35,15 +38,16 @@ Available Commands:
%s
Available Flags:
--help, -h: Show this help message
-help, -h: Show this help message
Use "flux <command> --help" for more information about a command.
Use "flux <command> -help" for more information about a command.
`
var maxDistance = 3
type Command struct {
Help string
DaemonConnected bool
HandlerFunc commands.CommandFunc
}
@@ -59,9 +63,10 @@ func NewCommandHandler() CommandHandler {
}
}
func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, help string) {
func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, daemonConnected bool, help string) {
coomand := Command{
Help: help,
DaemonConnected: daemonConnected,
HandlerFunc: handler,
}
@@ -108,51 +113,36 @@ func (h *CommandHandler) GetHelpCmd(commands.CommandCtx, []string) error {
return nil
}
func runCommand(command string, args []string, config pkg.CLIConfig, info API.Info, cmdHandler CommandHandler) error {
func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler CommandHandler, logger *zap.SugaredLogger) error {
commandStruct, ok := cmdHandler.commands[command]
if !ok {
panic("runCommand was passed an invalid command name")
}
var info *API.Info = nil
if commandStruct.DaemonConnected {
var err error
info, err = util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger)
if err != nil {
fmt.Printf("Failed to connect to daemon\n")
os.Exit(1)
}
if info.Version != version {
fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version)
os.Exit(1)
}
}
commandCtx := commands.CommandCtx{
Config: config,
Info: info,
Logger: logger,
Interactive: isInteractive(),
}
commandStruct, ok := cmdHandler.commands[command]
if ok {
return commandStruct.HandlerFunc(commandCtx, args)
}
// diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant
var closestMatch struct {
name string
score int
}
for cmdName := range cmdHandler.commands {
distance := levenshtein.ComputeDistance(cmdName, command)
if distance <= maxDistance {
if closestMatch.name == "" || distance < closestMatch.score {
closestMatch.name = cmdName
closestMatch.score = distance
}
}
}
if closestMatch.name == "" {
return fmt.Errorf("unknown command: %s", command)
}
var response string
// new line ommitted because it will be produced when the user presses enter to submit their response
fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name)
fmt.Scanln(&response)
if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" {
command = closestMatch.name
} else {
return nil
}
// re-run command after accepting the suggestion
return runCommand(command, args, config, info, cmdHandler)
}
func main() {
@@ -160,21 +150,44 @@ func main() {
fmt.Printf("Flux is being run non-interactively\n")
}
zapConfig := zap.NewDevelopmentConfig()
verbosity := 0
debug, err := strconv.ParseBool(os.Getenv("DEBUG"))
if err != nil {
debug = false
}
if debug {
zapConfig = zap.NewDevelopmentConfig()
verbosity = -1
}
zapConfig.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity))
lameLogger, err := zapConfig.Build()
if err != nil {
fmt.Printf("Failed to create logger: %v\n", err)
os.Exit(1)
}
logger := lameLogger.Sugar()
cmdHandler := NewCommandHandler()
cmdHandler.RegisterCmd("init", commands.InitCommand, "Initialize a new project")
cmdHandler.RegisterCmd("deploy", commands.DeployCommand, "Deploy a new version of the app")
cmdHandler.RegisterCmd("start", commands.StartCommand, "Start the app")
cmdHandler.RegisterCmd("stop", commands.StopCommand, "Stop the app")
cmdHandler.RegisterCmd("list", commands.ListCommand, "List all the apps")
cmdHandler.RegisterCmd("delete", commands.DeleteCommand, "Delete the app")
cmdHandler.RegisterCmd("init", commands.InitCommand, false, "Initialize a new project")
cmdHandler.RegisterCmd("deploy", commands.DeployCommand, true, "Deploy a new version of the app")
cmdHandler.RegisterCmd("start", commands.StartCommand, true, "Start the app")
cmdHandler.RegisterCmd("stop", commands.StopCommand, true, "Stop the app")
cmdHandler.RegisterCmd("list", commands.ListCommand, true, "List all the apps")
cmdHandler.RegisterCmd("delete", commands.DeleteCommand, true, "Delete the app")
fs := flag.NewFlagSet("flux", flag.ExitOnError)
fs.Usage = func() {
cmdHandler.GetHelp()
}
err := fs.Parse(os.Args[1:])
err = fs.Parse(os.Args[1:])
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -214,18 +227,42 @@ func main() {
os.Exit(1)
}
info, err := util.GetRequest[API.Info](config.DaemonURL + "/heartbeat")
if err != nil {
fmt.Printf("Failed to connect to daemon\n")
command := os.Args[1]
if _, ok := cmdHandler.commands[command]; !ok {
var closestMatch struct {
name string
score int
}
for cmdName := range cmdHandler.commands {
distance := levenshtein.ComputeDistance(cmdName, command)
if distance <= maxDistance {
if closestMatch.name == "" || distance < closestMatch.score {
closestMatch.name = cmdName
closestMatch.score = distance
}
}
}
if closestMatch.name == "" {
fmt.Printf("unknown command: %s", command)
os.Exit(1)
}
if info.Version != version {
fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version)
os.Exit(1)
var response string
// new line ommitted because it will be produced when the user presses enter to submit their response
fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name)
fmt.Scanln(&response)
if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" {
command = closestMatch.name
} else {
os.Exit(0)
}
}
err = runCommand(os.Args[1], fs.Args()[1:], config, *info, cmdHandler)
err = runCommand(command, fs.Args()[1:], config, cmdHandler, logger)
if err != nil {
fmt.Printf("Error: %v\n", err)
os.Exit(1)

View File

@@ -19,6 +19,7 @@ func main() {
http.HandleFunc("GET /app/by-name/{name}", fluxServer.GetAppByName)
http.HandleFunc("GET /app/by-id/{id}", fluxServer.GetAppById)
// a PUT request is the proper type to use since these endpoints are idempotent
http.HandleFunc("PUT /app/{id}/start", fluxServer.StartApp)
http.HandleFunc("PUT /app/{id}/stop", fluxServer.StopApp)

8
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/juls0730/flux
go 1.23.3
go 1.24.2
require (
github.com/briandowns/spinner v1.23.1
@@ -10,7 +10,11 @@ require (
github.com/mattn/go-sqlite3 v1.14.24
)
require go.uber.org/multierr v1.10.0 // indirect
require (
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sync v0.14.0 // indirect
)
require (
github.com/Microsoft/go-winio v0.4.14 // indirect

4
go.sum
View File

@@ -43,6 +43,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd h1:JNazPdlAs307Gtaqmb+wfCjcOzO3MRXxg9nf0bAKAnc=
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd/go.mod h1:CnRvcleiS2kvK1N2PeQmeoRP5EXpBDpHPkg72vAUaSg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -113,6 +115,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@@ -87,9 +87,11 @@ func (d *DockerClient) StartContainer(ctx context.Context, containerID DockerID)
return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{})
}
// blocks until the container returns a 200 status code
const CONTAINER_START_TIMEOUT = 30 * time.Second
// blocks until the container returns a 200 status code for a max of CONTAINER_START_TIMEOUT (30 seconds)
func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
ctx, cancel := context.WithTimeout(ctx, CONTAINER_START_TIMEOUT)
defer cancel()
for {
@@ -152,7 +154,7 @@ func (d *DockerClient) GetContainerStatus(containerID DockerID) (*ContainerStatu
}
func (d *DockerClient) StopContainer(ctx context.Context, containerID DockerID) error {
d.logger.Debugw("Stopping container", zap.String("container_id", string(containerID[:12])))
d.logger.Debugw("Stopping container", zap.String("container_id", string(containerID)))
return d.client.ContainerStop(ctx, string(containerID), container.StopOptions{})
}

View File

@@ -1,389 +1,16 @@
package handlers
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/joho/godotenv"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"github.com/juls0730/sentinel"
"go.uber.org/zap"
)
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "test/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
err := r.ParseMultipartForm(10 << 32) // 10 GiB
if err != nil {
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var deployRequest API.DeployRequest
projectConfig := new(pkg.ProjectConfig)
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
flux.logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
}
deployRequest.Config = *projectConfig
idStr := r.FormValue("id")
if idStr == "" {
id, err := uuid.NewRandom()
if err != nil {
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
return
}
deployRequest.Id = id
} else {
deployRequest.Id, err = uuid.Parse(idStr)
if err != nil {
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
return
}
// make sure the id exists in the database
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
}
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
if err != nil && err == util.ErrLocked {
// This will happen if the app is already being deployed
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
return
}
go func() {
<-ctx.Done()
deploymentLock.Unlock(deployRequest.Id)
}()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusMultiStatus)
eventChannel := make(chan API.DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
// make sure the connection doesnt close while there are SSE events being sent
defer wg.Wait()
wg.Add(1)
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
ev := API.DeploymentEvent{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}(w, flusher)
eventChannel <- API.DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "No code archive found",
}
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
}
return
}
if projectConfig.Name == "all" {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Reserved name 'all' is not allowed",
}
return
}
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
if err != nil {
flux.logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
}
return
}
if projectConfig.EnvFile != "" {
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
// prevent path traversal
realEnvPath, err := filepath.EvalSymlinks(envPath)
if err != nil {
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
}
return
}
if !strings.HasPrefix(realEnvPath, projectPath) {
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
}
return
}
envBytes, err := os.Open(realEnvPath)
if err != nil {
flux.logger.Errorw("Failed to open env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to open env file: %v", err),
}
return
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to parse env file: %v", err),
}
return
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
// pipe the output of the build process to the event channel
pipeGroup := sync.WaitGroup{}
streamPipe := func(pipe io.ReadCloser) {
pipeGroup.Add(1)
defer pipeGroup.Done()
defer pipe.Close()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- API.DeploymentEvent{
Stage: "cmd_output",
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
flux.logger.Errorw("Error reading pipe", zap.Error(err))
}
}
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
eventChannel <- API.DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
// redirect stdout and stderr to the event channel
reader, writer := io.Pipe()
prepareCmd := exec.Command("go", "generate")
prepareCmd.Dir = projectPath
prepareCmd.Stdout = writer
prepareCmd.Stderr = writer
err = prepareCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
writer.Close()
eventChannel <- API.DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
reader, writer = io.Pipe()
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
buildCmd.Dir = projectPath
buildCmd.Stdout = writer
buildCmd.Stderr = writer
err = buildCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
eventChannel <- API.DeploymentEvent{
Stage: "creating",
Message: "Creating app, this might take a while...",
}
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
} else {
eventChannel <- API.DeploymentEvent{
Stage: "upgrading",
Message: "Upgrading app, this might take a while...",
}
// we dont need to change `app` since this upgrade will use the same app and update it in place
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
}
if err != nil {
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
}
return
}
var extApp API.App
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
eventChannel <- API.DeploymentEvent{
Stage: "complete",
Message: extApp,
}
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
}
func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) {
var apps []API.App
for _, app := range flux.appManager.GetAllApps() {
@@ -502,7 +129,7 @@ func (flux *FluxServer) StartApp(w http.ResponseWriter, r *http.Request) {
return
}
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -557,6 +184,11 @@ func (flux *FluxServer) StopApp(w http.ResponseWriter, r *http.Request) {
}
func (flux *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
if flux.config.DisableDeleteAll {
http.Error(w, "Delete all deployments is disabled", http.StatusForbidden)
return
}
apps := flux.appManager.GetAllApps()
for _, app := range apps {
err := flux.appManager.DeleteApp(app.Id)
@@ -582,13 +214,23 @@ func (flux *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Reque
return
}
status, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
if err != nil {
flux.logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if status != "stopped" {
app.Deployment.Stop(r.Context(), flux.docker)
flux.proxy.RemoveDeployment(app.Deployment.URL)
}
err = flux.appManager.DeleteApp(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
flux.proxy.RemoveDeployment(app.Deployment.URL)
w.WriteHeader(http.StatusOK)
}

385
internal/handlers/deploy.go Normal file
View File

@@ -0,0 +1,385 @@
package handlers
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/joho/godotenv"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "test/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
err := r.ParseMultipartForm(10 << 32) // 10 GiB
if err != nil {
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var deployRequest API.DeployRequest
projectConfig := new(pkg.ProjectConfig)
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
flux.logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
}
deployRequest.Config = *projectConfig
idStr := r.FormValue("id")
if idStr == "" {
id, err := uuid.NewRandom()
if err != nil {
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
return
}
deployRequest.Id = id
} else {
deployRequest.Id, err = uuid.Parse(idStr)
if err != nil {
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
return
}
// make sure the id exists in the database
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
}
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
if err != nil && err == util.ErrLocked {
// This will happen if the app is already being deployed
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
return
}
go func() {
<-ctx.Done()
deploymentLock.Unlock(deployRequest.Id)
}()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Send 200 to indicate a successful SSE connection to the client
w.WriteHeader(http.StatusOK)
eventChannel := make(chan API.DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
// make sure the connection doesnt close while there are SSE events being sent
defer wg.Wait()
wg.Add(1)
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
ev := API.DeploymentEvent{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}(w, flusher)
eventChannel <- API.DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "No code archive found",
}
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
}
return
}
if projectConfig.Name == "all" {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Reserved name 'all' is not allowed",
}
return
}
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
if err != nil {
flux.logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
}
return
}
if projectConfig.EnvFile != "" {
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
// prevent path traversal
realEnvPath, err := filepath.EvalSymlinks(envPath)
if err != nil {
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
}
return
}
if !strings.HasPrefix(realEnvPath, projectPath) {
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
}
return
}
envBytes, err := os.Open(realEnvPath)
if err != nil {
flux.logger.Errorw("Failed to open env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to open env file: %v", err),
}
return
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to parse env file: %v", err),
}
return
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
// pipe the output of the build process to the event channel
pipeGroup := sync.WaitGroup{}
streamPipe := func(pipe io.ReadCloser) {
pipeGroup.Add(1)
defer pipeGroup.Done()
defer pipe.Close()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- API.DeploymentEvent{
Stage: "cmd_output",
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
flux.logger.Errorw("Error reading pipe", zap.Error(err))
}
}
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
eventChannel <- API.DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
// redirect stdout and stderr to the event channel
reader, writer := io.Pipe()
prepareCmd := exec.Command("go", "generate")
prepareCmd.Dir = projectPath
prepareCmd.Stdout = writer
prepareCmd.Stderr = writer
err = prepareCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
writer.Close()
eventChannel <- API.DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
reader, writer = io.Pipe()
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
buildCmd.Dir = projectPath
buildCmd.Stdout = writer
buildCmd.Stderr = writer
err = buildCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
eventChannel <- API.DeploymentEvent{
Stage: "creating",
Message: "Creating app, this might take a while...",
}
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
} else {
eventChannel <- API.DeploymentEvent{
Stage: "upgrading",
Message: "Upgrading app, this might take a while...",
}
// we dont need to change `app` since this upgrade will use the same app and update it in place
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
}
if err != nil {
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
}
return
}
var extApp API.App
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
eventChannel <- API.DeploymentEvent{
Stage: "complete",
Message: extApp,
}
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
}

View File

@@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"strconv"
"time"
_ "embed"
@@ -19,7 +20,7 @@ import (
"github.com/google/uuid"
"github.com/juls0730/flux/internal/docker"
"github.com/juls0730/flux/internal/services/appManagerService"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/sentinel"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
@@ -44,7 +45,7 @@ type FluxServer struct {
docker *docker.DockerClient
proxy *proxyManagerService.ProxyManager
proxy *sentinel.ProxyManager
appManager *appManagerService.AppManager
rootDir string
@@ -62,7 +63,12 @@ func NewServer() *FluxServer {
config := zap.NewProductionConfig()
if os.Getenv("DEBUG") == "true" {
debug, err := strconv.ParseBool(os.Getenv("DEBUG"))
if err != nil {
debug = false
}
if debug {
config = zap.NewDevelopmentConfig()
verbosity = -1
}
@@ -144,7 +150,8 @@ func NewServer() *FluxServer {
// blocking until the iamge is pulled
io.Copy(io.Discard, events)
flux.proxy = proxyManagerService.NewProxyManager(flux.logger)
requestLogger := &RequestLogger{logger: flux.logger}
flux.proxy = sentinel.NewProxyManager(requestLogger)
flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger)
err = flux.appManager.Init()
@@ -162,7 +169,12 @@ func (s *FluxServer) Stop() {
func (s *FluxServer) ListenAndServe() error {
s.logger.Infow("Starting server", zap.String("daemon_host", s.config.DaemonHost), zap.String("proxy_host", s.config.ProxyHost))
go s.proxy.ListenAndServe(s.config.ProxyHost)
go func() {
err := s.proxy.ListenAndServe(s.config.ProxyHost)
if err != nil {
s.logger.Errorw("Failed to start proxy", zap.Error(err))
}
}()
return http.ListenAndServe(s.config.DaemonHost, nil)
}
@@ -248,3 +260,11 @@ func (s *FluxServer) UploadAppCode(code io.Reader, appId uuid.UUID) (string, err
return outputPath, nil
}
type RequestLogger struct {
logger *zap.SugaredLogger
}
func (r *RequestLogger) LogRequest(host string, status int, latency time.Duration, ip, method, path string) {
r.logger.Infow("Request", zap.String("host", host), zap.Int("status", status), zap.Duration("latency", latency), zap.String("ip", ip), zap.String("method", method), zap.String("path", path))
}

View File

@@ -176,9 +176,9 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string,
// Updates Container in place
func (c *Container) Upgrade(ctx context.Context, imageName string, environment []string, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
// Create new container with new image
logger.Debugw("Upgrading container", zap.String("container_id", string(c.ContainerID[:12])))
logger.Debugw("Upgrading container", zap.String("container_id", string(c.ContainerID)))
if c.Volumes == nil {
return fmt.Errorf("no volumes found for container %s", c.ContainerID[:12])
return fmt.Errorf("no volumes found for container %s", c.ContainerID)
}
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
@@ -269,7 +269,7 @@ func (c *Container) Remove(ctx context.Context, dockerClient *docker.DockerClien
func (c *Container) Start(ctx context.Context, initial bool, db *sql.DB, dockerClient *docker.DockerClient, logger *zap.SugaredLogger) error {
logger.Debugf("Starting container %+v", c)
logger.Info("Starting container", zap.String("container_id", string(c.ContainerID)[:12]))
logger.Infow("Starting container", zap.String("container_id", string(c.ContainerID)))
if !initial && c.Head {
logger.Debug("Starting and repairing head container")
@@ -330,7 +330,7 @@ func (c *Container) Wait(ctx context.Context, port uint16, dockerClient *docker.
func (c *Container) GetIp(dockerClient *docker.DockerClient, logger *zap.SugaredLogger) (string, error) {
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
if err != nil {
logger.Errorw("Failed to inspect container", zap.Error(err), zap.String("container_id", string(c.ContainerID[:12])))
logger.Errorw("Failed to inspect container", zap.Error(err), zap.String("container_id", string(c.ContainerID)))
return "", err
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/juls0730/flux/internal/docker"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/sentinel"
"go.uber.org/zap"
)
@@ -77,7 +78,7 @@ func (d *Deployment) Start(ctx context.Context, dockerClient *docker.DockerClien
for _, container := range d.containers {
err := dockerClient.StartContainer(ctx, container.ContainerID)
if err != nil {
return fmt.Errorf("failed to start container (%s): %v", container.ContainerID[:12], err)
return fmt.Errorf("failed to start container (%s): %v", container.ContainerID, err)
}
}
@@ -91,7 +92,7 @@ func (d *Deployment) GetInternalUrl(dockerClient *docker.DockerClient) (*url.URL
}
if containerJSON.NetworkSettings.IPAddress == "" {
return nil, fmt.Errorf("no IP address found for container %s", d.Head().ContainerID[:12])
return nil, fmt.Errorf("no IP address found for container %s", d.Head().ContainerID)
}
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, d.Port))
@@ -106,7 +107,7 @@ func (d *Deployment) Stop(ctx context.Context, dockerClient *docker.DockerClient
for _, container := range d.containers {
err := dockerClient.StopContainer(ctx, container.ContainerID)
if err != nil {
return fmt.Errorf("failed to stop container (%s): %v", container.ContainerID[:12], err)
return fmt.Errorf("failed to stop container (%s): %v", container.ContainerID, err)
}
}
return nil
@@ -141,7 +142,7 @@ func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.D
// if the head is running, but the supplemental container is stopped, return "failed"
if headStatus.Status == "running" && containerStatus.Status != "running" {
logger.Debugw("Supplemental container is not running but head is, returning to failed state", zap.String("container_id", string(container.ContainerID[:12])))
logger.Debugw("Supplemental container is not running but head is, returning to failed state", zap.String("container_id", string(container.ContainerID)))
for _, supplementalContainer := range deployment.containers {
err := dockerClient.StopContainer(ctx, supplementalContainer.ContainerID)
if err != nil {
@@ -169,7 +170,7 @@ func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.D
}
// Takes an existing deployment, and gracefully upgrades the app to a new image
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
// copy the old head container since Upgrade updates the container in place
oldHeadContainer := *deployment.Head()
@@ -183,13 +184,14 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
db.Exec("DELETE FROM containers WHERE id = ?", oldHeadContainer.ID)
newHeadContainer := deployment.Head()
logger.Debugw("Starting container", zap.String("container_id", string(newHeadContainer.ContainerID[:12])))
logger.Debugw("Starting container", zap.String("container_id", string(newHeadContainer.ContainerID)))
err = newHeadContainer.Start(ctx, true, db, dockerClient, logger)
if err != nil {
logger.Errorw("Failed to start container", zap.Error(err))
return err
}
logger.Debugw("Waiting for container to start", zap.String("container_id", string(newHeadContainer.ContainerID)))
if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil {
logger.Errorw("Failed to wait for container", zap.Error(err))
return err
@@ -201,14 +203,18 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
}
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections
oldProxy, ok := proxyManager.Load(deployment.URL)
var oldProxy *sentinel.Proxy
var ok bool = false
if value, exists := proxyManager.Load(deployment.URL); exists {
oldProxy, ok = value.(*sentinel.Proxy)
}
newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient)
if err != nil {
logger.Errorw("Failed to get internal url", zap.Error(err))
return err
}
newProxy, err := proxyManagerService.NewDeploymentProxy(*newDeploymentInternalUrl)
newProxy, err := sentinel.NewDeploymentProxy(newDeploymentInternalUrl.String(), proxyManagerService.GetTransport)
if err != nil {
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
return err
@@ -221,13 +227,24 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
if ok {
go oldProxy.GracefulShutdown(func() {
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to stop container", zap.Error(err))
return
}
err = dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to remove container", zap.Error(err))
}
})
} else {
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to stop container", zap.Error(err))
}
err = dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to remove container", zap.Error(err))
}

View File

@@ -11,6 +11,7 @@ import (
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/sentinel"
"go.uber.org/zap"
)
@@ -18,12 +19,12 @@ type AppManager struct {
util.TypedMap[uuid.UUID, *models.App]
nameIndex util.TypedMap[string, uuid.UUID]
logger *zap.SugaredLogger
proxyManager *proxyManagerService.ProxyManager
proxyManager *sentinel.ProxyManager
dockerClient *docker.DockerClient
db *sql.DB
}
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, logger *zap.SugaredLogger) *AppManager {
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, logger *zap.SugaredLogger) *AppManager {
return &AppManager{
db: db,
dockerClient: dockerClient,
@@ -87,7 +88,7 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p
return nil, fmt.Errorf("failed to get internal url: %v", err)
}
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport)
if err != nil {
appManager.logger.Errorw("Failed to create deployment proxy", zap.Error(err))
return nil, fmt.Errorf("failed to create deployment proxy: %v", err)
@@ -293,7 +294,7 @@ func (am *AppManager) Init() error {
continue
}
proxy, err := proxyManagerService.NewDeploymentProxy(*proxyURL)
proxy, err := sentinel.NewDeploymentProxy(proxyURL.String(), proxyManagerService.GetTransport)
if err != nil {
am.logger.Errorw("Failed to create proxy", zap.Error(err))
continue

View File

@@ -1,133 +1,18 @@
package proxyManagerService
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"sync/atomic"
"time"
"github.com/juls0730/flux/internal/util"
"go.uber.org/zap"
)
type DeploymentId int64
// this is the object that oversees the proxying of requests to the correct deployment
type ProxyManager struct {
util.TypedMap[string, *Proxy]
logger *zap.SugaredLogger
}
func NewProxyManager(logger *zap.SugaredLogger) *ProxyManager {
return &ProxyManager{
logger: logger,
}
}
func (proxyManager *ProxyManager) ListenAndServe(host string) error {
if host == "" {
host = "0.0.0.0:7465"
}
proxyManager.logger.Infof("Proxy server starting on http://%s", host)
if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("failed to start proxy server: %v", err)
}
return nil
}
// Stops forwarding traffic to a deployment
func (proxyManager *ProxyManager) RemoveDeployment(host string) {
proxyManager.Delete(host)
}
// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called.
func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) {
proxyManager.logger.Debugw("Adding proxy", zap.String("host", host))
proxyManager.Store(host, proxy)
}
// This function is responsible for taking an http request and forwarding it to the correct deployment
func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host := r.Host
proxyManager.logger.Debugw("Proxying request", zap.String("host", host))
proxy, ok := proxyManager.Load(host)
if !ok {
http.Error(w, "Not found", http.StatusNotFound)
return
}
proxy.proxyFunc.ServeHTTP(w, r)
}
type Proxy struct {
forwardingFor url.URL
proxyFunc *httputil.ReverseProxy
gracePeriod time.Duration
activeRequests int64
}
// type DeploymentProxy struct {
// deployment *models.Deployment
// proxy *httputil.ReverseProxy
// gracePeriod time.Duration
// activeRequests int64
// }
// Creates a proxy for a given deployment
func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) {
proxy := &Proxy{
forwardingFor: forwardingFor,
gracePeriod: time.Second * 30,
activeRequests: 0,
}
proxy.proxyFunc = &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL = &url.URL{
Scheme: forwardingFor.Scheme,
Host: forwardingFor.Host,
Path: req.URL.Path,
}
req.Host = forwardingFor.Host
atomic.AddInt64(&proxy.activeRequests, 1)
func GetTransport(target string) *http.Transport {
return &http.Transport{
Proxy: func(r *http.Request) (*url.URL, error) {
return url.Parse(target)
},
Transport: &http.Transport{
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 100,
},
ModifyResponse: func(resp *http.Response) error {
atomic.AddInt64(&proxy.activeRequests, -1)
return nil
},
}
return proxy, nil
}
// Drains connections from a proxy
func (p *Proxy) GracefulShutdown(shutdownFunc func()) {
ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod)
defer cancel()
done := false
for !done {
select {
case <-ctx.Done():
done = true
default:
if atomic.LoadInt64(&p.activeRequests) == 0 {
done = true
}
time.Sleep(time.Second)
}
}
shutdownFunc()
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
type Project struct {
@@ -14,7 +15,7 @@ type Project struct {
Name string `json:"name"`
}
func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, error) {
func GetProject(command string, args []string, config pkg.CLIConfig, logger *zap.SugaredLogger) (*Project, error) {
var projectName string
// we are in a project directory and the project is deployed
@@ -24,7 +25,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project,
return nil, fmt.Errorf("failed to read .fluxid: %v", err)
}
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-id/" + string(id))
app, err := GetRequest[API.App](config.DaemonURL+"/app/by-id/"+string(id), logger)
if err != nil {
return nil, fmt.Errorf("failed to get app: %v", err)
}
@@ -58,7 +59,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project,
}
// we are calling flux with a project name (ie `flux start my-project`)
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-name/" + projectName)
app, err := GetRequest[API.App](config.DaemonURL+"/app/by-name/"+projectName, logger)
if err != nil {
return nil, fmt.Errorf("failed to get app: %v", err)
}

View File

@@ -6,17 +6,26 @@ import (
"io"
"net/http"
"strings"
"go.uber.org/zap"
)
func isOk(statusCode int) bool {
return statusCode >= 200 && statusCode < 300
}
// make a function that makes an http GET request to the daemon and returns data of type T
func GetRequest[T any](url string) (*T, error) {
func GetRequest[T any](url string, logger *zap.SugaredLogger) (*T, error) {
resp, err := http.Get(url)
if err != nil {
logger.Debugw("GET request failed", zap.String("url", url), zap.Error(err))
return nil, fmt.Errorf("http get request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Debugw("GET request", zap.String("url", url), resp)
if !isOk(resp.StatusCode) {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
@@ -35,7 +44,7 @@ func GetRequest[T any](url string) (*T, error) {
return &data, nil
}
func DeleteRequest(url string) error {
func DeleteRequest(url string, logger *zap.SugaredLogger) error {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to delete: %v", err)
@@ -46,7 +55,9 @@ func DeleteRequest(url string) error {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Debugw("DELETE request", zap.String("url", url), resp)
if !isOk(resp.StatusCode) {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)
@@ -60,7 +71,7 @@ func DeleteRequest(url string) error {
return nil
}
func PutRequest(url string, data io.Reader) error {
func PutRequest(url string, data io.Reader, logger *zap.SugaredLogger) error {
req, err := http.NewRequest("PUT", url, data)
if err != nil {
return fmt.Errorf("failed to put: %v", err)
@@ -71,7 +82,9 @@ func PutRequest(url string, data io.Reader) error {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Debugw("PUT request", zap.String("url", url), resp)
if !isOk(resp.StatusCode) {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)

View File

@@ -41,7 +41,7 @@ func (dt *MutexLock[T]) Unlock(id T) {
dt.mu.Lock()
defer dt.mu.Unlock()
// Remove the app from deployed tracking
// Remove the object from the map (functionally unlocking it)
if cancel, exists := dt.deployed[id]; exists {
// Cancel the context
cancel()

View File

@@ -1,3 +1,3 @@
package pkg
const Version = "2025.05.04-00"
const Version = "2025.05.15-18"

75
test.sh Executable file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env bash
# Basic test for flux
set -ex
Flux_Database_Dir=$(mktemp -d)c
zqdgr build:all
tmux new-session -d -s daemon
# start daemon
tmux send-keys -t daemon "export FLUXD_ROOT_DIR=$Flux_Database_Dir" C-m
tmux send-keys -t daemon "DEBUG=true zqdgr run:daemon" C-m
# test daemon with the cli
tmux split-window -h
export FLUX_CLI_PATH=$PWD/flux
tmux send-keys -t daemon:0.1 "cd \$(mktemp -d)" C-m
tmux send-keys -t daemon:0.1 "cat << EOF > test.sh
#!/usr/bin/env bash
set -xe
# wait for the daemon to initialize
sleep 2
go mod init testApp
$FLUX_CLI_PATH init --host-url testApp --project-port 8080 testApp
cat << ELOF > main.go
package main
import (
\"fmt\"
\"net/http\"
)
func main() {
http.HandleFunc(\"/\", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, \"Hello World\\n\")
})
http.ListenAndServe(\":8080\", nil)
}
ELOF
$FLUX_CLI_PATH deploy -q
curl -H \"Host: testApp\" localhost:7465
$FLUX_CLI_PATH stop
curl -H \"Host: testApp\" localhost:7465
$FLUX_CLI_PATH start
curl -H \"Host: testApp\" localhost:7465
sed -i 's/Hello World/Hello World 2/' main.go
$FLUX_CLI_PATH deploy -q
curl -H \"Host: testApp\" localhost:7465
$FLUX_CLI_PATH delete --no-confirm
EOF
"
tmux send-keys -t daemon:0.1 "chmod +x test.sh" C-m
tmux send-keys -t daemon:0.1 "./test.sh" C-m
tmux attach-session -d

View File

@@ -7,9 +7,10 @@
"scripts": {
"build:daemon": "go build -o fluxd cmd/daemon/main.go",
"build:cli": "go build -o flux cmd/cli/main.go",
"build:all": "go build -o fluxd cmd/daemon/main.go && go build -o flux cmd/cli/main.go",
"build:all": "zqdgr build:daemon && zqdgr build:cli",
"run:daemon": "go run cmd/daemon/main.go",
"run:cli": "go run cmd/flux/main.go"
"run:cli": "go run cmd/cli/main.go",
"test": "./test.sh"
},
"pattern": "**/*.go",
"excluded_dirs": []