Compare commits
3 Commits
refactor/d
...
trunk
| Author | SHA1 | Date | |
|---|---|---|---|
|
fb2588cc3a
|
|||
| c51eca5dab | |||
| 5bb696052a |
@@ -26,7 +26,7 @@ Flux is a lightweight self-hosted micro-PaaS for hosting Golang web apps with ea
|
|||||||
## Dependencies
|
## Dependencies
|
||||||
|
|
||||||
- [Go](https://golang.org/dl/)
|
- [Go](https://golang.org/dl/)
|
||||||
- [ZQDGR](https://github.com/juls0730/zqdgr)
|
- [ZQDGR](https://github.com/juls0730/zqdgr) (development only)
|
||||||
- [Buildpacks](https://buildpacks.io/) (daemon only)
|
- [Buildpacks](https://buildpacks.io/) (daemon only)
|
||||||
- [Docker](https://docs.docker.com/get-docker/) (daemon only)
|
- [Docker](https://docs.docker.com/get-docker/) (daemon only)
|
||||||
|
|
||||||
|
|||||||
@@ -3,11 +3,13 @@ package commands
|
|||||||
import (
|
import (
|
||||||
"github.com/juls0730/flux/pkg"
|
"github.com/juls0730/flux/pkg"
|
||||||
"github.com/juls0730/flux/pkg/API"
|
"github.com/juls0730/flux/pkg/API"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CommandCtx struct {
|
type CommandCtx struct {
|
||||||
Config pkg.CLIConfig
|
Config pkg.CLIConfig
|
||||||
Info API.Info
|
Logger *zap.SugaredLogger
|
||||||
|
Info *API.Info
|
||||||
Interactive bool
|
Interactive bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ func deleteAll(ctx CommandCtx, noConfirm *bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
util.DeleteRequest(ctx.Config.DaemonURL + "/deployments")
|
util.DeleteRequest(ctx.Config.DaemonURL+"/deployments", ctx.Logger)
|
||||||
|
|
||||||
fmt.Printf("Successfully deleted all projects\n")
|
fmt.Printf("Successfully deleted all projects\n")
|
||||||
return nil
|
return nil
|
||||||
@@ -80,9 +80,9 @@ func DeleteCommand(ctx CommandCtx, args []string) error {
|
|||||||
return deleteAll(ctx, noConfirm)
|
return deleteAll(ctx, noConfirm)
|
||||||
}
|
}
|
||||||
|
|
||||||
project, err := util.GetProject("delete", args, ctx.Config)
|
project, err := util.GetProject("delete", args, ctx.Config, ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete --help for more information", err)
|
return fmt.Errorf("\tfailed to get project name: %v.\n\tSee flux delete -help for more information", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ask for confirmation if not --no-confirm
|
// ask for confirmation if not --no-confirm
|
||||||
@@ -101,7 +101,7 @@ func DeleteCommand(ctx CommandCtx, args []string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = util.DeleteRequest(ctx.Config.DaemonURL + "/app/" + project.Id)
|
err = util.DeleteRequest(ctx.Config.DaemonURL+"/app/"+project.Id, ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to delete project: %v", err)
|
return fmt.Errorf("failed to delete project: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
@@ -175,11 +176,38 @@ func preprocessEnvFile(envFile string, target *[]string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var deployUsage = `Usage:
|
||||||
|
flux deploy [flags]
|
||||||
|
|
||||||
|
Flags:
|
||||||
|
-help, -h: Show this help message
|
||||||
|
%s
|
||||||
|
|
||||||
|
Flux will deploy or redeploy the app in the current directory.
|
||||||
|
`
|
||||||
|
|
||||||
func DeployCommand(ctx CommandCtx, args []string) error {
|
func DeployCommand(ctx CommandCtx, args []string) error {
|
||||||
if _, err := os.Stat("flux.json"); err != nil {
|
if _, err := os.Stat("flux.json"); err != nil {
|
||||||
return fmt.Errorf("no flux.json found, please run flux init first")
|
return fmt.Errorf("no flux.json found, please run flux init first")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs := flag.NewFlagSet("deploy", flag.ExitOnError)
|
||||||
|
fs.Usage = func() {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
// Redirect flagset to print to buffer instead of stdout
|
||||||
|
fs.SetOutput(&buf)
|
||||||
|
fs.PrintDefaults()
|
||||||
|
|
||||||
|
fmt.Println(deployUsage, strings.TrimRight(buf.String(), "\n"))
|
||||||
|
}
|
||||||
|
|
||||||
|
quiet := fs.Bool("q", false, "Don't print the deployment logs")
|
||||||
|
|
||||||
|
err := fs.Parse(args)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
spinnerWriter := util.NewCustomSpinnerWriter()
|
spinnerWriter := util.NewCustomSpinnerWriter()
|
||||||
|
|
||||||
loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(spinnerWriter))
|
loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(spinnerWriter))
|
||||||
@@ -353,7 +381,10 @@ func DeployCommand(ctx CommandCtx, args []string) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
case "cmd_output":
|
case "cmd_output":
|
||||||
|
// suppress the command output if the quiet flag is set
|
||||||
|
if quiet == nil || !*quiet {
|
||||||
customWriter.Printf("... %s\n", data.Message)
|
customWriter.Printf("... %s\n", data.Message)
|
||||||
|
}
|
||||||
case "error":
|
case "error":
|
||||||
loadingSpinner.Stop()
|
loadingSpinner.Stop()
|
||||||
return fmt.Errorf("deployment failed: %s", data.Message)
|
return fmt.Errorf("deployment failed: %s", data.Message)
|
||||||
|
|||||||
@@ -13,17 +13,20 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var initUsage = `Usage:
|
var initUsage = `Usage:
|
||||||
flux init [project-name]
|
flux init [flags] [project-name]
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
project-name: The name of the project to initialize
|
project-name: The name of the project to initialize
|
||||||
|
|
||||||
Flux will initialize a new project in the current directory or the specified project.`
|
Flags:
|
||||||
|
-help, -h: Show this help message
|
||||||
|
%s
|
||||||
|
|
||||||
|
Flux will initialize a new project in the current directory or the specified project.
|
||||||
|
`
|
||||||
|
|
||||||
func InitCommand(ctx CommandCtx, args []string) error {
|
func InitCommand(ctx CommandCtx, args []string) error {
|
||||||
if !ctx.Interactive {
|
var projectConfig pkg.ProjectConfig
|
||||||
return fmt.Errorf("init command can only be run in interactive mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
fs := flag.NewFlagSet("init", flag.ExitOnError)
|
fs := flag.NewFlagSet("init", flag.ExitOnError)
|
||||||
fs.Usage = func() {
|
fs.Usage = func() {
|
||||||
@@ -32,8 +35,10 @@ func InitCommand(ctx CommandCtx, args []string) error {
|
|||||||
fs.SetOutput(&buf)
|
fs.SetOutput(&buf)
|
||||||
fs.PrintDefaults()
|
fs.PrintDefaults()
|
||||||
|
|
||||||
fmt.Println(initUsage)
|
fmt.Printf(initUsage, strings.TrimRight(buf.String(), "\n"))
|
||||||
}
|
}
|
||||||
|
hostUrl := fs.String("host-url", "", "The URL of the host")
|
||||||
|
projectPort := fs.Uint("project-port", 0, "The port of the host")
|
||||||
|
|
||||||
err := fs.Parse(args)
|
err := fs.Parse(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -43,10 +48,22 @@ func InitCommand(ctx CommandCtx, args []string) error {
|
|||||||
|
|
||||||
args = fs.Args()
|
args = fs.Args()
|
||||||
|
|
||||||
var projectConfig pkg.ProjectConfig
|
if !ctx.Interactive {
|
||||||
|
if hostUrl == nil || *hostUrl == "" {
|
||||||
|
return fmt.Errorf("host-url is required when not in interactive mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
if projectPort == nil || *projectPort == 0 {
|
||||||
|
return fmt.Errorf("project-port is required when not in interactive mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(args) < 1 {
|
||||||
|
return fmt.Errorf("project-name is required when not in interactive mode")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var response string
|
var response string
|
||||||
if len(args) > 1 {
|
if len(args) > 0 {
|
||||||
response = args[0]
|
response = args[0]
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("What is the name of your project?")
|
fmt.Println("What is the name of your project?")
|
||||||
@@ -55,6 +72,16 @@ func InitCommand(ctx CommandCtx, args []string) error {
|
|||||||
|
|
||||||
projectConfig.Name = response
|
projectConfig.Name = response
|
||||||
|
|
||||||
|
if hostUrl != nil && *hostUrl != "" {
|
||||||
|
if strings.HasPrefix(*hostUrl, "http") {
|
||||||
|
*hostUrl = strings.TrimPrefix(*hostUrl, "http://")
|
||||||
|
*hostUrl = strings.TrimPrefix(*hostUrl, "https://")
|
||||||
|
}
|
||||||
|
|
||||||
|
*hostUrl = strings.Split(*hostUrl, "/")[0]
|
||||||
|
|
||||||
|
projectConfig.Url = *hostUrl
|
||||||
|
} else {
|
||||||
fmt.Println("What URL should your project listen to?")
|
fmt.Println("What URL should your project listen to?")
|
||||||
fmt.Scanln(&response)
|
fmt.Scanln(&response)
|
||||||
if strings.HasPrefix(response, "http") {
|
if strings.HasPrefix(response, "http") {
|
||||||
@@ -65,7 +92,15 @@ func InitCommand(ctx CommandCtx, args []string) error {
|
|||||||
response = strings.Split(response, "/")[0]
|
response = strings.Split(response, "/")[0]
|
||||||
|
|
||||||
projectConfig.Url = response
|
projectConfig.Url = response
|
||||||
|
}
|
||||||
|
|
||||||
|
if projectPort != nil && *projectPort != 0 {
|
||||||
|
if *projectPort < 1024 || *projectPort > 65535 {
|
||||||
|
return fmt.Errorf("project-port must be between 1024 and 65535")
|
||||||
|
}
|
||||||
|
|
||||||
|
projectConfig.Port = uint16(*projectPort)
|
||||||
|
} else {
|
||||||
fmt.Println("What port does your project listen to?")
|
fmt.Println("What port does your project listen to?")
|
||||||
fmt.Scanln(&response)
|
fmt.Scanln(&response)
|
||||||
port, err := strconv.ParseUint(response, 10, 16)
|
port, err := strconv.ParseUint(response, 10, 16)
|
||||||
@@ -78,6 +113,7 @@ func InitCommand(ctx CommandCtx, args []string) error {
|
|||||||
if err != nil || projectConfig.Port < 1024 {
|
if err != nil || projectConfig.Port < 1024 {
|
||||||
return portErr
|
return portErr
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
configBytes, err := json.MarshalIndent(projectConfig, "", " ")
|
configBytes, err := json.MarshalIndent(projectConfig, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func ListCommand(ctx CommandCtx, args []string) error {
|
func ListCommand(ctx CommandCtx, args []string) error {
|
||||||
apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL + "/apps")
|
apps, err := util.GetRequest[[]API.App](ctx.Config.DaemonURL+"/apps", ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get apps: %v", err)
|
return fmt.Errorf("failed to get apps: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,14 +7,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func StartCommand(ctx CommandCtx, args []string) error {
|
func StartCommand(ctx CommandCtx, args []string) error {
|
||||||
projectName, err := util.GetProject("start", args, ctx.Config)
|
projectName, err := util.GetProject("start", args, ctx.Config, ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put request to start the project, since the start endpoint is idempotent.
|
// Put request to start the project, since the start endpoint is idempotent.
|
||||||
// If the project is already running, this will return a 304 Not Modified
|
// If the project is already running, this will return a 304 Not Modified
|
||||||
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil)
|
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil, ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to start %s: %v", projectName.Name, err)
|
return fmt.Errorf("failed to start %s: %v", projectName.Name, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,12 +7,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func StopCommand(ctx CommandCtx, args []string) error {
|
func StopCommand(ctx CommandCtx, args []string) error {
|
||||||
projectName, err := util.GetProject("stop", args, ctx.Config)
|
projectName, err := util.GetProject("stop", args, ctx.Config, ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil)
|
err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil, ctx.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to stop %s: %v", projectName.Name, err)
|
return fmt.Errorf("failed to stop %s: %v", projectName.Name, err)
|
||||||
}
|
}
|
||||||
|
|||||||
147
cmd/cli/main.go
147
cmd/cli/main.go
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/agnivade/levenshtein"
|
"github.com/agnivade/levenshtein"
|
||||||
@@ -15,6 +16,8 @@ import (
|
|||||||
"github.com/juls0730/flux/pkg"
|
"github.com/juls0730/flux/pkg"
|
||||||
"github.com/juls0730/flux/pkg/API"
|
"github.com/juls0730/flux/pkg/API"
|
||||||
"github.com/mattn/go-isatty"
|
"github.com/mattn/go-isatty"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
|
|
||||||
func isInteractive() bool {
|
func isInteractive() bool {
|
||||||
@@ -35,15 +38,16 @@ Available Commands:
|
|||||||
%s
|
%s
|
||||||
|
|
||||||
Available Flags:
|
Available Flags:
|
||||||
--help, -h: Show this help message
|
-help, -h: Show this help message
|
||||||
|
|
||||||
Use "flux <command> --help" for more information about a command.
|
Use "flux <command> -help" for more information about a command.
|
||||||
`
|
`
|
||||||
|
|
||||||
var maxDistance = 3
|
var maxDistance = 3
|
||||||
|
|
||||||
type Command struct {
|
type Command struct {
|
||||||
Help string
|
Help string
|
||||||
|
DaemonConnected bool
|
||||||
HandlerFunc commands.CommandFunc
|
HandlerFunc commands.CommandFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,9 +63,10 @@ func NewCommandHandler() CommandHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, help string) {
|
func (h *CommandHandler) RegisterCmd(name string, handler commands.CommandFunc, daemonConnected bool, help string) {
|
||||||
coomand := Command{
|
coomand := Command{
|
||||||
Help: help,
|
Help: help,
|
||||||
|
DaemonConnected: daemonConnected,
|
||||||
HandlerFunc: handler,
|
HandlerFunc: handler,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,51 +113,36 @@ func (h *CommandHandler) GetHelpCmd(commands.CommandCtx, []string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func runCommand(command string, args []string, config pkg.CLIConfig, info API.Info, cmdHandler CommandHandler) error {
|
func runCommand(command string, args []string, config pkg.CLIConfig, cmdHandler CommandHandler, logger *zap.SugaredLogger) error {
|
||||||
|
commandStruct, ok := cmdHandler.commands[command]
|
||||||
|
if !ok {
|
||||||
|
panic("runCommand was passed an invalid command name")
|
||||||
|
}
|
||||||
|
|
||||||
|
var info *API.Info = nil
|
||||||
|
|
||||||
|
if commandStruct.DaemonConnected {
|
||||||
|
var err error
|
||||||
|
info, err = util.GetRequest[API.Info](config.DaemonURL+"/heartbeat", logger)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to connect to daemon\n")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.Version != version {
|
||||||
|
fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
commandCtx := commands.CommandCtx{
|
commandCtx := commands.CommandCtx{
|
||||||
Config: config,
|
Config: config,
|
||||||
Info: info,
|
Info: info,
|
||||||
|
Logger: logger,
|
||||||
Interactive: isInteractive(),
|
Interactive: isInteractive(),
|
||||||
}
|
}
|
||||||
|
|
||||||
commandStruct, ok := cmdHandler.commands[command]
|
|
||||||
if ok {
|
|
||||||
return commandStruct.HandlerFunc(commandCtx, args)
|
return commandStruct.HandlerFunc(commandCtx, args)
|
||||||
}
|
|
||||||
|
|
||||||
// diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant
|
|
||||||
var closestMatch struct {
|
|
||||||
name string
|
|
||||||
score int
|
|
||||||
}
|
|
||||||
for cmdName := range cmdHandler.commands {
|
|
||||||
distance := levenshtein.ComputeDistance(cmdName, command)
|
|
||||||
|
|
||||||
if distance <= maxDistance {
|
|
||||||
if closestMatch.name == "" || distance < closestMatch.score {
|
|
||||||
closestMatch.name = cmdName
|
|
||||||
closestMatch.score = distance
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if closestMatch.name == "" {
|
|
||||||
return fmt.Errorf("unknown command: %s", command)
|
|
||||||
}
|
|
||||||
|
|
||||||
var response string
|
|
||||||
// new line ommitted because it will be produced when the user presses enter to submit their response
|
|
||||||
fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name)
|
|
||||||
fmt.Scanln(&response)
|
|
||||||
|
|
||||||
if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" {
|
|
||||||
command = closestMatch.name
|
|
||||||
} else {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// re-run command after accepting the suggestion
|
|
||||||
return runCommand(command, args, config, info, cmdHandler)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -160,21 +150,44 @@ func main() {
|
|||||||
fmt.Printf("Flux is being run non-interactively\n")
|
fmt.Printf("Flux is being run non-interactively\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zapConfig := zap.NewDevelopmentConfig()
|
||||||
|
verbosity := 0
|
||||||
|
|
||||||
|
debug, err := strconv.ParseBool(os.Getenv("DEBUG"))
|
||||||
|
if err != nil {
|
||||||
|
debug = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if debug {
|
||||||
|
zapConfig = zap.NewDevelopmentConfig()
|
||||||
|
verbosity = -1
|
||||||
|
}
|
||||||
|
|
||||||
|
zapConfig.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity))
|
||||||
|
|
||||||
|
lameLogger, err := zapConfig.Build()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to create logger: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := lameLogger.Sugar()
|
||||||
|
|
||||||
cmdHandler := NewCommandHandler()
|
cmdHandler := NewCommandHandler()
|
||||||
|
|
||||||
cmdHandler.RegisterCmd("init", commands.InitCommand, "Initialize a new project")
|
cmdHandler.RegisterCmd("init", commands.InitCommand, false, "Initialize a new project")
|
||||||
cmdHandler.RegisterCmd("deploy", commands.DeployCommand, "Deploy a new version of the app")
|
cmdHandler.RegisterCmd("deploy", commands.DeployCommand, true, "Deploy a new version of the app")
|
||||||
cmdHandler.RegisterCmd("start", commands.StartCommand, "Start the app")
|
cmdHandler.RegisterCmd("start", commands.StartCommand, true, "Start the app")
|
||||||
cmdHandler.RegisterCmd("stop", commands.StopCommand, "Stop the app")
|
cmdHandler.RegisterCmd("stop", commands.StopCommand, true, "Stop the app")
|
||||||
cmdHandler.RegisterCmd("list", commands.ListCommand, "List all the apps")
|
cmdHandler.RegisterCmd("list", commands.ListCommand, true, "List all the apps")
|
||||||
cmdHandler.RegisterCmd("delete", commands.DeleteCommand, "Delete the app")
|
cmdHandler.RegisterCmd("delete", commands.DeleteCommand, true, "Delete the app")
|
||||||
|
|
||||||
fs := flag.NewFlagSet("flux", flag.ExitOnError)
|
fs := flag.NewFlagSet("flux", flag.ExitOnError)
|
||||||
fs.Usage = func() {
|
fs.Usage = func() {
|
||||||
cmdHandler.GetHelp()
|
cmdHandler.GetHelp()
|
||||||
}
|
}
|
||||||
|
|
||||||
err := fs.Parse(os.Args[1:])
|
err = fs.Parse(os.Args[1:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -214,18 +227,42 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := util.GetRequest[API.Info](config.DaemonURL + "/heartbeat")
|
command := os.Args[1]
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Failed to connect to daemon\n")
|
if _, ok := cmdHandler.commands[command]; !ok {
|
||||||
|
var closestMatch struct {
|
||||||
|
name string
|
||||||
|
score int
|
||||||
|
}
|
||||||
|
for cmdName := range cmdHandler.commands {
|
||||||
|
distance := levenshtein.ComputeDistance(cmdName, command)
|
||||||
|
|
||||||
|
if distance <= maxDistance {
|
||||||
|
if closestMatch.name == "" || distance < closestMatch.score {
|
||||||
|
closestMatch.name = cmdName
|
||||||
|
closestMatch.score = distance
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if closestMatch.name == "" {
|
||||||
|
fmt.Printf("unknown command: %s", command)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if info.Version != version {
|
var response string
|
||||||
fmt.Printf("Version mismatch, daemon is running version %s, but you are running version %s\n", info.Version, version)
|
// new line ommitted because it will be produced when the user presses enter to submit their response
|
||||||
os.Exit(1)
|
fmt.Printf("No command found with the name '%s'. Did you mean '%s'? (y/N)", command, closestMatch.name)
|
||||||
|
fmt.Scanln(&response)
|
||||||
|
|
||||||
|
if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" {
|
||||||
|
command = closestMatch.name
|
||||||
|
} else {
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = runCommand(os.Args[1], fs.Args()[1:], config, *info, cmdHandler)
|
err = runCommand(command, fs.Args()[1:], config, cmdHandler, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error: %v\n", err)
|
fmt.Printf("Error: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ func main() {
|
|||||||
http.HandleFunc("GET /app/by-name/{name}", fluxServer.GetAppByName)
|
http.HandleFunc("GET /app/by-name/{name}", fluxServer.GetAppByName)
|
||||||
http.HandleFunc("GET /app/by-id/{id}", fluxServer.GetAppById)
|
http.HandleFunc("GET /app/by-id/{id}", fluxServer.GetAppById)
|
||||||
|
|
||||||
|
// a PUT request is the proper type to use since these endpoints are idempotent
|
||||||
http.HandleFunc("PUT /app/{id}/start", fluxServer.StartApp)
|
http.HandleFunc("PUT /app/{id}/start", fluxServer.StartApp)
|
||||||
http.HandleFunc("PUT /app/{id}/stop", fluxServer.StopApp)
|
http.HandleFunc("PUT /app/{id}/stop", fluxServer.StopApp)
|
||||||
|
|
||||||
|
|||||||
8
go.mod
8
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module github.com/juls0730/flux
|
module github.com/juls0730/flux
|
||||||
|
|
||||||
go 1.23.3
|
go 1.24.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/briandowns/spinner v1.23.1
|
github.com/briandowns/spinner v1.23.1
|
||||||
@@ -10,7 +10,11 @@ require (
|
|||||||
github.com/mattn/go-sqlite3 v1.14.24
|
github.com/mattn/go-sqlite3 v1.14.24
|
||||||
)
|
)
|
||||||
|
|
||||||
require go.uber.org/multierr v1.10.0 // indirect
|
require (
|
||||||
|
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd // indirect
|
||||||
|
go.uber.org/multierr v1.10.0 // indirect
|
||||||
|
golang.org/x/sync v0.14.0 // indirect
|
||||||
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Microsoft/go-winio v0.4.14 // indirect
|
github.com/Microsoft/go-winio v0.4.14 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -43,6 +43,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN
|
|||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0=
|
||||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||||
|
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd h1:JNazPdlAs307Gtaqmb+wfCjcOzO3MRXxg9nf0bAKAnc=
|
||||||
|
github.com/juls0730/sentinel v0.0.0-20250515154110-2e7e6586cacd/go.mod h1:CnRvcleiS2kvK1N2PeQmeoRP5EXpBDpHPkg72vAUaSg=
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||||
@@ -113,6 +115,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
|
|||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
|
||||||
|
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
|||||||
@@ -87,9 +87,11 @@ func (d *DockerClient) StartContainer(ctx context.Context, containerID DockerID)
|
|||||||
return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{})
|
return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// blocks until the container returns a 200 status code
|
const CONTAINER_START_TIMEOUT = 30 * time.Second
|
||||||
|
|
||||||
|
// blocks until the container returns a 200 status code for a max of CONTAINER_START_TIMEOUT (30 seconds)
|
||||||
func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error {
|
func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error {
|
||||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
ctx, cancel := context.WithTimeout(ctx, CONTAINER_START_TIMEOUT)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -152,7 +154,7 @@ func (d *DockerClient) GetContainerStatus(containerID DockerID) (*ContainerStatu
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *DockerClient) StopContainer(ctx context.Context, containerID DockerID) error {
|
func (d *DockerClient) StopContainer(ctx context.Context, containerID DockerID) error {
|
||||||
d.logger.Debugw("Stopping container", zap.String("container_id", string(containerID[:12])))
|
d.logger.Debugw("Stopping container", zap.String("container_id", string(containerID)))
|
||||||
return d.client.ContainerStop(ctx, string(containerID), container.StopOptions{})
|
return d.client.ContainerStop(ctx, string(containerID), container.StopOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,389 +1,16 @@
|
|||||||
package handlers
|
package handlers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/namesgenerator"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/joho/godotenv"
|
|
||||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||||
"github.com/juls0730/flux/internal/util"
|
|
||||||
"github.com/juls0730/flux/pkg"
|
|
||||||
"github.com/juls0730/flux/pkg/API"
|
"github.com/juls0730/flux/pkg/API"
|
||||||
|
"github.com/juls0730/sentinel"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
|
|
||||||
|
|
||||||
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "test/event-stream")
|
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
|
||||||
w.Header().Set("Connection", "keep-alive")
|
|
||||||
|
|
||||||
err := r.ParseMultipartForm(10 << 32) // 10 GiB
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var deployRequest API.DeployRequest
|
|
||||||
projectConfig := new(pkg.ProjectConfig)
|
|
||||||
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
|
|
||||||
flux.logger.Errorw("Failed to decode config", zap.Error(err))
|
|
||||||
|
|
||||||
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
deployRequest.Config = *projectConfig
|
|
||||||
idStr := r.FormValue("id")
|
|
||||||
|
|
||||||
if idStr == "" {
|
|
||||||
id, err := uuid.NewRandom()
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
|
|
||||||
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
deployRequest.Id = id
|
|
||||||
} else {
|
|
||||||
deployRequest.Id, err = uuid.Parse(idStr)
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
|
|
||||||
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure the id exists in the database
|
|
||||||
app := flux.appManager.GetApp(deployRequest.Id)
|
|
||||||
if app == nil {
|
|
||||||
http.Error(w, "App not found", http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
|
|
||||||
if err != nil && err == util.ErrLocked {
|
|
||||||
// This will happen if the app is already being deployed
|
|
||||||
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
deploymentLock.Unlock(deployRequest.Id)
|
|
||||||
}()
|
|
||||||
|
|
||||||
flusher, ok := w.(http.Flusher)
|
|
||||||
if !ok {
|
|
||||||
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusMultiStatus)
|
|
||||||
|
|
||||||
eventChannel := make(chan API.DeploymentEvent, 10)
|
|
||||||
defer close(eventChannel)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
// make sure the connection doesnt close while there are SSE events being sent
|
|
||||||
defer wg.Wait()
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func(w http.ResponseWriter, flusher http.Flusher) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case event, ok := <-eventChannel:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ev := API.DeploymentEvent{
|
|
||||||
Message: event.Message,
|
|
||||||
}
|
|
||||||
|
|
||||||
eventJSON, err := json.Marshal(ev)
|
|
||||||
if err != nil {
|
|
||||||
// Write error directly to ResponseWriter
|
|
||||||
jsonErr := json.NewEncoder(w).Encode(err)
|
|
||||||
if jsonErr != nil {
|
|
||||||
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", err.Error())
|
|
||||||
if flusher != nil {
|
|
||||||
flusher.Flush()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(w, "event: %s\n", event.Stage)
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
|
|
||||||
if flusher != nil {
|
|
||||||
flusher.Flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
if event.Stage == "error" || event.Stage == "complete" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(w, flusher)
|
|
||||||
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "start",
|
|
||||||
Message: "Uploading code",
|
|
||||||
}
|
|
||||||
|
|
||||||
deployRequest.Code, _, err = r.FormFile("code")
|
|
||||||
if err != nil {
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: "No code archive found",
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer deployRequest.Code.Close()
|
|
||||||
|
|
||||||
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: "Invalid flux.json, a name, url, and port must be specified",
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if projectConfig.Name == "all" {
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: "Reserved name 'all' is not allowed",
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
|
|
||||||
|
|
||||||
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Infow("Failed to upload code", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to upload code: %s", err),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if projectConfig.EnvFile != "" {
|
|
||||||
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
|
|
||||||
// prevent path traversal
|
|
||||||
realEnvPath, err := filepath.EvalSymlinks(envPath)
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !strings.HasPrefix(realEnvPath, projectPath) {
|
|
||||||
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
envBytes, err := os.Open(realEnvPath)
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to open env file", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to open env file: %v", err),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer envBytes.Close()
|
|
||||||
|
|
||||||
envVars, err := godotenv.Parse(envBytes)
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to parse env file: %v", err),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for key, value := range envVars {
|
|
||||||
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pipe the output of the build process to the event channel
|
|
||||||
pipeGroup := sync.WaitGroup{}
|
|
||||||
streamPipe := func(pipe io.ReadCloser) {
|
|
||||||
pipeGroup.Add(1)
|
|
||||||
defer pipeGroup.Done()
|
|
||||||
defer pipe.Close()
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(pipe)
|
|
||||||
for scanner.Scan() {
|
|
||||||
line := scanner.Text()
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "cmd_output",
|
|
||||||
Message: line,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to read pipe: %s", err),
|
|
||||||
}
|
|
||||||
flux.logger.Errorw("Error reading pipe", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "preparing",
|
|
||||||
Message: "Preparing project",
|
|
||||||
}
|
|
||||||
|
|
||||||
// redirect stdout and stderr to the event channel
|
|
||||||
reader, writer := io.Pipe()
|
|
||||||
prepareCmd := exec.Command("go", "generate")
|
|
||||||
prepareCmd.Dir = projectPath
|
|
||||||
prepareCmd.Stdout = writer
|
|
||||||
prepareCmd.Stderr = writer
|
|
||||||
|
|
||||||
err = prepareCmd.Start()
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to prepare project: %s", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go streamPipe(reader)
|
|
||||||
|
|
||||||
pipeGroup.Wait()
|
|
||||||
|
|
||||||
err = prepareCmd.Wait()
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to prepare project: %s", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.Close()
|
|
||||||
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "building",
|
|
||||||
Message: "Building project image",
|
|
||||||
}
|
|
||||||
|
|
||||||
reader, writer = io.Pipe()
|
|
||||||
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
|
|
||||||
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
|
|
||||||
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
|
|
||||||
buildCmd.Dir = projectPath
|
|
||||||
buildCmd.Stdout = writer
|
|
||||||
buildCmd.Stderr = writer
|
|
||||||
|
|
||||||
err = buildCmd.Start()
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to build image", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to build image: %s", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go streamPipe(reader)
|
|
||||||
|
|
||||||
pipeGroup.Wait()
|
|
||||||
|
|
||||||
err = buildCmd.Wait()
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to build image", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to build image: %s", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
app := flux.appManager.GetApp(deployRequest.Id)
|
|
||||||
|
|
||||||
if app == nil {
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "creating",
|
|
||||||
Message: "Creating app, this might take a while...",
|
|
||||||
}
|
|
||||||
|
|
||||||
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
|
|
||||||
} else {
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "upgrading",
|
|
||||||
Message: "Upgrading app, this might take a while...",
|
|
||||||
}
|
|
||||||
|
|
||||||
// we dont need to change `app` since this upgrade will use the same app and update it in place
|
|
||||||
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "error",
|
|
||||||
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var extApp API.App
|
|
||||||
extApp.Id = app.Id
|
|
||||||
extApp.Name = app.Name
|
|
||||||
extApp.DeploymentID = app.DeploymentID
|
|
||||||
|
|
||||||
eventChannel <- API.DeploymentEvent{
|
|
||||||
Stage: "complete",
|
|
||||||
Message: extApp,
|
|
||||||
}
|
|
||||||
|
|
||||||
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) {
|
func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) {
|
||||||
var apps []API.App
|
var apps []API.App
|
||||||
for _, app := range flux.appManager.GetAllApps() {
|
for _, app := range flux.appManager.GetAllApps() {
|
||||||
@@ -502,7 +129,7 @@ func (flux *FluxServer) StartApp(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
|
newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@@ -557,6 +184,11 @@ func (flux *FluxServer) StopApp(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (flux *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
|
func (flux *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if flux.config.DisableDeleteAll {
|
||||||
|
http.Error(w, "Delete all deployments is disabled", http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
apps := flux.appManager.GetAllApps()
|
apps := flux.appManager.GetAllApps()
|
||||||
for _, app := range apps {
|
for _, app := range apps {
|
||||||
err := flux.appManager.DeleteApp(app.Id)
|
err := flux.appManager.DeleteApp(app.Id)
|
||||||
@@ -582,13 +214,23 @@ func (flux *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
status, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to get deployment status", zap.Error(err))
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if status != "stopped" {
|
||||||
|
app.Deployment.Stop(r.Context(), flux.docker)
|
||||||
|
flux.proxy.RemoveDeployment(app.Deployment.URL)
|
||||||
|
}
|
||||||
|
|
||||||
err = flux.appManager.DeleteApp(id)
|
err = flux.appManager.DeleteApp(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
flux.proxy.RemoveDeployment(app.Deployment.URL)
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|||||||
385
internal/handlers/deploy.go
Normal file
385
internal/handlers/deploy.go
Normal file
@@ -0,0 +1,385 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/docker/docker/pkg/namesgenerator"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/joho/godotenv"
|
||||||
|
"github.com/juls0730/flux/internal/util"
|
||||||
|
"github.com/juls0730/flux/pkg"
|
||||||
|
"github.com/juls0730/flux/pkg/API"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
|
||||||
|
|
||||||
|
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "test/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
|
||||||
|
err := r.ParseMultipartForm(10 << 32) // 10 GiB
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var deployRequest API.DeployRequest
|
||||||
|
projectConfig := new(pkg.ProjectConfig)
|
||||||
|
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
|
||||||
|
flux.logger.Errorw("Failed to decode config", zap.Error(err))
|
||||||
|
|
||||||
|
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
deployRequest.Config = *projectConfig
|
||||||
|
idStr := r.FormValue("id")
|
||||||
|
|
||||||
|
if idStr == "" {
|
||||||
|
id, err := uuid.NewRandom()
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
|
||||||
|
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
deployRequest.Id = id
|
||||||
|
} else {
|
||||||
|
deployRequest.Id, err = uuid.Parse(idStr)
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
|
||||||
|
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure the id exists in the database
|
||||||
|
app := flux.appManager.GetApp(deployRequest.Id)
|
||||||
|
if app == nil {
|
||||||
|
http.Error(w, "App not found", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
|
||||||
|
if err != nil && err == util.ErrLocked {
|
||||||
|
// This will happen if the app is already being deployed
|
||||||
|
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
deploymentLock.Unlock(deployRequest.Id)
|
||||||
|
}()
|
||||||
|
|
||||||
|
flusher, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send 200 to indicate a successful SSE connection to the client
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
||||||
|
eventChannel := make(chan API.DeploymentEvent, 10)
|
||||||
|
defer close(eventChannel)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
// make sure the connection doesnt close while there are SSE events being sent
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(w http.ResponseWriter, flusher http.Flusher) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case event, ok := <-eventChannel:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ev := API.DeploymentEvent{
|
||||||
|
Message: event.Message,
|
||||||
|
}
|
||||||
|
|
||||||
|
eventJSON, err := json.Marshal(ev)
|
||||||
|
if err != nil {
|
||||||
|
// Write error directly to ResponseWriter
|
||||||
|
jsonErr := json.NewEncoder(w).Encode(err)
|
||||||
|
if jsonErr != nil {
|
||||||
|
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", err.Error())
|
||||||
|
if flusher != nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "event: %s\n", event.Stage)
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
|
||||||
|
if flusher != nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Stage == "error" || event.Stage == "complete" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(w, flusher)
|
||||||
|
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "start",
|
||||||
|
Message: "Uploading code",
|
||||||
|
}
|
||||||
|
|
||||||
|
deployRequest.Code, _, err = r.FormFile("code")
|
||||||
|
if err != nil {
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: "No code archive found",
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer deployRequest.Code.Close()
|
||||||
|
|
||||||
|
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: "Invalid flux.json, a name, url, and port must be specified",
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if projectConfig.Name == "all" {
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: "Reserved name 'all' is not allowed",
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
|
||||||
|
|
||||||
|
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Infow("Failed to upload code", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to upload code: %s", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if projectConfig.EnvFile != "" {
|
||||||
|
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
|
||||||
|
// prevent path traversal
|
||||||
|
realEnvPath, err := filepath.EvalSymlinks(envPath)
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(realEnvPath, projectPath) {
|
||||||
|
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
envBytes, err := os.Open(realEnvPath)
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to open env file", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to open env file: %v", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer envBytes.Close()
|
||||||
|
|
||||||
|
envVars, err := godotenv.Parse(envBytes)
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to parse env file: %v", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, value := range envVars {
|
||||||
|
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pipe the output of the build process to the event channel
|
||||||
|
pipeGroup := sync.WaitGroup{}
|
||||||
|
streamPipe := func(pipe io.ReadCloser) {
|
||||||
|
pipeGroup.Add(1)
|
||||||
|
defer pipeGroup.Done()
|
||||||
|
defer pipe.Close()
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(pipe)
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "cmd_output",
|
||||||
|
Message: line,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to read pipe: %s", err),
|
||||||
|
}
|
||||||
|
flux.logger.Errorw("Error reading pipe", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "preparing",
|
||||||
|
Message: "Preparing project",
|
||||||
|
}
|
||||||
|
|
||||||
|
// redirect stdout and stderr to the event channel
|
||||||
|
reader, writer := io.Pipe()
|
||||||
|
prepareCmd := exec.Command("go", "generate")
|
||||||
|
prepareCmd.Dir = projectPath
|
||||||
|
prepareCmd.Stdout = writer
|
||||||
|
prepareCmd.Stderr = writer
|
||||||
|
|
||||||
|
err = prepareCmd.Start()
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to prepare project: %s", err),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go streamPipe(reader)
|
||||||
|
|
||||||
|
pipeGroup.Wait()
|
||||||
|
|
||||||
|
err = prepareCmd.Wait()
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to prepare project: %s", err),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.Close()
|
||||||
|
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "building",
|
||||||
|
Message: "Building project image",
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, writer = io.Pipe()
|
||||||
|
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
|
||||||
|
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
|
||||||
|
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
|
||||||
|
buildCmd.Dir = projectPath
|
||||||
|
buildCmd.Stdout = writer
|
||||||
|
buildCmd.Stderr = writer
|
||||||
|
|
||||||
|
err = buildCmd.Start()
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to build image", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to build image: %s", err),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go streamPipe(reader)
|
||||||
|
|
||||||
|
pipeGroup.Wait()
|
||||||
|
|
||||||
|
err = buildCmd.Wait()
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to build image", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to build image: %s", err),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
app := flux.appManager.GetApp(deployRequest.Id)
|
||||||
|
|
||||||
|
if app == nil {
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "creating",
|
||||||
|
Message: "Creating app, this might take a while...",
|
||||||
|
}
|
||||||
|
|
||||||
|
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
|
||||||
|
} else {
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "upgrading",
|
||||||
|
Message: "Upgrading app, this might take a while...",
|
||||||
|
}
|
||||||
|
|
||||||
|
// we dont need to change `app` since this upgrade will use the same app and update it in place
|
||||||
|
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "error",
|
||||||
|
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var extApp API.App
|
||||||
|
extApp.Id = app.Id
|
||||||
|
extApp.Name = app.Name
|
||||||
|
extApp.DeploymentID = app.DeploymentID
|
||||||
|
|
||||||
|
eventChannel <- API.DeploymentEvent{
|
||||||
|
Stage: "complete",
|
||||||
|
Message: extApp,
|
||||||
|
}
|
||||||
|
|
||||||
|
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
|
||||||
@@ -19,7 +20,7 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/juls0730/flux/internal/docker"
|
"github.com/juls0730/flux/internal/docker"
|
||||||
"github.com/juls0730/flux/internal/services/appManagerService"
|
"github.com/juls0730/flux/internal/services/appManagerService"
|
||||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
"github.com/juls0730/sentinel"
|
||||||
|
|
||||||
"github.com/juls0730/flux/pkg"
|
"github.com/juls0730/flux/pkg"
|
||||||
"github.com/juls0730/flux/pkg/API"
|
"github.com/juls0730/flux/pkg/API"
|
||||||
@@ -44,7 +45,7 @@ type FluxServer struct {
|
|||||||
|
|
||||||
docker *docker.DockerClient
|
docker *docker.DockerClient
|
||||||
|
|
||||||
proxy *proxyManagerService.ProxyManager
|
proxy *sentinel.ProxyManager
|
||||||
appManager *appManagerService.AppManager
|
appManager *appManagerService.AppManager
|
||||||
|
|
||||||
rootDir string
|
rootDir string
|
||||||
@@ -62,7 +63,12 @@ func NewServer() *FluxServer {
|
|||||||
|
|
||||||
config := zap.NewProductionConfig()
|
config := zap.NewProductionConfig()
|
||||||
|
|
||||||
if os.Getenv("DEBUG") == "true" {
|
debug, err := strconv.ParseBool(os.Getenv("DEBUG"))
|
||||||
|
if err != nil {
|
||||||
|
debug = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if debug {
|
||||||
config = zap.NewDevelopmentConfig()
|
config = zap.NewDevelopmentConfig()
|
||||||
verbosity = -1
|
verbosity = -1
|
||||||
}
|
}
|
||||||
@@ -144,7 +150,8 @@ func NewServer() *FluxServer {
|
|||||||
// blocking until the iamge is pulled
|
// blocking until the iamge is pulled
|
||||||
io.Copy(io.Discard, events)
|
io.Copy(io.Discard, events)
|
||||||
|
|
||||||
flux.proxy = proxyManagerService.NewProxyManager(flux.logger)
|
requestLogger := &RequestLogger{logger: flux.logger}
|
||||||
|
flux.proxy = sentinel.NewProxyManager(requestLogger)
|
||||||
|
|
||||||
flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger)
|
flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger)
|
||||||
err = flux.appManager.Init()
|
err = flux.appManager.Init()
|
||||||
@@ -162,7 +169,12 @@ func (s *FluxServer) Stop() {
|
|||||||
func (s *FluxServer) ListenAndServe() error {
|
func (s *FluxServer) ListenAndServe() error {
|
||||||
s.logger.Infow("Starting server", zap.String("daemon_host", s.config.DaemonHost), zap.String("proxy_host", s.config.ProxyHost))
|
s.logger.Infow("Starting server", zap.String("daemon_host", s.config.DaemonHost), zap.String("proxy_host", s.config.ProxyHost))
|
||||||
|
|
||||||
go s.proxy.ListenAndServe(s.config.ProxyHost)
|
go func() {
|
||||||
|
err := s.proxy.ListenAndServe(s.config.ProxyHost)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Errorw("Failed to start proxy", zap.Error(err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
return http.ListenAndServe(s.config.DaemonHost, nil)
|
return http.ListenAndServe(s.config.DaemonHost, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,3 +260,11 @@ func (s *FluxServer) UploadAppCode(code io.Reader, appId uuid.UUID) (string, err
|
|||||||
|
|
||||||
return outputPath, nil
|
return outputPath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RequestLogger struct {
|
||||||
|
logger *zap.SugaredLogger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RequestLogger) LogRequest(host string, status int, latency time.Duration, ip, method, path string) {
|
||||||
|
r.logger.Infow("Request", zap.String("host", host), zap.Int("status", status), zap.Duration("latency", latency), zap.String("ip", ip), zap.String("method", method), zap.String("path", path))
|
||||||
|
}
|
||||||
|
|||||||
@@ -176,9 +176,9 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string,
|
|||||||
// Updates Container in place
|
// Updates Container in place
|
||||||
func (c *Container) Upgrade(ctx context.Context, imageName string, environment []string, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
|
func (c *Container) Upgrade(ctx context.Context, imageName string, environment []string, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
|
||||||
// Create new container with new image
|
// Create new container with new image
|
||||||
logger.Debugw("Upgrading container", zap.String("container_id", string(c.ContainerID[:12])))
|
logger.Debugw("Upgrading container", zap.String("container_id", string(c.ContainerID)))
|
||||||
if c.Volumes == nil {
|
if c.Volumes == nil {
|
||||||
return fmt.Errorf("no volumes found for container %s", c.ContainerID[:12])
|
return fmt.Errorf("no volumes found for container %s", c.ContainerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
|
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
|
||||||
@@ -269,7 +269,7 @@ func (c *Container) Remove(ctx context.Context, dockerClient *docker.DockerClien
|
|||||||
|
|
||||||
func (c *Container) Start(ctx context.Context, initial bool, db *sql.DB, dockerClient *docker.DockerClient, logger *zap.SugaredLogger) error {
|
func (c *Container) Start(ctx context.Context, initial bool, db *sql.DB, dockerClient *docker.DockerClient, logger *zap.SugaredLogger) error {
|
||||||
logger.Debugf("Starting container %+v", c)
|
logger.Debugf("Starting container %+v", c)
|
||||||
logger.Info("Starting container", zap.String("container_id", string(c.ContainerID)[:12]))
|
logger.Infow("Starting container", zap.String("container_id", string(c.ContainerID)))
|
||||||
|
|
||||||
if !initial && c.Head {
|
if !initial && c.Head {
|
||||||
logger.Debug("Starting and repairing head container")
|
logger.Debug("Starting and repairing head container")
|
||||||
@@ -330,7 +330,7 @@ func (c *Container) Wait(ctx context.Context, port uint16, dockerClient *docker.
|
|||||||
func (c *Container) GetIp(dockerClient *docker.DockerClient, logger *zap.SugaredLogger) (string, error) {
|
func (c *Container) GetIp(dockerClient *docker.DockerClient, logger *zap.SugaredLogger) (string, error) {
|
||||||
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
|
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorw("Failed to inspect container", zap.Error(err), zap.String("container_id", string(c.ContainerID[:12])))
|
logger.Errorw("Failed to inspect container", zap.Error(err), zap.String("container_id", string(c.ContainerID)))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/juls0730/flux/internal/docker"
|
"github.com/juls0730/flux/internal/docker"
|
||||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||||
"github.com/juls0730/flux/pkg"
|
"github.com/juls0730/flux/pkg"
|
||||||
|
"github.com/juls0730/sentinel"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -77,7 +78,7 @@ func (d *Deployment) Start(ctx context.Context, dockerClient *docker.DockerClien
|
|||||||
for _, container := range d.containers {
|
for _, container := range d.containers {
|
||||||
err := dockerClient.StartContainer(ctx, container.ContainerID)
|
err := dockerClient.StartContainer(ctx, container.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to start container (%s): %v", container.ContainerID[:12], err)
|
return fmt.Errorf("failed to start container (%s): %v", container.ContainerID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,7 +92,7 @@ func (d *Deployment) GetInternalUrl(dockerClient *docker.DockerClient) (*url.URL
|
|||||||
}
|
}
|
||||||
|
|
||||||
if containerJSON.NetworkSettings.IPAddress == "" {
|
if containerJSON.NetworkSettings.IPAddress == "" {
|
||||||
return nil, fmt.Errorf("no IP address found for container %s", d.Head().ContainerID[:12])
|
return nil, fmt.Errorf("no IP address found for container %s", d.Head().ContainerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, d.Port))
|
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, d.Port))
|
||||||
@@ -106,7 +107,7 @@ func (d *Deployment) Stop(ctx context.Context, dockerClient *docker.DockerClient
|
|||||||
for _, container := range d.containers {
|
for _, container := range d.containers {
|
||||||
err := dockerClient.StopContainer(ctx, container.ContainerID)
|
err := dockerClient.StopContainer(ctx, container.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to stop container (%s): %v", container.ContainerID[:12], err)
|
return fmt.Errorf("failed to stop container (%s): %v", container.ContainerID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -141,7 +142,7 @@ func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.D
|
|||||||
|
|
||||||
// if the head is running, but the supplemental container is stopped, return "failed"
|
// if the head is running, but the supplemental container is stopped, return "failed"
|
||||||
if headStatus.Status == "running" && containerStatus.Status != "running" {
|
if headStatus.Status == "running" && containerStatus.Status != "running" {
|
||||||
logger.Debugw("Supplemental container is not running but head is, returning to failed state", zap.String("container_id", string(container.ContainerID[:12])))
|
logger.Debugw("Supplemental container is not running but head is, returning to failed state", zap.String("container_id", string(container.ContainerID)))
|
||||||
for _, supplementalContainer := range deployment.containers {
|
for _, supplementalContainer := range deployment.containers {
|
||||||
err := dockerClient.StopContainer(ctx, supplementalContainer.ContainerID)
|
err := dockerClient.StopContainer(ctx, supplementalContainer.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -169,7 +170,7 @@ func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.D
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Takes an existing deployment, and gracefully upgrades the app to a new image
|
// Takes an existing deployment, and gracefully upgrades the app to a new image
|
||||||
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
|
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
|
||||||
// copy the old head container since Upgrade updates the container in place
|
// copy the old head container since Upgrade updates the container in place
|
||||||
oldHeadContainer := *deployment.Head()
|
oldHeadContainer := *deployment.Head()
|
||||||
|
|
||||||
@@ -183,13 +184,14 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
|
|||||||
db.Exec("DELETE FROM containers WHERE id = ?", oldHeadContainer.ID)
|
db.Exec("DELETE FROM containers WHERE id = ?", oldHeadContainer.ID)
|
||||||
|
|
||||||
newHeadContainer := deployment.Head()
|
newHeadContainer := deployment.Head()
|
||||||
logger.Debugw("Starting container", zap.String("container_id", string(newHeadContainer.ContainerID[:12])))
|
logger.Debugw("Starting container", zap.String("container_id", string(newHeadContainer.ContainerID)))
|
||||||
err = newHeadContainer.Start(ctx, true, db, dockerClient, logger)
|
err = newHeadContainer.Start(ctx, true, db, dockerClient, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorw("Failed to start container", zap.Error(err))
|
logger.Errorw("Failed to start container", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debugw("Waiting for container to start", zap.String("container_id", string(newHeadContainer.ContainerID)))
|
||||||
if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil {
|
if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil {
|
||||||
logger.Errorw("Failed to wait for container", zap.Error(err))
|
logger.Errorw("Failed to wait for container", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
@@ -201,14 +203,18 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections
|
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections
|
||||||
oldProxy, ok := proxyManager.Load(deployment.URL)
|
var oldProxy *sentinel.Proxy
|
||||||
|
var ok bool = false
|
||||||
|
if value, exists := proxyManager.Load(deployment.URL); exists {
|
||||||
|
oldProxy, ok = value.(*sentinel.Proxy)
|
||||||
|
}
|
||||||
|
|
||||||
newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient)
|
newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorw("Failed to get internal url", zap.Error(err))
|
logger.Errorw("Failed to get internal url", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newProxy, err := proxyManagerService.NewDeploymentProxy(*newDeploymentInternalUrl)
|
newProxy, err := sentinel.NewDeploymentProxy(newDeploymentInternalUrl.String(), proxyManagerService.GetTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
|
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
@@ -221,13 +227,24 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr
|
|||||||
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
|
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
|
||||||
if ok {
|
if ok {
|
||||||
go oldProxy.GracefulShutdown(func() {
|
go oldProxy.GracefulShutdown(func() {
|
||||||
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
|
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorw("Failed to stop container", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorw("Failed to remove container", zap.Error(err))
|
logger.Errorw("Failed to remove container", zap.Error(err))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
|
err := dockerClient.StopContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorw("Failed to stop container", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
err = dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorw("Failed to remove container", zap.Error(err))
|
logger.Errorw("Failed to remove container", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
|
||||||
"github.com/juls0730/flux/internal/util"
|
"github.com/juls0730/flux/internal/util"
|
||||||
"github.com/juls0730/flux/pkg"
|
"github.com/juls0730/flux/pkg"
|
||||||
|
"github.com/juls0730/sentinel"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,12 +19,12 @@ type AppManager struct {
|
|||||||
util.TypedMap[uuid.UUID, *models.App]
|
util.TypedMap[uuid.UUID, *models.App]
|
||||||
nameIndex util.TypedMap[string, uuid.UUID]
|
nameIndex util.TypedMap[string, uuid.UUID]
|
||||||
logger *zap.SugaredLogger
|
logger *zap.SugaredLogger
|
||||||
proxyManager *proxyManagerService.ProxyManager
|
proxyManager *sentinel.ProxyManager
|
||||||
dockerClient *docker.DockerClient
|
dockerClient *docker.DockerClient
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, logger *zap.SugaredLogger) *AppManager {
|
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *sentinel.ProxyManager, logger *zap.SugaredLogger) *AppManager {
|
||||||
return &AppManager{
|
return &AppManager{
|
||||||
db: db,
|
db: db,
|
||||||
dockerClient: dockerClient,
|
dockerClient: dockerClient,
|
||||||
@@ -87,7 +88,7 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p
|
|||||||
return nil, fmt.Errorf("failed to get internal url: %v", err)
|
return nil, fmt.Errorf("failed to get internal url: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
|
newProxy, err := sentinel.NewDeploymentProxy(deploymentInternalUrl.String(), proxyManagerService.GetTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
appManager.logger.Errorw("Failed to create deployment proxy", zap.Error(err))
|
appManager.logger.Errorw("Failed to create deployment proxy", zap.Error(err))
|
||||||
return nil, fmt.Errorf("failed to create deployment proxy: %v", err)
|
return nil, fmt.Errorf("failed to create deployment proxy: %v", err)
|
||||||
@@ -293,7 +294,7 @@ func (am *AppManager) Init() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy, err := proxyManagerService.NewDeploymentProxy(*proxyURL)
|
proxy, err := sentinel.NewDeploymentProxy(proxyURL.String(), proxyManagerService.GetTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
am.logger.Errorw("Failed to create proxy", zap.Error(err))
|
am.logger.Errorw("Failed to create proxy", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -1,133 +1,18 @@
|
|||||||
package proxyManagerService
|
package proxyManagerService
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/juls0730/flux/internal/util"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type DeploymentId int64
|
func GetTransport(target string) *http.Transport {
|
||||||
|
return &http.Transport{
|
||||||
// this is the object that oversees the proxying of requests to the correct deployment
|
Proxy: func(r *http.Request) (*url.URL, error) {
|
||||||
type ProxyManager struct {
|
return url.Parse(target)
|
||||||
util.TypedMap[string, *Proxy]
|
|
||||||
logger *zap.SugaredLogger
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewProxyManager(logger *zap.SugaredLogger) *ProxyManager {
|
|
||||||
return &ProxyManager{
|
|
||||||
logger: logger,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (proxyManager *ProxyManager) ListenAndServe(host string) error {
|
|
||||||
if host == "" {
|
|
||||||
host = "0.0.0.0:7465"
|
|
||||||
}
|
|
||||||
|
|
||||||
proxyManager.logger.Infof("Proxy server starting on http://%s", host)
|
|
||||||
if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed {
|
|
||||||
return fmt.Errorf("failed to start proxy server: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stops forwarding traffic to a deployment
|
|
||||||
func (proxyManager *ProxyManager) RemoveDeployment(host string) {
|
|
||||||
proxyManager.Delete(host)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called.
|
|
||||||
func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) {
|
|
||||||
proxyManager.logger.Debugw("Adding proxy", zap.String("host", host))
|
|
||||||
proxyManager.Store(host, proxy)
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function is responsible for taking an http request and forwarding it to the correct deployment
|
|
||||||
func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
host := r.Host
|
|
||||||
|
|
||||||
proxyManager.logger.Debugw("Proxying request", zap.String("host", host))
|
|
||||||
proxy, ok := proxyManager.Load(host)
|
|
||||||
if !ok {
|
|
||||||
http.Error(w, "Not found", http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
proxy.proxyFunc.ServeHTTP(w, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Proxy struct {
|
|
||||||
forwardingFor url.URL
|
|
||||||
proxyFunc *httputil.ReverseProxy
|
|
||||||
gracePeriod time.Duration
|
|
||||||
activeRequests int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// type DeploymentProxy struct {
|
|
||||||
// deployment *models.Deployment
|
|
||||||
// proxy *httputil.ReverseProxy
|
|
||||||
// gracePeriod time.Duration
|
|
||||||
// activeRequests int64
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Creates a proxy for a given deployment
|
|
||||||
func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) {
|
|
||||||
proxy := &Proxy{
|
|
||||||
forwardingFor: forwardingFor,
|
|
||||||
gracePeriod: time.Second * 30,
|
|
||||||
activeRequests: 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
proxy.proxyFunc = &httputil.ReverseProxy{
|
|
||||||
Director: func(req *http.Request) {
|
|
||||||
req.URL = &url.URL{
|
|
||||||
Scheme: forwardingFor.Scheme,
|
|
||||||
Host: forwardingFor.Host,
|
|
||||||
Path: req.URL.Path,
|
|
||||||
}
|
|
||||||
req.Host = forwardingFor.Host
|
|
||||||
atomic.AddInt64(&proxy.activeRequests, 1)
|
|
||||||
},
|
},
|
||||||
Transport: &http.Transport{
|
|
||||||
MaxIdleConns: 100,
|
MaxIdleConns: 100,
|
||||||
IdleConnTimeout: 90 * time.Second,
|
IdleConnTimeout: 90 * time.Second,
|
||||||
MaxIdleConnsPerHost: 100,
|
MaxIdleConnsPerHost: 100,
|
||||||
},
|
|
||||||
ModifyResponse: func(resp *http.Response) error {
|
|
||||||
atomic.AddInt64(&proxy.activeRequests, -1)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return proxy, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drains connections from a proxy
|
|
||||||
func (p *Proxy) GracefulShutdown(shutdownFunc func()) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
done := false
|
|
||||||
for !done {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
done = true
|
|
||||||
default:
|
|
||||||
if atomic.LoadInt64(&p.activeRequests) == 0 {
|
|
||||||
done = true
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
shutdownFunc()
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/juls0730/flux/pkg"
|
"github.com/juls0730/flux/pkg"
|
||||||
"github.com/juls0730/flux/pkg/API"
|
"github.com/juls0730/flux/pkg/API"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Project struct {
|
type Project struct {
|
||||||
@@ -14,7 +15,7 @@ type Project struct {
|
|||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, error) {
|
func GetProject(command string, args []string, config pkg.CLIConfig, logger *zap.SugaredLogger) (*Project, error) {
|
||||||
var projectName string
|
var projectName string
|
||||||
|
|
||||||
// we are in a project directory and the project is deployed
|
// we are in a project directory and the project is deployed
|
||||||
@@ -24,7 +25,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project,
|
|||||||
return nil, fmt.Errorf("failed to read .fluxid: %v", err)
|
return nil, fmt.Errorf("failed to read .fluxid: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-id/" + string(id))
|
app, err := GetRequest[API.App](config.DaemonURL+"/app/by-id/"+string(id), logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get app: %v", err)
|
return nil, fmt.Errorf("failed to get app: %v", err)
|
||||||
}
|
}
|
||||||
@@ -58,7 +59,7 @@ func GetProject(command string, args []string, config pkg.CLIConfig) (*Project,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// we are calling flux with a project name (ie `flux start my-project`)
|
// we are calling flux with a project name (ie `flux start my-project`)
|
||||||
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-name/" + projectName)
|
app, err := GetRequest[API.App](config.DaemonURL+"/app/by-name/"+projectName, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get app: %v", err)
|
return nil, fmt.Errorf("failed to get app: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,17 +6,26 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func isOk(statusCode int) bool {
|
||||||
|
return statusCode >= 200 && statusCode < 300
|
||||||
|
}
|
||||||
|
|
||||||
// make a function that makes an http GET request to the daemon and returns data of type T
|
// make a function that makes an http GET request to the daemon and returns data of type T
|
||||||
func GetRequest[T any](url string) (*T, error) {
|
func GetRequest[T any](url string, logger *zap.SugaredLogger) (*T, error) {
|
||||||
resp, err := http.Get(url)
|
resp, err := http.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Debugw("GET request failed", zap.String("url", url), zap.Error(err))
|
||||||
return nil, fmt.Errorf("http get request failed: %v", err)
|
return nil, fmt.Errorf("http get request failed: %v", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
logger.Debugw("GET request", zap.String("url", url), resp)
|
||||||
|
|
||||||
|
if !isOk(resp.StatusCode) {
|
||||||
responseBody, err := io.ReadAll(resp.Body)
|
responseBody, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error reading response body: %v", err)
|
return nil, fmt.Errorf("error reading response body: %v", err)
|
||||||
@@ -35,7 +44,7 @@ func GetRequest[T any](url string) (*T, error) {
|
|||||||
return &data, nil
|
return &data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteRequest(url string) error {
|
func DeleteRequest(url string, logger *zap.SugaredLogger) error {
|
||||||
req, err := http.NewRequest("DELETE", url, nil)
|
req, err := http.NewRequest("DELETE", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to delete: %v", err)
|
return fmt.Errorf("failed to delete: %v", err)
|
||||||
@@ -46,7 +55,9 @@ func DeleteRequest(url string) error {
|
|||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
logger.Debugw("DELETE request", zap.String("url", url), resp)
|
||||||
|
|
||||||
|
if !isOk(resp.StatusCode) {
|
||||||
responseBody, err := io.ReadAll(resp.Body)
|
responseBody, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error reading response body: %v", err)
|
return fmt.Errorf("error reading response body: %v", err)
|
||||||
@@ -60,7 +71,7 @@ func DeleteRequest(url string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func PutRequest(url string, data io.Reader) error {
|
func PutRequest(url string, data io.Reader, logger *zap.SugaredLogger) error {
|
||||||
req, err := http.NewRequest("PUT", url, data)
|
req, err := http.NewRequest("PUT", url, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to put: %v", err)
|
return fmt.Errorf("failed to put: %v", err)
|
||||||
@@ -71,7 +82,9 @@ func PutRequest(url string, data io.Reader) error {
|
|||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
logger.Debugw("PUT request", zap.String("url", url), resp)
|
||||||
|
|
||||||
|
if !isOk(resp.StatusCode) {
|
||||||
responseBody, err := io.ReadAll(resp.Body)
|
responseBody, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error reading response body: %v", err)
|
return fmt.Errorf("error reading response body: %v", err)
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func (dt *MutexLock[T]) Unlock(id T) {
|
|||||||
dt.mu.Lock()
|
dt.mu.Lock()
|
||||||
defer dt.mu.Unlock()
|
defer dt.mu.Unlock()
|
||||||
|
|
||||||
// Remove the app from deployed tracking
|
// Remove the object from the map (functionally unlocking it)
|
||||||
if cancel, exists := dt.deployed[id]; exists {
|
if cancel, exists := dt.deployed[id]; exists {
|
||||||
// Cancel the context
|
// Cancel the context
|
||||||
cancel()
|
cancel()
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
package pkg
|
package pkg
|
||||||
|
|
||||||
const Version = "2025.05.04-00"
|
const Version = "2025.05.15-18"
|
||||||
|
|||||||
75
test.sh
Executable file
75
test.sh
Executable file
@@ -0,0 +1,75 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# Basic test for flux
|
||||||
|
|
||||||
|
set -ex
|
||||||
|
|
||||||
|
Flux_Database_Dir=$(mktemp -d)c
|
||||||
|
|
||||||
|
zqdgr build:all
|
||||||
|
|
||||||
|
tmux new-session -d -s daemon
|
||||||
|
# start daemon
|
||||||
|
tmux send-keys -t daemon "export FLUXD_ROOT_DIR=$Flux_Database_Dir" C-m
|
||||||
|
tmux send-keys -t daemon "DEBUG=true zqdgr run:daemon" C-m
|
||||||
|
|
||||||
|
|
||||||
|
# test daemon with the cli
|
||||||
|
tmux split-window -h
|
||||||
|
|
||||||
|
export FLUX_CLI_PATH=$PWD/flux
|
||||||
|
|
||||||
|
tmux send-keys -t daemon:0.1 "cd \$(mktemp -d)" C-m
|
||||||
|
tmux send-keys -t daemon:0.1 "cat << EOF > test.sh
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -xe
|
||||||
|
|
||||||
|
# wait for the daemon to initialize
|
||||||
|
sleep 2
|
||||||
|
|
||||||
|
go mod init testApp
|
||||||
|
$FLUX_CLI_PATH init --host-url testApp --project-port 8080 testApp
|
||||||
|
|
||||||
|
cat << ELOF > main.go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
\"fmt\"
|
||||||
|
\"net/http\"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
http.HandleFunc(\"/\", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprintf(w, \"Hello World\\n\")
|
||||||
|
})
|
||||||
|
http.ListenAndServe(\":8080\", nil)
|
||||||
|
}
|
||||||
|
ELOF
|
||||||
|
|
||||||
|
$FLUX_CLI_PATH deploy -q
|
||||||
|
|
||||||
|
curl -H \"Host: testApp\" localhost:7465
|
||||||
|
|
||||||
|
$FLUX_CLI_PATH stop
|
||||||
|
|
||||||
|
curl -H \"Host: testApp\" localhost:7465
|
||||||
|
|
||||||
|
$FLUX_CLI_PATH start
|
||||||
|
|
||||||
|
curl -H \"Host: testApp\" localhost:7465
|
||||||
|
|
||||||
|
sed -i 's/Hello World/Hello World 2/' main.go
|
||||||
|
|
||||||
|
$FLUX_CLI_PATH deploy -q
|
||||||
|
|
||||||
|
curl -H \"Host: testApp\" localhost:7465
|
||||||
|
|
||||||
|
$FLUX_CLI_PATH delete --no-confirm
|
||||||
|
EOF
|
||||||
|
"
|
||||||
|
|
||||||
|
|
||||||
|
tmux send-keys -t daemon:0.1 "chmod +x test.sh" C-m
|
||||||
|
tmux send-keys -t daemon:0.1 "./test.sh" C-m
|
||||||
|
tmux attach-session -d
|
||||||
@@ -7,9 +7,10 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"build:daemon": "go build -o fluxd cmd/daemon/main.go",
|
"build:daemon": "go build -o fluxd cmd/daemon/main.go",
|
||||||
"build:cli": "go build -o flux cmd/cli/main.go",
|
"build:cli": "go build -o flux cmd/cli/main.go",
|
||||||
"build:all": "go build -o fluxd cmd/daemon/main.go && go build -o flux cmd/cli/main.go",
|
"build:all": "zqdgr build:daemon && zqdgr build:cli",
|
||||||
"run:daemon": "go run cmd/daemon/main.go",
|
"run:daemon": "go run cmd/daemon/main.go",
|
||||||
"run:cli": "go run cmd/flux/main.go"
|
"run:cli": "go run cmd/cli/main.go",
|
||||||
|
"test": "./test.sh"
|
||||||
},
|
},
|
||||||
"pattern": "**/*.go",
|
"pattern": "**/*.go",
|
||||||
"excluded_dirs": []
|
"excluded_dirs": []
|
||||||
|
|||||||
Reference in New Issue
Block a user