Massive architectural rework

This commit massively overhauls the project's structure to simplify
development. Most parts are now correctly compartmentalized and
dependencies are passed in a sane way rather than global variables
galore xd.
This commit is contained in:
Zoe
2025-05-02 12:15:40 -05:00
parent f4bf2ff5a1
commit c891c24843
50 changed files with 2684 additions and 2410 deletions

View File

@@ -0,0 +1,172 @@
package docker
import (
"context"
"fmt"
"io"
"net/http"
"time"
dockerTypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/pkg/namesgenerator"
"go.uber.org/zap"
)
type DockerContainer struct {
ID DockerID
Name string
Volumes []*DockerVolume
}
// Creates a container in the docker daemon and returns the descriptor for the container
func (d *DockerClient) CreateDockerContainer(ctx context.Context, imageName string, vols []*DockerVolume, environment []string, hosts []string, name *string) (*DockerContainer, error) {
for _, host := range hosts {
if host == ":" {
return nil, fmt.Errorf("invalid host %s", host)
}
}
if name == nil {
containerName := fmt.Sprintf("flux-%s", namesgenerator.GetRandomName(0))
name = &containerName
}
d.logger.Debugw("Creating container", zap.String("container_name", *name))
mounts := make([]mount.Mount, len(vols))
volumes := make(map[string]struct{}, len(vols))
for i, volume := range vols {
volumes[volume.VolumeID] = struct{}{}
mounts[i] = mount.Mount{
Type: mount.TypeVolume,
Source: volume.VolumeID,
Target: volume.Mountpoint,
ReadOnly: false,
}
}
resp, err := d.client.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: environment,
Volumes: volumes,
Labels: map[string]string{
"managed-by": "flux",
},
},
&container.HostConfig{
RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped},
NetworkMode: "bridge",
Mounts: mounts,
ExtraHosts: hosts,
},
nil,
nil,
*name,
)
if err != nil {
return nil, err
}
c := &DockerContainer{
ID: DockerID(resp.ID),
Name: *name,
Volumes: vols,
}
return c, nil
}
func (d *DockerClient) ContainerRemove(ctx context.Context, containerID DockerID, options container.RemoveOptions) error {
d.logger.Debugw("Removing container", zap.String("container_id", string(containerID)))
return d.client.ContainerRemove(ctx, string(containerID), options)
}
func (d *DockerClient) StartContainer(ctx context.Context, containerID DockerID) error {
return d.client.ContainerStart(ctx, string(containerID), container.StartOptions{})
}
// blocks until the container returns a 200 status code
func (d *DockerClient) ContainerWait(ctx context.Context, containerID DockerID, port uint16) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return fmt.Errorf("container failed to become ready in time")
default:
containerJSON, err := d.ContainerInspect(ctx, containerID)
if err != nil {
return err
}
if containerJSON.State.Running {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/", containerJSON.NetworkSettings.IPAddress, port))
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
}
time.Sleep(time.Second)
}
}
}
func (d *DockerClient) DeleteDockerContainer(ctx context.Context, containerID DockerID) error {
d.logger.Debugw("Removing container", zap.String("container_id", string(containerID)))
return d.client.ContainerRemove(ctx, string(containerID), container.RemoveOptions{})
}
func (d *DockerClient) GetContainerIp(containerID DockerID) (string, error) {
containerJSON, err := d.client.ContainerInspect(context.Background(), string(containerID))
if err != nil {
return "", err
}
ip := containerJSON.NetworkSettings.IPAddress
return ip, nil
}
type ContainerStatus struct {
// Can be "created", "running", "paused", "restarting", "removing", "exited", or "dead"
Status string
ExitCode int
}
func (d *DockerClient) GetContainerStatus(containerID DockerID) (*ContainerStatus, error) {
containerJSON, err := d.client.ContainerInspect(context.Background(), string(containerID))
if err != nil {
return nil, err
}
containerStatus := &ContainerStatus{
Status: containerJSON.State.Status,
ExitCode: containerJSON.State.ExitCode,
}
return containerStatus, nil
}
func (d *DockerClient) StopContainer(ctx context.Context, containerID DockerID) error {
d.logger.Debugw("Stopping container", zap.String("container_id", string(containerID[:12])))
return d.client.ContainerStop(ctx, string(containerID), container.StopOptions{})
}
func (d *DockerClient) ImagePull(ctx context.Context, imageName string, options image.PullOptions) (io.ReadCloser, error) {
d.logger.Debugw("Pulling image", zap.String("image", imageName))
return d.client.ImagePull(ctx, imageName, options)
}
func (d *DockerClient) ContainerInspect(ctx context.Context, containerID DockerID) (dockerTypes.ContainerJSON, error) {
d.logger.Debugw("Inspecting container", zap.String("container_id", string(containerID)))
return d.client.ContainerInspect(ctx, string(containerID))
}
func (d *DockerClient) ContainerStart(ctx context.Context, containerID string, options container.StartOptions) error {
d.logger.Debugw("Starting container", zap.String("container_id", containerID))
return d.client.ContainerStart(ctx, containerID, options)
}

29
internal/docker/docker.go Normal file
View File

@@ -0,0 +1,29 @@
package docker
import (
"github.com/docker/docker/client"
"go.uber.org/zap"
)
type DockerID string
// structure that holds the docker daemon information
type DockerClient struct {
client *client.Client
logger *zap.SugaredLogger
}
func NewDocker(rawDockerClient *client.Client, logger *zap.SugaredLogger) *DockerClient {
if rawDockerClient == nil {
var err error
rawDockerClient, err = client.NewClientWithOpts(client.FromEnv)
if err != nil {
logger.Fatalw("Failed to create docker client", zap.Error(err))
}
}
return &DockerClient{
client: rawDockerClient,
logger: logger,
}
}

37
internal/docker/volume.go Normal file
View File

@@ -0,0 +1,37 @@
package docker
import (
"context"
"fmt"
"github.com/docker/docker/api/types/volume"
"go.uber.org/zap"
)
type DockerVolume struct {
VolumeID string
Mountpoint string
}
func (d *DockerClient) CreateDockerVolume(ctx context.Context) (vol *DockerVolume, err error) {
dockerVolume, err := d.client.VolumeCreate(ctx, volume.CreateOptions{
Driver: "local",
DriverOpts: map[string]string{},
})
if err != nil {
return nil, fmt.Errorf("failed to create volume: %v", err)
}
d.logger.Debugw("Volume created", zap.String("volume_id", dockerVolume.Name), zap.String("mountpoint", dockerVolume.Mountpoint))
vol = &DockerVolume{
VolumeID: dockerVolume.Name,
}
return vol, nil
}
func (d *DockerClient) DeleteDockerVolume(ctx context.Context, volumeID string) error {
d.logger.Debugw("Removing volume", zap.String("volume_id", volumeID))
return d.client.VolumeRemove(ctx, volumeID, true)
}

576
internal/handlers/app.go Normal file
View File

