From 4ab58f6324737de8dd7efb2c244e290964102bfa Mon Sep 17 00:00:00 2001 From: Zoe <62722391+juls0730@users.noreply.github.com> Date: Sat, 3 May 2025 19:03:09 -0500 Subject: [PATCH] add app state, and fix a few bugs --- cmd/cli/commands/start.go | 7 +- cmd/cli/commands/stop.go | 7 +- cmd/cli/config.json | 2 +- internal/handlers/app.go | 18 +++++ internal/handlers/schema.sql | 2 + internal/handlers/server.go | 7 +- internal/models/app.go | 1 + internal/models/container.go | 50 +++++-------- internal/models/deployment.go | 12 +--- internal/models/volume.go | 46 ++++++++++++ .../services/appManagerService/appmanager.go | 72 ++++++++++--------- pkg/version.go | 2 +- 12 files changed, 143 insertions(+), 83 deletions(-) create mode 100644 internal/models/volume.go diff --git a/cmd/cli/commands/start.go b/cmd/cli/commands/start.go index 2fa4df4..ff1c14c 100644 --- a/cmd/cli/commands/start.go +++ b/cmd/cli/commands/start.go @@ -14,9 +14,12 @@ func StartCommand(ctx CommandCtx, args []string) error { // Put request to start the project, since the start endpoint is idempotent. // If the project is already running, this will return a 304 Not Modified - util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil) + err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/start", nil) + if err != nil { + return fmt.Errorf("failed to start %s: %v", projectName.Name, err) + } - fmt.Printf("Successfully started %s\n", projectName) + fmt.Printf("Successfully started %s\n", projectName.Name) return nil } diff --git a/cmd/cli/commands/stop.go b/cmd/cli/commands/stop.go index 3ef6a39..6649b19 100644 --- a/cmd/cli/commands/stop.go +++ b/cmd/cli/commands/stop.go @@ -12,8 +12,11 @@ func StopCommand(ctx CommandCtx, args []string) error { return err } - util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil) + err = util.PutRequest(ctx.Config.DaemonURL+"/app/"+projectName.Id+"/stop", nil) + if err != nil { + return fmt.Errorf("failed to stop %s: %v", projectName.Name, err) + } - fmt.Printf("Successfully stopped %s\n", projectName) + fmt.Printf("Successfully stopped %s\n", projectName.Name) return nil } diff --git a/cmd/cli/config.json b/cmd/cli/config.json index 2b21a95..831a506 100644 --- a/cmd/cli/config.json +++ b/cmd/cli/config.json @@ -1,3 +1,3 @@ { - "deamon_url": "http://127.0.0.1:5647" + "daemon_url": "http://127.0.0.1:5647" } \ No newline at end of file diff --git a/internal/handlers/app.go b/internal/handlers/app.go index 89ee6f9..e19405b 100644 --- a/internal/handlers/app.go +++ b/internal/handlers/app.go @@ -482,6 +482,14 @@ func (flux *FluxServer) StartApp(w http.ResponseWriter, r *http.Request) { return } + app.State = "running" + _, err = flux.db.ExecContext(r.Context(), "UPDATE apps SET state = ? WHERE id = ?", app.State, app.Id[:]) + if err != nil { + flux.logger.Errorw("Failed to update app state", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + err = app.Deployment.Start(r.Context(), flux.docker) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -529,6 +537,14 @@ func (flux *FluxServer) StopApp(w http.ResponseWriter, r *http.Request) { return } + app.State = "stopped" + _, err = flux.db.ExecContext(r.Context(), "UPDATE apps SET state = ? WHERE id = ?", app.State, app.Id[:]) + if err != nil { + flux.logger.Errorw("Failed to update app state", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + err = app.Deployment.Stop(r.Context(), flux.docker) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -572,5 +588,7 @@ func (flux *FluxServer) DeleteDeployHandler(w http.ResponseWriter, r *http.Reque return } + flux.proxy.RemoveDeployment(app.Deployment.URL) + w.WriteHeader(http.StatusOK) } diff --git a/internal/handlers/schema.sql b/internal/handlers/schema.sql index b987a58..03fd97a 100644 --- a/internal/handlers/schema.sql +++ b/internal/handlers/schema.sql @@ -7,12 +7,14 @@ CREATE TABLE IF NOT EXISTS deployments ( CREATE TABLE IF NOT EXISTS apps ( id BLOB PRIMARY KEY, name TEXT NOT NULL UNIQUE, + state TEXT NOT NULL, deployment_id INTEGER, FOREIGN KEY(deployment_id) REFERENCES deployments(id) ); CREATE TABLE IF NOT EXISTS containers ( id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, + friendly_name TEXT NOT NULL, container_id TEXT NOT NULL, head BOOLEAN NOT NULL, deployment_id INTEGER NOT NULL, diff --git a/internal/handlers/server.go b/internal/handlers/server.go index 5709c9d..2c34a0c 100644 --- a/internal/handlers/server.go +++ b/internal/handlers/server.go @@ -43,7 +43,7 @@ type FluxServer struct { db *sql.DB docker *docker.DockerClient - // TODO: implement + proxy *proxyManagerService.ProxyManager appManager *appManagerService.AppManager @@ -147,7 +147,10 @@ func NewServer() *FluxServer { flux.proxy = proxyManagerService.NewProxyManager(flux.logger) flux.appManager = appManagerService.NewAppManager(flux.db, flux.docker, flux.proxy, flux.logger) - flux.appManager.Init() + err = flux.appManager.Init() + if err != nil { + flux.logger.Fatalw("Failed to initialize apps", zap.Error(err)) + } return flux } diff --git a/internal/models/app.go b/internal/models/app.go index 194bd33..100d75c 100644 --- a/internal/models/app.go +++ b/internal/models/app.go @@ -12,6 +12,7 @@ import ( type App struct { Id uuid.UUID `json:"id,omitempty"` Name string `json:"name,omitempty"` + State string `json:"state,omitempty"` Deployment *Deployment `json:"-"` DeploymentID int64 `json:"deployment_id,omitempty"` } diff --git a/internal/models/container.go b/internal/models/container.go index b372aa9..22fa4fe 100644 --- a/internal/models/container.go +++ b/internal/models/container.go @@ -13,25 +13,6 @@ import ( "go.uber.org/zap" ) -type Volume struct { - ID int64 `json:"id"` - Mountpoint string `json:"mountpoint"` - VolumeID string `json:"volume_id"` - ContainerID string `json:"container_id"` -} - -func (v *Volume) Remove(ctx context.Context, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error { - logger.Debugw("Removing volume", zap.String("volume_id", v.VolumeID)) - - _, err := db.ExecContext(ctx, "DELETE FROM volumes WHERE volume_id = ?", v.VolumeID) - if err != nil { - logger.Errorw("Failed to delete volume", zap.Error(err)) - return err - } - - return dockerClient.DeleteDockerVolume(ctx, v.VolumeID) -} - type Container struct { ID int64 `json:"id"` @@ -67,11 +48,11 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string, logger.Debugw("Creating container with image", zap.String("image", imageName)) - var volumes []*docker.DockerVolume + var volumes []*Volume // in the head container, we have a default volume where the project is mounted, this is important so that if the project uses sqlite for example, // all the data will not be lost the second the containers turns off. if head { - vol, err := dockerClient.CreateDockerVolume(ctx) + vol, err := CreateVolume(ctx, "/workspace", dockerClient, logger) if err != nil { logger.Errorw("Failed to create head's workspace volume", zap.Error(err)) return nil, err @@ -83,12 +64,6 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string, } for _, containerVolume := range containerVols { - vol, err := dockerClient.CreateDockerVolume(ctx) - if err != nil { - logger.Errorw("Failed to create volume", zap.Error(err)) - return nil, err - } - if containerVolume.Mountpoint == "" { return nil, fmt.Errorf("mountpoint is empty") } @@ -97,7 +72,12 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string, return nil, fmt.Errorf("invalid mountpoint") } - vol.Mountpoint = containerVolume.Mountpoint + vol, err := CreateVolume(ctx, containerVolume.Mountpoint, dockerClient, logger) + if err != nil { + logger.Errorw("Failed to create volume", zap.Error(err)) + return nil, err + } + volumes = append(volumes, vol) } @@ -132,8 +112,13 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string, io.Copy(io.Discard, image) } + dockerVols := make([]*docker.DockerVolume, 0) + for _, volume := range volumes { + dockerVols = append(dockerVols, &volume.DockerVolume) + } + logger.Debugw("Creating container", zap.String("image", imageName)) - dockerContainer, err := dockerClient.CreateDockerContainer(ctx, imageName, volumes, environment, hosts, nil) + dockerContainer, err := dockerClient.CreateDockerContainer(ctx, imageName, dockerVols, environment, hosts, nil) if err != nil { logger.Errorw("Failed to create container", zap.Error(err)) return nil, err @@ -143,9 +128,10 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string, ContainerID: dockerContainer.ID, Name: dockerContainer.Name, FriendlyName: friendlyName, + Volumes: volumes, } - err = db.QueryRow("INSERT INTO containers (container_id, head, deployment_id) VALUES (?, ?, ?) RETURNING id, container_id, head, deployment_id", string(c.ContainerID), head, deployment.ID).Scan(&c.ID, &c.ContainerID, &c.Head, &c.DeploymentID) + err = db.QueryRow("INSERT INTO containers (container_id, head, friendly_name, deployment_id) VALUES (?, ?, ?, ?) RETURNING id, container_id, head, deployment_id", string(c.ContainerID), head, friendlyName, deployment.ID).Scan(&c.ID, &c.ContainerID, &c.Head, &c.DeploymentID) if err != nil { logger.Errorw("Failed to insert container", zap.Error(err)) return nil, err @@ -166,7 +152,7 @@ func CreateContainer(ctx context.Context, imageName string, friendlyName string, for _, vol := range c.Volumes { logger.Debug("Inserting volume", zap.String("volume_id", vol.VolumeID), zap.String("mountpoint", vol.Mountpoint), zap.String("container_id", string(c.ContainerID))) - err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ContainerID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) + err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) if err != nil { logger.Errorw("Failed to insert volume", zap.Error(err)) tx.Rollback() @@ -215,7 +201,7 @@ func (c *Container) Upgrade(ctx context.Context, imageName string, environment [ return err } - err = db.QueryRow("INSERT INTO containers (container_id, head, deployment_id) VALUES (?, ?, ?) RETURNING id, container_id, head, deployment_id", newDockerContainer.ID, c.Head, c.Deployment.ID).Scan(&c.ID, &c.ContainerID, &c.Head, &c.DeploymentID) + err = db.QueryRow("INSERT INTO containers (container_id, head, friendly_name, deployment_id) VALUES (?, ?, ?, ?) RETURNING id, container_id, head, deployment_id", newDockerContainer.ID, c.Head, c.FriendlyName, c.Deployment.ID).Scan(&c.ID, &c.ContainerID, &c.Head, &c.DeploymentID) if err != nil { logger.Errorw("Failed to insert container", zap.Error(err)) return err diff --git a/internal/models/deployment.go b/internal/models/deployment.go index 405b170..00255f3 100644 --- a/internal/models/deployment.go +++ b/internal/models/deployment.go @@ -173,14 +173,8 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr // copy the old head container since Upgrade updates the container in place oldHeadContainer := *deployment.Head() - oldDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient) - if err != nil { - logger.Errorw("Failed to get internal url", zap.Error(err)) - return err - } - // we only upgrade the head container, in the future we might want to allow upgrading supplemental containers, but this should work just fine for now. - err = deployment.Head().Upgrade(ctx, imageName, projectConfig.Environment, dockerClient, db, logger) + err := deployment.Head().Upgrade(ctx, imageName, projectConfig.Environment, dockerClient, db, logger) if err != nil { logger.Errorw("Failed to upgrade container", zap.Error(err)) return err @@ -207,7 +201,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr } // Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully drained of connections - oldProxy, ok := proxyManager.Load(oldDeploymentInternalUrl.String()) + oldProxy, ok := proxyManager.Load(deployment.URL) newDeploymentInternalUrl, err := deployment.GetInternalUrl(dockerClient) if err != nil { @@ -225,7 +219,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig *pkg.Pr deployment.URL = projectConfig.Url // gracefully shutdown the old proxy, or if it doesnt exist, just remove the containers - if !ok { + if ok { go oldProxy.GracefulShutdown(func() { err := dockerClient.DeleteDockerContainer(context.Background(), oldHeadContainer.ContainerID) if err != nil { diff --git a/internal/models/volume.go b/internal/models/volume.go new file mode 100644 index 0000000..9f8ce91 --- /dev/null +++ b/internal/models/volume.go @@ -0,0 +1,46 @@ +package models + +import ( + "context" + "database/sql" + + docker "github.com/juls0730/flux/internal/docker" + "go.uber.org/zap" +) + +type Volume struct { + ID int64 `json:"id"` + docker.DockerVolume + ContainerID int64 `json:"container_id"` +} + +// Creates a volume for a container, does not insert it into the database +func CreateVolume(ctx context.Context, mountpoint string, dockerClient *docker.DockerClient, logger *zap.SugaredLogger) (*Volume, error) { + logger.Debugw("Creating volume", zap.String("mountpoint", mountpoint)) + + dockerVol, err := dockerClient.CreateDockerVolume(ctx) + if err != nil { + logger.Errorw("Failed to create volume", zap.Error(err)) + return nil, err + } + + dockerVol.Mountpoint = mountpoint + + vol := &Volume{ + DockerVolume: *dockerVol, + } + + return vol, nil +} + +func (v *Volume) Remove(ctx context.Context, dockerClient *docker.DockerClient, db *sql.DB, logger *zap.SugaredLogger) error { + logger.Debugw("Removing volume", zap.String("volume_id", v.VolumeID)) + + _, err := db.ExecContext(ctx, "DELETE FROM volumes WHERE volume_id = ?", v.VolumeID) + if err != nil { + logger.Errorw("Failed to delete volume", zap.Error(err)) + return err + } + + return dockerClient.DeleteDockerVolume(ctx, v.VolumeID) +} diff --git a/internal/services/appManagerService/appmanager.go b/internal/services/appManagerService/appmanager.go index b65abd6..9d642af 100644 --- a/internal/services/appManagerService/appmanager.go +++ b/internal/services/appManagerService/appmanager.go @@ -33,7 +33,7 @@ func NewAppManager(db *sql.DB, dockerClient *docker.DockerClient, proxyManager * } func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, projectConfig *pkg.ProjectConfig, id uuid.UUID) (*models.App, error) { - app := models.App{ + app := &models.App{ Id: id, } appManager.logger.Debugw("Creating deployment", zap.String("id", app.Id.String())) @@ -69,13 +69,11 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p return nil, fmt.Errorf("failed to create container: %v", err) } - var appIdBlob []byte - err = appManager.db.QueryRow("INSERT INTO apps (id, name, deployment_id) VALUES ($1, $2, $3) RETURNING id, name, deployment_id", app.Id[:], projectConfig.Name, app.Deployment.ID).Scan(&appIdBlob, &app.Name, &app.DeploymentID) + err = appManager.db.QueryRowContext(ctx, "INSERT INTO apps (id, name, state, deployment_id) VALUES ($1, $2, $3, $4) RETURNING name, state, deployment_id", app.Id[:], projectConfig.Name, "running", app.Deployment.ID).Scan(&app.Name, &app.State, &app.DeploymentID) if err != nil { appManager.logger.Errorw("Failed to insert app", zap.Error(err)) return nil, fmt.Errorf("failed to insert app: %v", err) } - app.Id = uuid.Must(uuid.FromBytes(appIdBlob)) err = app.Deployment.Start(ctx, appManager.dockerClient) if err != nil { @@ -83,9 +81,6 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p return nil, fmt.Errorf("failed to start deployment: %v", err) } - appManager.AddApp(app.Id, app) - appPtr := appManager.GetApp(app.Id) - deploymentInternalUrl, err := app.Deployment.GetInternalUrl(appManager.dockerClient) if err != nil { appManager.logger.Errorw("Failed to get internal url", zap.Error(err)) @@ -98,9 +93,10 @@ func (appManager *AppManager) CreateApp(ctx context.Context, imageName string, p return nil, fmt.Errorf("failed to create deployment proxy: %v", err) } - appManager.proxyManager.AddProxy(appPtr.Deployment.URL, newProxy) + appManager.AddApp(app.Id, app) + appManager.proxyManager.AddProxy(app.Deployment.URL, newProxy) - return appPtr, nil + return app, nil } func (appManager *AppManager) Upgrade(ctx context.Context, appId uuid.UUID, imageName string, projectConfig *pkg.ProjectConfig) error { @@ -174,13 +170,13 @@ func (am *AppManager) RemoveApp(id uuid.UUID) { } // add a given app to the app manager -func (am *AppManager) AddApp(id uuid.UUID, app models.App) { +func (am *AppManager) AddApp(id uuid.UUID, app *models.App) { if app.Deployment == nil || app.Deployment.Containers() == nil || app.Deployment.Head() == nil || len(app.Deployment.Containers()) == 0 || app.Name == "" { panic("invalid app") } am.nameIndex.Store(app.Name, id) - am.Store(id, &app) + am.Store(id, app) } // nukes an app completely @@ -203,58 +199,52 @@ func (am *AppManager) DeleteApp(id uuid.UUID) error { } // Scan every app in the database, and create in memory structures if the deployment is already running -func (am *AppManager) Init() { +func (am *AppManager) Init() error { am.logger.Info("Initializing deployments") if am.db == nil { am.logger.Panic("DB is nil") } - appRows, err := am.db.Query("SELECT id, name, deployment_id FROM apps") + appRows, err := am.db.Query("SELECT id, name, state, deployment_id FROM apps") if err != nil { - am.logger.Errorw("Failed to get apps", zap.Error(err)) - return + return fmt.Errorf("failed to get apps: %v", err) } defer appRows.Close() - var apps []models.App + var apps []*models.App for appRows.Next() { - var app models.App + var app *models.App = new(models.App) var appIdBlob []byte - if err := appRows.Scan(&appIdBlob, &app.Name, &app.DeploymentID); err != nil { - am.logger.Warnw("Failed to scan app", zap.Error(err)) - return + if err := appRows.Scan(&appIdBlob, &app.Name, &app.State, &app.DeploymentID); err != nil { + return fmt.Errorf("failed to scan app: %v", err) } app.Id = uuid.Must(uuid.FromBytes(appIdBlob)) app.Deployment = models.NewDeployment() if app.Deployment == nil { - am.logger.Errorw("Failed to create deployment") - return + return fmt.Errorf("failed to create deployment") } err := am.db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&app.Deployment.ID, &app.Deployment.URL, &app.Deployment.Port) if err != nil { - am.logger.Errorw("Failed to get deployment", zap.Error(err)) - return + return fmt.Errorf("failed to get deployment: %v", err) } am.logger.Debugw("Found deployment", zap.Int64("id", app.Deployment.ID)) - containerRows, err := am.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) + containerRows, err := am.db.Query("SELECT id, container_id, friendly_name, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) if err != nil { - am.logger.Warnw("Failed to query containers", zap.Error(err)) - return + return fmt.Errorf("failed to query containers: %v", err) } defer containerRows.Close() for containerRows.Next() { - var container models.Container - containerRows.Scan(&container.ID, &container.ContainerID, &container.DeploymentID, &container.Head) + var container *models.Container = new(models.Container) + containerRows.Scan(&container.ID, &container.ContainerID, &container.FriendlyName, &container.DeploymentID, &container.Head) container.Deployment = app.Deployment - volumeRows, err := am.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:]) + volumeRows, err := am.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ID) if err != nil { - am.logger.Warnw("Failed to query volumes", zap.Error(err)) - return + return fmt.Errorf("failed to query volumes: %v", err) } defer volumeRows.Close() @@ -264,10 +254,22 @@ func (am *AppManager) Init() { container.Volumes = append(container.Volumes, volume) } - app.Deployment.AppendContainer(&container) + app.Deployment.AppendContainer(container) } - // TODO: Store state of deployment in database and start it if it's stopped and should be started + // align the state of the deployment with the state of the app + switch app.State { + case "running": + err = app.Deployment.Start(context.Background(), am.dockerClient) + if err != nil { + return fmt.Errorf("failed to start deployment: %v", err) + } + case "stopped": + err = app.Deployment.Stop(context.Background(), am.dockerClient) + if err != nil { + return fmt.Errorf("failed to stop deployment: %v", err) + } + } apps = append(apps, app) } @@ -300,4 +302,6 @@ func (am *AppManager) Init() { am.proxyManager.AddProxy(app.Deployment.URL, proxy) am.logger.Debugw("Created proxy", zap.String("id", app.Id.String())) } + + return nil } diff --git a/pkg/version.go b/pkg/version.go index 68698f0..94629dc 100644 --- a/pkg/version.go +++ b/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "2025.05.02-17" +const Version = "2025.05.04-00"