Skip to content

Commit 07e2869

Browse files
committed
use regionId
1 parent db92748 commit 07e2869

15 files changed

+62
-72
lines changed

pkg/db/deployment_topology_list_by_versions.sql_generated.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/instances_find_by_deployment_id.sql_generated.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/instances_find_by_deployment_id_and_region_id.sql_generated.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/instances_find_by_pod_name.sql_generated.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/models_generated.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/querier_generated.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/queries/deployment_topology_list_by_versions.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ FROM `deployment_topology` dt
99
INNER JOIN `deployments` d ON dt.deployment_id = d.id
1010
INNER JOIN `workspaces` w ON d.workspace_id = w.id
1111
INNER JOIN `regions` r ON dt.region_id = r.id
12-
WHERE r.name = sqlc.arg(region) AND dt.version > sqlc.arg(afterVersion)
12+
WHERE r.id = sqlc.arg(region_id) AND dt.version > sqlc.arg(afterVersion)
1313
ORDER BY dt.version ASC
1414
LIMIT ?;

pkg/db/schema.sql

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -533,15 +533,14 @@ CREATE TABLE `deployment_topology` (
533533
`pk` bigint unsigned AUTO_INCREMENT NOT NULL,
534534
`workspace_id` varchar(64) NOT NULL,
535535
`deployment_id` varchar(64) NOT NULL,
536-
`region` varchar(64),
536+
`region` varchar(255) DEFAULT 'DELETE ME',
537537
`region_id` varchar(64) NOT NULL,
538538
`desired_replicas` int NOT NULL,
539539
`version` bigint unsigned NOT NULL,
540540
`desired_status` enum('stopped','running') NOT NULL,
541541
`created_at` bigint NOT NULL,
542542
`updated_at` bigint,
543-
CONSTRAINT `deployment_topology_pk` PRIMARY KEY(`pk`),
544-
CONSTRAINT `unique_version_per_region` UNIQUE(`region_id`,`version`)
543+
CONSTRAINT `deployment_topology_pk` PRIMARY KEY(`pk`)
545544
);
546545