@@ -0,0 +1,576 @@
package handlers
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/joho/godotenv"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
"go.uber.org/zap"
)
var deploymentLock *util.MutexLock[uuid.UUID] = util.NewMutexLock[uuid.UUID]()
func (flux *FluxServer) DeployNewApp(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "test/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
err := r.ParseMultipartForm(10 << 32) // 10 GiB
if err != nil {
flux.logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var deployRequest API.DeployRequest
projectConfig := new(pkg.ProjectConfig)
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
flux.logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
}
deployRequest.Config = *projectConfig
idStr := r.FormValue("id")
if idStr == "" {
id, err := uuid.NewRandom()
if err != nil {
flux.logger.Errorw("Failed to generate uuid", zap.Error(err))
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
return
}
deployRequest.Id = id
} else {
deployRequest.Id, err = uuid.Parse(idStr)
if err != nil {
flux.logger.Errorw("Failed to parse uuid", zap.Error(err))
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
return
}
// make sure the id exists in the database
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
}
ctx, err := deploymentLock.Lock(deployRequest.Id, r.Context())
if err != nil && err == util.ErrLocked {
// This will happen if the app is already being deployed
http.Error(w, "Cannot deploy app, it's already being deployed", http.StatusConflict)
return
}
go func() {
<-ctx.Done()
deploymentLock.Unlock(deployRequest.Id)
}()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusMultiStatus)
eventChannel := make(chan API.DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
// make sure the connection doesnt close while there are SSE events being sent
defer wg.Wait()
wg.Add(1)
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
ev := API.DeploymentEvent{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}(w, flusher)
eventChannel <- API.DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "No code archive found",
}
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
}
return
}
if projectConfig.Name == "all" {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: "Reserved name 'all' is not allowed",
}
return
}
flux.logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
if err != nil {
flux.logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
}
return
}
if projectConfig.EnvFile != "" {
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
// prevent path traversal
realEnvPath, err := filepath.EvalSymlinks(envPath)
if err != nil {
flux.logger.Errorw("Failed to eval symlinks", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
}
return
}
if !strings.HasPrefix(realEnvPath, projectPath) {
flux.logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
}
return
}
envBytes, err := os.Open(realEnvPath)
if err != nil {
flux.logger.Errorw("Failed to open env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to open env file: %v", err),
}
return
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
flux.logger.Errorw("Failed to parse env file", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to parse env file: %v", err),
}
return
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
// pipe the output of the build process to the event channel
pipeGroup := sync.WaitGroup{}
streamPipe := func(pipe io.ReadCloser) {
pipeGroup.Add(1)
defer pipeGroup.Done()
defer pipe.Close()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- API.DeploymentEvent{
Stage: "cmd_output",
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
flux.logger.Errorw("Error reading pipe", zap.Error(err))
}
}
flux.logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
eventChannel <- API.DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
// redirect stdout and stderr to the event channel
reader, writer := io.Pipe()
prepareCmd := exec.Command("go", "generate")
prepareCmd.Dir = projectPath
prepareCmd.Stdout = writer
prepareCmd.Stderr = writer
err = prepareCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
}
return
}
writer.Close()
eventChannel <- API.DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
reader, writer = io.Pipe()
flux.logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
buildCmd.Dir = projectPath
buildCmd.Stdout = writer
buildCmd.Stderr = writer
err = buildCmd.Start()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
flux.logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
}
return
}
app := flux.appManager.GetApp(deployRequest.Id)
if app == nil {
eventChannel <- API.DeploymentEvent{
Stage: "creating",
Message: "Creating app, this might take a while...",
}
app, err = flux.appManager.CreateApp(r.Context(), imageName, projectConfig, deployRequest.Id)
} else {
eventChannel <- API.DeploymentEvent{
Stage: "upgrading",
Message: "Upgrading app, this might take a while...",
}
// we dont need to change `app` since this upgrade will use the same app and update it in place
err = flux.appManager.Upgrade(r.Context(), app.Id, imageName, projectConfig)
}
if err != nil {
flux.logger.Errorw("Failed to deploy app", zap.Error(err))
eventChannel <- API.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
}
return
}
var extApp API.App
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
eventChannel <- API.DeploymentEvent{
Stage: "complete",
Message: extApp,
}
flux.logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
}
func (flux *FluxServer) GetAllApps(w http.ResponseWriter, r *http.Request) {
var apps []API.App
for _, app := range flux.appManager.GetAllApps() {
var extApp API.App
deploymentStatus, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
if err != nil {
flux.logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
extApp.DeploymentStatus = deploymentStatus
apps = append(apps, extApp)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(apps)
}
func (flux *FluxServer) GetAppById(w http.ResponseWriter, r *http.Request) {
id, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
app := flux.appManager.GetApp(id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
var extApp API.App
deploymentStatus, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
if err != nil {
flux.logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
extApp.DeploymentStatus = deploymentStatus
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(extApp)
}
func (flux *FluxServer) GetAppByName(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
app := flux.appManager.GetAppByName(name)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
var extApp API.App
deploymentStatus, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
if err != nil {
flux.logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
extApp.DeploymentStatus = deploymentStatus
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(extApp)
}
func (flux *FluxServer) StartApp(w http.ResponseWriter, r *http.Request) {
id, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
app := flux.appManager.GetApp(id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
status, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if status == "running" {
http.Error(w, "App is already running", http.StatusNotModified)
return
}
err = app.Deployment.Start(r.Context(), flux.docker)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
deploymentInternalUrl, err := app.Deployment.GetInternalUrl(flux.docker)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
flux.proxy.AddProxy(app.Deployment.URL, newProxy)
w.WriteHeader(http.StatusOK)
}
func (flux *FluxServer) StopApp(w http.ResponseWriter, r *http.Request) {
id, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
app := flux.appManager.GetApp(id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
status, err := app.Deployment.Status(r.Context(), flux.docker, flux.logger)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if status == "stopped" || status == "failed" {
http.Error(w, "App is already stopped", http.StatusNotModified)
return
}
err = app.Deployment.Stop(r.Context(), flux.docker)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
flux.proxy.RemoveDeployment(app.Deployment.URL)
w.WriteHeader(http.StatusOK)
}
func (flux *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
apps := flux.appManager.GetAllApps()
for _, app := range apps {
err := flux.appManager.DeleteApp(app.Id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}
func (flux *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) {
id, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
app := flux.appManager.GetApp(id)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
err = flux.appManager.DeleteApp(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}

247
internal/handlers/server.go Normal file
View File

@@ -0,0 +1,247 @@
package handlers
import (
"archive/tar"
"compress/gzip"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
_ "embed"
"github.com/docker/docker/api/types/image"
"github.com/google/uuid"
"github.com/juls0730/flux/internal/docker"
"github.com/juls0730/flux/internal/services/appManagerService"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
//go:embed schema.sql
schema string
DefaultConfig = pkg.DaemonConfig{
Builder: "paketobuildpacks/builder-jammy-tiny",
CompressionLevel: 0,
DaemonHost: "0.0.0.0:5647",
ProxyHost: "0.0.0.0:7465",
}
)
type FluxServer struct {
db *sql.DB
docker *docker.DockerClient
// TODO: implement
proxy *proxyManagerService.ProxyManager
appManager *appManagerService.AppManager
rootDir string
config pkg.DaemonConfig
logger *zap.SugaredLogger
}
func NewServer() *FluxServer {
flux := &FluxServer{}
verbosity, err := strconv.Atoi(os.Getenv("FLUXD_VERBOSITY"))
if err != nil {
verbosity = 0
}
config := zap.NewProductionConfig()
if os.Getenv("DEBUG") == "true" {
config = zap.NewDevelopmentConfig()
verbosity = -1
}
config.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity))
lameLogger, err := config.Build()
if err != nil {
fmt.Printf("Failed to create logger: %v\n", err)
os.Exit(1)
}
flux.logger = lameLogger.Sugar()
rootDir := os.Getenv("FLUXD_ROOT_DIR")
if rootDir == "" {
rootDir = "/var/fluxd"
}
flux.rootDir = rootDir
configPath := filepath.Join(flux.rootDir, "config.json")
if _, err := os.Stat(configPath); err != nil {
if err := os.MkdirAll(rootDir, 0755); err != nil {
flux.logger.Fatalw("Failed to create fluxd directory", zap.Error(err))
}
configBytes, err := json.Marshal(DefaultConfig)
if err != nil {
flux.logger.Fatalw("Failed to marshal default config", zap.Error(err))
}
fmt.Printf("Config file not found creating default config file at %s\n", configPath)
if err := os.WriteFile(configPath, configBytes, 0644); err != nil {
flux.logger.Fatalw("Failed to write config file", zap.Error(err))
}
}
configFile, err := os.ReadFile(configPath)
if err != nil {
flux.logger.Fatalw("Failed to read config file", zap.Error(err))
}
// apply the config file over the default config, this way if we have missing fields, they will be filled in with
// the default values
flux.config = DefaultConfig
if err := json.Unmarshal(configFile, &flux.config); err != nil {
flux.logger.Fatalw("Failed to parse config file", zap.Error(err))
}
dbPath := filepath.Join(flux.rootDir, "fluxd.db")
flux.db, err = sql.Open("sqlite3", dbPath)
if err != nil {
flux.logger.Fatalw("Failed to open database", zap.Error(err))
}
err = flux.db.Ping()
if err != nil {
flux.logger.Fatalw("Failed to ping database", zap.Error(err))
}
_, err = flux.db.Exec(schema)
if err != nil {
flux.logger.Fatalw("Failed to create database schema", zap.Error(err))
}
flux.docker = docker.NewDocker(nil, flux.logger)
if err != nil {
flux.logger.Fatalw("Failed to create docker client", zap.Error(err))
}
flux.logger.Infof("Pulling builder image %s this may take a while...", flux.config.Builder)
events, err := flux.docker.ImagePull(context.Background(), fmt.Sprintf("%s:latest", flux.config.Builder), image.PullOptions{})
if err != nil {
flux.logger.Fatalw("Failed to pull builder image", zap.Error(err))
}
// blocking until the iamge is pulled
io.Copy(io.Discard, events)
flux.proxy = proxyManagerService.NewProxyManager(flux.logger)
flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger)
flux.appManager.Init()
return flux
}
func (s *FluxServer) Stop() {
s.logger.Sync()
}
func (s *FluxServer) ListenAndServe() error {
s.logger.Infow("Starting server", zap.String("daemon_host", s.config.DaemonHost), zap.String("proxy_host", s.config.ProxyHost))
go s.proxy.ListenAndServe(s.config.ProxyHost)
return http.ListenAndServe(s.config.DaemonHost, nil)
}
func (s *FluxServer) DaemonInfoHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(API.Info{
CompressionLevel: s.config.CompressionLevel,
Version: pkg.Version,
})
}
// This extracts and uploads a tar file to a temporary directory, and returns the path to the directory
func (s *FluxServer) UploadAppCode(code io.Reader, appId uuid.UUID) (string, error) {
var err error
outputPath, err := os.MkdirTemp(os.TempDir(), appId.String())
if err != nil {
s.logger.Errorw("Failed to create project directory", zap.Error(err))
return "", err
}
var gzReader *gzip.Reader
defer func() {
if gzReader != nil {
gzReader.Close()
}
}()
if s.config.CompressionLevel > 0 {
gzReader, err = gzip.NewReader(code)
if err != nil {
s.logger.Infow("Failed to create gzip reader", zap.Error(err))
return "", err
}
}
var tarReader *tar.Reader
if gzReader != nil {
tarReader = tar.NewReader(gzReader)
} else {
tarReader = tar.NewReader(code)
}
s.logger.Infow("Extracting files for project", zap.String("project", outputPath))
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
s.logger.Debugw("Failed to read tar header", zap.Error(err))
return "", err
}
// Construct full path
path := filepath.Join(outputPath, header.Name)
// Handle different file types
switch header.Typeflag {
case tar.TypeDir:
if err = os.MkdirAll(path, 0755); err != nil {
s.logger.Debugw("Failed to extract directory", zap.Error(err))
return "", err
}
case tar.TypeReg:
if err = os.MkdirAll(filepath.Dir(path), 0755); err != nil {
s.logger.Debugw("Failed to extract directory", zap.Error(err))
return "", err
}
outFile, err := os.Create(path)
if err != nil {
s.logger.Debugw("Failed to extract file", zap.Error(err))
return "", err
}
defer outFile.Close()
if _, err = io.Copy(outFile, tarReader); err != nil {
s.logger.Debugw("Failed to copy file during extraction", zap.Error(err))
return "", err
}
}
}
return outputPath, nil
}

28
internal/models/app.go Normal file
View File

@@ -0,0 +1,28 @@
package models
import (
"context"
"database/sql"
"github.com/google/uuid"
docker "github.com/juls0730/flux/internal/docker"
"go.uber.org/zap"
)
type App struct {
Id uuid.UUID `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Deployment *Deployment `json:"-"`
DeploymentID int64 `json:"deployment_id,omitempty"`
}
func (app *App) Remove(ctx context.Context, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
app.Deployment.Remove(ctx, dockerClient, db, logger)
_, err := db.ExecContext(ctx, "DELETE FROM apps WHERE id = ?", app.Id[:])
if err != nil {
logger.Errorw("Failed to delete app", zap.Error(err))
return err
}
return nil
}

View File

@@ -0,0 +1,354 @@
package models
import (
"context"
"database/sql"
"fmt"
"io"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
docker "github.com/juls0730/flux/internal/docker"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
type Volume struct {
ID int64 `json:"id"`
Mountpoint string `json:"mountpoint"`
VolumeID string `json:"volume_id"`
ContainerID string `json:"container_id"`
}
func (v *Volume) Remove(ctx context.Context, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
logger.Debugw("Removing volume", zap.String("volume_id", v.VolumeID))
_, err := db.ExecContext(ctx, "DELETE FROM volumes WHERE volume_id = ?", v.VolumeID)
if err != nil {
logger.Errorw("Failed to delete volume", zap.Error(err))
return err
}
return dockerClient.DeleteDockerVolume(ctx, v.VolumeID)
}
type Container struct {
ID int64 `json:"id"`
Name string `json:"name"` // name of the container in the docker daemon
ContainerID docker.DockerID `json:"container_id"`
Head bool `json:"head"` // if the container is the head of the deployment
FriendlyName string `json:"friendly_name"` // name used by other containers to reach this container
Volumes []*Volume `json:"volumes"`
Deployment *Deployment `json:"-"`
DeploymentID int64 `json:"deployment_id"`
}
// Create a container given a container configuration and a deployment. This will do a few things:
//
// 1. Create the container in the docker daemon
//
// 2. Create the volumes for the container
//
// 3. Insert the container and volumes into the database
//
// This will not mess with containers already in the Deployment object, it is expected that this function will only be
// called when the app in initially created
func CreateContainer(ctx context.Context, imageName string, friendlyName string, head bool, environment []string, containerVols []pkg.Volume, deployment *Deployment, logger *zap.SugaredLogger, dockerClient *docker.DockerClient, db *sql.DB) (c *Container, err error) {
if friendlyName == "" {
return nil, fmt.Errorf("container friendly name is empty")
}
if imageName == "" {
return nil, fmt.Errorf("container image name is empty")
}
logger.Debugw("Creating container with image", zap.String("image", imageName))
var volumes []*docker.DockerVolume
// in the head container, we have a default volume where the project is mounted, this is important so that if the project uses sqlite for example,
// all the data will not be lost the second the containers turns off.
if head {
vol, err := dockerClient.CreateDockerVolume(ctx)
if err != nil {
logger.Errorw("Failed to create head's workspace volume", zap.Error(err))
return nil, err
}
vol.Mountpoint = "/workspace"
volumes = append(volumes, vol)
}
for _, containerVolume := range containerVols {
vol, err := dockerClient.CreateDockerVolume(ctx)
if err != nil {
logger.Errorw("Failed to create volume", zap.Error(err))
return nil, err
}
if containerVolume.Mountpoint == "" {
return nil, fmt.Errorf("mountpoint is empty")
}
if containerVolume.Mountpoint == "/workspace" || containerVolume.Mountpoint == "/" {
return nil, fmt.Errorf("invalid mountpoint")
}
vol.Mountpoint = containerVolume.Mountpoint
volumes = append(volumes, vol)
}
// if the container is the head, build a list of hostnames that the container can reach by name for this deployment
// TODO: this host list should be consistent across all containers in the deployment, not just the head
var hosts []string
if head {
logger.Debug("Building host list")
for _, container := range deployment.Containers() {
containerName, err := container.GetIp(dockerClient, logger)
if err != nil {
logger.Errorw("Failed to get container ip", zap.Error(err))
return nil, err
}
hosts = append(hosts, fmt.Sprintf("%s:%s", container.FriendlyName, containerName))
}
}
// if the container is not the head, pull the image from docker hub
if !head {
logger.Debug("Pulling image", zap.String("image", imageName))
image, err := dockerClient.ImagePull(ctx, imageName, image.PullOptions{})
if err != nil {
logger.Errorw("Failed to pull image", zap.Error(err))
return nil, err
}
// blcok untile the image is pulled
io.Copy(io.Discard, image)
}
logger.Debugw("Creating container", zap.String("image", imageName))
dockerContainer, err := dockerClient.CreateDockerContainer(ctx, imageName, volumes, environment, hosts, nil)
if err != nil {
logger.Errorw("Failed to create container", zap.Error(err))
return nil, err
}
c = &Container{
ContainerID: dockerContainer.ID,
Name: dockerContainer.Name,
FriendlyName: friendlyName,
}
err = db.QueryRow("INSERT INTO containers (container_id, head, deployment_id) VALUES (?, ?, ?) RETURNING id, container_id, head, deployment_id", string(c.ContainerID), head, deployment.ID).Scan(&c.ID, &c.ContainerID, &c.Head, &c.DeploymentID)
if err != nil {
logger.Errorw("Failed to insert container", zap.Error(err))
return nil, err
}
tx, err := db.Begin()
if err != nil {
logger.Errorw("Failed to begin transaction", zap.Error(err))
return nil, err
}
volumeInsertStmt, err := tx.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
logger.Errorw("Failed to prepare statement", zap.Error(err))
tx.Rollback()
return nil, err
}
for _, vol := range c.Volumes {
logger.Debug("Inserting volume", zap.String("volume_id", vol.VolumeID), zap.String("mountpoint", vol.Mountpoint), zap.String("container_id", string(c.ContainerID)))
err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ContainerID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
if err != nil {
logger.Errorw("Failed to insert volume", zap.Error(err))
tx.Rollback()
return nil, err
}
}
err = tx.Commit()
if err != nil {
logger.Errorw("Failed to commit transaction", zap.Error(err))
tx.Rollback()
return nil, err
}
c.Deployment = deployment
deployment.AppendContainer(c)
return c, nil
}
// Updates Container in place
func (c *Container) Upgrade(ctx context.Context, imageName string, environment []string, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
// Create new container with new image
logger.Debugw("Upgrading container", zap.String("container_id", string(c.ContainerID[:12])))
if c.Volumes == nil {
return fmt.Errorf("no volumes found for container %s", c.ContainerID[:12])
}
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
if err != nil {
return err
}
hosts := containerJSON.HostConfig.ExtraHosts
var dockerVolumes []*docker.DockerVolume
for _, volume := range c.Volumes {
dockerVolumes = append(dockerVolumes, &docker.DockerVolume{
VolumeID: volume.VolumeID,
Mountpoint: volume.Mountpoint,
})
}
newDockerContainer, err := dockerClient.CreateDockerContainer(ctx, imageName, dockerVolumes, environment, hosts, nil)
if err != nil {
return err
}
err = db.QueryRow("INSERT INTO containers (container_id, head, deployment_id) VALUES (?, ?, ?) RETURNING id, container_id, head, deployment_id", newDockerContainer.ID, c.Head, c.Deployment.ID).Scan(&c.ID, &c.ContainerID, &c.Head, &c.DeploymentID)
if err != nil {
logger.Errorw("Failed to insert container", zap.Error(err))
return err
}
tx, err := db.Begin()
if err != nil {
logger.Errorw("Failed to begin transaction", zap.Error(err))
return err
}
volumeUpdateStmt, err := tx.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
tx.Rollback()
logger.Errorw("Failed to prepare statement", zap.Error(err))
return err
}
for _, vol := range c.Volumes {
err = volumeUpdateStmt.QueryRow(c.ID, vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
if err != nil {
tx.Rollback()
logger.Error("Failed to update volume", zap.Error(err))
return err
}
}
err = tx.Commit()
if err != nil {
tx.Rollback()
logger.Errorw("Failed to commit transaction", zap.Error(err))
return err
}
logger.Debug("Upgraded container")
return nil
}
func (c *Container) Remove(ctx context.Context, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
logger.Debugw("Removing container", zap.String("container_id", string(c.ContainerID)))
err := dockerClient.StopContainer(ctx, c.ContainerID)
if err != nil {
logger.Errorw("Failed to stop container", zap.Error(err))
return err
}
for _, volume := range c.Volumes {
logger.Debugw("Removing volume", zap.String("volume_id", volume.VolumeID))
err := volume.Remove(ctx, dockerClient, db, logger)
if err != nil {
return err
}
}
_, err = db.ExecContext(ctx, "DELETE FROM containers WHERE container_id = ?", c.ContainerID)
if err != nil {
logger.Errorw("Failed to delete container", zap.Error(err))
return err
}
return dockerClient.ContainerRemove(ctx, c.ContainerID, container.RemoveOptions{})
}
func (c *Container) Start(ctx context.Context, initial bool, db *sql.DB, dockerClient *docker.DockerClient, logger *zap.SugaredLogger) error {
logger.Debugf("Starting container %+v", c)
logger.Info("Starting container", zap.String("container_id", string(c.ContainerID)[:12]))
if !initial && c.Head {
logger.Debug("Starting and repairing head container")
containerJSON, err := dockerClient.ContainerInspect(ctx, c.ContainerID)
if err != nil {
return err
}
// remove yourself
dockerClient.ContainerRemove(ctx, c.ContainerID, container.RemoveOptions{})
var volumes []*docker.DockerVolume
var hosts []string
for _, volume := range c.Volumes {
volumes = append(volumes, &docker.DockerVolume{
VolumeID: volume.VolumeID,
Mountpoint: volume.Mountpoint,
})
}
for _, supplementalContainer := range c.Deployment.Containers() {
if supplementalContainer.Head {
continue
}
ip, err := supplementalContainer.GetIp(dockerClient, logger)
if err != nil {
return err
}
hosts = append(hosts, fmt.Sprintf("%s:%s", supplementalContainer.FriendlyName, ip))
}
// recreate yourself
resp, err := dockerClient.CreateDockerContainer(ctx,
containerJSON.Image,
volumes,
containerJSON.Config.Env,
hosts,
&c.Name,
)
if err != nil {
return err
}
c.ContainerID = resp.ID
db.Exec("UPDATE containers SET container_id = ? WHERE id = ?", string(c.ContainerID), c.ID)
}
return dockerClient.ContainerStart(ctx, string(c.ContainerID), container.StartOptions{})
}
func (c *Container) Wait(ctx context.Context, port uint16, dockerClient *docker.DockerClient) error {
return dockerClient.ContainerWait(ctx, c.ContainerID, port)
}
func (c *Container) GetIp(dockerClient *docker.DockerClient, logger *zap.SugaredLogger) (string, error) {
containerJSON, err := dockerClient.ContainerInspect(context.Background(), c.ContainerID)
if err != nil {
logger.Errorw("Failed to inspect container", zap.Error(err), zap.String("container_id", string(c.ContainerID[:12])))
return "", err
}
ip := containerJSON.NetworkSettings.IPAddress
return ip, nil
}

View File

@@ -0,0 +1,243 @@
package models
import (
"context"
"database/sql"
"fmt"
"net/url"
"github.com/juls0730/flux/internal/docker"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
type Deployment struct {
ID int64 `json:"id"`
containers []*Container `json:"-"`
URL string `json:"url"`
Port uint16 `json:"port"`
headCache *Container
}
func NewDeployment() *Deployment {
return &Deployment{
containers: make([]*Container, 0),
}
}
func (d *Deployment) Remove(ctx context.Context, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error {
logger.Debugw("Removing deployment", zap.Int64("id", d.ID))
for _, container := range d.containers {
err := container.Remove(ctx, dockerClient, db, logger)
if err != nil {
return err
}
}
db.ExecContext(ctx, "DELETE FROM deployments WHERE id = ?", d.ID)
return nil
}
func (d *Deployment) Head() *Container {
if d.headCache != nil {
return d.headCache
}
for _, container := range d.containers {
if container.Head {
d.headCache = container
return container
}
}
return nil
}
func (d *Deployment) Containers() []*Container {
if d.containers == nil {
return nil
}
// copy the slice so that we don't modify the original
containers := make([]*Container, len(d.containers))
copy(containers, d.containers)
return containers
}
func (d *Deployment) AppendContainer(container *Container) {
d.headCache = nil
d.containers = append(d.containers, container)
}
func (d *Deployment) Start(ctx context.Context, dockerClient *docker.DockerClient) error {
for _, container := range d.containers {
err := dockerClient.StartContainer(ctx, container.ContainerID)
if err != nil {
return fmt.Errorf("failed to start container (%s): %v", container.ContainerID[:12], err)
}
}
return nil
}
func (d *Deployment) GetInternalUrl(dockerClient *docker.DockerClient) (*url.URL, error) {
containerJSON, err := dockerClient.ContainerInspect(context.Background(), d.Head().ContainerID)
if err != nil {
return nil, err
}
if containerJSON.NetworkSettings.IPAddress == "" {
return nil, fmt.Errorf("no IP address found for container %s", d.Head().ContainerID[:12])
}
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, d.Port))
if err != nil {
return nil, err
}
return containerUrl, nil
}
func (d *Deployment) Stop(ctx context.Context, dockerClient *docker.DockerClient) error {
for _, container := range d.containers {
err := dockerClient.StopContainer(ctx, container.ContainerID)
if err != nil {
return fmt.Errorf("failed to stop container (%s): %v", container.ContainerID[:12], err)
}
}
return nil
}
// gets the status of the head container, and attempt to get the supplemental containers in an aligned state
func (deployment *Deployment) Status(ctx context.Context, dockerClient *docker.DockerClient, logger *zap.SugaredLogger) (string, error) {
// first, get the status of the head container
headStatus, err := dockerClient.GetContainerStatus(deployment.Head().ContainerID)
if err != nil {
return "", err
}
// then, check the status of all supplemental containers
for _, container := range deployment.containers {
if container.Head {
continue
}
containerStatus, err := dockerClient.GetContainerStatus(container.ContainerID)
if err != nil {
return "", err
}
// if the head is stopped, but the supplemental container is running, stop the supplemental container
if headStatus.Status != "running" && containerStatus.Status == "running" {
err := dockerClient.StopContainer(ctx, container.ContainerID)
if err != nil {
return "", err
}
}
// if the head is running, but the supplemental container is stopped, return "failed"
if headStatus.Status == "running" && containerStatus.Status != "running" {
logger.Debugw("Supplemental container is not running but head is, returning to failed state", zap.String("container_id", string(container.ContainerID[:12])))
for _, supplementalContainer := range deployment.containers {
err := dockerClient.StopContainer(ctx, supplementalContainer.ContainerID)
if err != nil {
return "", err
}
}
return "failed", nil
}
}
switch headStatus.Status {
case "running":
return "running", nil
case "exited", "dead":
if headStatus.ExitCode != 0 {
// non-zero exit code in unix terminology means the program did not complete successfully
return "failed", nil
}
return "stopped", nil
default:
return "stopped", nil
}
}
// Takes an existing deployment, and gracefully upgrades the app to a new image
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, db *sql.DB, logger *zap.SugaredLogger) error {
// copy the old head container since Upgrade updates the container in place
oldHeadContainer := *deployment.Head()
oldDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient)
if err != nil {
logger.Errorw("Failed to get internal url", zap.Error(err))
return err
}
// we only upgrade the head container, in the future we might want to allow upgrading supplemental containers, but this should work just fine for now.
err = deployment.Head().Upgrade(ctx, imageName, projectConfig.Environment, dockerClient, db, logger)
if err != nil {
logger.Errorw("Failed to upgrade container", zap.Error(err))
return err
}
db.Exec("DELETE FROM containers WHERE id = ?", oldHeadContainer.ID)
newHeadContainer := deployment.Head()
logger.Debugw("Starting container", zap.String("container_id", string(newHeadContainer.ContainerID[:12])))
err = newHeadContainer.Start(ctx, true, db, dockerClient, logger)
if err != nil {
logger.Errorw("Failed to start container", zap.Error(err))
return err
}
if err := newHeadContainer.Wait(ctx, projectConfig.Port, dockerClient); err != nil {
logger.Errorw("Failed to wait for container", zap.Error(err))
return err
}
if _, err := db.Exec("UPDATE deployments SET url = ?, port = ? WHERE id = ?", projectConfig.Url, projectConfig.Port, deployment.ID); err != nil {
logger.Errorw("Failed to update deployment", zap.Error(err))
return err
}
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections
oldProxy, ok := proxyManager.Load(oldDeploymentInternalUrl.String())
newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient)
if err != nil {
logger.Errorw("Failed to get internal url", zap.Error(err))
return err
}
newProxy, err := proxyManagerService.NewDeploymentProxy(*newDeploymentInternalUrl)
if err != nil {
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
return err
}
proxyManager.RemoveDeployment(deployment.URL)
proxyManager.AddProxy(projectConfig.Url, newProxy)
deployment.URL = projectConfig.Url
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
if !ok {
go oldProxy.GracefulShutdown(func() {
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to remove container", zap.Error(err))
}
})
} else {
err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID)
if err != nil {
logger.Errorw("Failed to remove container", zap.Error(err))
}
}
return nil
}

View File

@@ -1,320 +0,0 @@
package server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"github.com/google/uuid"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
type App struct {
Id uuid.UUID `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Deployment *Deployment `json:"-"`
DeploymentID int64 `json:"deployment_id,omitempty"`
flux *FluxServer
}
func (flux *FluxServer) GetAppByNameHandler(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
app := flux.appManager.GetAppByName(name)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
var extApp pkg.App
deploymentStatus, err := app.Deployment.Status(r.Context())
if err != nil {
logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
extApp.DeploymentStatus = deploymentStatus
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(extApp)
}
// Create the initial app row in the database and create and start the deployment. The app is the overarching data
// structure that contains all of the data for a project
func (flux *FluxServer) CreateApp(ctx context.Context, imageName string, projectPath string, projectConfig *pkg.ProjectConfig, id uuid.UUID) (*App, error) {
app := &App{
Id: id,
flux: flux,
}
logger.Debugw("Creating deployment", zap.String("id", app.Id.String()))
deployment, err := flux.CreateDeployment(projectConfig.Port, projectConfig.Url)
app.Deployment = deployment
if err != nil {
logger.Errorw("Failed to create deployment", zap.Error(err))
return nil, err
}
for _, container := range projectConfig.Containers {
c, err := flux.CreateContainer(ctx, &container, false, deployment, container.Name)
if err != nil {
return nil, fmt.Errorf("failed to create container: %v", err)
}
c.Start(ctx, true)
}
headContainer := pkg.Container{
ImageName: imageName,
Volumes: projectConfig.Volumes,
Environment: projectConfig.Environment,
}
// this call does a lot for us, see it's documentation for more info
_, err = flux.CreateContainer(ctx, &headContainer, true, deployment, projectConfig.Name)
if err != nil {
return nil, fmt.Errorf("failed to create container: %v", err)
}
// create app in the database
var appIdBlob []byte
err = appInsertStmt.QueryRow(id[:], projectConfig.Name, deployment.ID).Scan(&appIdBlob, &app.Name, &app.DeploymentID)
if err != nil {
return nil, fmt.Errorf("failed to insert app: %v", err)
}
app.Id, err = uuid.FromBytes(appIdBlob)
if err != nil {
return nil, fmt.Errorf("failed to parse app id: %v", err)
}
err = deployment.Start(ctx)
if err != nil {
return nil, fmt.Errorf("failed to start deployment: %v", err)
}
flux.appManager.AddApp(app.Id, app)
return app, nil
}
func (app *App) Upgrade(ctx context.Context, imageName string, projectPath string, projectConfig *pkg.ProjectConfig) error {
logger.Debugw("Upgrading deployment", zap.String("id", app.Id.String()))
// if deploy is not started, start it
deploymentStatus, err := app.Deployment.Status(ctx)
if err != nil {
return fmt.Errorf("failed to get deployment status: %v", err)
}
if deploymentStatus != "running" {
err = app.Deployment.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start deployment: %v", err)
}
}
app.flux.db.Exec("UPDATE apps SET name = ? WHERE id = ?", projectConfig.Name, app.Id[:])
err = app.Deployment.Upgrade(ctx, projectConfig, imageName, projectPath)
if err != nil {
return fmt.Errorf("failed to upgrade deployment: %v", err)
}
return nil
}
// delete an app and deployment from the database, and its project files from disk.
func (app *App) Remove(ctx context.Context) error {
app.flux.appManager.RemoveApp(app.Id)
err := app.Deployment.Remove(ctx)
if err != nil {
logger.Errorw("Failed to remove deployment", zap.Error(err))
return err
}
_, err = app.flux.db.Exec("DELETE FROM apps WHERE id = ?", app.Id[:])
if err != nil {
logger.Errorw("Failed to delete app", zap.Error(err))
return err
}
projectPath := filepath.Join(app.flux.rootDir, "apps", app.Id.String())
err = os.RemoveAll(projectPath)
if err != nil {
return fmt.Errorf("failed to remove project directory: %v", err)
}
return nil
}
type AppManager struct {
pkg.TypedMap[uuid.UUID, *App]
nameIndex pkg.TypedMap[string, uuid.UUID]
}
func (am *AppManager) GetAppByName(name string) *App {
id, ok := am.nameIndex.Load(name)
if !ok {
return nil
}
return am.GetApp(id)
}
func (am *AppManager) GetApp(id uuid.UUID) *App {
app, exists := am.Load(id)
if !exists {
return nil
}
return app
}
func (am *AppManager) GetAllApps() []*App {
var apps []*App
am.Range(func(key uuid.UUID, app *App) bool {
apps = append(apps, app)
return true
})
return apps
}
// removes an app from the app manager
func (am *AppManager) RemoveApp(id uuid.UUID) {
app, ok := am.Load(id)
if !ok {
return
}
am.nameIndex.Delete(app.Name)
am.Delete(id)
}
// add a given app to the app manager
func (am *AppManager) AddApp(id uuid.UUID, app *App) {
if app.Deployment.Containers == nil || app.Deployment.Head == nil || len(app.Deployment.Containers) == 0 || app.Name == "" {
panic("invalid app")
}
am.nameIndex.Store(app.Name, id)
am.Store(id, app)
}
// nukes an app completely
func (am *AppManager) DeleteApp(id uuid.UUID) error {
app := am.GetApp(id)
if app == nil {
return fmt.Errorf("app not found")
}
// calls RemoveApp
err := app.Remove(context.Background())
if err != nil {
return err
}
return nil
}
// Scan every app in the database, and create in memory structures if the deployment is already running
func (am *AppManager) Init() {
logger.Info("Initializing deployments")
if Flux.db == nil {
logger.Panic("DB is nil")
}
rows, err := Flux.db.Query("SELECT id, name, deployment_id FROM apps")
if err != nil {
logger.Warnw("Failed to query apps", zap.Error(err))
return
}
defer rows.Close()
var apps []App
for rows.Next() {
var app App
var appIdBlob []byte
if err := rows.Scan(&appIdBlob, &app.Name, &app.DeploymentID); err != nil {
logger.Warnw("Failed to scan app", zap.Error(err))
return
}
app.Id = uuid.Must(uuid.FromBytes(appIdBlob))
app.flux = Flux
apps = append(apps, app)
}
for _, app := range apps {
deployment := &Deployment{}
var headContainer *Container
Flux.db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
deployment.Containers = make([]*Container, 0)
rows, err = Flux.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID)
if err != nil {
logger.Warnw("Failed to query containers", zap.Error(err))
return
}
defer rows.Close()
for rows.Next() {
var container Container
var containerIDString string
rows.Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
container.Deployment = deployment
copy(container.ContainerID[:], containerIDString)
if container.Head {
if headContainer != nil {
logger.Fatal("Several containers are marked as head")
}
headContainer = &container
}
rows, err := Flux.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:])
if err != nil {
logger.Warnw("Failed to query volumes", zap.Error(err))
return
}
defer rows.Close()
for rows.Next() {
volume := new(Volume)
rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID, &volume.Mountpoint)
container.Volumes = append(container.Volumes, volume)
}
deployment.Containers = append(deployment.Containers, &container)
}
if headContainer == nil {
logger.Fatal("head container is nil!")
}
deployment.Head = headContainer
app.Deployment = deployment
am.AddApp(app.Id, &app)
status, err := deployment.Status(context.Background())
if err != nil {
logger.Warnw("Failed to get deployment status", zap.Error(err))
continue
}
if status != "running" {
continue
}
deployment.Proxy, _ = deployment.NewDeploymentProxy()
Flux.proxy.AddDeployment(deployment)
}
}

