File tree Expand file tree Collapse file tree 1 file changed +3
-3
lines changed
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink Expand file tree Collapse file tree 1 file changed +3
-3
lines changed Original file line number Diff line number Diff line change 66
66
import org .apache .flink .testutils .junit .SharedReference ;
67
67
import org .apache .flink .util .TestLogger ;
68
68
69
- import org .apache .flink .shaded .guava32 .com .google .common .collect .Lists ;
70
-
71
69
import org .apache .kafka .clients .CommonClientConfigs ;
72
70
import org .apache .kafka .clients .admin .AdminClient ;
73
71
import org .apache .kafka .clients .admin .CreateTopicsResult ;
@@ -551,7 +549,9 @@ public Long map(Long value) {
551
549
public void notifyCheckpointComplete (long checkpointId ) throws Exception {
552
550
// sync with shared object, this is guaranteed to sync eventually because of final
553
551
// checkpoint
554
- checkpointedRecords .get ().addAll (Lists .newArrayList (snapshottedRecords .get ()));
552
+ ArrayList <Long > committedRecords = new ArrayList <>();
553
+ snapshottedRecords .get ().forEach (committedRecords ::add );
554
+ checkpointedRecords .get ().addAll (committedRecords );
555
555
}
556
556
557
557
@ Override
You can’t perform that action at this time.
0 commit comments