Skip to content

Commit 5c79d73

Browse files
authored
feat(persistence): make DeleteFromHistoryNode page size a dynamic config (#7484)
<!-- Describe what has changed in this PR --> **What changed?** Made HistoryNodeDeleteBatchSize configurable through a dynamic config. Also allowed unbounded deletion request for HistoryNodeDeleteBatchSize when the batch size less or equal to zero. <!-- Tell your future self why have you made these changes --> **Why?** Make the HistoryNodeDeleteBatchSize configurable for SQL implementations. Right now we use a fixed value of 1000 for page size when using SQL. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** HistoryNodeDeleteBatchSize is now configurable using the property history.historyNodeDeleteBatchSize. The default remains the same as before, 1000. Note that using a value <= 0 makes the request page size unbounded. <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: fimanishi <[email protected]>
1 parent e2eef59 commit 5c79d73

File tree

14 files changed

+225
-14
lines changed

14 files changed

+225
-14
lines changed

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,12 @@ const (
10821082
// Default value: 4000
10831083
// Allowed filters: N/A
10841084
ReplicatorTaskDeleteBatchSize
1085+
// HistoryNodeDeleteBatchSize is batch size for deleting history nodes
1086+
// KeyName: history.historyNodeDeleteBatchSize
1087+
// Value type: Int
1088+
// Default value: 1000
1089+
// Allowed filters: N/A
1090+
HistoryNodeDeleteBatchSize
10851091
// ReplicatorReadTaskMaxRetryCount is the number of read replication task retry time
10861092
// KeyName: history.replicatorReadTaskMaxRetryCount
10871093
// Value type: Int
@@ -3736,6 +3742,11 @@ var IntKeys = map[IntKey]DynamicInt{
37363742
Description: "ReplicatorTaskDeleteBatchSize is batch size for ReplicatorProcessor to delete replication tasks",
37373743
DefaultValue: 4000,
37383744
},
3745+
HistoryNodeDeleteBatchSize: {
3746+
KeyName: "history.historyNodeDeleteBatchSize",
3747+
Description: "HistoryNodeDeleteBatchSize is batch size for deleting history nodes",
3748+
DefaultValue: 1000,
3749+
},
37393750
ReplicatorReadTaskMaxRetryCount: {
37403751
KeyName: "history.replicatorReadTaskMaxRetryCount",
37413752
Description: "ReplicatorReadTaskMaxRetryCount is the number of read replication task retry time",

common/persistence/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type (
3737
ReadNoSQLShardFromDataBlob dynamicproperties.BoolPropertyFn
3838
SerializationEncoding dynamicproperties.StringPropertyFn
3939
DomainAuditLogTTL dynamicproperties.DurationPropertyFnWithDomainIDFilter
40+
HistoryNodeDeleteBatchSize dynamicproperties.IntPropertyFn
4041
}
4142
)
4243

@@ -52,5 +53,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
5253
ReadNoSQLShardFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLShardFromDataBlob),
5354
SerializationEncoding: dc.GetStringProperty(dynamicproperties.SerializationEncoding),
5455
DomainAuditLogTTL: dc.GetDurationPropertyFilteredByDomainID(dynamicproperties.DomainAuditLogTTL),
56+
HistoryNodeDeleteBatchSize: dc.GetIntProperty(dynamicproperties.HistoryNodeDeleteBatchSize),
5557
}
5658
}