View File

@@ -1,527 +0,0 @@
package server
import (
"context"
"database/sql"
"fmt"
"io"
"net/http"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
var (
containerInsertStmt *sql.Stmt
)
type Volume struct {
ID int64 `json:"id"`
VolumeID string `json:"volume_id"`
Mountpoint string `json:"mountpoint"`
ContainerID string `json:"container_id"`
}
type Container struct {
ID int64 `json:"id"`
Head bool `json:"head"` // if the container is the head of the deployment
FriendlyName string `json:"friendly_name"` // name used by other containers to reach this container
Name string `json:"name"` // name of the container in the docker daemon
Deployment *Deployment `json:"-"`
Volumes []*Volume `json:"volumes"`
ContainerID [64]byte `json:"container_id"`
DeploymentID int64 `json:"deployment_id"`
}
// Creates a volume in the docker daemon and returns the descriptor for the volume
func CreateDockerVolume(ctx context.Context) (vol *Volume, err error) {
dockerVolume, err := Flux.dockerClient.VolumeCreate(ctx, volume.CreateOptions{
Driver: "local",
DriverOpts: map[string]string{},
})
if err != nil {
return nil, fmt.Errorf("failed to create volume: %v", err)
}
logger.Debugw("Volume created", zap.String("volume_id", dockerVolume.Name), zap.String("mountpoint", dockerVolume.Mountpoint))
vol = &Volume{
VolumeID: dockerVolume.Name,
}
return vol, nil
}
// Creates a container in the docker daemon and returns the descriptor for the container
func CreateDockerContainer(ctx context.Context, imageName string, vols []*Volume, environment []string, hosts []string) (*Container, error) {
for _, host := range hosts {
if host == ":" {
return nil, fmt.Errorf("invalid host %s", host)
}
}
containerName := fmt.Sprintf("flux-%s", namesgenerator.GetRandomName(0))
logger.Debugw("Creating container", zap.String("container_id", containerName))
mounts := make([]mount.Mount, len(vols))
volumes := make(map[string]struct{}, len(vols))
for i, volume := range vols {
volumes[volume.VolumeID] = struct{}{}
mounts[i] = mount.Mount{
Type: mount.TypeVolume,
Source: volume.VolumeID,
Target: volume.Mountpoint,
ReadOnly: false,
}
}
resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: environment,
Volumes: volumes,
Labels: map[string]string{
"managed-by": "flux",
},
},
&container.HostConfig{
RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped},
NetworkMode: "bridge",
Mounts: mounts,
ExtraHosts: hosts,
},
nil,
nil,
containerName,
)
if err != nil {
return nil, err
}
c := &Container{
ContainerID: [64]byte([]byte(resp.ID)),
Volumes: vols,
Name: containerName,
}
return c, nil
}
// Create a container given a container configuration and a deployment. This will do a few things:
// 1. Create the container in the docker daemon
// 2. Create the volumes for the container
// 3. Insert the container and volumes into the database
func (flux *FluxServer) CreateContainer(ctx context.Context, container *pkg.Container, head bool, deployment *Deployment, friendlyName string) (c *Container, err error) {
if friendlyName == "" {
return nil, fmt.Errorf("container friendly name is empty")
}
if container.ImageName == "" {
return nil, fmt.Errorf("container image name is empty")
}
logger.Debugw("Creating container with image", zap.String("image", container.ImageName))
var volumes []*Volume
// in the head container, we have a default volume where the project is mounted, this is important so that if the project uses sqlite for example,
// all the data will not be lost the second the containers turns off.
if head {
vol, err := CreateDockerVolume(ctx)
if err != nil {
return nil, err
}
vol.Mountpoint = "/workspace"
volumes = append(volumes, vol)
}
for _, containerVolume := range container.Volumes {
vol, err := CreateDockerVolume(ctx)
if err != nil {
return nil, err
}
if containerVolume.Mountpoint == "" {
return nil, fmt.Errorf("mountpoint is empty")
}
if containerVolume.Mountpoint == "/workspace" || containerVolume.Mountpoint == "/" {
return nil, fmt.Errorf("invalid mountpoint")
}
vol.Mountpoint = containerVolume.Mountpoint
volumes = append(volumes, vol)
}
// if the container is the head, build a list of hostnames that the container can reach by name for this deployment
// TODO: this host list should be consistent across all containers in the deployment, not just the head
var hosts []string
if head {
for _, container := range deployment.Containers {
containerName, err := container.GetIp()
if err != nil {
return nil, err
}
hosts = append(hosts, fmt.Sprintf("%s:%s", container.FriendlyName, containerName))
}
}
// if the container is not the head, pull the image from docker hub
if !head {
image, err := Flux.dockerClient.ImagePull(ctx, container.ImageName, image.PullOptions{})
if err != nil {
logger.Errorw("Failed to pull image", zap.Error(err))
return nil, err
}
// blcok untile the image is pulled
io.Copy(io.Discard, image)
}
c, err = CreateDockerContainer(ctx, container.ImageName, volumes, container.Environment, hosts)
if err != nil {
return nil, err
}
c.FriendlyName = friendlyName
var containerIDString string
err = containerInsertStmt.QueryRow(c.ContainerID[:], head, deployment.ID).Scan(&c.ID, &containerIDString, &c.Head, &c.DeploymentID)
if err != nil {
return nil, err
}
copy(c.ContainerID[:], containerIDString)
tx, err := flux.db.Begin()
if err != nil {
return nil, err
}
volumeInsertStmt, err := tx.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
logger.Errorw("Failed to prepare statement", zap.Error(err))
tx.Rollback()
return nil, err
}
for _, vol := range c.Volumes {
err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ContainerID[:]).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
if err != nil {
tx.Rollback()
return nil, err
}
}
err = tx.Commit()
if err != nil {
tx.Rollback()
return nil, err
}
c.Deployment = deployment
if head {
deployment.Head = c
}
deployment.Containers = append(deployment.Containers, c)
return c, nil
}
func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, emvironment []string) (*Container, error) {
// Create new container with new image
logger.Debugw("Upgrading container", zap.ByteString("container_id", c.ContainerID[:12]))
if c.Volumes == nil {
return nil, fmt.Errorf("no volumes found for container %s", c.ContainerID[:12])
}
containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(c.ContainerID[:]))
if err != nil {
return nil, err
}
hosts := containerJSON.HostConfig.ExtraHosts
newContainer, err := CreateDockerContainer(ctx, imageName, c.Volumes, emvironment, hosts)
if err != nil {
return nil, err
}
newContainer.Deployment = c.Deployment
var containerIDString string
err = containerInsertStmt.QueryRow(newContainer.ContainerID[:], c.Head, c.Deployment.ID).Scan(&newContainer.ID, &containerIDString, &newContainer.Head, &newContainer.DeploymentID)
if err != nil {
logger.Errorw("Failed to insert container", zap.Error(err))
return nil, err
}
copy(newContainer.ContainerID[:], containerIDString)
tx, err := Flux.db.Begin()
if err != nil {
logger.Errorw("Failed to begin transaction", zap.Error(err))
return nil, err
}
volumeUpdateStmt, err := tx.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
tx.Rollback()
return nil, err
}
for _, vol := range newContainer.Volumes {
err = volumeUpdateStmt.QueryRow(newContainer.ContainerID[:], vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
if err != nil {
tx.Rollback()
logger.Error("Failed to update volume", zap.Error(err))
return nil, err
}
}
err = tx.Commit()
if err != nil {
tx.Rollback()
return nil, err
}
logger.Debug("Upgraded container")
return newContainer, nil
}
// initial indicates if the container was just created, because if not, we need to fix the extra hosts field since it's not guaranteed that the supplemental containers have the same ip
// as they had when the deployment was previously on
func (c *Container) Start(ctx context.Context, initial bool) error {
if !initial && c.Head {
containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, string(c.ContainerID[:]))
if err != nil {
return err
}
// remove yourself
Flux.dockerClient.ContainerRemove(ctx, string(c.ContainerID[:]), container.RemoveOptions{})
var volumes map[string]struct{} = make(map[string]struct{})
var hosts []string
var mounts []mount.Mount
for _, volume := range c.Volumes {
volumes[volume.VolumeID] = struct{}{}
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Source: volume.VolumeID,
Target: volume.Mountpoint,
ReadOnly: false,
})
}
for _, supplementalContainer := range c.Deployment.Containers {
if supplementalContainer.Head {
continue
}
ip, err := supplementalContainer.GetIp()
if err != nil {
return err
}
hosts = append(hosts, fmt.Sprintf("%s:%s", supplementalContainer.FriendlyName, ip))
}
// recreate yourself
// TODO: pull this out so it stays in sync with CreateDockerContainer
resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{
Image: containerJSON.Image,
Env: containerJSON.Config.Env,
Volumes: volumes,
Labels: map[string]string{
"managed-by": "flux",
},
},
&container.HostConfig{
RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped},
NetworkMode: "bridge",
Mounts: mounts,
ExtraHosts: hosts,
},
nil,
nil,
c.Name,
)
if err != nil {
return err
}
c.ContainerID = [64]byte([]byte(resp.ID))
Flux.db.Exec("UPDATE containers SET container_id = ? WHERE id = ?", c.ContainerID[:], c.ID)
}
return Flux.dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{})
}
func (c *Container) Stop(ctx context.Context) error {
return Flux.dockerClient.ContainerStop(ctx, string(c.ContainerID[:]), container.StopOptions{})
}
// Stop and remove a container and all of its volumes
func (c *Container) Remove(ctx context.Context) error {
err := RemoveDockerContainer(ctx, string(c.ContainerID[:]))
if err != nil {
return fmt.Errorf("failed to remove container (%s): %v", c.ContainerID[:12], err)
}
tx, err := Flux.db.Begin()
if err != nil {
logger.Errorw("Failed to begin transaction", zap.Error(err))
return err
}
_, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", c.ContainerID[:])
if err != nil {
tx.Rollback()
return err
}
for _, volume := range c.Volumes {
if err := RemoveVolume(ctx, volume.VolumeID); err != nil {
tx.Rollback()
return fmt.Errorf("failed to remove volume (%s): %v", volume.VolumeID, err)
}
_, err = tx.Exec("DELETE FROM volumes WHERE volume_id = ?", volume.VolumeID)
if err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
logger.Errorw("Failed to commit transaction", zap.Error(err))
return err
}
return nil
}
func (c *Container) Wait(ctx context.Context, port uint16) error {
return WaitForDockerContainer(ctx, string(c.ContainerID[:]), port)
}
type ContainerStatus struct {
Status string
ExitCode int
}
func (c *Container) Status(ctx context.Context) (*ContainerStatus, error) {
containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, string(c.ContainerID[:]))
if err != nil {
return nil, err
}
containerStatus := &ContainerStatus{
Status: containerJSON.State.Status,
ExitCode: containerJSON.State.ExitCode,
}
return containerStatus, nil
}
func (c *Container) GetIp() (string, error) {
containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(c.ContainerID[:]))
if err != nil {
return "", err
}
ip := containerJSON.NetworkSettings.IPAddress
return ip, nil
}
// Stops and deletes a container from the docker daemon
func RemoveDockerContainer(ctx context.Context, containerID string) error {
if err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil {
return fmt.Errorf("failed to stop container (%s): %v", containerID[:12], err)
}
if err := Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil {
return fmt.Errorf("failed to remove container (%s): %v", containerID[:12], err)
}
return nil
}
// scuffed af "health check" for docker containers
func WaitForDockerContainer(ctx context.Context, containerID string, containerPort uint16) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return fmt.Errorf("container failed to become ready in time")
default:
containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}
if containerJSON.State.Running {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/", containerJSON.NetworkSettings.IPAddress, containerPort))
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
}
time.Sleep(time.Second)
}
}
}
func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) error {
timeout := 30
err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{
Timeout: &timeout,
})
if err != nil {
return fmt.Errorf("failed to stop container: %v", err)
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{})
default:
containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}
if !containerJSON.State.Running {
return Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{})
}
time.Sleep(time.Second)
}
}
}
func RemoveVolume(ctx context.Context, volumeID string) error {
logger.Debugw("Removed volume", zap.String("volume_id", volumeID))
if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil {
return fmt.Errorf("failed to remove volume (%s): %v", volumeID, err)
}
return nil
}