547546
CREATE TABLE `acme_users` (
@@ -605,7 +604,7 @@ CREATE TABLE `sentinels` (
605604
`environment_id` varchar(255) NOT NULL,
606605
`k8s_name` varchar(64) NOT NULL,
607606
`k8s_address` varchar(255) NOT NULL,
608-
`region` varchar(64),
607+
`region` varchar(255) DEFAULT 'DELETE ME',
609608
`region_id` varchar(255) NOT NULL DEFAULT 'TODO',
610609
`image` varchar(255) NOT NULL,
611610
`desired_state` enum('running','standby','archived') NOT NULL DEFAULT 'running',
@@ -620,8 +619,7 @@ CREATE TABLE `sentinels` (
620619
CONSTRAINT `sentinels_pk` PRIMARY KEY(`pk`),
621620
CONSTRAINT `sentinels_id_unique` UNIQUE(`id`),
622621
CONSTRAINT `sentinels_k8s_name_unique` UNIQUE(`k8s_name`),
623-
CONSTRAINT `sentinels_k8s_address_unique` UNIQUE(`k8s_address`),
624-
CONSTRAINT `unique_version_per_region` UNIQUE(`region_id`,`version`)
622+
CONSTRAINT `sentinels_k8s_address_unique` UNIQUE(`k8s_address`)
625623
);
626624

627625
CREATE TABLE `instances` (
@@ -631,6 +629,7 @@ CREATE TABLE `instances` (
631629
`workspace_id` varchar(255) NOT NULL,
632630
`project_id` varchar(255) NOT NULL,
633631
`app_id` varchar(64) NOT NULL,
632+
`region` varchar(255) DEFAULT 'DELETE ME',
634633
`region_id` varchar(64) NOT NULL DEFAULT 'TODO',
635634
`k8s_name` varchar(255) NOT NULL,
636635
`address` varchar(255) NOT NULL,
@@ -706,16 +705,14 @@ CREATE TABLE `cilium_network_policies` (
706705
`deployment_id` varchar(128) NOT NULL,
707706
`k8s_name` varchar(64) NOT NULL,
708707
`k8s_namespace` varchar(255) NOT NULL,
709-
`region` varchar(255),
708+
`region` varchar(255) DEFAULT 'DELETE ME',
710709
`region_id` varchar(64) NOT NULL DEFAULT 'TODO',
711710
`policy` json NOT NULL,
712711
`version` bigint unsigned NOT NULL,
713712
`created_at` bigint NOT NULL,
714713
`updated_at` bigint,
715714
CONSTRAINT `cilium_network_policies_pk` PRIMARY KEY(`pk`),
716-
CONSTRAINT `cilium_network_policies_id_unique` UNIQUE(`id`),
717-
CONSTRAINT `one_deployment_per_region` UNIQUE(`deployment_id`,`region_id`,`k8s_name`),
718-
CONSTRAINT `unique_version_per_region` UNIQUE(`region_id`,`version`)
715+
CONSTRAINT `cilium_network_policies_id_unique` UNIQUE(`id`)
719716
);
720717

721718
CREATE TABLE `clusters` (

svc/ctrl/services/cluster/rpc_heartbeat.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cluster
22

33
import (
44
"context"
5-
"fmt"
65
"time"
76

87
"connectrpc.com/connect"
@@ -37,9 +36,7 @@ func (s *Service) Heartbeat(ctx context.Context, req *connect.Request[ctrlv1.Hea
3736
now := time.Now().UnixMilli()
3837

3938
err := db.Query.UpsertRegion(ctx, s.db.RW(), db.UpsertRegionParams{
40-
// using a readable id here to make debugging significantly easier
41-
// do not rely on this schema though. treat ids as opaque strings.
42-
ID: fmt.Sprintf("%s::%s", platform, regionName),
39+
ID: uid.New(uid.RegionPrefix),
4340
Name: regionName,
4441
Platform: platform,
4542
})

svc/ctrl/services/cluster/rpc_watch_deployments.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,28 @@ func (s *Service) WatchDeployments(
3737
return err
3838
}
3939

40-
region := req.Header().Get("X-Krane-Region")
40+
regionName := req.Header().Get("X-Krane-Region")
4141
platform := req.Header().Get("X-Krane-Platform")
4242
if err := assert.All(
43-
assert.NotEmpty(region, "region is required"),
43+
assert.NotEmpty(regionName, "region is required"),
4444
assert.NotEmpty(platform, "platform is required"),
4545
); err != nil {
4646
return connect.NewError(connect.CodeInvalidArgument, err)
4747
}
4848

49+
logger.Info("starting WatchDeployments stream", "region_name", regionName, "platform", platform)
50+
51+
region, err := db.Query.FindRegionByNameAndPlatform(ctx, s.db.RO(), db.FindRegionByNameAndPlatformParams{
52+
Name: regionName,
53+
Platform: platform,
54+
})
55+
if err != nil {
56+
logger.Error("failed to find region for WatchDeployments", "error", err, "region_name", regionName, "platform", platform)
57+
return connect.NewError(connect.CodeInternal, err)
58+
}
59+
60+
logger.Info("found region for WatchDeployments", "region_id", region.ID)
61+
4962
versionCursor := req.Msg.GetVersionLastSeen()
5063

5164
for {
@@ -55,7 +68,7 @@ func (s *Service) WatchDeployments(
5568
default:
5669
}
5770

58-
states, err := s.fetchDeploymentStates(ctx, region, versionCursor)
71+
states, err := s.fetchDeploymentStates(ctx, region.ID, versionCursor)
5972
if err != nil {
6073
logger.Error("failed to fetch deployment states", "error", err)
6174
return connect.NewError(connect.CodeInternal, err)
@@ -79,9 +92,9 @@ func (s *Service) WatchDeployments(
7992
// fetchDeploymentStates queries the database for deployment topologies in the given region
8093
// with versions greater than afterVersion, returning up to 100 results. Rows that fail
8194
// conversion are logged and skipped rather than failing the entire batch.
82-
func (s *Service) fetchDeploymentStates(ctx context.Context, region string, afterVersion uint64) ([]*ctrlv1.DeploymentState, error) {
95+
func (s *Service) fetchDeploymentStates(ctx context.Context, regionID string, afterVersion uint64) ([]*ctrlv1.DeploymentState, error) {
8396
rows, err := db.Query.ListDeploymentTopologyByRegion(ctx, s.db.RO(), db.ListDeploymentTopologyByRegionParams{
84-
Region: region,
97+
RegionID: regionID,
8598
Afterversion: afterVersion,
8699
Limit: 100,
87100
})
@@ -99,6 +112,8 @@ func (s *Service) fetchDeploymentStates(ctx context.Context, region string, afte
99112
states = append(states, state)
100113
}
101114

115+
logger.Info("fetched deployment states", "count", len(states), "region_id", regionID, "after_version", afterVersion)
116+
102117
return states, nil
103118
}
104119

0 commit comments

Comments
 (0)