fix pipe closed issue, and a lot of other stuff

This commit is contained in:
Zoe
2024-12-14 02:49:05 -06:00
parent 7689999413
commit de22bd20c9
23 changed files with 1032 additions and 795 deletions

View File

@@ -3,17 +3,17 @@ package server
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
type App struct {
ID int64 `json:"id,omitempty"`
Deployment *Deployment `json:"deployment,omitempty"`
Deployment *Deployment `json:"-"`
Name string `json:"name,omitempty"`
DeploymentID int64 `json:"deployment_id,omitempty"`
}
@@ -22,12 +22,12 @@ func CreateApp(ctx context.Context, imageName string, projectPath string, projec
app := &App{
Name: projectConfig.Name,
}
log.Printf("Creating deployment %s...\n", app.Name)
logger.Debugw("Creating deployment", zap.String("name", app.Name))
deployment, err := CreateDeployment(projectConfig.Port, projectConfig.Url, Flux.db)
app.Deployment = deployment
if err != nil {
log.Printf("Failed to create deployment: %v", err)
logger.Errorw("Failed to create deployment", zap.Error(err))
return nil, err
}
@@ -60,7 +60,7 @@ func CreateApp(ctx context.Context, imageName string, projectPath string, projec
}
func (app *App) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string) error {
log.Printf("Upgrading deployment %s...\n", app.Name)
logger.Debugw("Upgrading deployment", zap.String("name", app.Name))
// if deploy is not started, start it
deploymentStatus, err := app.Deployment.Status(ctx)
@@ -84,15 +84,17 @@ func (app *App) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, im
}
func (app *App) Remove(ctx context.Context) error {
Flux.appManager.RemoveApp(app.Name)
err := app.Deployment.Remove(ctx)
if err != nil {
log.Printf("Failed to remove deployment: %v\n", err)
logger.Errorw("Failed to remove deployment", zap.Error(err))
return err
}
_, err = Flux.db.Exec("DELETE FROM apps WHERE id = ?", app.ID)
if err != nil {
log.Printf("Failed to delete app: %v\n", err)
logger.Errorw("Failed to delete app", zap.Error(err))
return err
}
@@ -129,6 +131,10 @@ func (am *AppManager) GetAllApps() []*App {
return apps
}
func (am *AppManager) RemoveApp(name string) {
am.Delete(name)
}
func (am *AppManager) AddApp(name string, app *App) {
if app.Deployment.Containers == nil || app.Deployment.Head == nil || len(app.Deployment.Containers) == 0 {
panic("nil containers")
@@ -154,15 +160,15 @@ func (am *AppManager) DeleteApp(name string) error {
}
func (am *AppManager) Init() {
log.Printf("Initializing deployments...\n")
logger.Info("Initializing deployments")
if Flux.db == nil {
log.Panicf("DB is nil")
logger.Panic("DB is nil")
}
rows, err := Flux.db.Query("SELECT id, name, deployment_id FROM apps")
if err != nil {
log.Printf("Failed to query apps: %v\n", err)
logger.Warnw("Failed to query apps", zap.Error(err))
return
}
defer rows.Close()
@@ -171,7 +177,7 @@ func (am *AppManager) Init() {
for rows.Next() {
var app App
if err := rows.Scan(&app.ID, &app.Name, &app.DeploymentID); err != nil {
log.Printf("Failed to scan app: %v\n", err)
logger.Warnw("Failed to scan app", zap.Error(err))
return
}
apps = append(apps, app)
@@ -185,7 +191,7 @@ func (am *AppManager) Init() {
rows, err = Flux.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID)
if err != nil {
log.Printf("Failed to query containers: %v\n", err)
logger.Warnw("Failed to query containers", zap.Error(err))
return
}
defer rows.Close()
@@ -199,7 +205,7 @@ func (am *AppManager) Init() {
if container.Head {
if headContainer != nil {
log.Fatalf("Several containers are marked as head")
logger.Fatal("Several containers are marked as head")
}
headContainer = &container
@@ -207,7 +213,7 @@ func (am *AppManager) Init() {
rows, err := Flux.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:])
if err != nil {
log.Printf("Failed to query volumes: %v\n", err)
logger.Warnw("Failed to query volumes", zap.Error(err))
return
}
defer rows.Close()
@@ -222,7 +228,7 @@ func (am *AppManager) Init() {
}
if headContainer == nil {
log.Fatalf("head container is nil!")
logger.Fatal("head container is nil!")
}
deployment.Head = headContainer
@@ -231,7 +237,7 @@ func (am *AppManager) Init() {
status, err := deployment.Status(context.Background())
if err != nil {
log.Printf("Failed to get deployment status: %v\n", err)
logger.Warnw("Failed to get deployment status", zap.Error(err))
continue
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
@@ -16,6 +15,7 @@ import (
"github.com/docker/docker/api/types/volume"
"github.com/joho/godotenv"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
var (
@@ -49,7 +49,7 @@ func CreateDockerVolume(ctx context.Context) (vol *Volume, err error) {
return nil, fmt.Errorf("failed to create volume: %v", err)
}
log.Printf("Volume %s created at %s\n", dockerVolume.Name, dockerVolume.Mountpoint)
logger.Debugw("Volume created", zap.String("volume_id", dockerVolume.Name), zap.String("mountpoint", dockerVolume.Mountpoint))
vol = &Volume{
VolumeID: dockerVolume.Name,
@@ -78,7 +78,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
}
}
log.Printf("Creating container %s...\n", containerName)
logger.Debugw("Creating container", zap.String("container_id", containerName))
resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: projectConfig.Environment,
@@ -115,7 +115,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
}
func CreateContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig, head bool, deployment *Deployment) (c *Container, err error) {
log.Printf("Creating container with image %s\n", imageName)
logger.Debugw("Creating container with image", zap.String("image", imageName))
if projectConfig.EnvFile != "" {
envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile))
@@ -145,7 +145,7 @@ func CreateContainer(ctx context.Context, imageName, projectPath string, project
if volumeInsertStmt == nil {
volumeInsertStmt, err = Flux.db.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
logger.Errorw("Failed to prepare statement", zap.Error(err))
return nil, err
}
}
@@ -185,7 +185,7 @@ func CreateContainer(ctx context.Context, imageName, projectPath string, project
func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (*Container, error) {
// Create new container with new image
log.Printf("Upgrading container %s...\n", c.ContainerID[:12])
logger.Debugw("Upgrading container", zap.ByteString("container_id", c.ContainerID[:12]))
if c.Volumes == nil {
return nil, fmt.Errorf("no volumes found for container %s", c.ContainerID[:12])
}
@@ -208,7 +208,7 @@ func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string,
var containerIDString string
err = containerInsertStmt.QueryRow(newContainer.ContainerID[:], c.Head, c.Deployment.ID).Scan(&newContainer.ID, &containerIDString, &newContainer.Head, &newContainer.DeploymentID)
if err != nil {
log.Printf("Failed to insert container: %v\n", err)
logger.Errorw("Failed to insert container", zap.Error(err))
return nil, err
}
copy(newContainer.ContainerID[:], containerIDString)
@@ -223,7 +223,7 @@ func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string,
vol = &newContainer.Volumes[0]
volumeUpdateStmt.QueryRow(newContainer.ContainerID[:], vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
log.Printf("Upgraded container")
logger.Debug("Upgraded container")
return newContainer, nil
}
@@ -245,7 +245,7 @@ func (c *Container) Remove(ctx context.Context) error {
tx, err := Flux.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
logger.Errorw("Failed to begin transaction", zap.Error(err))
return err
}
@@ -269,7 +269,7 @@ func (c *Container) Remove(ctx context.Context) error {
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
logger.Errorw("Failed to commit transaction", zap.Error(err))
return err
}
@@ -362,7 +362,7 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er
}
func RemoveVolume(ctx context.Context, volumeID string) error {
log.Printf("Removed volume %s\n", volumeID)
logger.Debugw("Removed volume", zap.String("volume_id", volumeID))
if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil {
return fmt.Errorf("failed to remove volume (%s): %v", volumeID, err)

View File

@@ -7,13 +7,13 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os/exec"
"sync"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
var (
@@ -74,9 +74,9 @@ func (dt *DeploymentLock) CompleteDeployment(appName string) {
var deploymentLock = NewDeploymentLock()
type DeploymentEvent struct {
Stage string `json:"stage"`
Message string `json:"message"`
StatusCode int `json:"status,omitempty"`
Stage string `json:"stage"`
Message interface{} `json:"message"`
StatusCode int `json:"status,omitempty"`
}
func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
@@ -90,7 +90,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseMultipartForm(10 << 30) // 10 GiB
if err != nil {
log.Printf("Failed to parse multipart form: %v\n", err)
logger.Errorw("Failed to parse multipart form", zap.Error(err))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@@ -105,7 +105,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
var projectConfig pkg.ProjectConfig
if err := json.NewDecoder(deployRequest.Config).Decode(&projectConfig); err != nil {
log.Printf("Failed to decode config: %v\n", err)
logger.Errorw("Failed to decode config", zap.Error(err))
http.Error(w, "Invalid flux.json", http.StatusBadRequest)
return
@@ -118,7 +118,10 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
defer deploymentLock.CompleteDeployment(projectConfig.Name)
go func() {
<-ctx.Done()
deploymentLock.CompleteDeployment(projectConfig.Name)
}()
flusher, ok := w.(http.Flusher)
if !ok {
@@ -147,9 +150,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
ev := struct {
Message string `json:"message"`
}{
ev := pkg.DeploymentEvent{
Message: event.Message,
}
@@ -207,11 +208,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.Printf("Deploying project %s to %s\n", projectConfig.Name, projectConfig.Url)
logger.Infow("Deploying project", zap.String("name", projectConfig.Name), zap.String("url", projectConfig.Url))
projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig)
if err != nil {
log.Printf("Failed to upload code: %v\n", err)
logger.Infow("Failed to upload code", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
@@ -221,11 +222,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
// Streams the each line of the pipe into the eventChannel, this closes the pipe when the function exits
var pipeGroup sync.WaitGroup
streamPipe := func(pipe io.ReadCloser) {
// we need a wait group because otherwise the function *could* exit before the pipe is closed
// and wreck havoc on every future request
wg.Add(1)
defer wg.Done()
pipeGroup.Add(1)
defer pipeGroup.Done()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
@@ -241,11 +242,11 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
log.Printf("Error reading pipe: %s\n", err)
logger.Errorw("Error reading pipe", zap.Error(err))
}
}
log.Printf("Preparing project %s...\n", projectConfig.Name)
logger.Debugw("Preparing project", zap.String("name", projectConfig.Name))
eventChannel <- DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
@@ -255,7 +256,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
prepareCmd.Dir = projectPath
cmdOut, err := prepareCmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get stdout pipe: %v\n", err)
logger.Errorw("Failed to get stdout pipe", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stdout pipe: %s", err),
@@ -266,7 +267,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
cmdErr, err := prepareCmd.StderrPipe()
if err != nil {
log.Printf("Failed to get stderr pipe: %v\n", err)
logger.Errorw("Failed to get stderr pipe", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stderr pipe: %s", err),
@@ -275,12 +276,26 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
err = prepareCmd.Start()
if err != nil {
logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
go streamPipe(cmdOut)
go streamPipe(cmdErr)
err = prepareCmd.Run()
pipeGroup.Wait()
err = prepareCmd.Wait()
if err != nil {
log.Printf("Failed to prepare project: %s\n", err)
logger.Errorw("Failed to prepare project", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
@@ -295,13 +310,13 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
Message: "Building project image",
}
log.Printf("Building image for project %s...\n", projectConfig.Name)
logger.Debugw("Building image for project", zap.String("name", projectConfig.Name))
imageName := fmt.Sprintf("flux_%s-image", projectConfig.Name)
buildCmd := exec.Command("pack", "build", imageName, "--builder", s.config.Builder)
buildCmd.Dir = projectPath
cmdOut, err = buildCmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get stdout pipe: %v\n", err)
logger.Errorw("Failed to get stdout pipe", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stdout pipe: %s", err),
@@ -312,7 +327,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
cmdErr, err = buildCmd.StderrPipe()
if err != nil {
log.Printf("Failed to get stderr pipe: %v\n", err)
logger.Errorw("Failed to get stderr pipe", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stderr pipe: %s", err),
@@ -322,12 +337,26 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
err = buildCmd.Start()
if err != nil {
logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
go streamPipe(cmdOut)
go streamPipe(cmdErr)
err = buildCmd.Run()
pipeGroup.Wait()
err = buildCmd.Wait()
if err != nil {
log.Printf("Failed to build image: %s\n", err)
logger.Errorw("Failed to build image", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
@@ -347,7 +376,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
if app == nil {
app, err = CreateApp(ctx, imageName, projectPath, projectConfig)
if err != nil {
log.Printf("Failed to create app: %v", err)
logger.Errorw("Failed to create app", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to create app: %s", err),
@@ -359,7 +388,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
} else {
err = app.Upgrade(ctx, projectConfig, imageName, projectPath)
if err != nil {
log.Printf("Failed to upgrade app: %v", err)
logger.Errorw("Failed to upgrade app", zap.Error(err))
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
@@ -370,26 +399,12 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
}
responseJSON, err := json.Marshal(DeployResponse{
App: *app,
})
if err != nil {
log.Printf("Failed to marshal deploy response: %v\n", err)
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to marshal deploy response: %s", err),
StatusCode: http.StatusInternalServerError,
}
return
}
eventChannel <- DeploymentEvent{
Stage: "complete",
Message: string(responseJSON),
Message: app,
}
log.Printf("App %s deployed successfully!\n", app.Name)
logger.Infow("App deployed successfully", zap.String("name", app.Name))
}
func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) {
@@ -457,12 +472,12 @@ func (s *FluxServer) StopDeployHandler(w http.ResponseWriter, r *http.Request) {
func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
log.Printf("Deleting deployment %s...\n", name)
logger.Debugw("Deleting deployment", zap.String("name", name))
err := Flux.appManager.DeleteApp(name)
if err != nil {
log.Printf("Failed to delete app: %v\n", err)
logger.Errorw("Failed to delete app", zap.Error(err))
http.Error(w, err.Error(), http.StatusNotFound)
return
}
@@ -472,9 +487,9 @@ func (s *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Request)
func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.Request) {
for _, app := range Flux.appManager.GetAllApps() {
err := app.Remove(r.Context())
err := Flux.appManager.DeleteApp(app.Name)
if err != nil {
log.Printf("Failed to remove app: %v\n", err)
logger.Errorw("Failed to remove app", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -485,12 +500,12 @@ func (s *FluxServer) DeleteAllDeploymentsHandler(w http.ResponseWriter, r *http.
func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) {
// for each app, get the deployment status
var apps []*pkg.App
var apps []pkg.App
for _, app := range Flux.appManager.GetAllApps() {
var extApp pkg.App
deploymentStatus, err := app.Deployment.Status(r.Context())
if err != nil {
log.Printf("Failed to get deployment status: %v\n", err)
logger.Errorw("Failed to get deployment status", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -499,7 +514,7 @@ func (s *FluxServer) ListAppsHandler(w http.ResponseWriter, r *http.Request) {
extApp.Name = app.Name
extApp.DeploymentID = app.DeploymentID
extApp.DeploymentStatus = deploymentStatus
apps = append(apps, &extApp)
apps = append(apps, extApp)
}
w.Header().Set("Content-Type", "application/json")

View File

@@ -4,9 +4,9 @@ import (
"context"
"database/sql"
"fmt"
"log"
"github.com/juls0730/flux/pkg"
"go.uber.org/zap"
)
var (
@@ -30,14 +30,14 @@ func CreateDeployment(port uint16, appUrl string, db *sql.DB) (*Deployment, erro
if deploymentInsertStmt == nil {
deploymentInsertStmt, err = db.Prepare("INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
logger.Errorw("Failed to prepare statement", zap.Error(err))
return nil, err
}
}
err = deploymentInsertStmt.QueryRow(appUrl, port).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
if err != nil {
log.Printf("Failed to insert deployment: %v\n", err)
logger.Errorw("Failed to insert deployment", zap.Error(err))
return nil, err
}
@@ -52,7 +52,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
container, err := deployment.Head.Upgrade(ctx, imageName, projectPath, projectConfig)
if err != nil {
log.Printf("Failed to upgrade container: %v\n", err)
logger.Errorw("Failed to upgrade container", zap.Error(err))
return err
}
@@ -60,20 +60,20 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
deployment.Head = container
deployment.Containers = append(deployment.Containers, container)
log.Printf("Starting container %s...\n", container.ContainerID[:12])
logger.Debugw("Starting container", zap.ByteString("container_id", container.ContainerID[:12]))
err = container.Start(ctx)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
logger.Errorw("Failed to start container", zap.Error(err))
return err
}
if err := container.Wait(ctx, projectConfig.Port); err != nil {
log.Printf("Failed to wait for container: %v\n", err)
logger.Errorw("Failed to wait for container", zap.Error(err))
return err
}
if _, err := Flux.db.Exec("UPDATE deployments SET url = ?, port = ? WHERE id = ?", projectConfig.Url, projectConfig.Port, deployment.ID); err != nil {
log.Printf("Failed to update deployment: %v\n", err)
logger.Errorw("Failed to update deployment", zap.Error(err))
return err
}
@@ -81,13 +81,13 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
oldProxy := deployment.Proxy
deployment.Proxy, err = deployment.NewDeploymentProxy()
if err != nil {
log.Printf("Failed to create deployment proxy: %v\n", err)
logger.Errorw("Failed to create deployment proxy", zap.Error(err))
return err
}
tx, err := Flux.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
logger.Errorw("Failed to begin transaction", zap.Error(err))
return err
}
@@ -95,13 +95,13 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
var oldContainers []*Container
for _, container := range deployment.Containers {
if existingContainers[string(container.ContainerID[:])] {
log.Printf("Deleting container from db: %s\n", container.ContainerID[:12])
logger.Debugw("Deleting container from db", zap.ByteString("container_id", container.ContainerID[:12]))
_, err = tx.Exec("DELETE FROM containers WHERE id = ?", container.ID)
oldContainers = append(oldContainers, container)
if err != nil {
log.Printf("Failed to delete container: %v\n", err)
logger.Errorw("Failed to delete container", zap.Error(err))
tx.Rollback()
return err
}
@@ -113,7 +113,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
logger.Errorw("Failed to commit transaction", zap.Error(err))
return err
}
@@ -123,7 +123,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
for _, container := range oldContainers {
err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to remove container: %v\n", err)
logger.Errorw("Failed to remove container", zap.Error(err))
}
}
}
@@ -136,14 +136,16 @@ func (d *Deployment) Remove(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Remove(ctx)
if err != nil {
log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err)
logger.Errorf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
Flux.proxy.RemoveDeployment(d)
_, err := Flux.db.Exec("DELETE FROM deployments WHERE id = ?", d.ID)
if err != nil {
log.Printf("Failed to delete deployment: %v\n", err)
logger.Errorw("Failed to delete deployment", zap.Error(err))
return err
}
@@ -154,7 +156,7 @@ func (d *Deployment) Start(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Start(ctx)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
logger.Errorf("Failed to start container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
@@ -171,7 +173,7 @@ func (d *Deployment) Stop(ctx context.Context) error {
for _, container := range d.Containers {
err := container.Stop(ctx)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
logger.Errorf("Failed to start container (%s): %v\n", container.ContainerID[:12], err)
return err
}
}
@@ -195,7 +197,7 @@ func (d *Deployment) Status(ctx context.Context) (string, error) {
for _, container := range d.Containers {
containerStatus, err := container.Status(ctx)
if err != nil {
log.Printf("Failed to get container status: %v\n", err)
logger.Errorw("Failed to get container status", zap.Error(err))
return "", err
}

View File

@@ -3,13 +3,14 @@ package server
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
)
type Proxy struct {
@@ -21,11 +22,7 @@ func (p *Proxy) RemoveDeployment(deployment *Deployment) {
}
func (p *Proxy) AddDeployment(deployment *Deployment) {
if deployment.Containers == nil {
panic("containers is nil")
}
log.Printf("Adding deployment %s\n", deployment.URL)
logger.Debugw("Adding deployment", zap.String("url", deployment.URL))
p.deployments.Store(deployment.URL, deployment)
}
@@ -114,7 +111,7 @@ func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) {
for _, container := range oldContainers {
err := RemoveDockerContainer(context.Background(), string(container.ContainerID[:]))
if err != nil {
log.Printf("Failed to remove container (%s): %v\n", container.ContainerID[:12], err)
logger.Errorw("Failed to remove container", zap.Error(err))
}
}
}

View File

@@ -8,10 +8,10 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
_ "embed"
@@ -19,6 +19,8 @@ import (
"github.com/docker/docker/client"
"github.com/juls0730/flux/pkg"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
@@ -31,7 +33,8 @@ var (
Level: 0,
},
}
Flux *FluxServer
Flux *FluxServer
logger *zap.SugaredLogger
)
type FluxServerConfig struct {
@@ -46,92 +49,125 @@ type FluxServer struct {
rootDir string
appManager *AppManager
dockerClient *client.Client
Logger *zap.SugaredLogger
}
func NewFluxServer() *FluxServer {
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
logger.Fatalw("Failed to create docker client", zap.Error(err))
}
rootDir := os.Getenv("FLUXD_ROOT_DIR")
if rootDir == "" {
rootDir = "/var/fluxd"
}
db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db"))
if err != nil {
logger.Fatalw("Failed to open database", zap.Error(err))
}
_, err = db.Exec(string(schemaBytes))
if err != nil {
logger.Fatalw("Failed to create database schema", zap.Error(err))
}
return &FluxServer{
db: db,
proxy: &Proxy{},
appManager: new(AppManager),
rootDir: rootDir,
dockerClient: dockerClient,
}
}
func (s *FluxServer) Stop() {
s.Logger.Sync()
}
func NewServer() *FluxServer {
Flux = new(FluxServer)
verbosity, err := strconv.Atoi(os.Getenv("FLUXD_VERBOSITY"))
if err != nil {
verbosity = 0
}
config := zap.NewProductionConfig()
if os.Getenv("DEBUG") == "true" {
config = zap.NewDevelopmentConfig()
verbosity = -1
}
config.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosity))
lameLogger, err := config.Build()
logger = lameLogger.Sugar()
if err != nil {
logger.Fatalw("Failed to create logger", zap.Error(err))
}
Flux = NewFluxServer()
Flux.Logger = logger
var serverConfig FluxServerConfig
Flux.rootDir = os.Getenv("FLUXD_ROOT_DIR")
if Flux.rootDir == "" {
Flux.rootDir = "/var/fluxd"
}
// parse config, if it doesnt exist, create it and use the default config
configPath := filepath.Join(Flux.rootDir, "config.json")
if _, err := os.Stat(configPath); err != nil {
if err := os.MkdirAll(Flux.rootDir, 0755); err != nil {
log.Fatalf("Failed to create fluxd directory: %v\n", err)
logger.Fatalw("Failed to create fluxd directory", zap.Error(err))
}
configBytes, err := json.Marshal(DefaultConfig)
if err != nil {
log.Fatalf("Failed to marshal default config: %v\n", err)
logger.Fatalw("Failed to marshal default config", zap.Error(err))
}
log.Printf("Config file not found, creating default config file at %s\n", configPath)
logger.Debugw("Config file not found creating default config file at", zap.String("path", configPath))
if err := os.WriteFile(configPath, configBytes, 0644); err != nil {
log.Fatalf("Failed to write config file: %v\n", err)
logger.Fatalw("Failed to write config file", zap.Error(err))
}
}
configFile, err := os.ReadFile(configPath)
if err != nil {
log.Fatalf("Failed to read config file: %v\n", err)
logger.Fatalw("Failed to read config file", zap.Error(err))
}
if err := json.Unmarshal(configFile, &serverConfig); err != nil {
log.Fatalf("Failed to parse config file: %v\n", err)
logger.Fatalw("Failed to parse config file", zap.Error(err))
}
Flux.config = serverConfig
Flux.dockerClient, err = client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Fatalf("Failed to create docker client: %v\n", err)
}
log.Printf("Pulling builder image %s, this may take a while...\n", serverConfig.Builder)
logger.Infof("Pulling builder image %s this may take a while...", serverConfig.Builder)
events, err := Flux.dockerClient.ImagePull(context.Background(), fmt.Sprintf("%s:latest", serverConfig.Builder), image.PullOptions{})
if err != nil {
log.Fatalf("Failed to pull builder image: %v\n", err)
logger.Fatalw("Failed to pull builder image", zap.Error(err))
}
// wait for the iamge to be pulled
// blocking wait for the iamge to be pulled
io.Copy(io.Discard, events)
log.Printf("Successfully pulled builder image %s\n", serverConfig.Builder)
logger.Infow("Successfully pulled builder image", zap.String("image", serverConfig.Builder))
if err := os.MkdirAll(filepath.Join(Flux.rootDir, "apps"), 0755); err != nil {
log.Fatalf("Failed to create apps directory: %v\n", err)
logger.Fatalw("Failed to create apps directory", zap.Error(err))
}
Flux.db, err = sql.Open("sqlite3", filepath.Join(Flux.rootDir, "fluxd.db"))
if err != nil {
log.Fatalf("Failed to open database: %v\n", err)
}
_, err = Flux.db.Exec(string(schemaBytes))
if err != nil {
log.Fatalf("Failed to create database schema: %v\n", err)
}
Flux.appManager = new(AppManager)
Flux.appManager.Init()
Flux.proxy = &Proxy{}
port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" {
port = "7465"
}
go func() {
log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port)
logger.Infof("Proxy server starting on http://127.0.0.1:%s", port)
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), Flux.proxy); err != nil && err != http.ErrServerClosed {
log.Fatalf("Proxy server error: %v", err)
logger.Fatalw("Proxy server error", zap.Error(err))
}
}()
@@ -142,7 +178,7 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf
var err error
projectPath := filepath.Join(s.rootDir, "apps", projectConfig.Name)
if err = os.MkdirAll(projectPath, 0755); err != nil {
log.Printf("Failed to create project directory: %v\n", err)
logger.Errorw("Failed to create project directory", zap.Error(err))
return "", err
}
@@ -156,7 +192,7 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf
if s.config.Compression.Enabled {
gzReader, err = gzip.NewReader(code)
if err != nil {
log.Printf("Failed to create gzip reader: %v\n", err)
logger.Infow("Failed to create gzip reader", zap.Error(err))
return "", err
}
}
@@ -168,14 +204,14 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf
tarReader = tar.NewReader(code)
}
log.Printf("Extracting files for %s...\n", projectPath)
logger.Infow("Extracting files for project", zap.String("project", projectPath))
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Failed to read tar header: %v\n", err)
logger.Debugw("Failed to read tar header", zap.Error(err))
return "", err
}
@@ -186,24 +222,24 @@ func (s *FluxServer) UploadAppCode(code io.Reader, projectConfig pkg.ProjectConf
switch header.Typeflag {
case tar.TypeDir:
if err = os.MkdirAll(path, 0755); err != nil {
log.Printf("Failed to extract directory: %v\n", err)
logger.Debugw("Failed to extract directory", zap.Error(err))
return "", err
}
case tar.TypeReg:
if err = os.MkdirAll(filepath.Dir(path), 0755); err != nil {
log.Printf("Failed to extract directory: %v\n", err)
logger.Debugw("Failed to extract directory", zap.Error(err))
return "", err
}
outFile, err := os.Create(path)
if err != nil {
log.Printf("Failed to extract file: %v\n", err)
logger.Debugw("Failed to extract file", zap.Error(err))
return "", err
}
defer outFile.Close()
if _, err = io.Copy(outFile, tarReader); err != nil {
log.Printf("Failed to copy file during extraction: %v\n", err)
logger.Debugw("Failed to copy file during extraction", zap.Error(err))
return "", err
}
}