Skip to content

Commit 18f82b4

Browse files
authored
Ensure all materialized views have correct TTL (#6607)
1 parent bc52694 commit 18f82b4

File tree

3 files changed

+162
-0
lines changed

3 files changed

+162
-0
lines changed

.changeset/six-jeans-greet.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'hive': patch
3+
---
4+
5+
Ensure all materialized views have correct TTL
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import { z } from 'zod';
2+
import type { Action } from '../clickhouse';
3+
4+
const SystemTablesModel = z.array(
5+
z.object({
6+
name: z.string(),
7+
uuid: z.string(),
8+
}),
9+
);
10+
11+
const InnerTablesModel = z.array(
12+
z.object({
13+
name: z.string(),
14+
engine_full: z.string(),
15+
}),
16+
);
17+
18+
const StateTableModel = z.array(
19+
z.object({
20+
table: z.string(),
21+
cleaned: z.boolean(),
22+
}),
23+
);
24+
25+
export const action: Action = async (exec, query) => {
26+
// Create a table to store the state of the migration
27+
await exec(`
28+
CREATE TABLE IF NOT EXISTS default.migration_apply_ttl (
29+
table String,
30+
cleaned Bool DEFAULT false,
31+
version UInt8
32+
) ENGINE = ReplacingMergeTree(version) ORDER BY table
33+
`);
34+
35+
// If a row is already present and has a higher version number (expired rows were dropped), it won't be inserted
36+
await exec(`
37+
INSERT INTO default.migration_apply_ttl (table, version) VALUES
38+
('operations_daily', 1),
39+
('coordinates_daily', 1),
40+
('clients_daily', 1),
41+
('subscription_operations_daily', 1),
42+
('operations_hourly', 1),
43+
('coordinates_hourly', 1),
44+
('clients_hourly', 1),
45+
('operations_minutely', 1),
46+
('coordinates_minutely', 1),
47+
('clients_minutely', 1)
48+
`);
49+
50+
// daily
51+
await applyTTL('operations_daily', 'INTERVAL 1 YEAR');
52+
await applyTTL('coordinates_daily', 'INTERVAL 1 YEAR');
53+
await applyTTL('clients_daily', 'INTERVAL 1 YEAR');
54+
await applyTTL('subscription_operations_daily', 'INTERVAL 1 YEAR');
55+
56+
// hourly
57+
await applyTTL('operations_hourly', 'INTERVAL 30 DAY');
58+
await applyTTL('coordinates_hourly', 'INTERVAL 30 DAY');
59+
await applyTTL('clients_hourly', 'INTERVAL 30 DAY');
60+
61+
// minutely
62+
await applyTTL('operations_minutely', 'INTERVAL 24 HOUR');
63+
await applyTTL('coordinates_minutely', 'INTERVAL 24 HOUR');
64+
await applyTTL('clients_minutely', 'INTERVAL 24 HOUR');
65+
66+
console.log('Dropping migration state table');
67+
await exec(`
68+
DROP TABLE default.migration_apply_ttl
69+
`);
70+
71+
async function applyTTL(tableName: string, interval: string) {
72+
const table = await querySystemTable(tableName);
73+
const innerTable = await queryInnerTable(table.uuid, table.name);
74+
75+
if (hasTTL(innerTable.engine_full)) {
76+
console.log('TTL already applied to:', tableName);
77+
return;
78+
}
79+
80+
await exec(`
81+
ALTER TABLE "${innerTable.name}" MODIFY TTL timestamp + ${interval};
82+
`);
83+
84+
await dropOldRows(table.uuid, tableName, interval);
85+
}
86+
87+
async function querySystemTable(tableName: string) {
88+
const [table] = await query(`
89+
SELECT uuid, name
90+
FROM system.tables
91+
WHERE
92+
database = 'default'
93+
AND name = '${tableName}'
94+
LIMIT 1
95+
`).then(async r => SystemTablesModel.parse(r.data));
96+
97+
if (!table) {
98+
throw new Error(`Table ${tableName} not found`);
99+
}
100+
101+
return table;
102+
}
103+
104+
async function queryInnerTable(uuid: string, tableName: string) {
105+
const [table] = await query(`
106+
SELECT name, engine_full
107+
FROM system.tables
108+
WHERE
109+
database = 'default'
110+
AND name = '.inner_id.${uuid}'
111+
LIMIT 1
112+
`).then(async r => InnerTablesModel.parse(r.data));
113+
114+
if (!table) {
115+
throw new Error(`Inner table of ${tableName} not found`);
116+
}
117+
118+
return table;
119+
}
120+
121+
async function dropOldRows(uuid: string, tableName: string, interval: string) {
122+
const [state] = await query(`
123+
SELECT table, cleaned FROM default.migration_apply_ttl WHERE table = '${tableName}' ORDER BY version DESC LIMIT 1
124+
`).then(r => StateTableModel.parse(r.data));
125+
126+
if (state.cleaned) {
127+
console.log('Old rows already deleted from:', tableName);
128+
return;
129+
}
130+
131+
console.log('Deleting old rows from:', tableName);
132+
133+
await exec(
134+
`
135+
DELETE FROM ".inner_id.${uuid}" WHERE timestamp < now() - ${interval}
136+
`,
137+
{
138+
// execute asynchronously
139+
lightweight_deletes_sync: '0',
140+
},
141+
);
142+
143+
console.log('Deleted old rows from', tableName);
144+
await exec(`
145+
INSERT INTO default.migration_apply_ttl (table, cleaned, version) VALUES ('${tableName}', true, 2);
146+
`);
147+
console.log('Marked as cleaned:', tableName);
148+
}
149+
};
150+
151+
function hasTTL(engineFull: string) {
152+
return engineFull.replace(/[\n\t]/g, ' ').includes(' TTL ');
153+
}

packages/migrations/src/clickhouse.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ export async function migrateClickHouse(
4343

4444
const endpoint = `${clickhouse.protocol}://${clickhouse.host}:${clickhouse.port}`;
4545

46+
// Make sure people don't accidentally define the GRAPHQL_HIVE_ENVIRONMENT environment variable
47+
hiveCloudEnvironment = isHiveCloud ? hiveCloudEnvironment : null;
48+
4649
console.log('Migrating ClickHouse');
4750
console.log('Endpoint: ', endpoint);
4851
console.log('Username: ', clickhouse.username);
@@ -170,6 +173,7 @@ export async function migrateClickHouse(
170173
import('./clickhouse-actions/010-app-deployment-operations'),
171174
import('./clickhouse-actions/011-audit-logs'),
172175
import('./clickhouse-actions/012-coordinates-typename-index'),
176+
import('./clickhouse-actions/013-apply-ttl'),
173177
]);
174178

175179
async function actionRunner(action: Action, index: number) {

0 commit comments

Comments
 (0)