Skip to content

Commit fe80113

Browse files
[Flink] Fix flink source enum (lakesoul-io#573)
* add more log Signed-off-by: chenxu <chenxu@dmetasoul.com> * fix task assign logics Signed-off-by: chenxu <chenxu@dmetasoul.com> --------- Signed-off-by: chenxu <chenxu@dmetasoul.com> Co-authored-by: chenxu <chenxu@dmetasoul.com>
1 parent ebb8f45 commit fe80113

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerat
4848
private long startTime;
4949
private long nextStartTime;
5050
private int hashBucketNum = -1;
51+
String fullTableName;
5152

5253
public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulPartitionSplit> context, LakeSoulDynSplitAssigner splitAssigner, RowType rowType, long discoveryInterval, long startTime, String tableId, String hashBucketNum, List<String> partitionColumns, Plan partitionFilters) {
5354
this.context = context;
@@ -66,6 +67,9 @@ public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSou
6667
this.partitionArrowSchema = new Schema(partitionFields);
6768
this.partitionFilters = partitionFilters;
6869
tableInfo = DataOperation.dbManager().getTableInfoByTableId(tableId);
70+
fullTableName = tableInfo.getTableNamespace() + "." + tableInfo.getTableName();
71+
LOG.info("Create Dyn enumerator for table name {}, tableId {}, context {}",
72+
fullTableName, tableId, System.identityHashCode(context));
6973
}
7074

7175
@Override
@@ -83,6 +87,8 @@ public synchronized void handleSplitRequest(int subtaskId, @Nullable String requ
8387
}
8488
int tasksSize = context.registeredReaders().size();
8589
if (tasksSize == 0) {
90+
LOG.info("handleSplitRequest: Task size is 0 for subtaskId {} for table {}", subtaskId, fullTableName);
91+
taskIdsAwaitingSplit.add(subtaskId);
8692
return;
8793
}
8894
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
@@ -134,10 +140,10 @@ private synchronized void processDiscoveredSplits(
134140
LOG.info("Process discovered splits {}, taskSize {}, oid {}, tid {}", splits,
135141
tasksSize, System.identityHashCode(this),
136142
Thread.currentThread().getId());
143+
this.splitAssigner.addSplits(splits);
137144
if (tasksSize == 0) {
138145
return;
139146
}
140-
this.splitAssigner.addSplits(splits);
141147
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
142148
while (iter.hasNext()) {
143149
int taskId = iter.next();

0 commit comments

Comments
 (0)