|
6 | 6 | package multiregionccl_test |
7 | 7 |
|
8 | 8 | import ( |
| 9 | + "context" |
9 | 10 | "testing" |
| 11 | + "time" |
10 | 12 |
|
11 | 13 | "github.com/cockroachdb/cockroach/pkg/base" |
12 | 14 | "github.com/cockroachdb/cockroach/pkg/ccl" |
13 | 15 | "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" |
| 16 | + "github.com/cockroachdb/cockroach/pkg/keys" |
| 17 | + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" |
| 18 | + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" |
| 19 | + "github.com/cockroachdb/cockroach/pkg/roachpb" |
| 20 | + "github.com/cockroachdb/cockroach/pkg/settings/cluster" |
| 21 | + "github.com/cockroachdb/cockroach/pkg/testutils" |
14 | 22 | "github.com/cockroachdb/cockroach/pkg/testutils/skip" |
15 | 23 | "github.com/cockroachdb/cockroach/pkg/util/leaktest" |
16 | 24 | "github.com/cockroachdb/cockroach/pkg/util/log" |
| 25 | + "github.com/cockroachdb/errors" |
17 | 26 | "github.com/stretchr/testify/require" |
18 | 27 | ) |
19 | 28 |
|
@@ -150,3 +159,104 @@ func TestGlobalReadsAfterEnterpriseDisabled(t *testing.T) { |
150 | 159 | _, err = sqlDB.Exec(`ALTER TABLE t2 CONFIGURE ZONE USING global_reads = false`) |
151 | 160 | require.NoError(t, err) |
152 | 161 | } |
| 162 | + |
| 163 | +// This test verifies closed timestamp policy behavior with and without the |
| 164 | +// auto-tuning. With the auto-tuning enabled, we expect latency-based policies |
| 165 | +// on global tables. Without it, no latency-based policies should exist. |
| 166 | +func TestReplicaClosedTSPolicyWithPolicyRefresher(t *testing.T) { |
| 167 | + defer leaktest.AfterTest(t)() |
| 168 | + defer log.Scope(t).Close(t) |
| 169 | + ctx := context.Background() |
| 170 | + st := cluster.MakeTestingClusterSettings() |
| 171 | + |
| 172 | + // Helper function to check if a policy is a newly introduced latency-based policy. |
| 173 | + isLatencyBasedPolicy := func(policy ctpb.RangeClosedTimestampPolicy) bool { |
| 174 | + return policy >= ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS && |
| 175 | + policy <= ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS |
| 176 | + } |
| 177 | + |
| 178 | + // Set small intervals for faster testing. |
| 179 | + closedts.LeadForGlobalReadsAutoTuneEnabled.Override(ctx, &st.SV, true) |
| 180 | + closedts.RangeClosedTimestampPolicyRefreshInterval.Override(ctx, &st.SV, 5*time.Millisecond) |
| 181 | + closedts.RangeClosedTimestampPolicyLatencyRefreshInterval.Override(ctx, &st.SV, 5*time.Millisecond) |
| 182 | + |
| 183 | + // Create a multi-region cluster with manual replication. |
| 184 | + numServers := 3 |
| 185 | + tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( |
| 186 | + t, numServers, base.TestingKnobs{}, multiregionccltestutils.WithReplicationMode(base.ReplicationManual), |
| 187 | + multiregionccltestutils.WithSettings(st), |
| 188 | + ) |
| 189 | + defer cleanup() |
| 190 | + |
| 191 | + // Create a multi-region database and global table. |
| 192 | + _, err := sqlDB.Exec(`CREATE DATABASE t PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"`) |
| 193 | + require.NoError(t, err) |
| 194 | + _, err = sqlDB.Exec(`CREATE TABLE t.test_table (k INT PRIMARY KEY) LOCALITY GLOBAL`) |
| 195 | + require.NoError(t, err) |
| 196 | + |
| 197 | + // Look up the table ID and get its key prefix. |
| 198 | + var tableID uint32 |
| 199 | + err = sqlDB.QueryRow(`SELECT id from system.namespace WHERE name='test_table'`).Scan(&tableID) |
| 200 | + require.NoError(t, err) |
| 201 | + tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID)) |
| 202 | + // Split the range at the table prefix and replicate it across all nodes. |
| 203 | + tc.SplitRangeOrFatal(t, tablePrefix.AsRawKey()) |
| 204 | + tc.AddVotersOrFatal(t, tablePrefix.AsRawKey(), tc.Target(1), tc.Target(2)) |
| 205 | + |
| 206 | + // Get the store and replica for testing. |
| 207 | + store := tc.GetFirstStoreFromServer(t, 0) |
| 208 | + replica := store.LookupReplica(roachpb.RKey(tablePrefix.AsRawKey())) |
| 209 | + require.NotNil(t, replica) |
| 210 | + |
| 211 | + // Wait for the closed timestamp policies to stabilize and verify the |
| 212 | + // expected behavior. |
| 213 | + testutils.SucceedsSoon(t, func() error { |
| 214 | + snapshot := store.GetStoreConfig().ClosedTimestampSender.GetSnapshot() |
| 215 | + if len(snapshot.ClosedTimestamps) != int(ctpb.MAX_CLOSED_TIMESTAMP_POLICY) { |
| 216 | + return errors.Errorf("expected %d closed timestamps, got %d", |
| 217 | + ctpb.MAX_CLOSED_TIMESTAMP_POLICY, len(snapshot.ClosedTimestamps)) |
| 218 | + } |
| 219 | + |
| 220 | + // Ensure there are ranges to check. |
| 221 | + if len(snapshot.AddedOrUpdated) == 0 { |
| 222 | + return errors.Errorf("no ranges") |
| 223 | + } |
| 224 | + |
| 225 | + // With policy refresher enabled: expect to find latency-based policy. |
| 226 | + for _, policy := range snapshot.AddedOrUpdated { |
| 227 | + if policy.RangeID != replica.GetRangeID() { |
| 228 | + continue |
| 229 | + } |
| 230 | + if isLatencyBasedPolicy(policy.Policy) { |
| 231 | + return nil |
| 232 | + } |
| 233 | + } |
| 234 | + return errors.Errorf("no ranges with latency based policy") |
| 235 | + }) |
| 236 | + |
| 237 | + // Without policy refresher: expect to NOT find latency-based policy. |
| 238 | + _, err = sqlDB.Exec(`SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_auto_tune.enabled = false`) |
| 239 | + require.NoError(t, err) |
| 240 | + |
| 241 | + // Wait for policy refresher to disable and verify no latency-based policies exist. |
| 242 | + testutils.SucceedsSoon(t, func() error { |
| 243 | + snapshot := store.GetStoreConfig().ClosedTimestampSender.GetSnapshot() |
| 244 | + if len(snapshot.ClosedTimestamps) != int(ctpb.MAX_CLOSED_TIMESTAMP_POLICY) { |
| 245 | + return errors.Errorf("expected %d closed timestamps, got %d", |
| 246 | + ctpb.MAX_CLOSED_TIMESTAMP_POLICY, len(snapshot.ClosedTimestamps)) |
| 247 | + } |
| 248 | + |
| 249 | + // Ensure there are ranges to check. |
| 250 | + if len(snapshot.AddedOrUpdated) == 0 { |
| 251 | + return errors.Errorf("no ranges") |
| 252 | + } |
| 253 | + |
| 254 | + // Verify no latency-based policies remain. |
| 255 | + for _, policy := range snapshot.AddedOrUpdated { |
| 256 | + if isLatencyBasedPolicy(policy.Policy) { |
| 257 | + return errors.Errorf("range %d has latency based policy %s", policy.RangeID, policy.Policy) |
| 258 | + } |
| 259 | + } |
| 260 | + return nil |
| 261 | + }) |
| 262 | +} |
0 commit comments