Skip to content

Commit 9c9a9ee

Browse files
fix: reap unused fly apps from past deployments (#774)
This change implements a new temporal workflow that will reap unused fly.io machines. It is triggered as a child workflow every time a deployment is processed (the parent workflow). The child workflow is invoked with a given project id and it will reap all fly machines provisioned in old deployments leaving only machines in the 3 most recent deployments. The goal is to keep fly.io machines, a capped resource, in check as more users play with Gram Functions.
1 parent 51baba0 commit 9c9a9ee

File tree

13 files changed

+615
-1
lines changed

13 files changed

+615
-1
lines changed

server/database/sqlc.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ sql:
329329
- schema: schema.sql
330330
queries: ../internal/functions/queries.sql
331331
engine: postgresql
332+
database:
333+
managed: true
332334
gen:
333335
go:
334336
package: "repo"

server/internal/background/activities.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Activities struct {
3434
processDeployment *activities.ProcessDeployment
3535
provisionFunctionsAccess *activities.ProvisionFunctionsAccess
3636
deployFunctionRunners *activities.DeployFunctionRunners
37+
reapFlyApps *activities.ReapFlyApps
3738
refreshBillingUsage *activities.RefreshBillingUsage
3839
refreshOpenRouterKey *activities.RefreshOpenRouterKey
3940
slackChatCompletion *activities.SlackChatCompletion
@@ -72,6 +73,7 @@ func NewActivities(
7273
processDeployment: activities.NewProcessDeployment(logger, tracerProvider, meterProvider, db, features, assetStorage, billingRepo),
7374
provisionFunctionsAccess: activities.NewProvisionFunctionsAccess(logger, db, encryption),
7475
deployFunctionRunners: activities.NewDeployFunctionRunners(logger, db, functionsDeployer, functionsVersion, encryption),
76+
reapFlyApps: activities.NewReapFlyApps(logger, meterProvider, db, functionsDeployer, 3),
7577
refreshBillingUsage: activities.NewRefreshBillingUsage(logger, db, billingRepo),
7678
refreshOpenRouterKey: activities.NewRefreshOpenRouterKey(logger, db, openrouter),
7779
slackChatCompletion: activities.NewSlackChatCompletionActivity(logger, slackClient, chatClient),
@@ -144,3 +146,7 @@ func (a *Activities) DeployFunctionRunners(ctx context.Context, req activities.D
144146
func (a *Activities) ValidateDeployment(ctx context.Context, projectID uuid.UUID, deploymentID uuid.UUID) error {
145147
return a.validateDeployment.Do(ctx, projectID, deploymentID)
146148
}
149+
150+
func (a *Activities) ReapFlyApps(ctx context.Context, req activities.ReapFlyAppsRequest) (*activities.ReapFlyAppsResult, error) {
151+
return a.reapFlyApps.Do(ctx, req)
152+
}

server/internal/background/activities/metrics.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
meterFunctionsToolsSkipped = "functions.tools.skipped"
2727
meterFunctionsToolsCounter = "functions.tools.count"
2828
meterFunctionsProcessedDuration = "functions.processed.duration"
29+
30+
meterFlyAppReaperReapCount = "flyapp_reaper.reap.count"
2931
)
3032

3133
type metrics struct {
@@ -40,6 +42,8 @@ type metrics struct {
4042
functionsToolsSkipped metric.Int64Counter
4143
functionsToolsCounter metric.Int64Counter
4244
functionsProcessedDuration metric.Float64Histogram
45+
46+
flyAppReaperReapCount metric.Int64Counter
4347
}
4448

4549
func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
@@ -120,6 +124,15 @@ func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
120124
logger.ErrorContext(ctx, "failed to create metric", attr.SlogMetricName(meterFunctionsProcessedDuration), attr.SlogError(err))
121125
}
122126

127+
flyAppReaperReapCount, err := meter.Int64Counter(
128+
meterFlyAppReaperReapCount,
129+
metric.WithDescription("Number of fly apps reaped by the reaper workflow"),
130+
metric.WithUnit("{app}"),
131+
)
132+
if err != nil {
133+
logger.ErrorContext(ctx, "failed to create metric", attr.SlogMetricName(meterFlyAppReaperReapCount), attr.SlogError(err))
134+
}
135+
123136
return &metrics{
124137
opSkipped: opSkipped,
125138
openAPIUpgradeCounter: openAPIUpgradeCounter,
@@ -129,6 +142,7 @@ func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
129142
functionsToolsSkipped: functionsToolsSkipped,
130143
functionsToolsCounter: functionsToolsCounter,
131144
functionsProcessedDuration: functionsProcessedDuration,
145+
flyAppReaperReapCount: flyAppReaperReapCount,
132146
}
133147
}
134148

@@ -214,3 +228,14 @@ func (m *metrics) RecordFunctionsProcessed(
214228
))
215229
}
216230
}
231+
232+
func (m *metrics) RecordFlyAppReaperReapCount(ctx context.Context, success int64, fail int64) {
233+
if counter := m.flyAppReaperReapCount; counter != nil {
234+
counter.Add(ctx, success, metric.WithAttributes(
235+
attr.Outcome(o11y.OutcomeSuccess),
236+
))
237+
counter.Add(ctx, fail, metric.WithAttributes(
238+
attr.Outcome(o11y.OutcomeFailure),
239+
))
240+
}
241+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package activities
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
"github.com/google/uuid"
8+
"github.com/jackc/pgx/v5/pgtype"
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
"go.opentelemetry.io/otel/metric"
11+
"go.temporal.io/sdk/temporal"
12+
13+
"github.com/speakeasy-api/gram/server/internal/attr"
14+
"github.com/speakeasy-api/gram/server/internal/functions"
15+
funcrepo "github.com/speakeasy-api/gram/server/internal/functions/repo"
16+
"github.com/speakeasy-api/gram/server/internal/oops"
17+
)
18+
19+
type ReapFlyAppsRequest struct {
20+
Scope FunctionsReaperScope
21+
22+
ProjectID uuid.NullUUID
23+
}
24+
25+
type ReapFlyAppsResult struct {
26+
Reaped int
27+
Errors int
28+
}
29+
30+
type ReapFlyApps struct {
31+
logger *slog.Logger
32+
metrics *metrics
33+
db *pgxpool.Pool
34+
deployer functions.Deployer
35+
keepCount int64
36+
}
37+
38+
func NewReapFlyApps(
39+
logger *slog.Logger,
40+
meterProvider metric.MeterProvider,
41+
db *pgxpool.Pool,
42+
deployer functions.Deployer,
43+
keepCount int64,
44+
) *ReapFlyApps {
45+
return &ReapFlyApps{
46+
logger: logger.With(attr.SlogComponent("flyio-reaper")),
47+
metrics: newMetrics(newMeter(meterProvider), logger),
48+
db: db,
49+
deployer: deployer,
50+
keepCount: keepCount,
51+
}
52+
}
53+
54+
func (r *ReapFlyApps) Do(ctx context.Context, req ReapFlyAppsRequest) (*ReapFlyAppsResult, error) {
55+
logger := r.logger
56+
57+
switch {
58+
case req.Scope == FunctionsReaperScopeProject && req.ProjectID.UUID == uuid.Nil:
59+
return nil, temporal.NewApplicationErrorWithOptions("project ID must be set for project-scoped reaper", "reaper_error", temporal.ApplicationErrorOptions{
60+
NonRetryable: true,
61+
Cause: nil,
62+
})
63+
case req.Scope == FunctionsReaperScopeGlobal && req.ProjectID.UUID != uuid.Nil:
64+
return nil, temporal.NewApplicationErrorWithOptions("project ID must not be set for global reaper", "reaper_error", temporal.ApplicationErrorOptions{
65+
NonRetryable: true,
66+
Cause: nil,
67+
})
68+
}
69+
70+
repo := funcrepo.New(r.db)
71+
72+
// Get all apps that should be reaped (keeping only the most recent N per project)
73+
appsToReap, err := repo.GetFlyAppsToReap(ctx, funcrepo.GetFlyAppsToReapParams{
74+
KeepCount: pgtype.Int8{Int64: r.keepCount, Valid: true},
75+
// Starting with a small batch size for now and we'll increase later on
76+
// after some observation.
77+
BatchSize: pgtype.Int8{Int64: 20, Valid: true},
78+
})
79+
if err != nil {
80+
return nil, oops.E(oops.CodeUnexpected, err, "failed to query apps to reap").Log(ctx, logger)
81+
}
82+
83+
if len(appsToReap) == 0 {
84+
logger.InfoContext(ctx, "no apps to reap")
85+
return &ReapFlyAppsResult{
86+
Reaped: 0,
87+
Errors: 0,
88+
}, nil
89+
}
90+
91+
result := &ReapFlyAppsResult{
92+
Reaped: 0,
93+
Errors: 0,
94+
}
95+
96+
for _, app := range appsToReap {
97+
appLogger := logger.With(
98+
attr.SlogFlyAppInternalID(app.ID.String()),
99+
attr.SlogFlyAppName(app.AppName),
100+
attr.SlogFlyOrgSlug(app.FlyOrgSlug),
101+
attr.SlogProjectID(app.ProjectID.String()),
102+
attr.SlogDeploymentID(app.DeploymentID.String()),
103+
attr.SlogDeploymentFunctionsID(app.FunctionID.String()),
104+
)
105+
106+
appLogger.InfoContext(ctx, "reaping fly app")
107+
108+
if err := r.deployer.Reap(ctx, functions.ReapRequest{
109+
ProjectID: app.ProjectID,
110+
DeploymentID: app.DeploymentID,
111+
FunctionID: app.FunctionID,
112+
}); err != nil {
113+
appLogger.ErrorContext(ctx, "failed to reap app", attr.SlogError(err))
114+
result.Errors++
115+
continue
116+
}
117+
118+
result.Reaped++
119+
appLogger.InfoContext(ctx, "successfully reaped fly app")
120+
}
121+
122+
r.metrics.RecordFlyAppReaperReapCount(ctx, int64(result.Reaped), int64(result.Errors))
123+
124+
return result, nil
125+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package activities
2+
3+
type FunctionsReaperScope string
4+
5+
const (
6+
FunctionsReaperScopeGlobal FunctionsReaperScope = "global"
7+
FunctionsReaperScopeProject FunctionsReaperScope = "project"
8+
)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package background
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"go.temporal.io/api/enums/v1"
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/temporal"
11+
"go.temporal.io/sdk/workflow"
12+
13+
"github.com/google/uuid"
14+
"github.com/speakeasy-api/gram/server/internal/background/activities"
15+
)
16+
17+
type FunctionsReaperWorkflowParams struct {
18+
Scope activities.FunctionsReaperScope
19+
20+
ProjectID uuid.NullUUID
21+
}
22+
23+
type FunctionsReaperWorkflowResult struct {
24+
AppsReaped int
25+
Errors int
26+
}
27+
28+
func ExecuteProjectFunctionsReaperWorkflow(ctx context.Context, temporalClient client.Client, projectID uuid.UUID) (client.WorkflowRun, error) {
29+
return temporalClient.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
30+
ID: "v1:functions-reaper:" + projectID.String(),
31+
TaskQueue: string(TaskQueueMain),
32+
WorkflowIDConflictPolicy: enums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
33+
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
34+
WorkflowRunTimeout: time.Minute * 10,
35+
}, ProcessDeploymentWorkflow, FunctionsReaperWorkflowParams{
36+
Scope: activities.FunctionsReaperScopeProject,
37+
ProjectID: uuid.NullUUID{UUID: projectID, Valid: projectID != uuid.Nil},
38+
})
39+
}
40+
41+
func FunctionsReaperWorkflow(ctx workflow.Context, params FunctionsReaperWorkflowParams) (*FunctionsReaperWorkflowResult, error) {
42+
// This can stay nil/unassigned. Temporal just uses this to get activity names.
43+
// The actual activities are registered in the CLI layer (cmd/gram/worker.go).
44+
var a *Activities
45+
46+
logger := workflow.GetLogger(ctx)
47+
48+
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
49+
StartToCloseTimeout: 3 * time.Minute,
50+
RetryPolicy: &temporal.RetryPolicy{
51+
InitialInterval: time.Second,
52+
MaximumInterval: 1 * time.Minute,
53+
BackoffCoefficient: 2,
54+
MaximumAttempts: 3,
55+
},
56+
})
57+
58+
var result activities.ReapFlyAppsResult
59+
err := workflow.ExecuteActivity(
60+
ctx,
61+
a.ReapFlyApps,
62+
activities.ReapFlyAppsRequest{
63+
Scope: params.Scope,
64+
ProjectID: params.ProjectID,
65+
},
66+
).Get(ctx, &result)
67+
if err != nil {
68+
return nil, fmt.Errorf("failed to reap functions: %w", err)
69+
}
70+
71+
logger.Info("functions reaper completed",
72+
"apps_reaped", result.Reaped,
73+
"errors", result.Errors,
74+
)
75+
76+
return &FunctionsReaperWorkflowResult{
77+
AppsReaped: result.Reaped,
78+
Errors: result.Errors,
79+
}, nil
80+
}

