-
Notifications
You must be signed in to change notification settings - Fork 546
BIP 2: Support upserting oracle table
Status: MERGED
Author: @chncaesar
Contributor: @AdmondGuo
Date: 2022.03.23
Issue: #1772
Pull Requests: #1722
数据处理时,更新(upsert) RDBMS 表是比较普遍的场景。目前,Byzer-lang 能够更新 MySQL 表。代码save append result_set as jdbc.db.table_1 where idCol="c1"。
result_set 有如下数据
| c1 | c2 |
|---|---|
| 1 | "a_updated" |
| 2 | "b" |
table_1 有如下记录
| c1 | c2 |
|---|---|
| 1 | "a" |
执行时,比较 c1 字段,若值已经存在,则更新其他字段;否则插入一条新记录。完成后,table_1 变为
| c1 | c2 |
|---|---|
| 1 | "a_updated" |
| 2 | "b" |
我们希望 Byzer-lang 具备更新(upsert) Oracle 表的能力。
各家 RDBMS 提供了不同 sql 实现 upsert 语义。Oracle 提供了 merge into 语句。
因而,byzer-lang 将结果集转化为 n 条 merge into 语句,通过 JDBC 协议提交至Oracle,提交或者回滚事务。
代码实现上,DataFrameWriterExtensions 利用 scala 隐式能力扩展 DataFrameWriter 功能。UpsertUtils 已经实现了 upsert 时 不同数据分片 JDBC 连接和事务控制。
我们需要扩展 UpsertBuilder,生成 Oracle merge into, 并注册该类。类图如下:

这里有 2 点要注意:
- 事务控制
- 结果集和表 schema 对齐
Byzer-lang 是一个分布式应用,每个数据分片有独立数据库连接,可能导致脏数据。强事务要求的场景,将数据分片改为1,牺牲一点性能。
对于每个数据分片,batchSize 条数据提交一次事务,若有异常,则回滚。为了避免脏数据,batchSize 需要大于等于结果集条数。
要求结果集字段名称和数据类型与目标表一致,否则更新失败。例如以下语句执行报错,因为 concat(c2, "upd") 没有显式声明字段名。
select c1, concat(c2,"upd") as updated;
save append updated as jdbc.`db1.table1`;修改为
select c1, concat(c2,"upd") c2 as updated;
save append updated as jdbc.`db1.table1`;