View File

@@ -1,579 +0,0 @@
package server
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/joho/godotenv"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
var (
appInsertStmt *sql.Stmt
)
type DeployRequest struct {
Id uuid.UUID `form:"id"`
Config pkg.ProjectConfig `form:"config"`
Code multipart.File `form:"code"`
}
type DeploymentLock struct {
mu sync.Mutex
deployed map[uuid.UUID]context.CancelFunc
}
func NewDeploymentLock() *DeploymentLock {
return &DeploymentLock{
deployed: make(map[uuid.UUID]context.CancelFunc),
}
}
// This function will lock a deployment based on an app name so that the same app cannot be deployed twice simultaneously
func (dt *DeploymentLock) LockDeployment(appId uuid.UUID, ctx context.Context) (context.Context, error) {
dt.mu.Lock()
defer dt.mu.Unlock()
// Check if the app is already being deployed
if _, exists := dt.deployed[appId]; exists {
return nil, fmt.Errorf("app is already being deployed")
}
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
// Store the cancel function
dt.deployed[appId] = cancel
return ctx, nil
}
// This function will unlock a deployment based on an app name so that the same app can be deployed again (you would call this after a deployment has completed)
func (dt *DeploymentLock) UnlockDeployment(appId uuid.UUID) {
dt.mu.Lock()
defer dt.mu.Unlock()
// Remove the app from deployed tracking
if cancel, exists := dt.deployed[appId]; exists {
// Cancel the context
cancel()
// Remove from map
delete(dt.deployed, appId)
}
}
var deploymentLock = NewDeploymentLock()
type DeploymentEvent struct {
Stage string `json:"stage"`
Message interface{} `json:"message"`
StatusCode int `json:"status,omitempty"`
}
func (flux *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
if flux.appManager == nil {
panic("App manager is nil")
}
w.Header().Set("Content-Type", "test/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
err := r.ParseMultipartForm(10 << 30) // 10 GiB
if err != nil {
logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var deployRequest DeployRequest
projectConfig := new(pkg.ProjectConfig)
if err := json.Unmarshal([]byte(r.FormValue("config")), &projectConfig); err != nil {
logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
}
deployRequest.Config = *projectConfig
idStr := r.FormValue("id")
if idStr == "" {
id, err := uuid.NewRandom()
if err != nil {
logger.Errorw("Failed to generate uuid", zap.Error(err))
http.Error(w, "Failed to generate uuid", http.StatusInternalServerError)
return
}
deployRequest.Id = id
} else {
deployRequest.Id, err = uuid.Parse(idStr)
if err != nil {
logger.Errorw("Failed to parse uuid", zap.Error(err))
http.Error(w, "Failed to parse uuid", http.StatusBadRequest)
return
}
}
ctx, err := deploymentLock.LockDeployment(deployRequest.Id, r.Context())
if err != nil {
// This will happen if the app is already being deployed
http.Error(w, err.Error(), http.StatusConflict)
return
}
go func() {
<-ctx.Done()
deploymentLock.UnlockDeployment(deployRequest.Id)
}()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusMultiStatus)
eventChannel := make(chan DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
ev := pkg.DeploymentEvent{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}(w, flusher)
eventChannel <- DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- DeploymentEvent{
Stage: "error",
Message: "No code archive found",
StatusCode: http.StatusBadRequest,
}
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
StatusCode: http.StatusBadRequest,
}
return
}
logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url), zap.String("id", deployRequest.Id.String()))
projectPath, err := flux.UploadAppCode(deployRequest.Code, deployRequest.Id)
if err != nil {
logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
// We need to pre-process EnvFile since docker has no concept of where the file is, or anything like that, so we have to read from it,
// and place all of it's content into the environment field so that docker can find it later
if projectConfig.EnvFile != "" {
envPath := filepath.Join(projectPath, projectConfig.EnvFile)
// prevent path traversal
realEnvPath, err := filepath.EvalSymlinks(envPath)
if err != nil {
logger.Errorw("Failed to eval symlinks", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to eval symlinks: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
if !strings.HasPrefix(realEnvPath, projectPath) {
logger.Errorw("Env file is not in project directory", zap.String("env_file", projectConfig.EnvFile))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Env file is not in project directory: %s", projectConfig.EnvFile),
StatusCode: http.StatusBadRequest,
}
return
}
envBytes, err := os.Open(realEnvPath)
if err != nil {
logger.Errorw("Failed to open env file", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to open env file: %v", err),
StatusCode: http.StatusInternalServerError,
}
return
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
logger.Errorw("Failed to parse env file", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to parse env file: %v", err),
StatusCode: http.StatusInternalServerError,
}
return
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
pipeGroup := sync.WaitGroup{}
streamPipe := func(pipe io.ReadCloser) {
pipeGroup.Add(1)
defer pipeGroup.Done()
defer pipe.Close()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- DeploymentEvent{
Stage: "cmd_output",
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
logger.Errorw("Error reading pipe", zap.Error(err))
}
}
logger.Debugw("Preparing project", zap.String("name", projectConfig.Name), zap.String("id", deployRequest.Id.String()))
eventChannel <- DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
// redirect stdout and stderr to the event channel
reader, writer := io.Pipe()
prepareCmd := exec.Command("go", "generate")
prepareCmd.Dir = projectPath
prepareCmd.Stdout = writer
prepareCmd.Stderr = writer
err = prepareCmd.Start()
if err != nil {
logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
writer.Close()
eventChannel <- DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
reader, writer = io.Pipe()
logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("fluxi-%s", namesgenerator.GetRandomName(0))
buildCmd := exec.Command("pack", "build", imageName, "--builder", flux.config.Builder)
buildCmd.Dir = projectPath
buildCmd.Stdout = writer
buildCmd.Stderr = writer
err = buildCmd.Start()
if err != nil {
logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
go streamPipe(reader)
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
app := flux.appManager.GetApp(deployRequest.Id)
eventChannel <- DeploymentEvent{
Stage: "creating",
Message: "Creating deployment",
}
if app == nil {
app, err = flux.CreateApp(ctx, imageName, projectPath, projectConfig, deployRequest.Id)
} else {
err = app.Upgrade(ctx, imageName, projectPath, projectConfig)
}
if err != nil {
logger.Errorw("Failed to deploy app", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
var extApp pkg.App
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
eventChannel <- DeploymentEvent{
Stage: "complete",
Message: extApp,
}
logger.Infow("App deployed successfully", zap.String("id", app.Id.String()))
}
func (flux *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) {
appId, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
app := flux.appManager.GetApp(appId)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
status, err := app.Deployment.Status(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if status == "running" {
http.Error(w, "App is already running", http.StatusBadRequest)
return
}
err = app.Deployment.Start(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if app.Deployment.Proxy == nil {
app.Deployment.Proxy, _ = app.Deployment.NewDeploymentProxy()
}
w.WriteHeader(http.StatusOK)
}
func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) {
appId, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
app := Flux.appManager.GetApp(appId)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
status, err := app.Deployment.Status(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if status == "stopped" || status == "failed" {
http.Error(w, "App is already stopped", http.StatusBadRequest)
return
}
err = app.Deployment.Stop(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) {
appId, err := uuid.Parse(r.PathValue("id"))
if err != nil {
http.Error(w, "Invalid app id", http.StatusBadRequest)
return
}
logger.Debugw("Deleting deployment", zap.String("id", appId.String()))
err = Flux.appManager.DeleteApp(appId)
if err != nil {
logger.Errorw("Failed to delete app", zap.Error(err))
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
}
func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
if s.config.DisableDeleteAll {
http.Error(w, "Delete all is disabled", http.StatusForbidden)
return
}
for _, app := range Flux.appManager.GetAllApps() {
err := Flux.appManager.DeleteApp(app.Id)
if err != nil {
logger.Errorw("Failed to remove app", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}
func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) {
// for each app, get the deployment status
var apps []pkg.App
for _, app := range Flux.appManager.GetAllApps() {
var extApp pkg.App
deploymentStatus, err := app.Deployment.Status(r.Context())
if err != nil {
logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
extApp.Id = app.Id
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
extApp.DeploymentStatus = deploymentStatus
apps = append(apps, extApp)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(apps)
}
func (s *FluxServer) DaemonInfoHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pkg.Info{
Compression: s.config.Compression,
Version: pkg.Version,
})
}

View File

@@ -1,193 +0,0 @@
package server
import (
"context"
"database/sql"
"fmt"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
var (
deploymentInsertStmt *sql.Stmt
)
type Deployment struct {
ID int64 `json:"id"`
Head *Container `json:"head,omitempty"`
Containers []*Container `json:"containers,omitempty"`
Proxy *DeploymentProxy `json:"-"`
URL string `json:"url"`
Port uint16 `json:"port"`
}
// Creates a deployment row in the database, containting the URL the app should be hosted on (it's public hostname)
// and the port that the web server is listening on
func (flux *FluxServer) CreateDeployment(port uint16, appUrl string) (*Deployment, error) {
var deployment Deployment
err := deploymentInsertStmt.QueryRow(appUrl, port).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
if err != nil {
logger.Errorw("Failed to insert deployment", zap.Error(err))
return nil, err
}
return &deployment, nil
}
// Takes an existing deployment, and gracefully upgrades the app to a new image
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.ProjectConfig, imageName string, projectPath string) error {
// we only upgrade the head container, in the future we might want to allow upgrading supplemental containers, but this should work just fine for now.
newHeadContainer, err := deployment.Head.Upgrade(ctx, imageName, projectPath, projectConfig.Environment)
if err != nil {
logger.Errorw("Failed to upgrade container", zap.Error(err))
return err
}
oldHeadContainer := deployment.Head
Flux.db.Exec("DELETE FROM containers WHERE id = ?", oldHeadContainer.ID)
var containers []*Container
for _, container := range deployment.Containers {
if !container.Head {
containers = append(containers, container)
}
}
deployment.Head = newHeadContainer
deployment.Containers = append(containers, newHeadContainer)
logger.Debugw("Starting container", zap.ByteString("container_id", newHeadContainer.ContainerID[:12]))
err = newHeadContainer.Start(ctx, true)
if err != nil {
logger.Errorw("Failed to start container", zap.Error(err))
return err
}
if err := newHeadContainer.Wait(ctx, projectConfig.Port); err != nil {
logger.Errorw("Failed to wait for container", zap.Error(err))
return err
}
if _, err := Flux.db.Exec("UPDATE deployments SET url = ?, port = ? WHERE id = ?", projectConfig.Url, projectConfig.Port, deployment.ID); err != nil {
logger.Errorw("Failed to update deployment", zap.Error(err))
return err
}
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections
oldProxy := deployment.Proxy
deployment.Proxy, err = deployment.NewDeploymentProxy()
if err != nil {
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
return err
}
// gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers
if oldProxy != nil {
go oldProxy.GracefulShutdown([]*Container{oldHeadContainer})
} else {
err := RemoveDockerContainer(context.Background(), string(oldHeadContainer.ContainerID[:]))
if err != nil {
logger.Errorw("Failed to remove container", zap.Error(err))
}
}
deployment.Containers = containers
return nil
}
// Remove a deployment and all of it's containers
func (d *Deployment) Remove(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Remove(ctx)
if err != nil {
logger.Errorf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
Flux.proxy.RemoveDeployment(d)
_, err := Flux.db.Exec("DELETE FROM deployments WHERE id = ?", d.ID)
if err != nil {
logger.Errorw("Failed to delete deployment", zap.Error(err))
return err
}
return nil
}
func (d *Deployment) Start(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Start(ctx, false)
if err != nil {
logger.Errorf("Failed to start container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
if d.Proxy == nil {
d.Proxy, _ = d.NewDeploymentProxy()
Flux.proxy.AddDeployment(d)
}
return nil
}
func (d *Deployment) Stop(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Stop(ctx)
if err != nil {
logger.Errorf("Failed to start container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
Flux.proxy.RemoveDeployment(d)
d.Proxy = nil
return nil
}
// return the status of a deployment, either "running", "failed", "stopped", or "pending", errors if not all
// containers are in the same state
func (d *Deployment) Status(ctx context.Context) (string, error) {
var status *ContainerStatus
if d == nil {
return "", fmt.Errorf("deployment is nil")
}
if d.Containers == nil {
return "", fmt.Errorf("containers are nil")
}
for _, container := range d.Containers {
containerStatus, err := container.Status(ctx)
if err != nil {
logger.Errorw("Failed to get container status", zap.Error(err))
return "", err
}
// if not all containers are in the same state
if status != nil && status.Status != containerStatus.Status {
return "", fmt.Errorf("malformed deployment")
}
status = containerStatus
}
switch status.Status {
case "running":
return "running", nil
case "exited":
if status.ExitCode != 0 {
// non-zero exit code in unix terminology means the program did no complete successfully
return "failed", nil
}
return "stopped", nil
default:
return "pending", nil
}
}

View File

@@ -1,128 +0,0 @@
package server
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
)
type Proxy struct {
// map[string]*Deployment
deployments sync.Map
}
// Stops forwarding traffic to a deployment
func (p *Proxy) RemoveDeployment(deployment *Deployment) {
p.deployments.Delete(deployment.URL)
}
// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called.
func (p *Proxy) AddDeployment(deployment *Deployment) {
logger.Debugw("Adding deployment", zap.String("url", deployment.URL))
p.deployments.Store(deployment.URL, deployment)
}
// This function is responsible for taking an http request and forwarding it to the correct deployment
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host := r.Host
deployment, ok := p.deployments.Load(host)
if !ok {
http.Error(w, "Not found", http.StatusNotFound)
return
}
// on response from the server, this is decremented
atomic.AddInt64(&deployment.(*Deployment).Proxy.activeRequests, 1)
deployment.(*Deployment).Proxy.proxy.ServeHTTP(w, r)
}
type DeploymentProxy struct {
deployment *Deployment
proxy *httputil.ReverseProxy
gracePeriod time.Duration
activeRequests int64
}
// Creates a proxy for a given deployment
func (deployment *Deployment) NewDeploymentProxy() (*DeploymentProxy, error) {
if deployment == nil {
return nil, fmt.Errorf("deployment is nil")
}
containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(deployment.Head.ContainerID[:]))
if err != nil {
return nil, err
}
if containerJSON.NetworkSettings.IPAddress == "" {
return nil, fmt.Errorf("no IP address found for container %s", deployment.Head.ContainerID[:12])
}
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, deployment.Port))
if err != nil {
return nil, err
}
proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL = &url.URL{
Scheme: containerUrl.Scheme,
Host: containerUrl.Host,
Path: req.URL.Path,
}
req.Host = containerUrl.Host
},
Transport: &http.Transport{
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 100,
},
ModifyResponse: func(resp *http.Response) error {
atomic.AddInt64(&deployment.Proxy.activeRequests, -1)
return nil
},
}
return &DeploymentProxy{
deployment: deployment,
proxy: proxy,
gracePeriod: time.Second * 30,
activeRequests: 0,
}, nil
}
// Drains connections from a proxy
func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) {
ctx, cancel := context.WithTimeout(context.Background(), dp.gracePeriod)
defer cancel()
done := false
for !done {
select {
case <-ctx.Done():
done = true
default:
if atomic.LoadInt64(&dp.activeRequests) == 0 {
done = true
}
time.Sleep(time.Second)
}
}
for _, container := range oldContainers {
err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:]))
if err != nil {
logger.Errorw("Failed to remove container", zap.Error(err))
}
}
}

View File

@@ -1,284 +0,0 @@
package server
import (
"archive/tar"
"compress/gzip"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
_ "embed"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/client"
"github.com/google/uuid"
"github.com/juls0730/flux/pkg"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
//go:embed schema.sql
schemaBytes []byte
DefaultConfig = FluxServerConfig{
Builder: "paketobuildpacks/builder-jammy-tiny",
Compression: pkg.Compression{
Enabled: false,
Level: 0,
},
}
Flux *FluxServer
logger *zap.SugaredLogger
)
type FluxServerConfig struct {
Builder string `json:"builder"`
DisableDeleteAll bool `json:"disable_delete_all"`
Compression pkg.Compression `json:"compression"`
}
type FluxServer struct {
config FluxServerConfig
db *sql.DB
proxy *Proxy
rootDir string
appManager *AppManager
dockerClient *client.Client
Logger *zap.SugaredLogger
}
func NewFluxServer() *FluxServer {
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
logger.Fatalw("Failed to create docker client", zap.Error(err))
}
rootDir := os.Getenv("FLUXD_ROOT_DIR")
if rootDir == "" {
rootDir = "/var/fluxd"
}
if err := os.MkdirAll(rootDir, 0755); err != nil {
logger.Fatalw("Failed to create fluxd directory", zap.Error(err))
}
db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db"))
if err != nil {
logger.Fatalw("Failed to open database", zap.Error(err))
}
_, err = db.Exec(string(schemaBytes))
if err != nil {
logger.Fatalw("Failed to create database schema", zap.Error(err))
}
err = PrepareDBStatements(db)
if err != nil {
logger.Fatalw("Failed to prepare database statements", zap.Error(err))
}
return &FluxServer{
db: db,
proxy: &Proxy{},
appManager: new(AppManager),
rootDir: rootDir,
dockerClient: dockerClient,
}
}
func (s *FluxServer) Stop() {
s.Logger.Sync()
}
func NewServer() *FluxServer {
verbosity, err := strconv.Atoi(os.Getenv("FLUXD_VERBOSITY"))
if err != nil {
verbosity = 0
}
config := zap.NewProductionConfig()
if os.Getenv("DEBUG") == "true" {
config = zap.NewDevelopmentConfig()
verbosity = -1
}
config.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity))
lameLogger, err := config.Build()
if err != nil {
logger.Fatalw("Failed to create logger", zap.Error(err))
}
logger = lameLogger.Sugar()
Flux = NewFluxServer()
Flux.Logger = logger
var serverConfig FluxServerConfig
// parse config, if it doesnt exist, create it and use the default config
configPath := filepath.Join(Flux.rootDir, "config.json")
if _, err := os.Stat(configPath); err != nil {
if err := os.MkdirAll(Flux.rootDir, 0755); err != nil {
logger.Fatalw("Failed to create fluxd directory", zap.Error(err))
}
configBytes, err := json.Marshal(DefaultConfig)
if err != nil {
logger.Fatalw("Failed to marshal default config", zap.Error(err))
}
logger.Debugw("Config file not found creating default config file at", zap.String("path", configPath))
if err := os.WriteFile(configPath, configBytes, 0644); err != nil {
logger.Fatalw("Failed to write config file", zap.Error(err))
}
}
configFile, err := os.ReadFile(configPath)
if err != nil {
logger.Fatalw("Failed to read config file", zap.Error(err))
}
if err := json.Unmarshal(configFile, &serverConfig); err != nil {
logger.Fatalw("Failed to parse config file", zap.Error(err))
}
Flux.config = serverConfig
logger.Infof("Pulling builder image %s this may take a while...", serverConfig.Builder)
events, err := Flux.dockerClient.ImagePull(context.Background(), fmt.Sprintf("%s:latest", serverConfig.Builder), image.PullOptions{})
if err != nil {
logger.Fatalw("Failed to pull builder image", zap.Error(err))
}
// blocking until the iamge is pulled
io.Copy(io.Discard, events)
logger.Infow("Successfully pulled builder image", zap.String("image", serverConfig.Builder))
if err := os.MkdirAll(filepath.Join(Flux.rootDir, "apps"), 0755); err != nil {
logger.Fatalw("Failed to create apps directory", zap.Error(err))
}
Flux.appManager.Init()
port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" {
port = "7465"
}
go func() {
logger.Infof("Proxy server starting on http://127.0.0.1:%s", port)
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), Flux.proxy); err != nil {
logger.Fatalw("Failed to start proxy server", zap.Error(err))
}
}()
return Flux
}
// Handler for uploading a project to the server. We have to upload the entire project since we need to build the
// project ourselves to work with the buildpacks
func (s *FluxServer) UploadAppCode(code io.Reader, appId uuid.UUID) (string, error) {
var err error
projectPath := filepath.Join(s.rootDir, "apps", appId.String())
if err = os.MkdirAll(projectPath, 0755); err != nil {
logger.Errorw("Failed to create project directory", zap.Error(err))
return "", err
}
var gzReader *gzip.Reader
defer func() {
if gzReader != nil {
gzReader.Close()
}
}()
if s.config.Compression.Enabled {
gzReader, err = gzip.NewReader(code)
if err != nil {
logger.Infow("Failed to create gzip reader", zap.Error(err))
return "", err
}
}
var tarReader *tar.Reader
if gzReader != nil {
tarReader = tar.NewReader(gzReader)
} else {
tarReader = tar.NewReader(code)
}
logger.Infow("Extracting files for project", zap.String("project", projectPath))
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
logger.Debugw("Failed to read tar header", zap.Error(err))
return "", err
}
// Construct full path
path := filepath.Join(projectPath, header.Name)
// Handle different file types
switch header.Typeflag {
case tar.TypeDir:
if err = os.MkdirAll(path, 0755); err != nil {
logger.Debugw("Failed to extract directory", zap.Error(err))
return "", err
}
case tar.TypeReg:
if err = os.MkdirAll(filepath.Dir(path), 0755); err != nil {
logger.Debugw("Failed to extract directory", zap.Error(err))
return "", err
}
outFile, err := os.Create(path)
if err != nil {
logger.Debugw("Failed to extract file", zap.Error(err))
return "", err
}
defer outFile.Close()
if _, err = io.Copy(outFile, tarReader); err != nil {
logger.Debugw("Failed to copy file during extraction", zap.Error(err))
return "", err
}
}
}
return projectPath, nil
}
// TODO: split each prepare statement into its coresponding module so the statememnts are easier to find
func PrepareDBStatements(db *sql.DB) error {
var err error
appInsertStmt, err = db.Prepare("INSERT INTO apps (id, name, deployment_id) VALUES ($1, $2, $3) RETURNING id, name, deployment_id")
if err != nil {
return fmt.Errorf("failed to prepare statement: %v", err)
}
containerInsertStmt, err = db.Prepare("INSERT INTO containers (container_id, head, deployment_id) VALUES (?, ?, ?) RETURNING id, container_id, head, deployment_id")
if err != nil {
return err
}
deploymentInsertStmt, err = db.Prepare("INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port")
if err != nil {
logger.Errorw("Failed to prepare statement", zap.Error(err))
return err
}
return nil
}

