Unkey

Deployment Workflow - Pull-Based Architecture

Asynchronous deployment orchestration using event streaming, topology management, and status aggregation across multiple regions

Deployment Workflow - Pull-Based Architecture

Overview

The deployment workflow has been redesigned to work with the pull-based infrastructure model. Instead of the control plane directly pushing to hosts, it now creates deployment topology entries and emits events that are pulled by Krane agents in each region.

Architecture Changes

Previous Push-Based Workflow

sequenceDiagram
    participant User
    participant API
    participant Workflow
    participant DB
    participant Host1
    participant Host2
    
    User->>API: Deploy request
    API->>Workflow: Start deployment
    Workflow->>DB: Store config
    
    par Direct push to hosts
        Workflow->>Host1: Deploy container
        Host1-->>Workflow: Success/Failure
    and
        Workflow->>Host2: Deploy container
        Host2-->>Workflow: Success/Failure
    end
    
    Workflow->>DB: Update status
    Workflow-->>API: Response
    API-->>User: Result

New Pull-Based Workflow

sequenceDiagram
    participant User
    participant API
    participant Workflow
    participant DB
    participant Ctrl as Control Plane
    participant K1 as Krane (Region A)
    participant K2 as Krane (Region B)
    
    User->>API: Deploy request
    API->>Workflow: Start deployment
    
    Workflow->>DB: Create deployment
    Workflow->>DB: Create topology entries
    
    Note over Workflow: Emit events to regions
    Workflow->>Ctrl: EmitDeploymentState(regions)
    
    par Agents pull and apply
        Ctrl-->>K1: Stream event
        K1->>K1: Apply to Kubernetes
        K1->>Ctrl: UpdateInstance
    and
        Ctrl-->>K2: Stream event
        K2->>K2: Apply to Kubernetes
        K2->>Ctrl: UpdateInstance
    end
    
    Ctrl->>DB: Update instance status
    
    loop Poll for completion
        Workflow->>DB: Check deployment status
    end
    
    Workflow-->>API: Success
    API-->>User: Deployment ready

Database Schema

Deployment Topology

The new deployment_topology table tracks regional deployment distribution:

CREATE TABLE deployment_topology (
    workspace_id VARCHAR(256) NOT NULL,
    deployment_id VARCHAR(256) NOT NULL,
    region VARCHAR(256) NOT NULL,
    replicas INT NOT NULL,
    status ENUM('starting','started','stopping','stopped') NOT NULL,
    created_at BIGINT NOT NULL,
    updated_at BIGINT,
    PRIMARY KEY (deployment_id, region)
);

Deployment Status Tracking

stateDiagram-v2
    [*] --> pending: Deployment created
    pending --> building: Docker build started
    building --> deploying: Image ready
    deploying --> network: Topology created
    network --> ready: Instances running
    
    pending --> failed: Error
    building --> failed: Build failed
    deploying --> failed: Deploy failed
    network --> failed: Network failed

Workflow Implementation

Deploy Handler

The deploy handler in go/apps/ctrl/workflows/deploy/deploy_handler.go:

func (w *Workflow) Deploy(ctx restate.ObjectContext, req *hydrav1.DeployRequest) (*hydrav1.DeployResponse, error) {
    // 1. Find deployment and validate
    deployment, err := restate.Run(ctx, func(stepCtx restate.RunContext) (db.Deployment, error) {
        return db.Query.FindDeploymentById(stepCtx, w.db.RW(), req.GetDeploymentId())
    })
    
    // 2. Build Docker image if needed
    if req.GetBuildContextPath() != "" {
        dockerImage = w.buildImage(ctx, req)
    }
    
    // 3. Create deployment topology for target regions
    regions := w.determineTargetRegions(deployment)
    for _, region := range regions {
        err := restate.Run(ctx, func(stepCtx restate.RunContext) error {
            return db.Query.InsertDeploymentTopology(stepCtx, w.db.RW(), db.InsertDeploymentTopologyParams{
                DeploymentID: deployment.ID,
                Region:       region,
                Replicas:     deployment.Replicas,
                Status:       db.DeploymentTopologyStatusStarting,
            })
        })
    }
    
    // 4. Emit deployment events to regions
    w.cluster.EmitDeploymentState(&ctrlv1.DeploymentState{
        Action: ctrlv1.DeploymentState_ACTION_APPLY,
        Deployment: &ctrlv1.Deployment{
            Id:           deployment.ID,
            Image:        dockerImage,
            CpuMillicores: deployment.CpuMillicores,
            MemoryMib:    deployment.MemoryMib,
        },
    }, regions)
    
    // 5. Poll for deployment completion
    return w.pollDeploymentStatus(ctx, deployment.ID)
}

Topology Management

