@@ -1054,37 +1054,42 @@ public SourceEnumeratorState snapshotState(long checkpointId) {
10541054 public void notifyCheckpointComplete (long checkpointId ) throws Exception {
10551055 checkpointTriggeredBefore = true ;
10561056
1057- // lower than this checkpoint id.
1058- Set <TableBucket > consumedKvSnapshots = getAndRemoveConsumedBucketsUpTo (checkpointId );
1057+ if (hasPrimaryKey ) {
1058+ // lower than this checkpoint id.
1059+ Set <TableBucket > consumedKvSnapshots = getAndRemoveConsumedBucketsUpTo (checkpointId );
10591060
1060- LOG .info (
1061- "kv snapshot has already consumed and try to release kv snapshot lease for: {}, checkpoint id: {}" ,
1062- consumedKvSnapshots ,
1063- checkpointId );
1061+ if (!consumedKvSnapshots .isEmpty ()) {
1062+ LOG .info (
1063+ "kv snapshot has already consumed and try to release kv snapshot lease for: {}, checkpoint id: {}" ,
1064+ consumedKvSnapshots ,
1065+ checkpointId );
10641066
1065- // send request to fluss to unregister the kv snapshot lease.
1066- try {
1067- flussAdmin
1068- .createKvSnapshotLease (
1069- leaseContext .getKvSnapshotLeaseId (),
1070- leaseContext .getKvSnapshotLeaseDurationMs ())
1071- .releaseSnapshots (consumedKvSnapshots )
1072- .get ();
1073- } catch (Exception e ) {
1074- if (ExceptionUtils .findThrowable (e , UnsupportedVersionException .class ).isPresent ()) {
1075- LOG .warn (
1076- "Failed to release kv snapshot lease because the server does not support "
1077- + "kv snapshot lease API. Snapshots may remain in storage longer "
1078- + "than necessary. Please upgrade the Fluss server to version 0.9 "
1079- + "or later." ,
1080- e );
1081- } else {
1082- LOG .error (
1083- "Failed to release kv snapshot lease. These snapshots need to re-enqueue" ,
1084- e );
1085- // use the current checkpoint id to re-enqueue the buckets
1086- consumedKvSnapshots .forEach (
1087- tableBucket -> addConsumedBucket (checkpointId , tableBucket ));
1067+ // send request to fluss to unregister the kv snapshot lease.
1068+ try {
1069+ flussAdmin
1070+ .createKvSnapshotLease (
1071+ leaseContext .getKvSnapshotLeaseId (),
1072+ leaseContext .getKvSnapshotLeaseDurationMs ())
1073+ .releaseSnapshots (consumedKvSnapshots )
1074+ .get ();
1075+ } catch (Exception e ) {
1076+ if (ExceptionUtils .findThrowable (e , UnsupportedVersionException .class )
1077+ .isPresent ()) {
1078+ LOG .warn (
1079+ "Failed to release kv snapshot lease because the server does not support "
1080+ + "kv snapshot lease API. Snapshots may remain in storage longer "
1081+ + "than necessary. Please upgrade the Fluss server to version 0.9 "
1082+ + "or later." ,
1083+ e );
1084+ } else {
1085+ LOG .error (
1086+ "Failed to release kv snapshot lease. These snapshots need to re-enqueue" ,
1087+ e );
1088+ // use the current checkpoint id to re-enqueue the buckets
1089+ consumedKvSnapshots .forEach (
1090+ tableBucket -> addConsumedBucket (checkpointId , tableBucket ));
1091+ }
1092+ }
10881093 }
10891094 }
10901095 }
0 commit comments