View File

@@ -0,0 +1,303 @@
package appManagerService
import (
"context"
"database/sql"
"fmt"
"github.com/google/uuid"
"github.com/juls0730/flux/internal/docker"
models "github.com/juls0730/flux/internal/models"
proxyManagerService "github.com/juls0730/flux/internal/services/proxy"
"github.com/juls0730/flux/internal/util"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
type AppManager struct {
util.TypedMap[uuid.UUID, *models.App]
nameIndex util.TypedMap[string, uuid.UUID]
logger *zap.SugaredLogger
proxyManager *proxyManagerService.ProxyManager
dockerClient *docker.DockerClient
db *sql.DB
}
func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager *proxyManagerService.ProxyManager, logger *zap.SugaredLogger) *AppManager {
return &AppManager{
db: db,
dockerClient: dockerClient,
proxyManager: proxyManager,
logger: logger,
}
}
func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, projectConfig *pkg.ProjectConfig, id uuid.UUID) (*models.App, error) {
app := models.App{
Id: id,
}
appManager.logger.Debugw("Creating deployment", zap.String("id", app.Id.String()))
app.Deployment = models.NewDeployment()
if app.Deployment == nil {
appManager.logger.Errorw("Failed to create deployment")
return nil, fmt.Errorf("failed to create deployment")
}
if err := appManager.db.QueryRowContext(ctx, "INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port", projectConfig.Url, projectConfig.Port).Scan(&app.Deployment.ID, &app.Deployment.URL, &app.Deployment.Port); err != nil {
appManager.logger.Errorw("Failed to create deployment", zap.Error(err))
return nil, err
}
for _, container := range projectConfig.Containers {
// Create a container given a container configuration and a deployment. This will do a few things:
// 1. Create the container in the docker daemon
// 2. Create the volumes for the container
// 3. Insert the container and volumes into the database
c, err := models.CreateContainer(ctx, container.ImageName, container.Name, false, container.Environment, container.Volumes, app.Deployment, appManager.logger, appManager.dockerClient, appManager.db)
if err != nil {
appManager.logger.Errorw("Failed to create container", zap.Error(err))
return nil, fmt.Errorf("failed to create container: %v", err)
}
c.Start(ctx, true, appManager.db, appManager.dockerClient, appManager.logger)
}
_, err := models.CreateContainer(ctx, imageName, projectConfig.Name, true, projectConfig.Environment, projectConfig.Volumes, app.Deployment, appManager.logger, appManager.dockerClient, appManager.db)
if err != nil {
appManager.logger.Errorw("Failed to create container", zap.Error(err))
return nil, fmt.Errorf("failed to create container: %v", err)
}
var appIdBlob []byte
err = appManager.db.QueryRow("INSERT INTO apps (id, name, deployment_id) VALUES ($1, $2, $3) RETURNING id, name, deployment_id", app.Id[:], projectConfig.Name, app.Deployment.ID).Scan(&appIdBlob, &app.Name, &app.DeploymentID)
if err != nil {
appManager.logger.Errorw("Failed to insert app", zap.Error(err))
return nil, fmt.Errorf("failed to insert app: %v", err)
}
app.Id = uuid.Must(uuid.FromBytes(appIdBlob))
err = app.Deployment.Start(ctx, appManager.dockerClient)
if err != nil {
appManager.logger.Errorw("Failed to start deployment", zap.Error(err))
return nil, fmt.Errorf("failed to start deployment: %v", err)
}
appManager.AddApp(app.Id, app)
appPtr := appManager.GetApp(app.Id)
deploymentInternalUrl, err := app.Deployment.GetInternalUrl(appManager.dockerClient)
if err != nil {
appManager.logger.Errorw("Failed to get internal url", zap.Error(err))
return nil, fmt.Errorf("failed to get internal url: %v", err)
}
newProxy, err := proxyManagerService.NewDeploymentProxy(*deploymentInternalUrl)
if err != nil {
appManager.logger.Errorw("Failed to create deployment proxy", zap.Error(err))
return nil, fmt.Errorf("failed to create deployment proxy: %v", err)
}
appManager.proxyManager.AddProxy(appPtr.Deployment.URL, newProxy)
return appPtr, nil
}
func (appManager *AppManager) Upgrade(ctx context.Context, appId uuid.UUID, imageName string, projectConfig *pkg.ProjectConfig) error {
appManager.logger.Debugw("Upgrading app", zap.String("app_id", appId.String()), zap.String("image_name", imageName))
app := appManager.GetApp(appId)
if app == nil {
appManager.logger.Errorw("App not found, but upgrade called", zap.String("app_id", appId.String()))
return fmt.Errorf("failed to get app")
}
deploymentStatus, err := app.Deployment.Status(ctx, appManager.dockerClient, appManager.logger)
if err != nil {
appManager.logger.Errorw("Failed to get deployment status", zap.Error(err))
return fmt.Errorf("failed to get deployment status: %v", err)
}
if deploymentStatus != "running" {
err = app.Deployment.Start(ctx, appManager.dockerClient)
if err != nil {
appManager.logger.Errorw("Failed to start deployment", zap.Error(err))
return fmt.Errorf("failed to start deployment: %v", err)
}
}
err = app.Deployment.Upgrade(ctx, projectConfig, imageName, appManager.dockerClient, appManager.proxyManager, appManager.db, appManager.logger)
if err != nil {
appManager.logger.Errorw("Failed to upgrade deployment", zap.Error(err))
return fmt.Errorf("failed to upgrade deployment: %v", err)
}
return nil
}
func (am *AppManager) GetAppByName(name string) *models.App {
id, ok := am.nameIndex.Load(name)
if !ok {
return nil
}
return am.GetApp(id)
}
func (am *AppManager) GetApp(id uuid.UUID) *models.App {
app, exists := am.Load(id)
if !exists {
return nil
}
return app
}
func (am *AppManager) GetAllApps() []*models.App {
var apps []*models.App
am.Range(func(key uuid.UUID, app *models.App) bool {
apps = append(apps, app)
return true
})
return apps
}
// removes an app from the app manager
func (am *AppManager) RemoveApp(id uuid.UUID) {
app, ok := am.Load(id)
if !ok {
return
}
am.nameIndex.Delete(app.Name)
am.Delete(id)
}
// add a given app to the app manager
func (am *AppManager) AddApp(id uuid.UUID, app models.App) {
if app.Deployment == nil || app.Deployment.Containers() == nil || app.Deployment.Head() == nil || len(app.Deployment.Containers()) == 0 || app.Name == "" {
panic("invalid app")
}
am.nameIndex.Store(app.Name, id)
am.Store(id, &app)
}
// nukes an app completely
func (am *AppManager) DeleteApp(id uuid.UUID) error {
app := am.GetApp(id)
if app == nil {
return fmt.Errorf("app not found")
}
am.logger.Debugw("Deleting app", zap.String("id", id.String()))
// calls RemoveApp
err := app.Remove(context.Background(), am.dockerClient, am.db, am.logger)
if err != nil {
am.logger.Errorw("Failed to remove app", zap.Error(err))
return err
}
return nil
}
// Scan every app in the database, and create in memory structures if the deployment is already running
func (am *AppManager) Init() {
am.logger.Info("Initializing deployments")
if am.db == nil {
am.logger.Panic("DB is nil")
}
appRows, err := am.db.Query("SELECT id, name, deployment_id FROM apps")
if err != nil {
am.logger.Errorw("Failed to get apps", zap.Error(err))
return
}
defer appRows.Close()
var apps []models.App
for appRows.Next() {
var app models.App
var appIdBlob []byte
if err := appRows.Scan(&appIdBlob, &app.Name, &app.DeploymentID); err != nil {
am.logger.Warnw("Failed to scan app", zap.Error(err))
return
}
app.Id = uuid.Must(uuid.FromBytes(appIdBlob))
app.Deployment = models.NewDeployment()
if app.Deployment == nil {
am.logger.Errorw("Failed to create deployment")
return
}
err := am.db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&app.Deployment.ID, &app.Deployment.URL, &app.Deployment.Port)
if err != nil {
am.logger.Errorw("Failed to get deployment", zap.Error(err))
return
}
am.logger.Debugw("Found deployment", zap.Int64("id", app.Deployment.ID))
containerRows, err := am.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID)
if err != nil {
am.logger.Warnw("Failed to query containers", zap.Error(err))
return
}
defer containerRows.Close()
for containerRows.Next() {
var container models.Container
containerRows.Scan(&container.ID, &container.ContainerID, &container.DeploymentID, &container.Head)
container.Deployment = app.Deployment
volumeRows, err := am.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:])
if err != nil {
am.logger.Warnw("Failed to query volumes", zap.Error(err))
return
}
defer volumeRows.Close()
for volumeRows.Next() {
volume := new(models.Volume)
volumeRows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID, &volume.Mountpoint)
container.Volumes = append(container.Volumes, volume)
}
app.Deployment.AppendContainer(&container)
}
// TODO: Store state of deployment in database and start it if it's stopped and should be started
apps = append(apps, app)
}
for _, app := range apps {
am.AddApp(app.Id, app)
am.logger.Debugw("Added app", zap.String("id", app.Id.String()))
status, err := app.Deployment.Status(context.Background(), am.dockerClient, am.logger)
if err != nil {
am.logger.Warnw("Failed to get deployment status", zap.Error(err))
continue
}
if status != "running" {
continue
}
proxyURL, err := app.Deployment.GetInternalUrl(am.dockerClient)
if err != nil {
am.logger.Errorw("Failed to parse proxy url", zap.Error(err))
continue
}
proxy, err := proxyManagerService.NewDeploymentProxy(*proxyURL)
if err != nil {
am.logger.Errorw("Failed to create proxy", zap.Error(err))
continue
}
am.proxyManager.AddProxy(app.Deployment.URL, proxy)
am.logger.Debugw("Created proxy", zap.String("id", app.Id.String()))
}
}