flowchart TB
    Start([Deploy Request])
    Start --> DetermineRegions[Determine Target Regions]
    
    DetermineRegions --> CreateTopology[Create Topology Entries]
    
    CreateTopology --> EmitEvents[Emit to Regions]
    
    EmitEvents --> Region1[Region A Processing]
    EmitEvents --> Region2[Region B Processing]
    
    Region1 --> Update1[Update Instances]
    Region2 --> Update2[Update Instances]
    
    Update1 --> CheckStatus
    Update2 --> CheckStatus
    
    CheckStatus{All Regions Ready?}
    CheckStatus -->|No| Poll[Wait & Retry]
    CheckStatus -->|Yes| Complete[Mark Deployment Ready]
    
    Poll --> CheckStatus
    Complete --> End([Success])

Status Polling

The workflow polls for deployment completion:

func (w *Workflow) pollDeploymentStatus(ctx restate.ObjectContext, deploymentID string) (*hydrav1.DeployResponse, error) {
    maxAttempts := 300 // 5 minutes with 1s intervals
    
    for attempt := 0; attempt < maxAttempts; attempt++ {
        status, err := restate.Run(ctx, func(stepCtx restate.RunContext) (DeploymentStatus, error) {
            // Check topology status
            topology, err := db.Query.ListDeploymentTopology(stepCtx, w.db.RO(), deploymentID)
            if err != nil {
                return DeploymentStatus{}, err
            }
            
            // Check instance status
            instances, err := db.Query.FindInstancesByDeploymentId(stepCtx, w.db.RO(), deploymentID)
            if err != nil {
                return DeploymentStatus{}, err
            }
            
            return w.calculateDeploymentStatus(topology, instances)
        })
        
        switch status.State {
        case "ready":
            return &hydrav1.DeployResponse{Success: true}, nil
        case "failed":
            return nil, fmt.Errorf("deployment failed: %s", status.Message)
        case "pending", "deploying":
            // Continue polling
            restate.Sleep(ctx, time.Second)
        }
    }
    
    return nil, fmt.Errorf("deployment timeout after %d attempts", maxAttempts)
}

Event Flow

Deployment Event Structure

message DeploymentState {
    enum Action {
        ACTION_UNSPECIFIED = 0;
        ACTION_APPLY = 1;
        ACTION_DELETE = 2;
    }
    
    Action action = 1;
    Deployment deployment = 2;
}
 
message Deployment {
    string id = 1;
    string workspace_id = 2;
    string project_id = 3;
    string environment_id = 4;
    string image = 5;
    int32 cpu_millicores = 6;
    int32 memory_mib = 7;
    int32 replicas = 8;
    map<string, string> env_vars = 9;
}

Event Emission

sequenceDiagram
    participant W as Workflow
    participant CS as ClusterService
    participant AR as Agent Registry
    participant K1 as Krane-1 (us-east)
    participant K2 as Krane-2 (us-west)
    participant K3 as Krane-3 (eu-west)
    
    W->>CS: EmitDeploymentState(regions=[us-east, us-west])
    
    CS->>AR: Find matching agents
    AR-->>CS: [Krane-1, Krane-2]
    
    par Send to matching agents
        CS-->>K1: DeploymentState
    and
        CS-->>K2: DeploymentState
    end
    
    Note over K3: No event (region not matched)

Instance Lifecycle

Instance Creation Flow

stateDiagram-v2
    [*] --> Pending: Krane receives event
    Pending --> Creating: Apply to Kubernetes
    Creating --> Pending: Pod scheduled
    Pending --> Running: Container started
    Running --> [*]: Healthy
    
    Creating --> Failed: Resource limits
    Pending --> Failed: Image pull error
    Running --> Failed: Container crash

Instance Status Updates

func (k *KraneAgent) handlePodEvent(pod *v1.Pod) {
    status := k.podStatusToInstanceStatus(pod.Status)
    
    update := &ctrlv1.UpdateInstanceRequest{
        Change: &ctrlv1.UpdateInstanceRequest_Create{
            Create: &ctrlv1.UpdateInstanceRequest_Create{
                DeploymentId:  pod.Labels["deployment-id"],
                PodName:       pod.Name,
                Address:       pod.Status.PodIP,
                CpuMillicores: pod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue(),
                MemoryMib:     pod.Spec.Containers[0].Resources.Requests.Memory().Value() / (1024 * 1024),
                Status:        status,
            },
        },
    }
    
    k.syncEngine.InstanceUpdateBuffer.Put(update)
}

Rollback Mechanism

Version Tracking

Each deployment maintains version history:

graph LR
    D1[v1.0.0<br/>Image: app:1.0] --> D2[v1.1.0<br/>Image: app:1.1]
    D2 --> D3[v1.2.0<br/>Image: app:1.2]
    D3 -.->|Rollback| D2

Rollback Workflow

func (w *Workflow) Rollback(ctx restate.ObjectContext, req *hydrav1.RollbackRequest) error {
    // 1. Find target version
    targetDeployment, err := db.Query.FindDeploymentByVersion(ctx, req.Version)
    
    // 2. Create new deployment with old config
    newDeployment := targetDeployment
    newDeployment.ID = uid.New("dep")
    newDeployment.CreatedAt = time.Now()
    
    // 3. Trigger deployment with old image
    return w.Deploy(ctx, &hydrav1.DeployRequest{
        DeploymentId: newDeployment.ID,
    })
}

