upload config, manage volumes, and gen architecture improvements

This commit is contained in:
Zoe
2024-12-09 02:53:26 -06:00
parent f88a5b3db5
commit ee8e3a253f
9 changed files with 720 additions and 598 deletions

View File

@@ -20,10 +20,17 @@ import (
var dockerClient *client.Client
type Volume struct {
ID int64 `json:"id"`
VolumeID string `json:"volume_id"`
ContainerID int64 `json:"container_id"`
}
type Container struct {
ID int64 `json:"id"`
Head bool `json:"head"` // if the container is the head of the deployment
Deployment *Deployment
Volumes []Volume `json:"volumes"`
ContainerID [64]byte `json:"container_id"`
DeploymentID int64 `json:"deployment_id"`
}
@@ -38,7 +45,26 @@ func init() {
}
}
func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (string, error) {
func CreateVolume(ctx context.Context, name string) (vol *Volume, err error) {
dockerVolume, err := dockerClient.VolumeCreate(ctx, volume.CreateOptions{
Driver: "local",
DriverOpts: map[string]string{},
Name: name,
})
if err != nil {
return nil, fmt.Errorf("Failed to create volume: %v", err)
}
log.Printf("Volume %s created at %s\n", dockerVolume.Name, dockerVolume.Mountpoint)
vol = &Volume{
VolumeID: dockerVolume.Name,
}
return vol, nil
}
func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (c *Container, err error) {
log.Printf("Deploying container with image %s\n", imageName)
containerName := fmt.Sprintf("%s-%s", projectConfig.Name, time.Now().Format("20060102-150405"))
@@ -46,13 +72,13 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
if projectConfig.EnvFile != "" {
envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile))
if err != nil {
return "", fmt.Errorf("Failed to open env file: %v", err)
return nil, fmt.Errorf("Failed to open env file: %v", err)
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
return "", fmt.Errorf("Failed to parse env file: %v", err)
return nil, fmt.Errorf("Failed to parse env file: %v", err)
}
for key, value := range envVars {
@@ -60,23 +86,14 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
}
}
vol, err := dockerClient.VolumeCreate(ctx, volume.CreateOptions{
Driver: "local",
DriverOpts: map[string]string{},
Name: fmt.Sprintf("flux_%s-volume", projectConfig.Name),
})
if err != nil {
return "", fmt.Errorf("Failed to create volume: %v", err)
}
log.Printf("Volume %s created at %s\n", vol.Name, vol.Mountpoint)
vol, err := CreateVolume(ctx, fmt.Sprintf("flux_%s-volume", projectConfig.Name))
log.Printf("Creating container %s...\n", containerName)
resp, err := dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: projectConfig.Environment,
Volumes: map[string]struct{}{
vol.Name: {},
vol.VolumeID: {},
},
},
&container.HostConfig{
@@ -85,7 +102,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
Mounts: []mount.Mount{
{
Type: mount.TypeVolume,
Source: vol.Name,
Source: vol.VolumeID,
Target: "/workspace",
ReadOnly: false,
},
@@ -96,11 +113,16 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
containerName,
)
if err != nil {
return "", fmt.Errorf("Failed to create container: %v", err)
return nil, fmt.Errorf("Failed to create container: %v", err)
}
c = &Container{
ContainerID: [64]byte([]byte(resp.ID)),
Volumes: []Volume{*vol},
}
log.Printf("Created new container: %s\n", containerName)
return resp.ID, nil
return c, nil
}
func (c *Container) Start(ctx context.Context) error {
@@ -112,7 +134,43 @@ func (c *Container) Stop(ctx context.Context) error {
}
func (c *Container) Remove(ctx context.Context) error {
return RemoveDockerContainer(ctx, string(c.ContainerID[:]))
err := RemoveDockerContainer(ctx, string(c.ContainerID[:]))
if err != nil {
return fmt.Errorf("Failed to remove container (%s): %v", c.ContainerID[:12], err)
}
tx, err := Flux.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return err
}
_, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", c.ContainerID[:])
if err != nil {
tx.Rollback()
return err
}
for _, volume := range c.Volumes {
if err := RemoveVolume(ctx, volume.VolumeID); err != nil {
tx.Rollback()
return fmt.Errorf("Failed to remove volume (%s): %v", volume.VolumeID, err)
}
_, err = tx.Exec("DELETE FROM volumes WHERE volume_id = ?", volume.VolumeID)
if err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
return err
}
return nil
}
func (c *Container) Wait(ctx context.Context, port uint16) error {
@@ -122,11 +180,11 @@ func (c *Container) Wait(ctx context.Context, port uint16) error {
// RemoveContainer stops and removes a container, but be warned that this will not remove the container from the database
func RemoveDockerContainer(ctx context.Context, containerID string) error {
if err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil {
return fmt.Errorf("Failed to stop existing container: %v", err)
return fmt.Errorf("Failed to stop container (%s): %v", containerID[:12], err)
}
if err := dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil {
return fmt.Errorf("Failed to remove existing container: %v", err)
return fmt.Errorf("Failed to remove container (%s): %v", containerID[:12], err)
}
return nil
@@ -192,8 +250,10 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er
}
func RemoveVolume(ctx context.Context, volumeID string) error {
log.Printf("Removed volume %s\n", volumeID)
if err := dockerClient.VolumeRemove(ctx, volumeID, true); err != nil {
return fmt.Errorf("Failed to remove existing volume: %v", err)
return fmt.Errorf("Failed to remove volume (%s): %v", volumeID, err)
}
return nil

View File

@@ -7,9 +7,7 @@ import (
"log"
"mime/multipart"
"net/http"
"os"
"os/exec"
"path/filepath"
"github.com/juls0730/fluxd/pkg"
)
@@ -93,7 +91,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
app := Apps.GetApp(projectConfig.Name)
if Flux.appManager == nil {
panic("App manager is nil")
}
app := Flux.appManager.GetApp(projectConfig.Name)
if app == nil {
app = &App{
@@ -101,14 +103,14 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
log.Printf("Creating deployment %s...\n", app.Name)
containerID, err := CreateDockerContainer(r.Context(), imageName, projectPath, projectConfig)
if err != nil {
container, err := CreateDockerContainer(r.Context(), imageName, projectPath, projectConfig)
if err != nil || container == nil {
log.Printf("Failed to create container: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
deployment, err := CreateDeployment(containerID, projectConfig.Port, projectConfig.Url, s.db)
deployment, err := CreateDeployment(*container, projectConfig.Port, projectConfig.Url, s.db)
app.Deployment = deployment
if err != nil {
log.Printf("Failed to create deployment: %v\n", err)
@@ -154,9 +156,9 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
ReverseProxy.AddDeployment(&deployment)
Flux.proxy.AddDeployment(&deployment)
Apps.AddApp(app.Name, app)
Flux.appManager.AddApp(app.Name, app)
} else {
log.Printf("Upgrading deployment %s...\n", app.Name)
@@ -171,7 +173,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
}
err = app.Deployment.Upgrade(r.Context(), projectConfig, imageName, projectPath, s)
err = app.Deployment.Upgrade(r.Context(), projectConfig, imageName, projectPath)
if err != nil {
log.Printf("Failed to upgrade deployment: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -189,7 +191,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
app := Apps.GetApp(name)
app := Flux.appManager.GetApp(name)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
@@ -218,7 +220,7 @@ func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request)
func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
app := Apps.GetApp(name)
app := Flux.appManager.GetApp(name)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
@@ -246,159 +248,37 @@ func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) {
func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
var err error
app := Apps.GetApp(name)
if app == nil {
http.Error(w, "App not found", http.StatusNotFound)
return
}
log.Printf("Deleting deployment %s...\n", name)
for _, container := range app.Deployment.Containers {
err = RemoveDockerContainer(r.Context(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to remove container: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
err := Flux.appManager.DeleteApp(name)
err = RemoveVolume(r.Context(), fmt.Sprintf("flux_%s-volume", name))
if err != nil {
log.Printf("Failed to remove volume: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = tx.Exec("DELETE FROM deployments WHERE id = ?", app.DeploymentID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete deployment: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = tx.Exec("DELETE FROM containers WHERE deployment_id = ?", app.DeploymentID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete containers: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = tx.Exec("DELETE FROM apps WHERE id = ?", app.ID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete app: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
projectPath := filepath.Join(s.rootDir, "apps", name)
err = os.RemoveAll(projectPath)
if err != nil {
log.Printf("Failed to remove project directory: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Apps.DeleteApp(name)
w.WriteHeader(http.StatusOK)
}
func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
var err error
for _, app := range Apps.GetAllApps() {
for _, container := range app.Deployment.Containers {
err = RemoveDockerContainer(r.Context(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to remove container: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
err = RemoveVolume(r.Context(), fmt.Sprintf("flux_%s-volume", app.Name))
for _, app := range Flux.appManager.GetAllApps() {
err := app.Remove(r.Context())
if err != nil {
log.Printf("Failed to remove volume: %v\n", err)
log.Printf("Failed to remove app: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = tx.Exec("DELETE FROM deployments")
if err != nil {
tx.Rollback()
log.Printf("Failed to delete deployments: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = tx.Exec("DELETE FROM containers")
if err != nil {
tx.Rollback()
log.Printf("Failed to delete containers: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = tx.Exec("DELETE FROM apps")
if err != nil {
tx.Rollback()
log.Printf("Failed to delete apps: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := os.RemoveAll(filepath.Join(s.rootDir, "apps")); err != nil {
log.Printf("Failed to remove apps directory: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := os.RemoveAll(filepath.Join(s.rootDir, "deployments")); err != nil {
log.Printf("Failed to remove deployments directory: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) {
// for each app, get the deployment status
var apps []*pkg.App
for _, app := range Apps.GetAllApps() {
for _, app := range Flux.appManager.GetAllApps() {
var extApp pkg.App
deploymentStatus, err := app.Deployment.Status(r.Context())
if err != nil {
@@ -417,3 +297,10 @@ func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(apps)
}
func (s *FluxServer) DaemonInfoHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pkg.Info{
Compression: s.config.Compression,
})
}

View File

@@ -5,15 +5,18 @@ import (
"database/sql"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"github.com/juls0730/fluxd/pkg"
)
var (
Apps *AppManager = new(AppManager)
deploymentInsertStmt *sql.Stmt
containerInsertStmt *sql.Stmt
volumeInsertStmt *sql.Stmt
updateVolumeStmt *sql.Stmt
)
type AppManager struct {
@@ -27,6 +30,28 @@ type App struct {
DeploymentID int64 `json:"deployment_id,omitempty"`
}
func (app *App) Remove(ctx context.Context) error {
err := app.Deployment.Remove(ctx)
if err != nil {
log.Printf("Failed to remove deployment: %v\n", err)
return err
}
_, err = Flux.db.Exec("DELETE FROM apps WHERE id = ?", app.ID)
if err != nil {
log.Printf("Failed to delete app: %v\n", err)
return err
}
projectPath := filepath.Join(Flux.rootDir, "apps", app.Name)
err = os.RemoveAll(projectPath)
if err != nil {
return fmt.Errorf("Failed to remove project directory: %v", err)
}
return nil
}
type Deployment struct {
ID int64 `json:"id"`
Containers []Container `json:"-"`
@@ -59,18 +84,30 @@ func (am *AppManager) AddApp(name string, app *App) {
am.Store(name, app)
}
func (am *AppManager) DeleteApp(name string) {
func (am *AppManager) DeleteApp(name string) error {
app := am.GetApp(name)
if app == nil {
return fmt.Errorf("App not found")
}
err := app.Remove(context.Background())
if err != nil {
return err
}
am.Delete(name)
return nil
}
func (am *AppManager) Init() {
func (am *AppManager) Init(db *sql.DB) {
log.Printf("Initializing deployments...\n")
if DB == nil {
if db == nil {
log.Panicf("DB is nil")
}
rows, err := DB.Query("SELECT id, name, deployment_id FROM apps")
rows, err := db.Query("SELECT id, name, deployment_id FROM apps")
if err != nil {
log.Printf("Failed to query apps: %v\n", err)
return
@@ -90,14 +127,15 @@ func (am *AppManager) Init() {
for _, app := range apps {
var deployment Deployment
var headContainer *Container
DB.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
deployment.Containers = make([]Container, 0)
rows, err := DB.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID)
rows, err = db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID)
if err != nil {
log.Printf("Failed to query containers: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var container Container
@@ -113,6 +151,24 @@ func (am *AppManager) Init() {
deployment.Containers = append(deployment.Containers, container)
}
for i, container := range deployment.Containers {
var volumes []Volume
rows, err := db.Query("SELECT id, volume_id, container_id FROM volumes WHERE container_id = ?", container.ID)
if err != nil {
log.Printf("Failed to query volumes: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var volume Volume
rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID)
volumes = append(volumes, volume)
}
deployment.Containers[i].Volumes = volumes
}
deployment.Proxy, err = NewDeploymentProxy(&deployment, headContainer)
if err != nil {
log.Printf("Failed to create deployment proxy: %v\n", err)
@@ -121,12 +177,12 @@ func (am *AppManager) Init() {
app.Deployment = deployment
Apps.AddApp(app.Name, &app)
am.AddApp(app.Name, &app)
}
}
// Creates a deployment and containers in the database
func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB) (Deployment, error) {
func CreateDeployment(container Container, port uint16, appUrl string, db *sql.DB) (Deployment, error) {
var deployment Deployment
var err error
@@ -144,7 +200,6 @@ func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB
return Deployment{}, err
}
var container Container
if containerInsertStmt == nil {
containerInsertStmt, err = db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head")
if err != nil {
@@ -154,35 +209,50 @@ func CreateDeployment(containerID string, port uint16, appUrl string, db *sql.DB
}
var containerIDString string
err = containerInsertStmt.QueryRow(containerID, deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
if err != nil {
log.Printf("Failed to get container id: %v\n", err)
return Deployment{}, err
}
copy(container.ContainerID[:], containerIDString)
for i, volume := range container.Volumes {
if volumeInsertStmt == nil {
volumeInsertStmt, err = db.Prepare("INSERT INTO volumes (volume_id, container_id) VALUES (?, ?) RETURNING id, volume_id, container_id")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return Deployment{}, err
}
}
if err := volumeInsertStmt.QueryRow(volume.VolumeID, container.ID).Scan(&container.Volumes[i].ID, &container.Volumes[i].VolumeID, &container.Volumes[i].ContainerID); err != nil {
log.Printf("Failed to insert volume: %v\n", err)
return Deployment{}, err
}
}
container.Deployment = &deployment
deployment.Containers = append(deployment.Containers, container)
return deployment, nil
}
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string, s *FluxServer) error {
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string) error {
existingContainers, err := findExistingDockerContainers(ctx, projectConfig.Name)
if err != nil {
return fmt.Errorf("Failed to find existing containers: %v", err)
}
// Deploy new container before deleting old one
containerID, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig)
if err != nil {
c, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig)
if err != nil || c == nil {
log.Printf("Failed to create container: %v\n", err)
return err
}
var container Container
var container Container = *c
if containerInsertStmt == nil {
containerInsertStmt, err = DB.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head")
containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return err
@@ -190,17 +260,48 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
}
var containerIDString string
err = containerInsertStmt.QueryRow(containerID, deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
if err != nil {
log.Printf("Failed to get container id: %v\n", err)
return err
}
container.Deployment = deployment
// the space time complexity of this is pretty bad, but it works
for _, existingContainer := range deployment.Containers {
if !existingContainer.Head {
continue
}
for _, volume := range existingContainer.Volumes {
var targetVolume *Volume
for i, volume := range container.Volumes {
if volume.VolumeID == volume.VolumeID {
targetVolume = &container.Volumes[i]
break
}
}
if updateVolumeStmt == nil {
updateVolumeStmt, err = Flux.db.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, container_id")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return err
}
}
err := updateVolumeStmt.QueryRow(container.ID, volume.ID).Scan(&targetVolume.ID, &targetVolume.VolumeID, &targetVolume.ContainerID)
if err != nil {
log.Printf("Failed to update volume: %v\n", err)
return err
}
}
}
copy(container.ContainerID[:], containerIDString)
deployment.Containers = append(deployment.Containers, container)
log.Printf("Starting container %s...\n", containerID)
log.Printf("Starting container %s...\n", container.ContainerID[:])
err = container.Start(ctx)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
@@ -212,7 +313,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
return err
}
tx, err := s.db.Begin()
tx, err := Flux.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return err
@@ -235,12 +336,6 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
return err
}
tx, err = s.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return err
}
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully shutdown
oldProxy := deployment.Proxy
deployment.Proxy, err = NewDeploymentProxy(deployment, &container)
@@ -249,13 +344,19 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
return err
}
tx, err = Flux.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return err
}
var containers []Container
var oldContainers []*Container
for _, container := range deployment.Containers {
if existingContainers[string(container.ContainerID[:])] {
log.Printf("Stopping existing container: %s\n", container.ContainerID[0:12])
log.Printf("Deleting container from db: %s\n", container.ContainerID[0:12])
_, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", string(container.ContainerID[:]))
_, err = tx.Exec("DELETE FROM containers WHERE id = ?", container.ID)
oldContainers = append(oldContainers, &container)
if err != nil {
@@ -269,19 +370,26 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
containers = append(containers, container)
}
if oldProxy != nil {
go oldProxy.GracefulShutdown(oldContainers)
}
deployment.Containers = containers
ReverseProxy.AddDeployment(deployment)
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
return err
}
if oldProxy != nil {
go oldProxy.GracefulShutdown(oldContainers)
} else {
for _, container := range oldContainers {
err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to remove container: %v\n", err)
}
}
}
deployment.Containers = containers
Flux.proxy.AddDeployment(deployment)
return nil
}
@@ -295,6 +403,24 @@ func arrayContains(arr []string, str string) bool {
return false
}
func (d *Deployment) Remove(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Remove(ctx)
if err != nil {
log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
_, err := Flux.db.Exec("DELETE FROM deployments WHERE id = ?", d.ID)
if err != nil {
log.Printf("Failed to delete deployment: %v\n", err)
return err
}
return nil
}
func (d *Deployment) Start(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Start(ctx)

View File

@@ -7,14 +7,11 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
)
var ReverseProxy *Proxy
type Proxy struct {
deployments sync.Map
}
@@ -111,32 +108,7 @@ func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) {
for _, container := range oldContainers {
err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to remove container: %v\n", err)
log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err)
}
}
}
func InitProxy(apps *AppManager) {
ReverseProxy = &Proxy{}
apps.Range(func(key, value interface{}) bool {
app := value.(*App)
ReverseProxy.AddDeployment(&app.Deployment)
return true
})
}
func InitReverseProxy() {
InitProxy(Apps)
port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" {
port = "7465"
}
go func() {
log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port)
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), ReverseProxy); err != nil && err != http.ErrServerClosed {
log.Fatalf("Proxy server error: %v", err)
}
}()
}

View File

@@ -1,20 +1,27 @@
CREATE TABLE IF NOT EXISTS deployments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE,
url TEXT NOT NULL UNIQUE,
port INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS apps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE,
name TEXT NOT NULL UNIQUE,
deployment_id INTEGER,
FOREIGN KEY(deployment_id) REFERENCES deployments(id)
);
CREATE TABLE IF NOT EXISTS containers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE,
container_id TEXT NOT NULL,
head BOOLEAN NOT NULL,
deployment_id INTEGER NOT NULL,
FOREIGN KEY(deployment_id) REFERENCES deployments(id)
);
CREATE TABLE IF NOT EXISTS volumes (
id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE,
volume_id TEXT NOT NULL,
container_id INTEGER NOT NULL,
FOREIGN KEY(container_id) REFERENCES containers(id)
);

View File

@@ -5,8 +5,10 @@ import (
"compress/gzip"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
@@ -21,18 +23,25 @@ var (
schemaBytes []byte
DefaultConfig = FluxServerConfig{
Builder: "paketobuildpacks/builder-jammy-tiny",
Compression: pkg.Compression{
Enabled: false,
Level: 0,
},
}
DB *sql.DB
Flux *FluxServer
)
type FluxServerConfig struct {
Builder string `json:"builder"`
Builder string `json:"builder"`
Compression pkg.Compression `json:"compression"`
}
type FluxServer struct {
config FluxServerConfig
db *sql.DB
rootDir string
config FluxServerConfig
db *sql.DB
proxy *Proxy
rootDir string
appManager *AppManager
}
func NewServer() *FluxServer {
@@ -74,40 +83,79 @@ func NewServer() *FluxServer {
log.Fatalf("Failed to create apps directory: %v\n", err)
}
DB, err = sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db"))
db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db"))
if err != nil {
log.Fatalf("Failed to open database: %v\n", err)
}
_, err = DB.Exec(string(schemaBytes))
_, err = db.Exec(string(schemaBytes))
if err != nil {
log.Fatalf("Failed to create database schema: %v\n", err)
}
Apps.Init()
appManager := new(AppManager)
appManager.Init(db)
return &FluxServer{
config: serverConfig,
db: DB,
rootDir: rootDir,
proxy := &Proxy{}
appManager.Range(func(key, value interface{}) bool {
app := value.(*App)
proxy.AddDeployment(&app.Deployment)
return true
})
port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" {
port = "7465"
}
go func() {
log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port)
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), proxy); err != nil && err != http.ErrServerClosed {
log.Fatalf("Proxy server error: %v", err)
}
}()
Flux = &FluxServer{
config: serverConfig,
db: db,
proxy: proxy,
appManager: appManager,
rootDir: rootDir,
}
return Flux
}
func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConfig) (string, error) {
var err error
projectPath := filepath.Join(s.rootDir, "apps", projectConfig.Name)
if err := os.MkdirAll(projectPath, 0755); err != nil {
if err = os.MkdirAll(projectPath, 0755); err != nil {
log.Printf("Failed to create project directory: %v\n", err)
return "", err
}
gzReader, err := gzip.NewReader(code)
if err != nil {
log.Printf("Failed to create gzip reader: %v\n", err)
return "", err
}
defer gzReader.Close()
var gzReader *gzip.Reader
defer func() {
if gzReader != nil {
gzReader.Close()
}
}()
tarReader := tar.NewReader(gzReader)
if s.config.Compression.Enabled {
gzReader, err = gzip.NewReader(code)
if err != nil {
log.Printf("Failed to create gzip reader: %v\n", err)
return "", err
}
}
var tarReader *tar.Reader
if gzReader != nil {
tarReader = tar.NewReader(gzReader)
} else {
tarReader = tar.NewReader(code)
}
log.Printf("Extracting files for %s...\n", projectPath)
for {
@@ -126,12 +174,12 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf
// Handle different file types
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(path, 0755); err != nil {
if err = os.MkdirAll(path, 0755); err != nil {
log.Printf("Failed to extract directory: %v\n", err)
return "", err
}
case tar.TypeReg:
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
if err = os.MkdirAll(filepath.Dir(path), 0755); err != nil {
log.Printf("Failed to extract directory: %v\n", err)
return "", err
}
@@ -143,7 +191,7 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf
}
defer outFile.Close()
if _, err := io.Copy(outFile, tarReader); err != nil {
if _, err = io.Copy(outFile, tarReader); err != nil {
log.Printf("Failed to copy file during extraction: %v\n", err)
return "", err
}