common/persistence/persistence-tests/persistenceTestBase.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func NewTestBaseWithNoSQL(t *testing.T, options *TestBaseOptions) *TestBase {
161161
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
162162
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
163163
DomainAuditLogTTL: func(domainID string) time.Duration { return time.Hour * 24 * 365 }, // 1 year default
164+
HistoryNodeDeleteBatchSize: dynamicproperties.GetIntPropertyFn(1000),
164165
}
165166
params := TestBaseParams{
166167
DefaultTestCluster: testCluster,
@@ -194,6 +195,7 @@ func NewTestBaseWithSQL(t *testing.T, options *TestBaseOptions) *TestBase {
194195
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
195196
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
196197
DomainAuditLogTTL: func(domainID string) time.Duration { return time.Hour * 24 * 365 }, // 1 year default
198+
HistoryNodeDeleteBatchSize: dynamicproperties.GetIntPropertyFn(1000),
197199
}
198200
params := TestBaseParams{
199201
DefaultTestCluster: testCluster,

common/persistence/sql/factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type (
4343
dc *p.DynamicConfiguration
4444
}
4545

46-
// dbConn represents a logical mysql connection - its a
46+
// dbConn represents a logical mysql connection - it's a
4747
// wrapper around the standard sql connection pool with
4848
// additional reference counting
4949
dbConn struct {
@@ -98,7 +98,7 @@ func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
9898
if err != nil {
9999
return nil, err
100100
}
101-
return NewHistoryV2Persistence(conn, f.logger, f.parser)
101+
return NewHistoryV2Persistence(conn, f.logger, f.parser, f.dc)
102102
}
103103

104104
// NewDomainStore returns a new metadata store

common/persistence/sql/sql_history_store.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ func NewHistoryV2Persistence(
5555
db sqlplugin.DB,
5656
logger log.Logger,
5757
parser serialization.Parser,
58+
dc *persistence.DynamicConfiguration,
5859
) (persistence.HistoryStore, error) {
5960

6061
return &sqlHistoryStore{
6162
sqlStore: sqlStore{
6263
db: db,
6364
logger: logger,
6465
parser: parser,
66+
dc: dc,
6567
},
6668
}, nil
6769
}
@@ -421,11 +423,15 @@ func (m *sqlHistoryStore) DeleteHistoryBranch(
421423
for i := len(brsToDelete) - 1; i >= 0; i-- {
422424
br := brsToDelete[i]
423425
maxReferredEndNodeID, ok := validBRsMaxEndNode[br.BranchID]
426+
batchSize := _defaultHistoryNodeDeleteBatch
427+
if m.dc != nil {
428+
batchSize = m.dc.HistoryNodeDeleteBatchSize()
429+
}
424430
nodeFilter := &sqlplugin.HistoryNodeFilter{
425431
TreeID: serialization.MustParseUUID(treeID),
426432
BranchID: serialization.MustParseUUID(br.BranchID),
427433
ShardID: request.ShardID,
428-
PageSize: _defaultHistoryNodeDeleteBatch,
434+
PageSize: batchSize,
429435
}
430436

431437
if ok {
@@ -445,9 +451,10 @@ func (m *sqlHistoryStore) DeleteHistoryBranch(
445451
if err != nil {
446452
return err
447453
}
448-
if rowsAffected < _defaultHistoryNodeDeleteBatch ||
454+
if batchSize <= 0 ||
455+
rowsAffected < int64(batchSize) ||
449456
rowsAffected == persistence.UnknownNumRowsAffected ||
450-
rowsAffected > _defaultHistoryNodeDeleteBatch {
457+
rowsAffected > int64(batchSize) {
451458
break
452459
}
453460
}

common/persistence/sql/sql_history_store_test.go

Lines changed: 178 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
"github.com/uber/cadence/common"
3434
"github.com/uber/cadence/common/constants"
35+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3536
"github.com/uber/cadence/common/persistence"
3637
"github.com/uber/cadence/common/persistence/serialization"
3738
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
@@ -154,7 +155,7 @@ func TestGetHistoryTree(t *testing.T) {
154155

155156
mockDB := sqlplugin.NewMockDB(ctrl)
156157
mockParser := serialization.NewMockParser(ctrl)
157-
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser)
158+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, nil)
158159
require.NoError(t, err, "Failed to create sql history store")
159160

160161
tc.mockSetup(mockDB, mockParser)
@@ -366,7 +367,7 @@ func TestDeleteHistoryBranch(t *testing.T) {
366367
mockDB := sqlplugin.NewMockDB(ctrl)
367368
mockTx := sqlplugin.NewMockTx(ctrl)
368369
mockParser := serialization.NewMockParser(ctrl)
369-
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser)
370+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, nil)
370371
require.NoError(t, err, "Failed to create sql history store")
371372

372373
tc.mockSetup(mockDB, mockTx, mockParser)
@@ -546,7 +547,7 @@ func TestForkHistoryBranch(t *testing.T) {
546547

547548
mockDB := sqlplugin.NewMockDB(ctrl)
548549
mockParser := serialization.NewMockParser(ctrl)
549-
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser)
550+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, nil)
550551
require.NoError(t, err, "Failed to create sql history store")
551552

552553
tc.mockSetup(mockDB, mockParser)
@@ -808,7 +809,7 @@ func TestAppendHistoryNodes(t *testing.T) {
808809
mockDB := sqlplugin.NewMockDB(ctrl)
809810
mockTx := sqlplugin.NewMockTx(ctrl)
810811
mockParser := serialization.NewMockParser(ctrl)
811-
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser)
812+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, nil)
812813
require.NoError(t, err, "Failed to create sql history store")
813814

814815
tc.mockSetup(mockDB, mockTx, mockParser)
@@ -1026,7 +1027,7 @@ func TestReadHistoryBranch(t *testing.T) {
10261027
defer ctrl.Finish()
10271028

10281029
mockDB := sqlplugin.NewMockDB(ctrl)
1029-
store, err := NewHistoryV2Persistence(mockDB, nil, nil)
1030+
store, err := NewHistoryV2Persistence(mockDB, nil, nil, nil)
10301031
require.NoError(t, err, "Failed to create sql history store")
10311032

10321033
tc.mockSetup(mockDB)
@@ -1043,3 +1044,175 @@ func TestReadHistoryBranch(t *testing.T) {
10431044
})
10441045
}
10451046
}
1047+
1048+
func TestDeleteHistoryBranch_CustomBatchSize(t *testing.T) {
1049+
ctrl := gomock.NewController(t)
1050+
defer ctrl.Finish()
1051+
1052+
mockDB := sqlplugin.NewMockDB(ctrl)
1053+
mockTx := sqlplugin.NewMockTx(ctrl)
1054+
mockParser := serialization.NewMockParser(ctrl)
1055+
1056+
customBatchSize := 500
1057+
dc := &persistence.DynamicConfiguration{
1058+
HistoryNodeDeleteBatchSize: func(...dynamicproperties.FilterOption) int { return customBatchSize },
1059+
}
1060+
1061+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, dc)
1062+
require.NoError(t, err)
1063+
1064+
req := &persistence.InternalDeleteHistoryBranchRequest{
1065+
BranchInfo: types.HistoryBranch{
1066+
TreeID: "530ec3d3-f74b-423f-a138-3b35494fe691",
1067+
BranchID: "630ec3d3-f74b-423f-a138-3b35494fe691",
1068+
},
1069+
ShardID: 1,
1070+
}
1071+
1072+
mockDB.EXPECT().SelectFromHistoryTree(gomock.Any(), &sqlplugin.HistoryTreeFilter{
1073+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1074+
ShardID: 1,
1075+
}).Return([]sqlplugin.HistoryTreeRow{
1076+
{
1077+
ShardID: 1,
1078+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1079+
BranchID: serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691"),
1080+
Data: []byte(`aaaa`),
1081+
DataEncoding: "json",
1082+
},
1083+
}, nil)
1084+
mockParser.EXPECT().HistoryTreeInfoFromBlob([]byte(`aaaa`), "json").Return(&serialization.HistoryTreeInfo{}, nil)
1085+
1086+
mockDB.EXPECT().GetTotalNumDBShards().Return(1)
1087+
mockDB.EXPECT().BeginTx(gomock.Any(), gomock.Any()).Return(mockTx, nil)
1088+
mockTx.EXPECT().DeleteFromHistoryTree(gomock.Any(), &sqlplugin.HistoryTreeFilter{
1089+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1090+
BranchID: serialization.UUIDPtr(serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691")),
1091+
ShardID: 1,
1092+
}).Return(nil, nil)
1093+
mockTx.EXPECT().DeleteFromHistoryNode(gomock.Any(), &sqlplugin.HistoryNodeFilter{
1094+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1095+
BranchID: serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691"),
1096+
ShardID: 1,
1097+
PageSize: customBatchSize,
1098+
MinNodeID: common.Int64Ptr(1),
1099+
}).Return(&sqlResult{rowsAffected: 1}, nil)
1100+
mockTx.EXPECT().Commit().Return(nil)
1101+
1102+
err = store.DeleteHistoryBranch(context.Background(), req)
1103+
assert.NoError(t, err)
1104+
}
1105+
1106+
func TestDeleteHistoryBranch_UnboundedBatchSize(t *testing.T) {
1107+
ctrl := gomock.NewController(t)
1108+
defer ctrl.Finish()
1109+
1110+
mockDB := sqlplugin.NewMockDB(ctrl)
1111+
mockTx := sqlplugin.NewMockTx(ctrl)
1112+
mockParser := serialization.NewMockParser(ctrl)
1113+
1114+
dc := &persistence.DynamicConfiguration{
1115+
HistoryNodeDeleteBatchSize: func(...dynamicproperties.FilterOption) int { return 0 },
1116+
}
1117+
1118+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, dc)
1119+
require.NoError(t, err)
1120+
1121+
req := &persistence.InternalDeleteHistoryBranchRequest{
1122+
BranchInfo: types.HistoryBranch{
1123+
TreeID: "530ec3d3-f74b-423f-a138-3b35494fe691",
1124+
BranchID: "630ec3d3-f74b-423f-a138-3b35494fe691",
1125+
},
1126+
ShardID: 1,
1127+
}
1128+
1129+
mockDB.EXPECT().SelectFromHistoryTree(gomock.Any(), &sqlplugin.HistoryTreeFilter{
1130+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1131+
ShardID: 1,
1132+
}).Return([]sqlplugin.HistoryTreeRow{
1133+
{
1134+
ShardID: 1,
1135+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1136+
BranchID: serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691"),
1137+
Data: []byte(`aaaa`),
1138+
DataEncoding: "json",
1139+
},
1140+
}, nil)
1141+
mockParser.EXPECT().HistoryTreeInfoFromBlob([]byte(`aaaa`), "json").Return(&serialization.HistoryTreeInfo{}, nil)
1142+
1143+
mockDB.EXPECT().GetTotalNumDBShards().Return(1)
1144+
mockDB.EXPECT().BeginTx(gomock.Any(), gomock.Any()).Return(mockTx, nil)
1145+
mockTx.EXPECT().DeleteFromHistoryTree(gomock.Any(), &sqlplugin.HistoryTreeFilter{
1146+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1147+
BranchID: serialization.UUIDPtr(serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691")),
1148+
ShardID: 1,
1149+
}).Return(nil, nil)
1150+
mockTx.EXPECT().DeleteFromHistoryNode(gomock.Any(), &sqlplugin.HistoryNodeFilter{
1151+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1152+
BranchID: serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691"),
1153+
ShardID: 1,
1154+
PageSize: 0,
1155+
MinNodeID: common.Int64Ptr(1),
1156+
}).Return(&sqlResult{rowsAffected: 100}, nil)
1157+
mockTx.EXPECT().Commit().Return(nil)
1158+
1159+
err = store.DeleteHistoryBranch(context.Background(), req)
1160+
assert.NoError(t, err)
1161+
}
1162+
1163+
func TestDeleteHistoryBranch_NegativeBatchSize(t *testing.T) {
1164+
ctrl := gomock.NewController(t)
1165+
defer ctrl.Finish()
1166+
1167+
mockDB := sqlplugin.NewMockDB(ctrl)
1168+
mockTx := sqlplugin.NewMockTx(ctrl)
1169+
mockParser := serialization.NewMockParser(ctrl)
1170+
1171+
dc := &persistence.DynamicConfiguration{
1172+
HistoryNodeDeleteBatchSize: func(...dynamicproperties.FilterOption) int { return -100 },
1173+
}
1174+
1175+
store, err := NewHistoryV2Persistence(mockDB, nil, mockParser, dc)
1176+
require.NoError(t, err)
1177+
1178+
req := &persistence.InternalDeleteHistoryBranchRequest{
1179+
BranchInfo: types.HistoryBranch{
1180+
TreeID: "530ec3d3-f74b-423f-a138-3b35494fe691",
1181+
BranchID: "630ec3d3-f74b-423f-a138-3b35494fe691",
1182+
},
1183+
ShardID: 1,
1184+
}
1185+
1186+
mockDB.EXPECT().SelectFromHistoryTree(gomock.Any(), &sqlplugin.HistoryTreeFilter{
1187+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1188+
ShardID: 1,
1189+
}).Return([]sqlplugin.HistoryTreeRow{
1190+
{
1191+
ShardID: 1,
1192+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1193+
BranchID: serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691"),
1194+
Data: []byte(`aaaa`),
1195+
DataEncoding: "json",
1196+
},
1197+
}, nil)
1198+
mockParser.EXPECT().HistoryTreeInfoFromBlob([]byte(`aaaa`), "json").Return(&serialization.HistoryTreeInfo{}, nil)
1199+
1200+
mockDB.EXPECT().GetTotalNumDBShards().Return(1)
1201+
mockDB.EXPECT().BeginTx(gomock.Any(), gomock.Any()).Return(mockTx, nil)
1202+
mockTx.EXPECT().DeleteFromHistoryTree(gomock.Any(), &sqlplugin.HistoryTreeFilter{
1203+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1204+
BranchID: serialization.UUIDPtr(serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691")),
1205+
ShardID: 1,
1206+
}).Return(nil, nil)
1207+
mockTx.EXPECT().DeleteFromHistoryNode(gomock.Any(), &sqlplugin.HistoryNodeFilter{
1208+
TreeID: serialization.MustParseUUID("530ec3d3-f74b-423f-a138-3b35494fe691"),
1209+
BranchID: serialization.MustParseUUID("630ec3d3-f74b-423f-a138-3b35494fe691"),
1210+
ShardID: 1,
1211+
PageSize: -100,
1212+
MinNodeID: common.Int64Ptr(1),
1213+
}).Return(&sqlResult{rowsAffected: 50}, nil)
1214+
mockTx.EXPECT().Commit().Return(nil)
1215+
1216+
err = store.DeleteHistoryBranch(context.Background(), req)
1217+
assert.NoError(t, err)
1218+
}

common/persistence/sql/sqlplugin/mysql/events.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ const (
3636
getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
3737
`WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? and node_id < ? ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? `
3838

39-
deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? `
39+
deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? `
40+
41+
deleteHistoryNodesByBatchQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? `
4042

4143
// below are templates for history_tree table
4244
addHistoryTreeQuery = `INSERT INTO history_tree (` +
@@ -76,7 +78,10 @@ func (mdb *DB) SelectFromHistoryNode(ctx context.Context, filter *sqlplugin.Hist
7678
// DeleteFromHistoryNode deletes one or more rows from history_node table
7779
func (mdb *DB) DeleteFromHistoryNode(ctx context.Context, filter *sqlplugin.HistoryNodeFilter) (sql.Result, error) {
7880
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, mdb.GetTotalNumDBShards())
79-
return mdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID, filter.PageSize)
81+
if filter.PageSize > 0 {
82+
return mdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesByBatchQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID, filter.PageSize)
83+
}
84+
return mdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID)
8085
}
8186

8287
// For history_tree table:

common/persistence/sql/sqlplugin/postgres/events.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ const (
3636
getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
3737
`WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 and node_id < $5 ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $6 `
3838

39-
deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND (node_id,txn_id) IN (SELECT node_id,txn_id FROM
39+
deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4`
40+
41+
deleteHistoryNodesByBatchQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND (node_id,txn_id) IN (SELECT node_id,txn_id FROM
4042
history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 LIMIT $5)`
4143

4244
// below are templates for history_tree table
@@ -77,7 +79,10 @@ func (pdb *db) SelectFromHistoryNode(ctx context.Context, filter *sqlplugin.Hist
7779
// DeleteFromHistoryNode deletes one or more rows from history_node table
7880
func (pdb *db) DeleteFromHistoryNode(ctx context.Context, filter *sqlplugin.HistoryNodeFilter) (sql.Result, error) {
7981
dbShardID := sqlplugin.GetDBShardIDFromTreeID(filter.TreeID, pdb.GetTotalNumDBShards())
80-
return pdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID, filter.PageSize)
82+
if filter.PageSize > 0 {
83+
return pdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesByBatchQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID, filter.PageSize)
84+
}
85+
return pdb.driver.ExecContext(ctx, dbShardID, deleteHistoryNodesQuery, filter.ShardID, filter.TreeID, filter.BranchID, *filter.MinNodeID)
8186
}
8287

8388
// For history_tree table:

0 commit comments

Comments
 (0)