Monitoring and Observability

Key Metrics

# Workflow metrics
deployment_workflow_duration_seconds: histogram
deployment_workflow_status: counter
deployment_build_duration_seconds: histogram
deployment_polling_attempts: histogram
 
# Topology metrics
deployment_topology_regions: gauge
deployment_topology_status: gauge
 
# Instance metrics
deployment_instances_total: gauge
deployment_instances_by_status: gauge
deployment_instance_startup_time_seconds: histogram

Deployment Tracing

graph TB
    subgraph "Trace Span Tree"
        Root[deployment.workflow]
        Build[deployment.build]
        Topology[deployment.topology.create]
        Emit[deployment.events.emit]
        Poll[deployment.status.poll]
        
        Root --> Build
        Root --> Topology
        Root --> Emit
        Root --> Poll
        
        subgraph "Per Region"
            Apply1[deployment.apply.us-east]
            Apply2[deployment.apply.us-west]
        end
        
        Emit --> Apply1
        Emit --> Apply2
    end

Error Handling

Failure Scenarios

flowchart TB
    Start([Deployment Start])
    
    Start --> Build{Build Success?}
    Build -->|No| BuildFail[Mark Failed<br/>Cleanup]
    Build -->|Yes| Topology
    
    Topology --> Emit{Events Sent?}
    Emit -->|No| EmitFail[Retry Emission]
    Emit -->|Yes| Poll
    
    EmitFail --> EmitRetry{Retry Success?}
    EmitRetry -->|No| MarkFailed[Mark Deployment Failed]
    EmitRetry -->|Yes| Poll
    
    Poll --> Check{Instances Ready?}
    Check -->|Timeout| TimeoutFail[Mark Failed<br/>Trigger Cleanup]
    Check -->|Failed| InstanceFail[Mark Failed<br/>Cleanup Instances]
    Check -->|Success| Complete[Mark Ready]
    
    BuildFail --> End([Failed])
    MarkFailed --> End
    TimeoutFail --> End
    InstanceFail --> End
    Complete --> Success([Success])

Cleanup on Failure

func (w *Workflow) cleanupFailedDeployment(ctx context.Context, deploymentID string) error {
    // 1. Emit delete events to all regions
    topology, err := db.Query.ListDeploymentTopology(ctx, w.db.RO(), deploymentID)
    if err != nil {
        return err
    }
    
    for _, entry := range topology {
        w.cluster.EmitDeploymentState(&ctrlv1.DeploymentState{
            Action: ctrlv1.DeploymentState_ACTION_DELETE,
            Deployment: &ctrlv1.Deployment{
                Id: deploymentID,
            },
        }, []string{entry.Region})
    }
    
    // 2. Mark topology as stopped
    for _, entry := range topology {
        db.Query.UpdateDeploymentTopologyStatus(ctx, w.db.RW(), 
            deploymentID, entry.Region, db.DeploymentTopologyStatusStopped)
    }
    
    // 3. Delete instance records
    return db.Query.DeleteInstancesByDeploymentId(ctx, w.db.RW(), deploymentID)
}

Configuration

Deployment Parameters

# Default deployment configuration
deployment:
  defaultReplicas: 2
  defaultCpuMillicores: 500
  defaultMemoryMib: 512
  
  polling:
    interval: 1s
    maxAttempts: 300  # 5 minutes
  
  regions:
    - us-east-1
    - us-west-2
    - eu-west-1

Feature Flags

type DeploymentFeatures struct {
    MultiRegion         bool
    AutoScaling         bool
    BlueGreenDeployment bool
    CanaryRollout       bool
}
 
func (w *Workflow) getFeatures(workspaceID string) DeploymentFeatures {
    // Check workspace tier and enabled features
    return DeploymentFeatures{
        MultiRegion: w.isFeatureEnabled(workspaceID, "multi_region"),
        // ...
    }
}

Future Enhancements

Progressive Rollout

graph LR
    subgraph "Canary Deployment"
        V1[v1.0<br/>90% traffic]
        V2[v2.0<br/>10% traffic]
    end
    
    V1 --> Monitor{Metrics OK?}
    V2 --> Monitor
    
    Monitor -->|Yes| Promote[Increase v2.0 traffic]
    Monitor -->|No| Rollback[Route all to v1.0]

Multi-Version Support

Support for running multiple versions simultaneously:

type DeploymentStrategy struct {
    Type string // "rolling", "blue-green", "canary"
    Config map[string]interface{}
}
 
func (w *Workflow) deployWithStrategy(
    ctx context.Context, 
    deployment *Deployment,
    strategy DeploymentStrategy,
) error {
    switch strategy.Type {
    case "canary":
        return w.deployCanary(ctx, deployment, strategy.Config)
    case "blue-green":
        return w.deployBlueGreen(ctx, deployment, strategy.Config)
    default:
        return w.deployRolling(ctx, deployment)
    }
}