server/internal/background/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func NewTemporalWorker(
150150
temporalWorker.RegisterActivity(activities.TransitionDeployment)
151151
temporalWorker.RegisterActivity(activities.ProvisionFunctionsAccess)
152152
temporalWorker.RegisterActivity(activities.DeployFunctionRunners)
153+
temporalWorker.RegisterActivity(activities.ReapFlyApps)
153154
temporalWorker.RegisterActivity(activities.GetSlackProjectContext)
154155
temporalWorker.RegisterActivity(activities.PostSlackMessage)
155156
temporalWorker.RegisterActivity(activities.SlackChatCompletion)
@@ -164,6 +165,7 @@ func NewTemporalWorker(
164165
temporalWorker.RegisterActivity(activities.ValidateDeployment)
165166

166167
temporalWorker.RegisterWorkflow(ProcessDeploymentWorkflow)
168+
temporalWorker.RegisterWorkflow(FunctionsReaperWorkflow)
167169
temporalWorker.RegisterWorkflow(SlackEventWorkflow)
168170
temporalWorker.RegisterWorkflow(OpenrouterKeyRefreshWorkflow)
169171
temporalWorker.RegisterWorkflow(CustomDomainRegistrationWorkflow)

server/internal/deployments/impl.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/speakeasy-api/gram/server/internal/packages/semver"
3737
"github.com/speakeasy-api/gram/server/internal/thirdparty/posthog"
3838
"go.temporal.io/sdk/client"
39+
"go.temporal.io/sdk/temporal"
3940
)
4041

4142
type Service struct {
@@ -852,6 +853,17 @@ func (s *Service) resolvePackages(ctx context.Context, tx *packagesRepo.Queries,
852853
}
853854

854855
func (s *Service) startDeployment(ctx context.Context, logger *slog.Logger, projectID uuid.UUID, deploymentID uuid.UUID, dep *types.Deployment) (string, error) {
856+
defer func() {
857+
logger.InfoContext(ctx, "starting project-scoped functions reaper")
858+
_, err := background.ExecuteProjectFunctionsReaperWorkflow(ctx, s.temporal, projectID)
859+
if err != nil && !temporal.IsWorkflowExecutionAlreadyStartedError(err) {
860+
logger.ErrorContext(
861+
ctx, "failed to start project-scoped functions reaper workflow",
862+
attr.SlogError(err),
863+
)
864+
}
865+
}()
866+
855867
wr, err := background.ExecuteProcessDeploymentWorkflow(ctx, s.temporal, background.ProcessDeploymentWorkflowParams{
856868
ProjectID: projectID,
857869
DeploymentID: deploymentID,

server/internal/functions/deploy.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Orchestrator interface {
1919

2020
type Deployer interface {
2121
Deploy(context.Context, RunnerDeployRequest) (*RunnerDeployResult, error)
22+
Reap(context.Context, ReapRequest) error
2223
}
2324

2425
type ToolCaller interface {
@@ -102,3 +103,9 @@ type RunnerResourceReadRequest struct {
102103
ResourceURI string
103104
ResourceName string
104105
}
106+
107+
type ReapRequest struct {
108+
ProjectID uuid.UUID
109+
DeploymentID uuid.UUID
110+
FunctionID uuid.UUID
111+
}

0 commit comments

Comments
 (0)