View File

@@ -0,0 +1,133 @@
package proxyManagerService
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"sync/atomic"
"time"
"github.com/juls0730/flux/internal/util"
"go.uber.org/zap"
)
type DeploymentId int64
// this is the object that oversees the proxying of requests to the correct deployment
type ProxyManager struct {
util.TypedMap[string, *Proxy]
logger *zap.SugaredLogger
}
func NewProxyManager(logger *zap.SugaredLogger) *ProxyManager {
return &ProxyManager{
logger: logger,
}
}
func (proxyManager *ProxyManager) ListenAndServe(host string) error {
if host == "" {
host = "0.0.0.0:7465"
}
proxyManager.logger.Infof("Proxy server starting on http://%s", host)
if err := http.ListenAndServe(host, proxyManager); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("failed to start proxy server: %v", err)
}
return nil
}
// Stops forwarding traffic to a deployment
func (proxyManager *ProxyManager) RemoveDeployment(host string) {
proxyManager.Delete(host)
}
// Starts forwarding traffic to a deployment. The deployment must be ready to recieve requests before this is called.
func (proxyManager *ProxyManager) AddProxy(host string, proxy *Proxy) {
proxyManager.logger.Debugw("Adding proxy", zap.String("host", host))
proxyManager.Store(host, proxy)
}
// This function is responsible for taking an http request and forwarding it to the correct deployment
func (proxyManager *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host := r.Host
proxyManager.logger.Debugw("Proxying request", zap.String("host", host))
proxy, ok := proxyManager.Load(host)
if !ok {
http.Error(w, "Not found", http.StatusNotFound)
return
}
proxy.proxyFunc.ServeHTTP(w, r)
}
type Proxy struct {
forwardingFor url.URL
proxyFunc *httputil.ReverseProxy
gracePeriod time.Duration
activeRequests int64
}
// type DeploymentProxy struct {
// deployment *models.Deployment
// proxy *httputil.ReverseProxy
// gracePeriod time.Duration
// activeRequests int64
// }
// Creates a proxy for a given deployment
func NewDeploymentProxy(forwardingFor url.URL) (*Proxy, error) {
proxy := &Proxy{
forwardingFor: forwardingFor,
gracePeriod: time.Second * 30,
activeRequests: 0,
}
proxy.proxyFunc = &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL = &url.URL{
Scheme: forwardingFor.Scheme,
Host: forwardingFor.Host,
Path: req.URL.Path,
}
req.Host = forwardingFor.Host
atomic.AddInt64(&proxy.activeRequests, 1)
},
Transport: &http.Transport{
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 100,
},
ModifyResponse: func(resp *http.Response) error {
atomic.AddInt64(&proxy.activeRequests, -1)
return nil
},
}
return proxy, nil
}
// Drains connections from a proxy
func (p *Proxy) GracefulShutdown(shutdownFunc func()) {
ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod)
defer cancel()
done := false
for !done {
select {
case <-ctx.Done():
done = true
default:
if atomic.LoadInt64(&p.activeRequests) == 0 {
done = true
}
time.Sleep(time.Second)
}
}
shutdownFunc()
}

