fix steaming, stale data, proxy bugs and more

This commit is contained in:
Zoe
2024-12-13 03:33:04 -06:00
parent e46bb05b39
commit 7689999413
13 changed files with 798 additions and 658 deletions

View File

@@ -12,10 +12,10 @@ import (
)
type App struct {
ID int64 `json:"id,omitempty"`
Deployment Deployment `json:"deployment,omitempty"`
Name string `json:"name,omitempty"`
DeploymentID int64 `json:"deployment_id,omitempty"`
ID int64 `json:"id,omitempty"`
Deployment *Deployment `json:"deployment,omitempty"`
Name string `json:"name,omitempty"`
DeploymentID int64 `json:"deployment_id,omitempty"`
}
func CreateApp(ctx context.Context, imageName string, projectPath string, projectConfig pkg.ProjectConfig) (*App, error) {
@@ -24,51 +24,36 @@ func CreateApp(ctx context.Context, imageName string, projectPath string, projec
}
log.Printf("Creating deployment %s...\n", app.Name)
container, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig)
if err != nil || container == nil {
return nil, fmt.Errorf("Failed to create container: %v", err)
}
deployment, err := CreateDeployment(*container, projectConfig.Port, projectConfig.Url, Flux.db)
deployment, err := CreateDeployment(projectConfig.Port, projectConfig.Url, Flux.db)
app.Deployment = deployment
if err != nil {
log.Printf("Failed to create deployment: %v", err)
return nil, err
}
container, err := CreateContainer(ctx, imageName, projectPath, projectConfig, true, deployment)
if err != nil || container == nil {
return nil, fmt.Errorf("failed to create container: %v", err)
}
if appInsertStmt == nil {
appInsertStmt, err = Flux.db.Prepare("INSERT INTO apps (name, deployment_id) VALUES ($1, $2) RETURNING id, name, deployment_id")
if err != nil {
return nil, fmt.Errorf("Failed to prepare statement: %v", err)
return nil, fmt.Errorf("failed to prepare statement: %v", err)
}
}
// create app in the database
err = appInsertStmt.QueryRow(projectConfig.Name, deployment.ID).Scan(&app.ID, &app.Name, &app.DeploymentID)
if err != nil {
return nil, fmt.Errorf("Failed to insert app: %v", err)
return nil, fmt.Errorf("failed to insert app: %v", err)
}
err = deployment.Start(ctx)
if err != nil {
return nil, fmt.Errorf("Failed to start deployment: %v", err)
return nil, fmt.Errorf("failed to start deployment: %v", err)
}
var headContainer *Container
for _, container := range deployment.Containers {
if container.Head {
headContainer = &container
}
}
deployment.Proxy, err = NewDeploymentProxy(&deployment, headContainer)
if err != nil {
return nil, fmt.Errorf("Failed to create deployment proxy: %v", err)
}
Flux.proxy.AddDeployment(&deployment)
Flux.appManager.AddApp(app.Name, app)
return app, nil
@@ -79,16 +64,20 @@ func (app *App) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, im
// if deploy is not started, start it
deploymentStatus, err := app.Deployment.Status(ctx)
if deploymentStatus != "running" || err != nil {
if err != nil {
return fmt.Errorf("failed to get deployment status: %v", err)
}
if deploymentStatus != "running" {
err = app.Deployment.Start(ctx)
if err != nil {
return fmt.Errorf("Failed to start deployment: %v", err)
return fmt.Errorf("failed to start deployment: %v", err)
}
}
err = app.Deployment.Upgrade(ctx, projectConfig, imageName, projectPath)
if err != nil {
return fmt.Errorf("Failed to upgrade deployment: %v", err)
return fmt.Errorf("failed to upgrade deployment: %v", err)
}
return nil
@@ -110,7 +99,7 @@ func (app *App) Remove(ctx context.Context) error {
projectPath := filepath.Join(Flux.rootDir, "apps", app.Name)
err = os.RemoveAll(projectPath)
if err != nil {
return fmt.Errorf("Failed to remove project directory: %v", err)
return fmt.Errorf("failed to remove project directory: %v", err)
}
return nil
@@ -141,13 +130,17 @@ func (am *AppManager) GetAllApps() []*App {
}
func (am *AppManager) AddApp(name string, app *App) {
if app.Deployment.Containers == nil || app.Deployment.Head == nil || len(app.Deployment.Containers) == 0 {
panic("nil containers")
}
am.Store(name, app)
}
func (am *AppManager) DeleteApp(name string) error {
app := am.GetApp(name)
if app == nil {
return fmt.Errorf("App not found")
return fmt.Errorf("app not found")
}
err := app.Remove(context.Background())
@@ -185,10 +178,10 @@ func (am *AppManager) Init() {
}
for _, app := range apps {
var deployment Deployment
deployment := &Deployment{}
var headContainer *Container
Flux.db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
deployment.Containers = make([]Container, 0)
deployment.Containers = make([]*Container, 0)
rows, err = Flux.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID)
if err != nil {
@@ -201,19 +194,18 @@ func (am *AppManager) Init() {
var container Container
var containerIDString string
rows.Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
container.Deployment = &deployment
container.Deployment = deployment
copy(container.ContainerID[:], containerIDString)
if container.Head {
if headContainer != nil {
log.Fatalf("Several containers are marked as head")
}
headContainer = &container
}
deployment.Containers = append(deployment.Containers, container)
}
for i, container := range deployment.Containers {
var volumes []Volume
rows, err := Flux.db.Query("SELECT id, volume_id, container_id FROM volumes WHERE container_id = ?", container.ID)
rows, err := Flux.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:])
if err != nil {
log.Printf("Failed to query volumes: %v\n", err)
return
@@ -222,17 +214,32 @@ func (am *AppManager) Init() {
for rows.Next() {
var volume Volume
rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID)
volumes = append(volumes, volume)
rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID, &volume.Mountpoint)
container.Volumes = append(container.Volumes, volume)
}
deployment.Containers[i].Volumes = volumes
deployment.Containers = append(deployment.Containers, &container)
}
deployment.Proxy, _ = NewDeploymentProxy(&deployment, headContainer)
if headContainer == nil {
log.Fatalf("head container is nil!")
}
deployment.Head = headContainer
app.Deployment = deployment
am.AddApp(app.Name, &app)
status, err := deployment.Status(context.Background())
if err != nil {
log.Printf("Failed to get deployment status: %v\n", err)
continue
}
if status != "running" {
continue
}
deployment.Proxy, _ = deployment.NewDeploymentProxy()
Flux.proxy.AddDeployment(deployment)
}
}

