Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions gen/proto/ctrl/v1/cluster.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 25 additions & 5 deletions gen/proto/ctrl/v1/deployment.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions gen/proto/hydra/v1/deploy_restate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions svc/ctrl/proto/ctrl/v1/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ message ApplyDeployment {
// healthcheck is a JSON-encoded Healthcheck struct for configuring liveness/readiness probes.
// If empty, no probes are configured.
optional bytes healthcheck = 17;

// app_id identifies the app within the project.
// Used for pod labels so Vector can extract it into runtime logs.
string app_id = 18;
}

// DeleteDeployment identifies a deployment to remove from the cluster.
Expand Down
4 changes: 4 additions & 0 deletions svc/ctrl/proto/ctrl/v1/deployment.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ message CreateDeploymentRequest {
// Container command override (e.g., ["./app", "serve"])
// If not specified, the container's default entrypoint/cmd is used
repeated string command = 9;

// App slug within the project. Defaults to "default" if empty.
string app_slug = 10;
}

message GitCommitInfo {
Expand Down Expand Up @@ -68,6 +71,7 @@ message Deployment {
string workspace_id = 2;
string project_id = 3;
string environment_id = 4;
string app_id = 21;

// Source information
string git_commit_sha = 5;
Expand Down
3 changes: 2 additions & 1 deletion svc/ctrl/proto/hydra/v1/deploy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ option go_package = "github.com/unkeyed/unkey/gen/proto/hydra/v1;hydrav1";

// DeployService orchestrates the lifecycle of application deployments as
// durable Restate workflows. Each RPC is idempotent and can safely resume from
// any step after a crash.
// any step after a crash. Workflows are keyed by app_id to allow concurrent
// deployments across different apps within the same project.
//
// Deploy handles the full pipeline from building Docker images through
// provisioning containers and configuring domain routing. Rollback and Promote
Expand Down
1 change: 1 addition & 0 deletions svc/ctrl/services/cluster/rpc_watch_deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (s *Service) deploymentRowToState(row db.ListDeploymentTopologyByRegionRow)
WorkspaceId: row.Deployment.WorkspaceID,
ProjectId: row.Deployment.ProjectID,
EnvironmentId: row.Deployment.EnvironmentID,
AppId: row.Deployment.AppID,
Replicas: row.DeploymentTopology.DesiredReplicas,
Image: row.Deployment.Image.String,
CpuMillicores: int64(row.Deployment.CpuMillicores),
Expand Down
89 changes: 56 additions & 33 deletions svc/ctrl/services/deployment/create_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ const (
// CreateDeployment creates a new deployment record and initiates an async Restate
// workflow. The deployment source must be a prebuilt Docker image.
//
// The method looks up the project to infer the workspace, validates the
// environment exists, fetches environment variables, and persists the deployment
// with status "pending" before triggering the workflow. Git commit metadata is
// optional but validated when provided: timestamps must be Unix epoch milliseconds
// and cannot be more than one hour in the future.
// The method looks up the project and app to infer the workspace, validates the
// environment exists, fetches app-scoped environment variables with template
// resolution, and persists the deployment with status "pending" before triggering
// the workflow. Git commit metadata is optional but validated when provided:
// timestamps must be Unix epoch milliseconds and cannot be more than one hour
// in the future.
//
// The workflow runs asynchronously keyed by project ID, so only one deployment
// per project executes at a time. Returns the deployment ID and initial status.
// The workflow runs asynchronously keyed by app ID, so only one deployment
// per app executes at a time. Returns the deployment ID and initial status.
func (s *Service) CreateDeployment(
ctx context.Context,
req *connect.Request[ctrlv1.CreateDeploymentRequest],
Expand All @@ -62,19 +63,36 @@ func (s *Service) CreateDeployment(
}
workspaceID := project.WorkspaceID

// Look up default app
appRow, err := db.Query.FindAppByProjectAndSlug(ctx, s.db.RO(), db.FindAppByProjectAndSlugParams{
ProjectID: project.ID,
Slug: "default",
// Default app_slug to "default" for backwards compatibility
appSlug := req.Msg.GetAppSlug()
if appSlug == "" {
appSlug = "default"
}

// Lookup app with build and runtime settings for this environment
appWithSettings, err := db.Query.FindAppWithSettings(ctx, s.db.RO(), db.FindAppWithSettingsParams{
ProjectID: project.ID,
Slug: appSlug,
EnvironmentID: req.Msg.GetEnvironmentSlug(), // will be resolved below
})
// If app not found, fall back to looking up app and environment separately
// This handles the case where settings haven't been created yet
if err != nil && db.IsNotFound(err) {
return nil, connect.NewError(connect.CodeNotFound,
fmt.Errorf("app '%s' not found in project '%s' or missing settings for environment '%s'",
appSlug, req.Msg.GetProjectId(), req.Msg.GetEnvironmentSlug()))
}
if err != nil {
if db.IsNotFound(err) {
return nil, connect.NewError(connect.CodeNotFound,
fmt.Errorf("default app not found for project: %s", req.Msg.GetProjectId()))
}
return nil, connect.NewError(connect.CodeInternal, err)
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("failed to lookup app: %w", err))
}

app := appWithSettings.App
appBuildSettings := appWithSettings.AppBuildSetting
appRuntimeSettings := appWithSettings.AppRuntimeSetting
_ = appBuildSettings // build settings used by the workflow, not here

// Verify the environment exists
envSettings, err := db.Query.FindEnvironmentWithSettingsByProjectIdAndSlug(ctx, s.db.RO(), db.FindEnvironmentWithSettingsByProjectIdAndSlugParams{
WorkspaceID: workspaceID,
ProjectID: project.ID,
Expand All @@ -91,19 +109,22 @@ func (s *Service) CreateDeployment(
}
env := envSettings.Environment

// Fetch environment variables and build secrets blob
envVars, err := db.Query.FindEnvironmentVariablesByEnvironmentId(ctx, s.db.RO(), env.ID)
// Fetch app-scoped environment variables
appEnvVars, err := db.Query.FindAppEnvVarsByAppAndEnv(ctx, s.db.RO(), db.FindAppEnvVarsByAppAndEnvParams{
AppID: app.ID,
EnvironmentID: env.ID,
})
if err != nil {
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("failed to fetch environment variables: %w", err))
fmt.Errorf("failed to fetch app environment variables: %w", err))
}

secretsBlob := []byte{}
if len(envVars) > 0 {
if len(appEnvVars) > 0 {
secretsConfig := &ctrlv1.SecretsConfig{
Secrets: make(map[string]string, len(envVars)),
Secrets: make(map[string]string, len(appEnvVars)),
}
for _, ev := range envVars {
for _, ev := range appEnvVars {
secretsConfig.Secrets[ev.Key] = ev.Value
}

Expand Down Expand Up @@ -161,20 +182,21 @@ func (s *Service) CreateDeployment(

logger.Info("deployment will use prebuilt image",
"deployment_id", deploymentID,
"app_id", app.ID,
"image", dockerImage)

// Insert deployment into database, snapshotting settings from environment
// Insert deployment into database, snapshotting settings from app
err = db.Query.InsertDeployment(ctx, s.db.RW(), db.InsertDeploymentParams{
ID: deploymentID,
K8sName: uid.DNS1035(12),
WorkspaceID: workspaceID,
ProjectID: req.Msg.GetProjectId(),
AppID: appRow.App.ID,
AppID: app.ID,
EnvironmentID: env.ID,
OpenapiSpec: sql.NullString{String: "", Valid: false},
SentinelConfig: envSettings.EnvironmentRuntimeSetting.SentinelConfig,
SentinelConfig: appRuntimeSettings.SentinelConfig,
EncryptedEnvironmentVariables: secretsBlob,
Command: envSettings.EnvironmentRuntimeSetting.Command,
Command: appRuntimeSettings.Command,
Status: db.DeploymentsStatusPending,
CreatedAt: now,
UpdatedAt: sql.NullInt64{Valid: false, Int64: 0},
Expand All @@ -184,11 +206,11 @@ func (s *Service) CreateDeployment(
GitCommitAuthorHandle: sql.NullString{String: gitCommitAuthorHandle, Valid: gitCommitAuthorHandle != ""},
GitCommitAuthorAvatarUrl: sql.NullString{String: gitCommitAuthorAvatarURL, Valid: gitCommitAuthorAvatarURL != ""},
GitCommitTimestamp: sql.NullInt64{Int64: gitCommitTimestamp, Valid: gitCommitTimestamp != 0},
CpuMillicores: envSettings.EnvironmentRuntimeSetting.CpuMillicores,
MemoryMib: envSettings.EnvironmentRuntimeSetting.MemoryMib,
Port: envSettings.EnvironmentRuntimeSetting.Port,
ShutdownSignal: db.DeploymentsShutdownSignal(envSettings.EnvironmentRuntimeSetting.ShutdownSignal),
Healthcheck: envSettings.EnvironmentRuntimeSetting.Healthcheck,
CpuMillicores: appRuntimeSettings.CpuMillicores,
MemoryMib: appRuntimeSettings.MemoryMib,
Port: appRuntimeSettings.Port,
ShutdownSignal: db.DeploymentsShutdownSignal(appRuntimeSettings.ShutdownSignal),
Healthcheck: appRuntimeSettings.Healthcheck,
})
if err != nil {
logger.Error("failed to insert deployment", "error", err.Error())
Expand All @@ -199,6 +221,7 @@ func (s *Service) CreateDeployment(
"deployment_id", deploymentID,
"workspace_id", workspaceID,
"project_id", req.Msg.GetProjectId(),
"app_id", app.ID,
"environment", env.ID,
"docker_image", dockerImage,
)
Expand All @@ -221,8 +244,8 @@ func (s *Service) CreateDeployment(
},
}

// Send deployment request asynchronously (fire-and-forget)
invocation, err := s.deploymentClient(project.ID).
// Send deployment request asynchronously (fire-and-forget), keyed by app ID
invocation, err := s.deploymentClient(app.ID).
Deploy().
Send(ctx, deployReq)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions svc/ctrl/services/deployment/promote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
// This is typically used after a rollback to restore the original deployment, or
// to switch traffic to a new deployment that was previously in a preview state.
// The workflow runs synchronously (blocking until complete) and is keyed by
// project ID to prevent concurrent promotion operations on the same project.
// app ID to prevent concurrent promotion operations on the same app.
func (s *Service) Promote(ctx context.Context, req *connect.Request[ctrlv1.PromoteRequest]) (*connect.Response[ctrlv1.PromoteResponse], error) {
logger.Info("initiating promotion via Restate",
"target", req.Msg.GetTargetDeploymentId(),
)

// Get target deployment to determine project ID for keying
// Get target deployment to determine app ID for keying
targetDeployment, err := db.Query.FindDeploymentById(ctx, s.db.RO(), req.Msg.GetTargetDeploymentId())
if err != nil {
if db.IsNotFound(err) {
Expand All @@ -34,9 +34,9 @@ func (s *Service) Promote(ctx context.Context, req *connect.Request[ctrlv1.Promo
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get deployment: %w", err))
}

// Call the Restate workflow using project ID as the key
// This ensures only one operation per project can run at a time
_, err = s.deploymentClient(targetDeployment.ProjectID).
// Call the Restate workflow using app ID as the key
// This ensures only one operation per app can run at a time
_, err = s.deploymentClient(targetDeployment.AppID).
Promote().
Request(ctx, &hydrav1.PromoteRequest{
TargetDeploymentId: req.Msg.GetTargetDeploymentId(),
Expand Down
Loading