View File

@@ -0,0 +1,70 @@
package cli_util
import (
"encoding/json"
"fmt"
"os"
"github.com/juls0730/flux/pkg"
"github.com/juls0730/flux/pkg/API"
)
type Project struct {
Id string `json:"id"`
Name string `json:"name"`
}
func GetProject(command string, args []string, config pkg.CLIConfig) (*Project, error) {
var projectName string
// we are in a project directory and the project is deployed
if _, err := os.Stat(".fluxid"); err == nil {
id, err := os.ReadFile(".fluxid")
if err != nil {
return nil, fmt.Errorf("failed to read .fluxid: %v", err)
}
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-id/" + string(id))
if err != nil {
return nil, fmt.Errorf("failed to get app: %v", err)
}
return &Project{
Id: app.Id.String(),
Name: app.Name,
}, nil
}
// we are calling flux from a project directory, but the project isnt deployed yet
if len(args) == 0 {
if _, err := os.Stat("flux.json"); err != nil {
return nil, fmt.Errorf("the current directory is not a flux project, please run flux %[1]s in the project directory", command)
}
fluxConfigFile, err := os.Open("flux.json")
if err != nil {
return nil, fmt.Errorf("failed to open flux.json: %v", err)
}
defer fluxConfigFile.Close()
var config pkg.ProjectConfig
if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil {
return nil, fmt.Errorf("failed to decode flux.json: %v", err)
}
projectName = config.Name
} else {
projectName = args[0]
}
// we are calling flux with a project name (ie `flux start my-project`)
app, err := GetRequest[API.App](config.DaemonURL + "/app/by-name/" + projectName)
if err != nil {
return nil, fmt.Errorf("failed to get app: %v", err)
}
return &Project{
Id: app.Id.String(),
Name: app.Name,
}, nil
}