View File

@@ -2,6 +2,7 @@ package server
import (
"context"
"database/sql"
"fmt"
"log"
"net/http"
@@ -17,10 +18,17 @@ import (
"github.com/juls0730/flux/pkg"
)
var (
volumeInsertStmt *sql.Stmt
volumeUpdateStmt *sql.Stmt
containerInsertStmt *sql.Stmt
)
type Volume struct {
ID int64 `json:"id"`
VolumeID string `json:"volume_id"`
ContainerID int64 `json:"container_id"`
Mountpoint string `json:"mountpoint"`
ContainerID string `json:"container_id"`
}
type Container struct {
@@ -32,14 +40,13 @@ type Container struct {
DeploymentID int64 `json:"deployment_id"`
}
func CreateDockerVolume(ctx context.Context, name string) (vol *Volume, err error) {
func CreateDockerVolume(ctx context.Context) (vol *Volume, err error) {
dockerVolume, err := Flux.dockerClient.VolumeCreate(ctx, volume.CreateOptions{
Driver: "local",
DriverOpts: map[string]string{},
Name: name,
})
if err != nil {
return nil, fmt.Errorf("Failed to create volume: %v", err)
return nil, fmt.Errorf("failed to create volume: %v", err)
}
log.Printf("Volume %s created at %s\n", dockerVolume.Name, dockerVolume.Mountpoint)
@@ -51,21 +58,19 @@ func CreateDockerVolume(ctx context.Context, name string) (vol *Volume, err erro
return vol, nil
}
func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (c *Container, err error) {
log.Printf("Deploying container with image %s\n", imageName)
func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig, vol *Volume) (*Container, error) {
containerName := fmt.Sprintf("%s-%s", projectConfig.Name, time.Now().Format("20060102-150405"))
if projectConfig.EnvFile != "" {
envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile))
if err != nil {
return nil, fmt.Errorf("Failed to open env file: %v", err)
return nil, fmt.Errorf("failed to open env file: %v", err)
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
return nil, fmt.Errorf("Failed to parse env file: %v", err)
return nil, fmt.Errorf("failed to parse env file: %v", err)
}
for key, value := range envVars {
@@ -73,8 +78,6 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
}
}
vol, err := CreateDockerVolume(ctx, fmt.Sprintf("flux_%s-volume", projectConfig.Name))
log.Printf("Creating container %s...\n", containerName)
resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
@@ -90,7 +93,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
{
Type: mount.TypeVolume,
Source: vol.VolumeID,
Target: "/workspace",
Target: vol.Mountpoint,
ReadOnly: false,
},
},
@@ -100,18 +103,131 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p
containerName,
)
if err != nil {
return nil, fmt.Errorf("Failed to create container: %v", err)
return nil, err
}
c = &Container{
c := &Container{
ContainerID: [64]byte([]byte(resp.ID)),
Volumes: []Volume{*vol},
}
log.Printf("Created new container: %s\n", containerName)
return c, nil
}
func CreateContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig, head bool, deployment *Deployment) (c *Container, err error) {
log.Printf("Creating container with image %s\n", imageName)
if projectConfig.EnvFile != "" {
envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile))
if err != nil {
return nil, fmt.Errorf("failed to open env file: %v", err)
}
defer envBytes.Close()
envVars, err := godotenv.Parse(envBytes)
if err != nil {
return nil, fmt.Errorf("failed to parse env file: %v", err)
}
for key, value := range envVars {
projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value))
}
}
var vol *Volume
vol, err = CreateDockerVolume(ctx)
if err != nil {
return nil, err
}
vol.Mountpoint = "/workspace"
if volumeInsertStmt == nil {
volumeInsertStmt, err = Flux.db.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return nil, err
}
}
c, err = CreateDockerContainer(ctx, imageName, projectPath, projectConfig, vol)
if err != nil {
return nil, err
}
if containerInsertStmt == nil {
containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, head, deployment_id) VALUES ($1, $2, $3) RETURNING id, container_id, head, deployment_id")
if err != nil {
return nil, err
}
}
var containerIDString string
err = containerInsertStmt.QueryRow(c.ContainerID[:], head, deployment.ID).Scan(&c.ID, &containerIDString, &c.Head, &c.DeploymentID)
if err != nil {
return nil, err
}
copy(c.ContainerID[:], containerIDString)
err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ContainerID[:]).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
if err != nil {
return nil, err
}
c.Deployment = deployment
if head {
deployment.Head = c
}
deployment.Containers = append(deployment.Containers, c)
return c, nil
}
func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (*Container, error) {
// Create new container with new image
log.Printf("Upgrading container %s...\n", c.ContainerID[:12])
if c.Volumes == nil {
return nil, fmt.Errorf("no volumes found for container %s", c.ContainerID[:12])
}
vol := &c.Volumes[0]
newContainer, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig, vol)
if err != nil {
return nil, err
}
newContainer.Deployment = c.Deployment
if containerInsertStmt == nil {
containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, head, deployment_id) VALUES ($1, $2, $3) RETURNING id, container_id, head, deployment_id")
if err != nil {
return nil, err
}
}
var containerIDString string
err = containerInsertStmt.QueryRow(newContainer.ContainerID[:], c.Head, c.Deployment.ID).Scan(&newContainer.ID, &containerIDString, &newContainer.Head, &newContainer.DeploymentID)
if err != nil {
log.Printf("Failed to insert container: %v\n", err)
return nil, err
}
copy(newContainer.ContainerID[:], containerIDString)
if volumeUpdateStmt == nil {
volumeUpdateStmt, err = Flux.db.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, mountpoint, container_id")
if err != nil {
return nil, err
}
}
vol = &newContainer.Volumes[0]
volumeUpdateStmt.QueryRow(newContainer.ContainerID[:], vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID)
log.Printf("Upgraded container")
return newContainer, nil
}
func (c *Container) Start(ctx context.Context) error {
return Flux.dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{})
}
@@ -124,7 +240,7 @@ func (c *Container) Remove(ctx context.Context) error {
err := RemoveDockerContainer(ctx, string(c.ContainerID[:]))
if err != nil {
return fmt.Errorf("Failed to remove container (%s): %v", c.ContainerID[:12], err)
return fmt.Errorf("failed to remove container (%s): %v", c.ContainerID[:12], err)
}
tx, err := Flux.db.Begin()
@@ -142,7 +258,7 @@ func (c *Container) Remove(ctx context.Context) error {
for _, volume := range c.Volumes {
if err := RemoveVolume(ctx, volume.VolumeID); err != nil {
tx.Rollback()
return fmt.Errorf("Failed to remove volume (%s): %v", volume.VolumeID, err)
return fmt.Errorf("failed to remove volume (%s): %v", volume.VolumeID, err)
}
_, err = tx.Exec("DELETE FROM volumes WHERE volume_id = ?", volume.VolumeID)
@@ -176,11 +292,11 @@ func (c *Container) Status(ctx context.Context) (string, error) {
// RemoveContainer stops and removes a container, but be warned that this will not remove the container from the database
func RemoveDockerContainer(ctx context.Context, containerID string) error {
if err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil {
return fmt.Errorf("Failed to stop container (%s): %v", containerID[:12], err)
return fmt.Errorf("failed to stop container (%s): %v", containerID[:12], err)
}
if err := Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil {
return fmt.Errorf("Failed to remove container (%s): %v", containerID[:12], err)
return fmt.Errorf("failed to remove container (%s): %v", containerID[:12], err)
}
return nil
@@ -220,7 +336,7 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er
Timeout: &timeout,
})
if err != nil {
return fmt.Errorf("Failed to stop container: %v", err)
return fmt.Errorf("failed to stop container: %v", err)
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
@@ -249,7 +365,7 @@ func RemoveVolume(ctx context.Context, volumeID string) error {
log.Printf("Removed volume %s\n", volumeID)
if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil {
return fmt.Errorf("Failed to remove volume (%s): %v", volumeID, err)
return fmt.Errorf("failed to remove volume (%s): %v", volumeID, err)
}
return nil

View File

@@ -73,6 +73,12 @@ func (dt *DeploymentLock) CompleteDeployment(appName string) {
var deploymentLock = NewDeploymentLock()
type DeploymentEvent struct {
Stage string `json:"stage"`
Message string `json:"message"`
StatusCode int `json:"status,omitempty"`
}
func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
if Flux.appManager == nil {
panic("App manager is nil")
@@ -120,59 +126,84 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
eventChannel := make(chan pkg.DeploymentEvent, 10)
w.WriteHeader(http.StatusMultiStatus)
eventChannel := make(chan DeploymentEvent, 10)
defer close(eventChannel)
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
go func(w http.ResponseWriter, flusher http.Flusher) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-eventChannel:
if !ok {
return
}
eventJSON, err := json.Marshal(event)
ev := struct {
Message string `json:"message"`
}{
Message: event.Message,
}
eventJSON, err := json.Marshal(ev)
if err != nil {
// Write error directly to ResponseWriter
jsonErr := json.NewEncoder(w).Encode(err)
if jsonErr != nil {
fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n")
return
}
fmt.Fprintf(w, "data: %s\n\n", err.Error())
flusher.Flush()
if flusher != nil {
flusher.Flush()
}
return
}
fmt.Fprintf(w, "event: %s\n", event.Stage)
fmt.Fprintf(w, "data: %s\n\n", eventJSON)
flusher.Flush()
case <-ctx.Done():
return
if flusher != nil {
flusher.Flush()
}
if event.Stage == "error" || event.Stage == "complete" {
return
}
}
}
}()
}(w, flusher)
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "start",
Message: "Uploading code",
}
deployRequest.Code, _, err = r.FormFile("code")
if err != nil {
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: "No code archive found",
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: "No code archive found",
StatusCode: http.StatusBadRequest,
}
http.Error(w, "No code archive found", http.StatusBadRequest)
return
}
defer deployRequest.Code.Close()
if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 {
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
Error: "Invalid flux.json, a name, url, and port must be specified",
eventChannel <- DeploymentEvent{
Stage: "error",
Message: "Invalid flux.json, a name, url, and port must be specified",
StatusCode: http.StatusBadRequest,
}
http.Error(w, "Invalid flux.json, a name, url, and port must be specified", http.StatusBadRequest)
return
}
@@ -181,27 +212,32 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig)
if err != nil {
log.Printf("Failed to upload code: %v\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: "Failed to upload code",
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upload code: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Streams the each line of the pipe into the eventChannel, this closes the pipe when the function exits
streamPipe := func(pipe io.ReadCloser) {
// we need a wait group because otherwise the function *could* exit before the pipe is closed
// and wreck havoc on every future request
wg.Add(1)
defer wg.Done()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "cmd_output",
Message: fmt.Sprintf("%s", line),
Message: line,
}
}
if err := scanner.Err(); err != nil {
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to read pipe: %s", err),
}
@@ -210,7 +246,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
log.Printf("Preparing project %s...\n", projectConfig.Name)
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "preparing",
Message: "Preparing project",
}
@@ -220,25 +256,22 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
cmdOut, err := prepareCmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get stdout pipe: %v\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stdout pipe: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stdout pipe: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, fmt.Sprintf("Failed to get stdout pipe: %s", err), http.StatusInternalServerError)
return
}
cmdErr, err := prepareCmd.StderrPipe()
if err != nil {
log.Printf("Failed to get stderr pipe: %v\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stderr pipe: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stderr pipe: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, fmt.Sprintf("Failed to get stderr pipe: %s", err), http.StatusInternalServerError)
return
}
@@ -248,19 +281,16 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
err = prepareCmd.Run()
if err != nil {
log.Printf("Failed to prepare project: %s\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to prepare project: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, fmt.Sprintf("Failed to prepare project: %s", err), http.StatusInternalServerError)
return
}
cmdOut.Close()
cmdErr.Close()
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "building",
Message: "Building project image",
}
@@ -272,25 +302,23 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
cmdOut, err = buildCmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get stdout pipe: %v\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stdout pipe: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stdout pipe: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, fmt.Sprintf("Failed to get stdout pipe: %s", err), http.StatusInternalServerError)
return
}
cmdErr, err = buildCmd.StderrPipe()
if err != nil {
log.Printf("Failed to get stderr pipe: %v\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stderr pipe: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to get stderr pipe: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, fmt.Sprintf("Failed to get stderr pipe: %s", err), http.StatusInternalServerError)
return
}
@@ -300,21 +328,18 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
err = buildCmd.Run()
if err != nil {
log.Printf("Failed to build image: %s\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to build image: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, fmt.Sprintf("Failed to build image: %s", err), http.StatusInternalServerError)
return
}
cmdOut.Close()
cmdErr.Close()
app := Flux.appManager.GetApp(projectConfig.Name)
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "creating",
Message: "Creating deployment",
}
@@ -323,26 +348,24 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
app, err = CreateApp(ctx, imageName, projectPath, projectConfig)
if err != nil {
log.Printf("Failed to create app: %v", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to create app: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to create app: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} else {
err = app.Upgrade(ctx, projectConfig, imageName, projectPath)
if err != nil {
log.Printf("Failed to upgrade app: %v", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to upgrade app: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
@@ -352,27 +375,21 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
})
if err != nil {
log.Printf("Failed to marshal deploy response: %v\n", err)
eventChannel <- pkg.DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to marshal deploy response: %s", err),
Error: err.Error(),
eventChannel <- DeploymentEvent{
Stage: "error",
Message: fmt.Sprintf("Failed to marshal deploy response: %s", err),
StatusCode: http.StatusInternalServerError,
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
eventChannel <- pkg.DeploymentEvent{
eventChannel <- DeploymentEvent{
Stage: "complete",
Message: fmt.Sprintf("%s", responseJSON),
Message: string(responseJSON),
}
log.Printf("App %s deployed successfully!\n", app.Name)
close(eventChannel)
// make sure all the events are flushed
wg.Wait()
}
func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) {
@@ -402,14 +419,7 @@ func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request)
}
if app.Deployment.Proxy == nil {
var headContainer *Container
for _, container := range app.Deployment.Containers {
if container.Head {
headContainer = &container
}
}
app.Deployment.Proxy, _ = NewDeploymentProxy(&app.Deployment, headContainer)
app.Deployment.Proxy, _ = app.Deployment.NewDeploymentProxy()
}
w.WriteHeader(http.StatusOK)

View File

@@ -11,21 +11,19 @@ import (
var (
deploymentInsertStmt *sql.Stmt
containerInsertStmt *sql.Stmt
volumeInsertStmt *sql.Stmt
updateVolumeStmt *sql.Stmt
)
type Deployment struct {
ID int64 `json:"id"`
Containers []Container `json:"containers,omitempty"`
Head *Container `json:"head,omitempty"`
Containers []*Container `json:"containers,omitempty"`
Proxy *DeploymentProxy `json:"-"`
URL string `json:"url"`
Port uint16 `json:"port"`
}
// Creates a deployment and containers in the database
func CreateDeployment(container Container, port uint16, appUrl string, db *sql.DB) (Deployment, error) {
func CreateDeployment(port uint16, appUrl string, db *sql.DB) (*Deployment, error) {
var deployment Deployment
var err error
@@ -33,115 +31,33 @@ func CreateDeployment(container Container, port uint16, appUrl string, db *sql.D
deploymentInsertStmt, err = db.Prepare("INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return Deployment{}, err
return nil, err
}
}
err = deploymentInsertStmt.QueryRow(appUrl, port).Scan(&deployment.ID, &deployment.URL, &deployment.Port)
if err != nil {
log.Printf("Failed to insert deployment: %v\n", err)
return Deployment{}, err
return nil, err
}
if containerInsertStmt == nil {
containerInsertStmt, err = db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return Deployment{}, err
}
}
var containerIDString string
err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
if err != nil {
log.Printf("Failed to get container id: %v\n", err)
return Deployment{}, err
}
copy(container.ContainerID[:], containerIDString)
for i, volume := range container.Volumes {
if volumeInsertStmt == nil {
volumeInsertStmt, err = db.Prepare("INSERT INTO volumes (volume_id, container_id) VALUES (?, ?) RETURNING id, volume_id, container_id")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return Deployment{}, err
}
}
if err := volumeInsertStmt.QueryRow(volume.VolumeID, container.ID).Scan(&container.Volumes[i].ID, &container.Volumes[i].VolumeID, &container.Volumes[i].ContainerID); err != nil {
log.Printf("Failed to insert volume: %v\n", err)
return Deployment{}, err
}
}
container.Deployment = &deployment
deployment.Containers = append(deployment.Containers, container)
return deployment, nil
return &deployment, nil
}
func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string) error {
existingContainers, err := findExistingDockerContainers(ctx, projectConfig.Name)
if err != nil {
return fmt.Errorf("Failed to find existing containers: %v", err)
return fmt.Errorf("failed to find existing containers: %v", err)
}
// Deploy new container before deleting old one
c, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig)
if err != nil || c == nil {
log.Printf("Failed to create container: %v\n", err)
return err
}
var container Container = *c
if containerInsertStmt == nil {
containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return err
}
}
var containerIDString string
err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head)
container, err := deployment.Head.Upgrade(ctx, imageName, projectPath, projectConfig)
if err != nil {
log.Printf("Failed to get container id: %v\n", err)
log.Printf("Failed to upgrade container: %v\n", err)
return err
}
container.Deployment = deployment
// the space time complexity of this is pretty bad, but it works
for _, existingContainer := range deployment.Containers {
if !existingContainer.Head {
continue
}
for _, volume := range existingContainer.Volumes {
var targetVolume *Volume
for i, volume := range container.Volumes {
if volume.VolumeID == volume.VolumeID {
targetVolume = &container.Volumes[i]
break
}
}
if updateVolumeStmt == nil {
updateVolumeStmt, err = Flux.db.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, container_id")
if err != nil {
log.Printf("Failed to prepare statement: %v\n", err)
return err
}
}
err := updateVolumeStmt.QueryRow(container.ID, volume.ID).Scan(&targetVolume.ID, &targetVolume.VolumeID, &targetVolume.ContainerID)
if err != nil {
log.Printf("Failed to update volume: %v\n", err)
return err
}
}
}
copy(container.ContainerID[:], containerIDString)
// copy(container.ContainerID[:], containerIDString)
deployment.Head = container
deployment.Containers = append(deployment.Containers, container)
log.Printf("Starting container %s...\n", container.ContainerID[:12])
@@ -163,7 +79,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
// Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully shutdown
oldProxy := deployment.Proxy
deployment.Proxy, err = NewDeploymentProxy(deployment, &container)
deployment.Proxy, err = deployment.NewDeploymentProxy()
if err != nil {
log.Printf("Failed to create deployment proxy: %v\n", err)
return err
@@ -175,16 +91,17 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
return err
}
var containers []Container
var containers []*Container
var oldContainers []*Container
for _, container := range deployment.Containers {
if existingContainers[string(container.ContainerID[:])] {
log.Printf("Deleting container from db: %s\n", container.ContainerID[:12])
_, err = tx.Exec("DELETE FROM containers WHERE id = ?", container.ID)
oldContainers = append(oldContainers, &container)
oldContainers = append(oldContainers, container)
if err != nil {
log.Printf("Failed to delete container: %v\n", err)
tx.Rollback()
return err
}
@@ -212,9 +129,6 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro
}
deployment.Containers = containers
Flux.proxy.AddDeployment(deployment)
return nil
}
@@ -245,6 +159,11 @@ func (d *Deployment) Start(ctx context.Context) error {
}
}
if d.Proxy == nil {
d.Proxy, _ = d.NewDeploymentProxy()
Flux.proxy.AddDeployment(d)
}
return nil
}
@@ -257,19 +176,20 @@ func (d *Deployment) Stop(ctx context.Context) error {
}
}
Flux.proxy.RemoveDeployment(d)
d.Proxy = nil
return nil
}
func (d *Deployment) Status(ctx context.Context) (string, error) {
var status string
if d == nil {
fmt.Printf("Deployment is nil\n")
return "stopped", nil
return "", fmt.Errorf("deployment is nil")
}
if d.Containers == nil {
fmt.Printf("Containers are nil\n")
return "stopped", nil
return "", fmt.Errorf("containers are nil")
}
for _, container := range d.Containers {
@@ -281,7 +201,7 @@ func (d *Deployment) Status(ctx context.Context) (string, error) {
// if not all containers are in the same state
if status != "" && status != containerStatus {
return "", fmt.Errorf("Malformed deployment")
return "", fmt.Errorf("malformed deployment")
}
status = containerStatus

View File

@@ -16,7 +16,15 @@ type Proxy struct {
deployments sync.Map
}
func (p *Proxy) RemoveDeployment(deployment *Deployment) {
p.deployments.Delete(deployment.URL)
}
func (p *Proxy) AddDeployment(deployment *Deployment) {
if deployment.Containers == nil {
panic("containers is nil")
}
log.Printf("Adding deployment %s\n", deployment.URL)
p.deployments.Store(deployment.URL, deployment)
}
@@ -37,24 +45,23 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type DeploymentProxy struct {
deployment *Deployment
currentHead *Container
proxy *httputil.ReverseProxy
gracePeriod time.Duration
activeRequests int64
}
func NewDeploymentProxy(deployment *Deployment, head *Container) (*DeploymentProxy, error) {
func (deployment *Deployment) NewDeploymentProxy() (*DeploymentProxy, error) {
if deployment == nil {
return nil, fmt.Errorf("Deployment is nil")
return nil, fmt.Errorf("deployment is nil")
}
containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(head.ContainerID[:]))
containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(deployment.Head.ContainerID[:]))
if err != nil {
return nil, err
}
if containerJSON.NetworkSettings.IPAddress == "" {
return nil, fmt.Errorf("No IP address found for container %s", head.ContainerID[:12])
return nil, fmt.Errorf("no IP address found for container %s", deployment.Head.ContainerID[:12])
}
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, deployment.Port))
@@ -80,7 +87,6 @@ func NewDeploymentProxy(deployment *Deployment, head *Container) (*DeploymentPro
return &DeploymentProxy{
deployment: deployment,
currentHead: head,
proxy: proxy,
gracePeriod: time.Second * 30,
activeRequests: 0,
@@ -91,22 +97,18 @@ func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) {
ctx, cancel := context.WithTimeout(context.Background(), dp.gracePeriod)
defer cancel()
// Create a channel to signal when wait group is done
for {
done := false
for !done {
select {
case <-ctx.Done():
break
done = true
default:
if atomic.LoadInt64(&dp.activeRequests) == 0 {
break
done = true
}
time.Sleep(time.Second)
}
if atomic.LoadInt64(&dp.activeRequests) == 0 || ctx.Err() != nil {
break
}
}
for _, container := range oldContainers {

View File

@@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS containers (
CREATE TABLE IF NOT EXISTS volumes (
id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE,
volume_id TEXT NOT NULL,
mountpoint TEXT NOT NULL,
container_id INTEGER NOT NULL,
FOREIGN KEY(container_id) REFERENCES containers(id)
);

View File

@@ -123,12 +123,6 @@ func NewServer() *FluxServer {
Flux.proxy = &Proxy{}
Flux.appManager.Range(func(key, value interface{}) bool {
app := value.(*App)
Flux.proxy.AddDeployment(&app.Deployment)
return true
})
port := os.Getenv("FLUXD_PROXY_PORT")
if port == "" {
port = "7465"