Deployment Workflow - Pull-Based Architecture
Asynchronous deployment orchestration using event streaming, topology management, and status aggregation across multiple regions
Deployment Workflow - Pull-Based Architecture
Overview
The deployment workflow has been redesigned to work with the pull-based infrastructure model. Instead of the control plane directly pushing to hosts, it now creates deployment topology entries and emits events that are pulled by Krane agents in each region.
Architecture Changes
Previous Push-Based Workflow
sequenceDiagram
participant User
participant API
participant Workflow
participant DB
participant Host1
participant Host2
User->>API: Deploy request
API->>Workflow: Start deployment
Workflow->>DB: Store config
par Direct push to hosts
Workflow->>Host1: Deploy container
Host1-->>Workflow: Success/Failure
and
Workflow->>Host2: Deploy container
Host2-->>Workflow: Success/Failure
end
Workflow->>DB: Update status
Workflow-->>API: Response
API-->>User: ResultNew Pull-Based Workflow
sequenceDiagram
participant User
participant API
participant Workflow
participant DB
participant Ctrl as Control Plane
participant K1 as Krane (Region A)
participant K2 as Krane (Region B)
User->>API: Deploy request
API->>Workflow: Start deployment
Workflow->>DB: Create deployment
Workflow->>DB: Create topology entries
Note over Workflow: Emit events to regions
Workflow->>Ctrl: EmitDeploymentState(regions)
par Agents pull and apply
Ctrl-->>K1: Stream event
K1->>K1: Apply to Kubernetes
K1->>Ctrl: UpdateInstance
and
Ctrl-->>K2: Stream event
K2->>K2: Apply to Kubernetes
K2->>Ctrl: UpdateInstance
end
Ctrl->>DB: Update instance status
loop Poll for completion
Workflow->>DB: Check deployment status
end
Workflow-->>API: Success
API-->>User: Deployment readyDatabase Schema
Deployment Topology
The new deployment_topology table tracks regional deployment distribution:
CREATE TABLE deployment_topology (
workspace_id VARCHAR(256) NOT NULL,
deployment_id VARCHAR(256) NOT NULL,
region VARCHAR(256) NOT NULL,
replicas INT NOT NULL,
status ENUM('starting','started','stopping','stopped') NOT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT,
PRIMARY KEY (deployment_id, region)
);Deployment Status Tracking
stateDiagram-v2
[*] --> pending: Deployment created
pending --> building: Docker build started
building --> deploying: Image ready
deploying --> network: Topology created
network --> ready: Instances running
pending --> failed: Error
building --> failed: Build failed
deploying --> failed: Deploy failed
network --> failed: Network failedWorkflow Implementation
Deploy Handler
The deploy handler in go/apps/ctrl/workflows/deploy/deploy_handler.go:
func (w *Workflow) Deploy(ctx restate.ObjectContext, req *hydrav1.DeployRequest) (*hydrav1.DeployResponse, error) {
// 1. Find deployment and validate
deployment, err := restate.Run(ctx, func(stepCtx restate.RunContext) (db.Deployment, error) {
return db.Query.FindDeploymentById(stepCtx, w.db.RW(), req.GetDeploymentId())
})
// 2. Build Docker image if needed
if req.GetBuildContextPath() != "" {
dockerImage = w.buildImage(ctx, req)
}
// 3. Create deployment topology for target regions
regions := w.determineTargetRegions(deployment)
for _, region := range regions {
err := restate.Run(ctx, func(stepCtx restate.RunContext) error {
return db.Query.InsertDeploymentTopology(stepCtx, w.db.RW(), db.InsertDeploymentTopologyParams{
DeploymentID: deployment.ID,
Region: region,
Replicas: deployment.Replicas,
Status: db.DeploymentTopologyStatusStarting,
})
})
}
// 4. Emit deployment events to regions
w.cluster.EmitDeploymentState(&ctrlv1.DeploymentState{
Action: ctrlv1.DeploymentState_ACTION_APPLY,
Deployment: &ctrlv1.Deployment{
Id: deployment.ID,
Image: dockerImage,
CpuMillicores: deployment.CpuMillicores,
MemoryMib: deployment.MemoryMib,
},
}, regions)
// 5. Poll for deployment completion
return w.pollDeploymentStatus(ctx, deployment.ID)
}Topology Management
flowchart TB
Start([Deploy Request])
Start --> DetermineRegions[Determine Target Regions]
DetermineRegions --> CreateTopology[Create Topology Entries]
CreateTopology --> EmitEvents[Emit to Regions]
EmitEvents --> Region1[Region A Processing]
EmitEvents --> Region2[Region B Processing]
Region1 --> Update1[Update Instances]
Region2 --> Update2[Update Instances]
Update1 --> CheckStatus
Update2 --> CheckStatus
CheckStatus{All Regions Ready?}
CheckStatus -->|No| Poll[Wait & Retry]
CheckStatus -->|Yes| Complete[Mark Deployment Ready]
Poll --> CheckStatus
Complete --> End([Success])Status Polling
The workflow polls for deployment completion:
func (w *Workflow) pollDeploymentStatus(ctx restate.ObjectContext, deploymentID string) (*hydrav1.DeployResponse, error) {
maxAttempts := 300 // 5 minutes with 1s intervals
for attempt := 0; attempt < maxAttempts; attempt++ {
status, err := restate.Run(ctx, func(stepCtx restate.RunContext) (DeploymentStatus, error) {
// Check topology status
topology, err := db.Query.ListDeploymentTopology(stepCtx, w.db.RO(), deploymentID)
if err != nil {
return DeploymentStatus{}, err
}
// Check instance status
instances, err := db.Query.FindInstancesByDeploymentId(stepCtx, w.db.RO(), deploymentID)
if err != nil {
return DeploymentStatus{}, err
}
return w.calculateDeploymentStatus(topology, instances)
})
switch status.State {
case "ready":
return &hydrav1.DeployResponse{Success: true}, nil
case "failed":
return nil, fmt.Errorf("deployment failed: %s", status.Message)
case "pending", "deploying":
// Continue polling
restate.Sleep(ctx, time.Second)
}
}
return nil, fmt.Errorf("deployment timeout after %d attempts", maxAttempts)
}Event Flow
Deployment Event Structure
message DeploymentState {
enum Action {
ACTION_UNSPECIFIED = 0;
ACTION_APPLY = 1;
ACTION_DELETE = 2;
}
Action action = 1;
Deployment deployment = 2;
}
message Deployment {
string id = 1;
string workspace_id = 2;
string project_id = 3;
string environment_id = 4;
string image = 5;
int32 cpu_millicores = 6;
int32 memory_mib = 7;
int32 replicas = 8;
map<string, string> env_vars = 9;
}Event Emission
sequenceDiagram
participant W as Workflow
participant CS as ClusterService
participant AR as Agent Registry
participant K1 as Krane-1 (us-east)
participant K2 as Krane-2 (us-west)
participant K3 as Krane-3 (eu-west)
W->>CS: EmitDeploymentState(regions=[us-east, us-west])
CS->>AR: Find matching agents
AR-->>CS: [Krane-1, Krane-2]
par Send to matching agents
CS-->>K1: DeploymentState
and
CS-->>K2: DeploymentState
end
Note over K3: No event (region not matched)Instance Lifecycle
Instance Creation Flow
stateDiagram-v2
[*] --> Pending: Krane receives event
Pending --> Creating: Apply to Kubernetes
Creating --> Pending: Pod scheduled
Pending --> Running: Container started
Running --> [*]: Healthy
Creating --> Failed: Resource limits
Pending --> Failed: Image pull error
Running --> Failed: Container crashInstance Status Updates
func (k *KraneAgent) handlePodEvent(pod *v1.Pod) {
status := k.podStatusToInstanceStatus(pod.Status)
update := &ctrlv1.UpdateInstanceRequest{
Change: &ctrlv1.UpdateInstanceRequest_Create{
Create: &ctrlv1.UpdateInstanceRequest_Create{
DeploymentId: pod.Labels["deployment-id"],
PodName: pod.Name,
Address: pod.Status.PodIP,
CpuMillicores: pod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue(),
MemoryMib: pod.Spec.Containers[0].Resources.Requests.Memory().Value() / (1024 * 1024),
Status: status,
},
},
}
k.syncEngine.InstanceUpdateBuffer.Put(update)
}Rollback Mechanism
Version Tracking
Each deployment maintains version history:
graph LR
D1[v1.0.0<br/>Image: app:1.0] --> D2[v1.1.0<br/>Image: app:1.1]
D2 --> D3[v1.2.0<br/>Image: app:1.2]
D3 -.->|Rollback| D2Rollback Workflow
func (w *Workflow) Rollback(ctx restate.ObjectContext, req *hydrav1.RollbackRequest) error {
// 1. Find target version
targetDeployment, err := db.Query.FindDeploymentByVersion(ctx, req.Version)
// 2. Create new deployment with old config
newDeployment := targetDeployment
newDeployment.ID = uid.New("dep")
newDeployment.CreatedAt = time.Now()
// 3. Trigger deployment with old image
return w.Deploy(ctx, &hydrav1.DeployRequest{
DeploymentId: newDeployment.ID,
})
}Monitoring and Observability
Key Metrics
# Workflow metrics
deployment_workflow_duration_seconds: histogram
deployment_workflow_status: counter
deployment_build_duration_seconds: histogram
deployment_polling_attempts: histogram
# Topology metrics
deployment_topology_regions: gauge
deployment_topology_status: gauge
# Instance metrics
deployment_instances_total: gauge
deployment_instances_by_status: gauge
deployment_instance_startup_time_seconds: histogramDeployment Tracing
graph TB
subgraph "Trace Span Tree"
Root[deployment.workflow]
Build[deployment.build]
Topology[deployment.topology.create]
Emit[deployment.events.emit]
Poll[deployment.status.poll]
Root --> Build
Root --> Topology
Root --> Emit
Root --> Poll
subgraph "Per Region"
Apply1[deployment.apply.us-east]
Apply2[deployment.apply.us-west]
end
Emit --> Apply1
Emit --> Apply2
endError Handling
Failure Scenarios
flowchart TB
Start([Deployment Start])
Start --> Build{Build Success?}
Build -->|No| BuildFail[Mark Failed<br/>Cleanup]
Build -->|Yes| Topology
Topology --> Emit{Events Sent?}
Emit -->|No| EmitFail[Retry Emission]
Emit -->|Yes| Poll
EmitFail --> EmitRetry{Retry Success?}
EmitRetry -->|No| MarkFailed[Mark Deployment Failed]
EmitRetry -->|Yes| Poll
Poll --> Check{Instances Ready?}
Check -->|Timeout| TimeoutFail[Mark Failed<br/>Trigger Cleanup]
Check -->|Failed| InstanceFail[Mark Failed<br/>Cleanup Instances]
Check -->|Success| Complete[Mark Ready]
BuildFail --> End([Failed])
MarkFailed --> End
TimeoutFail --> End
InstanceFail --> End
Complete --> Success([Success])Cleanup on Failure
func (w *Workflow) cleanupFailedDeployment(ctx context.Context, deploymentID string) error {
// 1. Emit delete events to all regions
topology, err := db.Query.ListDeploymentTopology(ctx, w.db.RO(), deploymentID)
if err != nil {
return err
}
for _, entry := range topology {
w.cluster.EmitDeploymentState(&ctrlv1.DeploymentState{
Action: ctrlv1.DeploymentState_ACTION_DELETE,
Deployment: &ctrlv1.Deployment{
Id: deploymentID,
},
}, []string{entry.Region})
}
// 2. Mark topology as stopped
for _, entry := range topology {
db.Query.UpdateDeploymentTopologyStatus(ctx, w.db.RW(),
deploymentID, entry.Region, db.DeploymentTopologyStatusStopped)
}
// 3. Delete instance records
return db.Query.DeleteInstancesByDeploymentId(ctx, w.db.RW(), deploymentID)
}Configuration
Deployment Parameters
# Default deployment configuration
deployment:
defaultReplicas: 2
defaultCpuMillicores: 500
defaultMemoryMib: 512
polling:
interval: 1s
maxAttempts: 300 # 5 minutes
regions:
- us-east-1
- us-west-2
- eu-west-1Feature Flags
type DeploymentFeatures struct {
MultiRegion bool
AutoScaling bool
BlueGreenDeployment bool
CanaryRollout bool
}
func (w *Workflow) getFeatures(workspaceID string) DeploymentFeatures {
// Check workspace tier and enabled features
return DeploymentFeatures{
MultiRegion: w.isFeatureEnabled(workspaceID, "multi_region"),
// ...
}
}Future Enhancements
Progressive Rollout
graph LR
subgraph "Canary Deployment"
V1[v1.0<br/>90% traffic]
V2[v2.0<br/>10% traffic]
end
V1 --> Monitor{Metrics OK?}
V2 --> Monitor
Monitor -->|Yes| Promote[Increase v2.0 traffic]
Monitor -->|No| Rollback[Route all to v1.0]Multi-Version Support
Support for running multiple versions simultaneously:
type DeploymentStrategy struct {
Type string // "rolling", "blue-green", "canary"
Config map[string]interface{}
}
func (w *Workflow) deployWithStrategy(
ctx context.Context,
deployment *Deployment,
strategy DeploymentStrategy,
) error {
switch strategy.Type {
case "canary":
return w.deployCanary(ctx, deployment, strategy.Config)
case "blue-green":
return w.deployBlueGreen(ctx, deployment, strategy.Config)
default:
return w.deployRolling(ctx, deployment)
}
}