package server import ( "context" "database/sql" "fmt" "io" "net/http" "strings" "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/volume" "github.com/juls0730/flux/pkg" "go.uber.org/zap" ) var ( containerInsertStmt *sql.Stmt ) type Volume struct { ID int64 `json:"id"` VolumeID string `json:"volume_id"` Mountpoint string `json:"mountpoint"` ContainerID string `json:"container_id"` } type Container struct { ID int64 `json:"id"` Head bool `json:"head"` // if the container is the head of the deployment Name string `json:"name"` Deployment *Deployment `json:"-"` Volumes []*Volume `json:"volumes"` ContainerID [64]byte `json:"container_id"` DeploymentID int64 `json:"deployment_id"` } // Creates a volume in the docker daemon and returns the descriptor for the volume func CreateDockerVolume(ctx context.Context) (vol *Volume, err error) { dockerVolume, err := Flux.dockerClient.VolumeCreate(ctx, volume.CreateOptions{ Driver: "local", DriverOpts: map[string]string{}, }) if err != nil { return nil, fmt.Errorf("failed to create volume: %v", err) } logger.Debugw("Volume created", zap.String("volume_id", dockerVolume.Name), zap.String("mountpoint", dockerVolume.Mountpoint)) vol = &Volume{ VolumeID: dockerVolume.Name, } return vol, nil } // Creates a container in the docker daemon and returns the descriptor for the container func CreateDockerContainer(ctx context.Context, imageName string, projectName string, vols []*Volume, environment []string, hosts []string) (*Container, error) { for _, host := range hosts { if host == ":" { return nil, fmt.Errorf("invalid host %s", host) } } safeImageName := strings.ReplaceAll(imageName, "/", "_") containerName := fmt.Sprintf("flux_%s-%s-%s", safeImageName, projectName, time.Now().Format("20060102-150405")) logger.Debugw("Creating container", zap.String("container_id", containerName)) mounts := make([]mount.Mount, len(vols)) volumes := make(map[string]struct{}, len(vols)) for i, volume := range vols { volumes[volume.VolumeID] = struct{}{} mounts[i] = mount.Mount{ Type: mount.TypeVolume, Source: volume.VolumeID, Target: volume.Mountpoint, ReadOnly: false, } } resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: environment, Volumes: volumes, }, &container.HostConfig{ RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped}, NetworkMode: "bridge", Mounts: mounts, ExtraHosts: hosts, }, nil, nil, containerName, ) if err != nil { return nil, err } c := &Container{ ContainerID: [64]byte([]byte(resp.ID)), Volumes: vols, } return c, nil } // Create a container given a container configuration and a deployment. This will do a few things: // 1. Create the container in the docker daemon // 2. Create the volumes for the container // 3. Insert the container and volumes into the database func CreateContainer(ctx context.Context, container *pkg.Container, projectName string, head bool, deployment *Deployment) (c *Container, err error) { if container.Name == "" { return nil, fmt.Errorf("container name is empty") } if container.ImageName == "" { return nil, fmt.Errorf("container image name is empty") } logger.Debugw("Creating container with image", zap.String("image", container.ImageName)) var volumes []*Volume // in the head container, we have a default volume where the project is mounted, this is important so that if the project uses sqlite for example, // all the data will not be lost the second the containers turns off. if head { vol, err := CreateDockerVolume(ctx) if err != nil { return nil, err } vol.Mountpoint = "/workspace" volumes = append(volumes, vol) } for _, containerVolume := range container.Volumes { vol, err := CreateDockerVolume(ctx) if err != nil { return nil, err } if containerVolume.Mountpoint == "" { return nil, fmt.Errorf("mountpoint is empty") } if containerVolume.Mountpoint == "/workspace" || containerVolume.Mountpoint == "/" { return nil, fmt.Errorf("invalid mountpoint") } vol.Mountpoint = containerVolume.Mountpoint volumes = append(volumes, vol) } // if the container is the head, build a list of hostnames that the container can reach by name for this deployment // TODO: this host list should be consistent across all containers in the deployment, not just the head var hosts []string if head { for _, container := range deployment.Containers { containerName, err := container.GetIp() if err != nil { return nil, err } hosts = append(hosts, fmt.Sprintf("%s:%s", container.Name, containerName)) } } // if the container is not the head, pull the image from docker hub if !head { image, err := Flux.dockerClient.ImagePull(ctx, container.ImageName, image.PullOptions{}) if err != nil { logger.Errorw("Failed to pull image", zap.Error(err)) return nil, err } // blcok untile the image is pulled io.Copy(io.Discard, image) } c, err = CreateDockerContainer(ctx, container.ImageName, projectName, volumes, container.Environment, hosts) if err != nil { return nil, err } c.Name = container.Name var containerIDString string err = containerInsertStmt.QueryRow(c.ContainerID[:], head, deployment.ID).Scan(&c.ID, &containerIDString, &c.Head, &c.DeploymentID) if err != nil { return nil, err } copy(c.ContainerID[:], containerIDString) tx, err := Flux.db.Begin() if err != nil { return nil, err } volumeInsertStmt, err := tx.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id") if err != nil { logger.Errorw("Failed to prepare statement", zap.Error(err)) tx.Rollback() return nil, err } for _, vol := range c.Volumes { err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ContainerID[:]).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) if err != nil { tx.Rollback() return nil, err } } err = tx.Commit() if err != nil { tx.Rollback() return nil, err } c.Deployment = deployment if head { deployment.Head = c } deployment.Containers = append(deployment.Containers, c) return c, nil } func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, projectConfig *pkg.ProjectConfig) (*Container, error) { // Create new container with new image logger.Debugw("Upgrading container", zap.ByteString("container_id", c.ContainerID[:12])) if c.Volumes == nil { return nil, fmt.Errorf("no volumes found for container %s", c.ContainerID[:12]) } var hosts []string for _, container := range c.Deployment.Containers { containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(container.ContainerID[:])) if err != nil { return nil, err } hosts = containerJSON.HostConfig.ExtraHosts } newContainer, err := CreateDockerContainer(ctx, imageName, projectConfig.Name, c.Volumes, projectConfig.Environment, hosts) if err != nil { return nil, err } newContainer.Deployment = c.Deployment var containerIDString string err = containerInsertStmt.QueryRow(newContainer.ContainerID[:], c.Head, c.Deployment.ID).Scan(&newContainer.ID, &containerIDString, &newContainer.Head, &newContainer.DeploymentID) if err != nil { logger.Errorw("Failed to insert container", zap.Error(err)) return nil, err } copy(newContainer.ContainerID[:], containerIDString) tx, err := Flux.db.Begin() if err != nil { logger.Errorw("Failed to begin transaction", zap.Error(err)) return nil, err } volumeUpdateStmt, err := tx.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, mountpoint, container_id") if err != nil { tx.Rollback() return nil, err } for _, vol := range newContainer.Volumes { err = volumeUpdateStmt.QueryRow(newContainer.ContainerID[:], vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) if err != nil { tx.Rollback() logger.Error("Failed to update volume", zap.Error(err)) return nil, err } } err = tx.Commit() if err != nil { tx.Rollback() return nil, err } logger.Debug("Upgraded container") return newContainer, nil } // initial indicates if the container was just created, because if not, we need to fix the extra hsots since it's not guaranteed that the supplemental containers have the same ip // as they had when the deployment was previously on func (c *Container) Start(ctx context.Context, initial bool) error { if !initial && c.Head { containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, string(c.ContainerID[:])) if err != nil { return err } // remove yourself Flux.dockerClient.ContainerRemove(ctx, string(c.ContainerID[:]), container.RemoveOptions{}) var volumes map[string]struct{} = make(map[string]struct{}) var hosts []string var mounts []mount.Mount for _, volume := range c.Volumes { volumes[volume.VolumeID] = struct{}{} mounts = append(mounts, mount.Mount{ Type: mount.TypeVolume, Source: volume.VolumeID, Target: volume.Mountpoint, ReadOnly: false, }) } for _, supplementalContainer := range c.Deployment.Containers { if supplementalContainer.Head { continue } ip, err := supplementalContainer.GetIp() if err != nil { return err } hosts = append(hosts, fmt.Sprintf("%s:%s", supplementalContainer.Name, ip)) } // recreate yourself resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{ Image: containerJSON.Image, Env: containerJSON.Config.Env, Volumes: volumes, }, &container.HostConfig{ RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped}, NetworkMode: "bridge", Mounts: mounts, ExtraHosts: hosts, }, nil, nil, c.Name, ) if err != nil { return err } c.ContainerID = [64]byte([]byte(resp.ID)) Flux.db.Exec("UPDATE containers SET container_id = ? WHERE id = ?", c.ContainerID[:], c.ID) } return Flux.dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{}) } func (c *Container) Stop(ctx context.Context) error { return Flux.dockerClient.ContainerStop(ctx, string(c.ContainerID[:]), container.StopOptions{}) } // Stop and remove a container and all of its volumes func (c *Container) Remove(ctx context.Context) error { err := RemoveDockerContainer(ctx, string(c.ContainerID[:])) if err != nil { return fmt.Errorf("failed to remove container (%s): %v", c.ContainerID[:12], err) } tx, err := Flux.db.Begin() if err != nil { logger.Errorw("Failed to begin transaction", zap.Error(err)) return err } _, err = tx.Exec("DELETE FROM containers WHERE container_id = ?", c.ContainerID[:]) if err != nil { tx.Rollback() return err } for _, volume := range c.Volumes { if err := RemoveVolume(ctx, volume.VolumeID); err != nil { tx.Rollback() return fmt.Errorf("failed to remove volume (%s): %v", volume.VolumeID, err) } _, err = tx.Exec("DELETE FROM volumes WHERE volume_id = ?", volume.VolumeID) if err != nil { tx.Rollback() return err } } if err := tx.Commit(); err != nil { logger.Errorw("Failed to commit transaction", zap.Error(err)) return err } return nil } func (c *Container) Wait(ctx context.Context, port uint16) error { return WaitForDockerContainer(ctx, string(c.ContainerID[:]), port) } type ContainerStatus struct { Status string ExitCode int } func (c *Container) Status(ctx context.Context) (*ContainerStatus, error) { containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, string(c.ContainerID[:])) if err != nil { return nil, err } containerStatus := &ContainerStatus{ Status: containerJSON.State.Status, ExitCode: containerJSON.State.ExitCode, } return containerStatus, nil } func (c *Container) GetIp() (string, error) { containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(c.ContainerID[:])) if err != nil { return "", err } ip := containerJSON.NetworkSettings.IPAddress return ip, nil } // Stops and deletes a container from the docker daemon func RemoveDockerContainer(ctx context.Context, containerID string) error { if err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { return fmt.Errorf("failed to stop container (%s): %v", containerID[:12], err) } if err := Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { return fmt.Errorf("failed to remove container (%s): %v", containerID[:12], err) } return nil } // scuffed af "health check" for docker containers func WaitForDockerContainer(ctx context.Context, containerID string, containerPort uint16) error { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() for { select { case <-ctx.Done(): return fmt.Errorf("container failed to become ready in time") default: containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, containerID) if err != nil { return err } if containerJSON.State.Running { resp, err := http.Get(fmt.Sprintf("http://%s:%d/", containerJSON.NetworkSettings.IPAddress, containerPort)) if err == nil && resp.StatusCode == http.StatusOK { return nil } } time.Sleep(time.Second) } } } func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) error { timeout := 30 err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{ Timeout: &timeout, }) if err != nil { return fmt.Errorf("failed to stop container: %v", err) } ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) defer cancel() for { select { case <-ctx.Done(): return Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) default: containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, containerID) if err != nil { return err } if !containerJSON.State.Running { return Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) } time.Sleep(time.Second) } } } func RemoveVolume(ctx context.Context, volumeID string) error { logger.Debugw("Removed volume", zap.String("volume_id", volumeID)) if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { return fmt.Errorf("failed to remove volume (%s): %v", volumeID, err) } return nil } func findExistingDockerContainers(ctx context.Context, containerPrefix string) (map[string]bool, error) { containers, err := Flux.dockerClient.ContainerList(ctx, container.ListOptions{ All: true, }) if err != nil { return nil, err } var existingContainers map[string]bool = make(map[string]bool) for _, container := range containers { if strings.HasPrefix(container.Names[0], fmt.Sprintf("/%s-", containerPrefix)) { existingContainers[container.ID] = true } } return existingContainers, nil }