random fixes and a REVERSE PROXY!!!

This commit is contained in:
Zoe
2024-12-04 06:16:28 -06:00
parent 1bb4377a89
commit 11561e864c
11 changed files with 443 additions and 132 deletions

View File

@@ -4,9 +4,9 @@ import (
"context"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@@ -14,7 +14,6 @@ import (
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/joho/godotenv"
"github.com/juls0730/fluxd/models"
)
@@ -71,22 +70,22 @@ func (cm *ContainerManager) CreateContainer(ctx context.Context, imageName, proj
resp, err := cm.dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: projectConfig.Environment,
ExposedPorts: nat.PortSet{
nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): {},
},
// ExposedPorts: nat.PortSet{
// nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): {},
// },
Volumes: map[string]struct{}{
vol.Name: {},
},
},
&container.HostConfig{
PortBindings: nat.PortMap{
nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: strconv.Itoa(projectConfig.Port),
},
},
},
// PortBindings: nat.PortMap{
// nat.Port(fmt.Sprintf("%d/tcp", projectConfig.Port)): []nat.PortBinding{
// {
// HostIP: "0.0.0.0",
// HostPort: strconv.Itoa(projectConfig.Port),
// },
// },
// },
Mounts: []mount.Mount{
{
Type: mount.TypeVolume,
@@ -129,6 +128,64 @@ func (cm *ContainerManager) RemoveContainer(ctx context.Context, containerID str
return nil
}
func (cm *ContainerManager) WaitForContainer(ctx context.Context, containerID string, containerPort int) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return fmt.Errorf("container failed to become ready in time")
default:
containerJSON, err := cm.dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}
if containerJSON.State.Running {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/", containerJSON.NetworkSettings.IPAddress, containerPort))
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
}
time.Sleep(time.Second)
}
}
}
func (cm *ContainerManager) GracefullyRemoveContainer(ctx context.Context, containerID string) error {
timeout := 30
err := cm.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{
Timeout: &timeout,
})
if err != nil {
return fmt.Errorf("Failed to stop container: %v", err)
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{})
default:
containerJSON, err := cm.dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}
if !containerJSON.State.Running {
return cm.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{})
}
time.Sleep(time.Second)
}
}
}
func (cm *ContainerManager) RemoveVolume(ctx context.Context, volumeID string) error {
if err := cm.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil {
return fmt.Errorf("Failed to remove existing volume: %v", err)
@@ -147,7 +204,7 @@ func (cm *ContainerManager) findExistingContainers(ctx context.Context, containe
var existingContainers []string
for _, container := range containers {
if strings.HasPrefix(container.Names[0], fmt.Sprintf("/%s", containerPrefix)) {
if strings.HasPrefix(container.Names[0], fmt.Sprintf("/%s-", containerPrefix)) {
existingContainers = append(existingContainers, container.ID)
}
}

View File

@@ -56,8 +56,8 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
if projectConfig.Urls == nil || len(projectConfig.Urls) == 0 {
http.Error(w, "No deployment urls specified", http.StatusBadRequest)
if projectConfig.Url == "" {
http.Error(w, "No deployment url specified", http.StatusBadRequest)
return
}
@@ -66,7 +66,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.Printf("Deploying project %s to %s\n", projectConfig.Name, projectConfig.Urls)
log.Printf("Deploying project %s to %s\n", projectConfig.Name, projectConfig.Url)
projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig)
if err != nil {
@@ -97,7 +97,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
var app models.App
s.db.QueryRow("SELECT id FROM apps WHERE name = ?", projectConfig.Name).Scan(&app.ID)
s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE name = ?", projectConfig.Name).Scan(&app.ID, &app.Name, &app.DeploymentID)
if app.ID == 0 {
configBytes, err := json.Marshal(projectConfig)
@@ -137,7 +137,25 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
s.db.QueryRow("SELECT id, name, deployment_id FROM apps WHERE id = ?", appID).Scan(&app.ID, &app.Name, &app.DeploymentID)
err = s.StartDeployment(r.Context(), app.DeploymentID)
if err != nil {
log.Printf("Failed to start deployment: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} else {
// if deploy is not started, start it
deploymentStatus, err := s.GetDeploymentStatus(r.Context(), app.DeploymentID)
if deploymentStatus != "started" || err != nil {
err = s.StartDeployment(r.Context(), app.DeploymentID)
if err != nil {
log.Printf("Failed to start deployment: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
err = s.UpgradeDeployment(r.Context(), app.DeploymentID, projectConfig, imageName, projectPath)
if err != nil {
log.Printf("Failed to upgrade deployment: %v\n", err)
@@ -146,13 +164,6 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) {
}
}
err = s.StartDeployment(r.Context(), app.DeploymentID)
if err != nil {
log.Printf("Failed to start deployment: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("App %s deployed successfully!\n", app.Name)
json.NewEncoder(w).Encode(DeployResponse{

View File

@@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"github.com/juls0730/fluxd/models"
)
// Creates a deployment and containers in the database
func (s *FluxServer) CreateDeployment(ctx context.Context, projectConfig models.ProjectConfig, containerID string) (int64, error) {
deploymentResult, err := s.db.Exec("INSERT INTO deployments (urls) VALUES (?)", strings.Join(projectConfig.Urls, ","))
deploymentResult, err := s.db.Exec("INSERT INTO deployments (url) VALUES (?)", projectConfig.Url)
if err != nil {
log.Printf("Failed to insert deployment: %v\n", err)
return 0, err
@@ -45,33 +44,28 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64,
return fmt.Errorf("Failed to find existing containers: %v", err)
}
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return err
}
// TODO: swap containers if they are running and have the same image so that we can have a constant uptime
for _, existingContainer := range existingContainers {
log.Printf("Stopping existing container: %s\n", existingContainer[0:12])
tx.Exec("DELETE FROM containers WHERE container_id = ?", existingContainer)
err = s.containerManager.RemoveContainer(ctx, existingContainer)
if err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
return err
}
fmt.Printf("There are %d existing containers\n", len(existingContainers))
// Deploy new container before deleting old one
containerID, err := s.containerManager.CreateContainer(ctx, imageName, projectPath, projectConfig)
if err != nil {
log.Printf("Failed to create container: %v\n", err)
return err
}
err = s.containerManager.StartContainer(ctx, containerID)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
return err
}
if err := s.containerManager.WaitForContainer(ctx, containerID, projectConfig.Port); err != nil {
log.Printf("Failed to wait for container: %v\n", err)
return err
}
s.Proxy.AddContainer(projectConfig, containerID)
s.db.Exec("INSERT INTO containers (container_id, deployment_id) VALUES (?, ?)", containerID, deploymentID)
// update app in the database
@@ -80,6 +74,29 @@ func (s *FluxServer) UpgradeDeployment(ctx context.Context, deploymentID int64,
return err
}
// TODO: swap containers if they are running and have the same image so that we can have a constant uptime
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return err
}
for _, existingContainer := range existingContainers {
log.Printf("Stopping existing container: %s\n", existingContainer[0:12])
tx.Exec("DELETE FROM containers WHERE container_id = ?", existingContainer)
err = s.containerManager.GracefullyRemoveContainer(ctx, existingContainer)
if err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v\n", err)
return err
}
return nil
}
@@ -102,8 +119,19 @@ func (s *FluxServer) StartDeployment(ctx context.Context, deploymentID int64) er
containerIds = append(containerIds, newContainerId)
}
var projectConfigStr []byte
s.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr)
var projectConfig models.ProjectConfig
if err := json.Unmarshal(projectConfigStr, &projectConfig); err != nil {
return err
}
if projectConfig.Name == "" {
return fmt.Errorf("No project config found for deployment")
}
for _, containerId := range containerIds {
err := s.containerManager.StartContainer(ctx, containerId)
s.Proxy.AddContainer(projectConfig, containerId)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
return err
@@ -143,8 +171,19 @@ func (s *FluxServer) StopDeployment(ctx context.Context, deploymentID int64) err
containerIds = append(containerIds, newContainerId)
}
var projectConfigStr []byte
s.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr)
var projectConfig models.ProjectConfig
if err := json.Unmarshal(projectConfigStr, &projectConfig); err != nil {
return err
}
if projectConfig.Name == "" {
return fmt.Errorf("No project config found for deployment")
}
for _, containerId := range containerIds {
err := s.containerManager.StopContainer(ctx, containerId)
s.Proxy.RemoveContainer(containerId)
if err != nil {
log.Printf("Failed to start container: %v\n", err)
return err
@@ -176,7 +215,7 @@ func (s *FluxServer) GetStatus(ctx context.Context, containerID string) (string,
func (s *FluxServer) GetDeploymentStatus(ctx context.Context, deploymentID int64) (string, error) {
var deployment models.Deployments
s.db.QueryRow("SELECT id, urls FROM deployments WHERE id = ?", deploymentID).Scan(&deployment.ID, &deployment.URLs)
s.db.QueryRow("SELECT id, url FROM deployments WHERE id = ?", deploymentID).Scan(&deployment.ID, &deployment.URL)
var containerIds []string
rows, err := s.db.Query("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID)

192
server/proxy.go Normal file
View File

@@ -0,0 +1,192 @@
package server
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/juls0730/fluxd/models"
)
type ContainerProxy struct {
mu sync.RWMutex
urlMap map[string]*containerRoute
db *sql.DB
cm *ContainerManager
activeConns int64
}
type containerRoute struct {
containerID string
port int
url string
proxy *httputil.ReverseProxy
isActive bool
}
func (cp *ContainerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cp.mu.RLock()
defer cp.mu.RUnlock()
// Extract app name from host
appUrl := r.Host
var container *containerRoute
container, exists := cp.urlMap[appUrl]
if !exists || !container.isActive {
container = &containerRoute{
url: appUrl,
}
var deploymentID int64
cp.db.QueryRow("SELECT id FROM deployments WHERE url = ?", appUrl).Scan(&deploymentID)
if deploymentID == 0 {
fmt.Printf("No deployment found for url: %s\n", appUrl)
http.Error(w, "Container not found", http.StatusNotFound)
return
}
cp.db.QueryRow("SELECT container_id FROM containers WHERE deployment_id = ?", deploymentID).Scan(&container.containerID)
if container.containerID == "" {
fmt.Printf("No container found for deployment: %d\n", deploymentID)
http.Error(w, "Container not found", http.StatusNotFound)
return
}
var projectConfigStr string
if err := cp.db.QueryRow("SELECT project_config FROM apps WHERE deployment_id = ?", deploymentID).Scan(&projectConfigStr); err != nil || projectConfigStr == "" {
http.Error(w, "Container not found", http.StatusNotFound)
return
}
var projectConfig models.ProjectConfig
if err := json.Unmarshal([]byte(projectConfigStr), &projectConfig); err != nil {
http.Error(w, "Failed to parse json", http.StatusNotFound)
return
}
container.port = projectConfig.Port
cp.urlMap[appUrl] = container
}
if container.proxy == nil {
containerJSON, err := cp.cm.dockerClient.ContainerInspect(r.Context(), container.containerID)
if err != nil {
log.Printf("Failed to inspect container: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
url, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, container.port))
if err != nil {
log.Printf("Failed to parse URL: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
container.proxy = httputil.NewSingleHostReverseProxy(url)
}
container.proxy.ServeHTTP(w, r)
}
func (cp *ContainerProxy) AddContainer(projectConfig models.ProjectConfig, containerID string) error {
cp.mu.Lock()
defer cp.mu.Unlock()
containerJSON, err := cp.cm.dockerClient.ContainerInspect(context.Background(), containerID)
if err != nil {
log.Printf("Failed to inspect container: %v\n", err)
return err
}
containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, projectConfig.Port))
if err != nil {
return err
}
proxy := httputil.NewSingleHostReverseProxy(containerUrl)
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
atomic.AddInt64(&cp.activeConns, 1)
originalDirector(req)
}
proxy.ModifyResponse = func(resp *http.Response) error {
atomic.AddInt64(&cp.activeConns, -1)
return nil
}
// Handle errors
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
log.Printf("Proxy error: %v", err)
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
}
newRoute := &containerRoute{
url: projectConfig.Url,
proxy: proxy,
port: projectConfig.Port,
isActive: true,
}
cp.urlMap[projectConfig.Url] = newRoute
return nil
}
func (cp *ContainerProxy) RemoveContainer(containerID string) error {
cp.mu.Lock()
defer cp.mu.Unlock()
var deploymentID int64
if err := cp.db.QueryRow("SELECT deployment_id FROM containers WHERE id = ?", containerID).Scan(&deploymentID); err != nil {
return err
}
var url string
if err := cp.db.QueryRow("SELECT url FROM deployments WHERE id = ?", deploymentID).Scan(&url); err != nil {
return err
}
container, exists := cp.urlMap[url]
if !exists {
return fmt.Errorf("container not found")
}
container.isActive = false
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
delete(cp.urlMap, url)
return nil
default:
if atomic.LoadInt64(&cp.activeConns) == 0 {
delete(cp.urlMap, url)
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
}
func (cp *ContainerProxy) Start() {
server := &http.Server{
Addr: ":7465",
Handler: cp,
}
go func() {
log.Printf("Proxy server starting on http://127.0.0.1:7465\n")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Proxy server error: %v", err)
}
}()
}

View File

@@ -19,6 +19,6 @@ CREATE TABLE IF NOT EXISTS containers (
CREATE TABLE IF NOT EXISTS deployments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
urls TEXT NOT NULL,
url TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

View File

@@ -30,6 +30,7 @@ type FluxServer struct {
containerManager *ContainerManager
config FluxServerConfig
db *sql.DB
Proxy *ContainerProxy
rootDir string
}
@@ -103,7 +104,12 @@ func NewServer() *FluxServer {
containerManager: containerManager,
config: serverConfig,
db: db,
rootDir: rootDir,
Proxy: &ContainerProxy{
urlMap: make(map[string]*containerRoute),
db: db,
cm: containerManager,
},
rootDir: rootDir,
}
}