Skip to content

Commit b8b4ea5

Browse files
authored
feat: post analytics history (#2982)
1 parent 88001ff commit b8b4ea5

File tree

8 files changed

+192
-0
lines changed

8 files changed

+192
-0
lines changed

.infra/crons.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,8 @@ export const crons: Cron[] = [
126126
name: 'post-analytics-clickhouse',
127127
schedule: '*/5 * * * *',
128128
},
129+
{
130+
name: 'post-analytics-history-day-clickhouse',
131+
schedule: '3-59/5 * * * *',
132+
},
129133
];
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { crons } from '../../src/cron/index';
2+
import { postAnalyticsHistoryDayClickhouseCron as cron } from '../../src/cron/postAnalyticsHistoryDayClickhouse';
3+
4+
describe('postAnalyticsHistoryDayClickhouse cron', () => {
5+
it('should be registered', () => {
6+
const registeredWorker = crons.find((item) => item.name === cron.name);
7+
8+
expect(registeredWorker).toBeDefined();
9+
});
10+
});

src/common/schema/postAnalytics.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { format } from 'date-fns';
12
import { z } from 'zod';
23

34
export const postAnalyticsClickhouseSchema = z
@@ -14,3 +15,12 @@ export const postAnalyticsClickhouseSchema = z
1415
sharesInternal: z.coerce.number().nonnegative(),
1516
})
1617
.strict();
18+
19+
export const postAnalyticsHistoryClickhouseSchema = z
20+
.object({
21+
id: z.string(),
22+
updatedAt: z.coerce.date(),
23+
impressions: z.coerce.number().nonnegative(),
24+
date: z.coerce.date().transform((date) => format(date, 'yyyy-MM-dd')),
25+
})
26+
.strict();

src/cron/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { calculateTopReaders } from './calculateTopReaders';
2020
import cleanGiftedPlus from './cleanGiftedPlus';
2121
import { cleanStaleUserTransactions } from './cleanStaleUserTransactions';
2222
import { postAnalyticsClickhouseCron } from './postAnalyticsClickhouse';
23+
import { postAnalyticsHistoryDayClickhouseCron } from './postAnalyticsHistoryDayClickhouse';
2324

