Skip to content

Commit 41a202f

Browse files
authored
feat: add inc cdc filtering (#3452)
1 parent 3f1b052 commit 41a202f

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

.infra/application.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ debezium.source.plugin.name=pgoutput
1515
debezium.source.heartbeat.interval.ms=60000
1616
debezium.source.topic.prefix=api
1717
debezium.source.tombstones.on.delete=false
18-
debezium.transforms=Reroute,Notifications,ReadOperationFilter,PostsFilter
18+
debezium.transforms=Reroute,Notifications,ReadOperationFilter,PostsFilter,UserIncFilter
1919
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
2020
debezium.transforms.Reroute.topic.regex=^((?!\.notification_v2).)*$
2121
debezium.transforms.Reroute.topic.replacement=%topic%
@@ -28,4 +28,7 @@ debezium.transforms.ReadOperationFilter.condition=!(valueSchema.field('op') && v
2828
debezium.transforms.PostsFilter.type=io.debezium.transforms.Filter
2929
debezium.transforms.PostsFilter.language=jsr223.groovy
3030
debezium.transforms.PostsFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'post' && value.before.views != value.after.views) && !(valueSchema.field('op') && value.op == 'u' && value.source.table == 'user' && value.before.cioRegistered != value.after.cioRegistered)
31+
debezium.transforms.UserIncFilter.type=io.debezium.transforms.Filter
32+
debezium.transforms.UserIncFilter.language=jsr223.groovy
33+
debezium.transforms.UserIncFilter.condition=!(valueSchema.field('op') && value.op == 'u' && value.source.table == 'user' && value.before.inc != value.after.inc)
3134
debezium.sink.type=pubsub

src/entity/user/User.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,9 @@ export class User {
365365
})
366366
@Index('IDX_user_locationId')
367367
location: Promise<DatasetLocation>;
368+
369+
// used for diffing changes, eg. filters for cdc during updates
370+
// due to user table having a lot of depenencies
371+
@Column({ type: 'int', default: 0 })
372+
inc: number;
368373
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { MigrationInterface, QueryRunner } from 'typeorm';
2+
3+
export class UserInc1768572789493 implements MigrationInterface {
4+
name = 'UserInc1768572789493';
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(
8+
`ALTER TABLE "user" ADD "inc" integer NOT NULL DEFAULT '0'`,
9+
);
10+
}
11+
12+
public async down(queryRunner: QueryRunner): Promise<void> {
13+
await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "inc"`);
14+
}
15+
}

0 commit comments

Comments
 (0)