View File

@@ -0,0 +1,86 @@
package cli_util
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)
// make a function that makes an http GET request to the daemon and returns data of type T
func GetRequest[T any](url string) (*T, error) {
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("http get request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}
responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n"))
return nil, fmt.Errorf("http get request failed: %s", responseBody)
}
var data T
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, fmt.Errorf("failed to decode http response: %v", err)
}
return &data, nil
}
func DeleteRequest(url string) error {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to delete: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to delete: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)
}
responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n"))
return fmt.Errorf("delete failed: %s", responseBody)
}
return nil
}
func PutRequest(url string, data io.Reader) error {
req, err := http.NewRequest("PUT", url, data)
if err != nil {
return fmt.Errorf("failed to put: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to put: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)
}
responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n"))
return fmt.Errorf("put failed: %s", responseBody)
}
return nil
}

View File

@@ -0,0 +1,72 @@
package cli_util
import (
"fmt"
"os"
"sync"
)
type CustomSpinnerWriter struct {
currentSpinnerMsg string
lock sync.Mutex
}
func NewCustomSpinnerWriter() *CustomSpinnerWriter {
return &CustomSpinnerWriter{
currentSpinnerMsg: "",
lock: sync.Mutex{},
}
}
func (w *CustomSpinnerWriter) Write(p []byte) (n int, err error) {
w.lock.Lock()
defer w.lock.Unlock()
n, err = os.Stdout.Write(p)
if err != nil {
return n, err
}
w.currentSpinnerMsg = string(p)
return len(p), nil
}
type CustomStdout struct {
spinner *CustomSpinnerWriter
lock sync.Mutex
}
func NewCustomStdout(spinner *CustomSpinnerWriter) *CustomStdout {
return &CustomStdout{
spinner: spinner,
lock: sync.Mutex{},
}
}
// We have this custom writer because we want to have a spinner at the bottom of the terminal, but we dont want to have
// it interfere with the output of the command
func (w *CustomStdout) Write(p []byte) (n int, err error) {
w.lock.Lock()
defer w.lock.Unlock()
// clear line and carriage return
n, err = os.Stdout.Write(fmt.Appendf(nil, "\033[2K\r%s", p))
if err != nil {
return n, err
}
nn, err := os.Stdout.Write([]byte(w.spinner.currentSpinnerMsg))
if err != nil {
return n, err
}
n = nn + n
return n, nil
}
func (w *CustomStdout) Printf(format string, a ...interface{}) (n int, err error) {
str := fmt.Sprintf(format, a...)
return w.Write([]byte(str))
}

51
internal/util/lock.go Normal file
View File

@@ -0,0 +1,51 @@
package util
import (
"context"
"fmt"
"sync"
)
var ErrLocked = fmt.Errorf("item is locked")
type MutexLock[T comparable] struct {
mu sync.Mutex
deployed map[T]context.CancelFunc
}
func NewMutexLock[T comparable]() *MutexLock[T] {
return &MutexLock[T]{
deployed: make(map[T]context.CancelFunc),
}
}
func (dt *MutexLock[T]) Lock(id T, ctx context.Context) (context.Context, error) {
dt.mu.Lock()
defer dt.mu.Unlock()
// Check if the object is locked
if _, exists := dt.deployed[id]; exists {
return nil, ErrLocked
}
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
// Store the cancel function
dt.deployed[id] = cancel
return ctx, nil
}
func (dt *MutexLock[T]) Unlock(id T) {
dt.mu.Lock()
defer dt.mu.Unlock()
// Remove the app from deployed tracking
if cancel, exists := dt.deployed[id]; exists {
// Cancel the context
cancel()
// Remove from map
delete(dt.deployed, id)
}
}

30
internal/util/typedmap.go Normal file
View File

@@ -0,0 +1,30 @@
package util
import "sync"
type TypedMap[K comparable, V any] struct {
internal sync.Map
}
func (m *TypedMap[K, V]) Load(key K) (V, bool) {
val, ok := m.internal.Load(key)
if !ok {
var zero V
return zero, false
}
return val.(V), true
}
func (m *TypedMap[K, V]) Store(key K, value V) {
m.internal.Store(key, value)
}
func (m *TypedMap[K, V]) Delete(key K) {
m.internal.Delete(key)
}
func (m *TypedMap[K, V]) Range(f func(key K, value V) bool) {
m.internal.Range(func(k, v any) bool {
return f(k.(K), v.(V))
})
}