2425
export const crons: Cron[] = [
2526
updateViews,
@@ -43,4 +44,5 @@ export const crons: Cron[] = [
4344
cleanGiftedPlus,
4445
cleanStaleUserTransactions,
4546
postAnalyticsClickhouseCron,
47+
postAnalyticsHistoryDayClickhouseCron,
4648
];
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import { format, startOfToday } from 'date-fns';
2+
import { Cron } from './cron';
3+
import { getClickHouseClient } from '../common/clickhouse';
4+
import { postAnalyticsHistoryClickhouseSchema } from '../common/schema/postAnalytics';
5+
import { z } from 'zod';
6+
import { getRedisHash, setRedisHash } from '../redis';
7+
import { generateStorageKey, StorageTopic } from '../config';
8+
import { PostAnalyticsHistory } from '../entity/posts/PostAnalyticsHistory';
9+
10+
type PostAnalyticsHistoryClickhouseCronConfig = Partial<{
11+
lastRunAt: string;
12+
}>;
13+
14+
export const postAnalyticsHistoryDayClickhouseCron: Cron = {
15+
name: 'post-analytics-history-day-clickhouse',
16+
handler: async (con, logger) => {
17+
const redisStorageKey = generateStorageKey(
18+
StorageTopic.Cron,
19+
postAnalyticsHistoryDayClickhouseCron.name,
20+
'config',
21+
);
22+
23+
const cronConfig: Partial<PostAnalyticsHistoryClickhouseCronConfig> =
24+
await getRedisHash(redisStorageKey);
25+
26+
const lastRunAt = cronConfig.lastRunAt
27+
? new Date(cronConfig.lastRunAt)
28+
: startOfToday(); // for now use start of today if no last run time is set
29+
30+
if (Number.isNaN(lastRunAt.getTime())) {
31+
throw new Error('Invalid last run time');
32+
}
33+
34+
const clickhouseClient = getClickHouseClient();
35+
36+
const response = await clickhouseClient.query({
37+
query: /* sql */ `
38+
SELECT
39+
post_id AS id,
40+
date,
41+
max(created_at) AS "updatedAt",
42+
sum(impressions) AS impressions
43+
FROM api.post_analytics_history
44+
FINAL
45+
WHERE date = {date: Date}
46+
AND created_at > {lastRunAt: DateTime}
47+
GROUP BY date, post_id
48+
`,
49+
format: 'JSONEachRow',
50+
query_params: {
51+
lastRunAt: format(lastRunAt, 'yyyy-MM-dd HH:mm:ss'),
52+
date: format(new Date(), 'yyyy-MM-dd'),
53+
},
54+
});
55+
56+
const result = z
57+
.array(postAnalyticsHistoryClickhouseSchema)
58+
.safeParse(await response.json());
59+
60+
if (!result.success) {
61+
logger.error(
62+
{
63+
schemaError: result.error.errors[0],
64+
},
65+
'Invalid post analytics data',
66+
);
67+
68+
throw new Error('Invalid post analytics data');
69+
}
70+
71+
const { data } = result;
72+
73+
const chunks: PostAnalyticsHistory[][] = [];
74+
const chunkSize = 500;
75+
76+
data.forEach((item) => {
77+
if (
78+
chunks.length === 0 ||
79+
chunks[chunks.length - 1].length === chunkSize
80+
) {
81+
chunks.push([]);
82+
}
83+
84+
chunks[chunks.length - 1].push(item as PostAnalyticsHistory);
85+
});
86+
87+
const currentRunAt = new Date();
88+
89+
await con.transaction(async (entityManager) => {
90+
for (const chunk of chunks) {
91+
if (chunk.length === 0) {
92+
continue;
93+
}
94+
95+
await entityManager
96+
.createQueryBuilder()
97+
.insert()
98+
.into(PostAnalyticsHistory)
99+
.values(chunk)
100+
.orUpdate(Object.keys(chunk[0]), ['id'])
101+
.execute();
102+
}
103+
});
104+
105+
await setRedisHash<PostAnalyticsHistoryClickhouseCronConfig>(
106+
redisStorageKey,
107+
{
108+
lastRunAt: currentRunAt.toISOString(),
109+
},
110+
);
111+
},
112+
};

src/entity/posts/PostAnalytics.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
Column,
33
CreateDateColumn,
44
Entity,
5+
JoinColumn,
56
OneToOne,
67
PrimaryColumn,
78
UpdateDateColumn,
@@ -61,9 +62,11 @@ export class PostAnalytics {
6162
@Column({ default: 0 })
6263
awards: number;
6364

65+
// not added to migration because raw events data has some invalid post ids
6466
@OneToOne('Post', {
6567
lazy: true,
6668
onDelete: 'CASCADE',
6769
})
70+
@JoinColumn({ name: 'id' })
6871
post: Promise<Post>;
6972
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import {
2+
Column,
3+
CreateDateColumn,
4+
Entity,
5+
JoinColumn,
6+
OneToOne,
7+
PrimaryColumn,
8+
UpdateDateColumn,
9+
} from 'typeorm';
10+
import type { Post } from './Post';
11+
12+
@Entity()
13+
export class PostAnalyticsHistory {
14+
@PrimaryColumn({ type: 'text' })
15+
id: string;
16+
17+
@CreateDateColumn()
18+
createdAt: Date;
19+
20+
@UpdateDateColumn()
21+
updatedAt: Date;
22+
23+
@Column()
24+
date: string;
25+
26+
@Column({ default: 0 })
27+
impressions: number;
28+
29+
// not added to migration because raw events data has some invalid post ids
30+
@OneToOne('Post', {
31+
lazy: true,
32+
onDelete: 'CASCADE',
33+
})
34+
@JoinColumn({ name: 'id' })
35+
post: Promise<Post>;
36+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { MigrationInterface, QueryRunner } from 'typeorm';
2+
3+
export class PostAnalyticsHistory1755090244241 implements MigrationInterface {
4+
name = 'PostAnalyticsHistory1755090244241';
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(
8+
`CREATE TABLE "post_analytics_history" ("id" text NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), "date" character varying NOT NULL, "impressions" integer NOT NULL DEFAULT '0', CONSTRAINT "PK_6bb82232882b9a8bfa54a034c5f" PRIMARY KEY ("id"))`,
9+
);
10+
}
11+
12+
public async down(queryRunner: QueryRunner): Promise<void> {
13+
await queryRunner.query(`DROP TABLE "post_analytics_history"`);
14+
}
15+
}

0 commit comments

Comments
 (0)