Skip to content

Commit 9bf4462

Browse files
committed
more
1 parent af8b7fe commit 9bf4462

File tree

13 files changed

+218
-104
lines changed

13 files changed

+218
-104
lines changed

fluss-common/src/main/java/org/apache/fluss/metadata/PartitionInfo.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
package org.apache.fluss.metadata;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.fs.FsPath;
22+
23+
import javax.annotation.Nullable;
2124

2225
import java.util.Objects;
26+
import java.util.Optional;
2327

2428
/**
2529
* Information of a partition metadata, includes the partition's name and the partition id that
@@ -31,10 +35,17 @@
3135
public class PartitionInfo {
3236
private final long partitionId;
3337
private final ResolvedPartitionSpec partitionSpec;
38+
private final @Nullable FsPath remoteDataDir;
3439

3540
public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
41+
this(partitionId, partitionSpec, null);
42+
}
43+
44+
public PartitionInfo(
45+
long partitionId, ResolvedPartitionSpec partitionSpec, @Nullable FsPath remoteDataDir) {
3646
this.partitionId = partitionId;
3747
this.partitionSpec = partitionSpec;
48+
this.remoteDataDir = remoteDataDir;
3849
}
3950

4051
/** Get the partition id. The id is globally unique in the Fluss cluster. */
@@ -58,6 +69,10 @@ public PartitionSpec getPartitionSpec() {
5869
return partitionSpec.toPartitionSpec();
5970
}
6071

72+
public Optional<FsPath> getRemoteDataDir() {
73+
return Optional.ofNullable(remoteDataDir);
74+
}
75+
6176
@Override
6277
public boolean equals(Object o) {
6378
if (this == o) {
@@ -67,16 +82,25 @@ public boolean equals(Object o) {
6782
return false;
6883
}
6984
PartitionInfo that = (PartitionInfo) o;
70-
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
85+
return partitionId == that.partitionId
86+
&& Objects.equals(partitionSpec, that.partitionSpec)
87+
&& Objects.equals(remoteDataDir, that.remoteDataDir);
7188
}
7289

7390
@Override
7491
public int hashCode() {
75-
return Objects.hash(partitionId, partitionSpec);
92+
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
7693
}
7794

7895
@Override
7996
public String toString() {
80-
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
97+
return "Partition{name='"
98+
+ getPartitionName()
99+
+ '\''
100+
+ ", id="
101+
+ partitionId
102+
+ ", remoteDataDir="
103+
+ remoteDataDir
104+
+ '}';
81105
}
82106
}

fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.config.TableConfig;
23+
import org.apache.fluss.fs.FsPath;
2324
import org.apache.fluss.types.RowType;
2425

2526
import javax.annotation.Nullable;
@@ -59,6 +60,7 @@ public final class TableInfo {
5960
private final Configuration properties;
6061
private final TableConfig tableConfig;
6162
private final Configuration customProperties;
63+
private final @Nullable FsPath remoteDataDir;
6264
private final @Nullable String comment;
6365

6466
private final long createdTime;
@@ -74,6 +76,7 @@ public TableInfo(
7476
int numBuckets,
7577
Configuration properties,
7678
Configuration customProperties,
79+
@Nullable FsPath remoteDataDir,
7780
@Nullable String comment,
7881
long createdTime,
7982
long modifiedTime) {
@@ -90,6 +93,7 @@ public TableInfo(
9093
this.properties = properties;
9194
this.tableConfig = new TableConfig(properties);
9295
this.customProperties = customProperties;
96+
this.remoteDataDir = remoteDataDir;
9397
this.comment = comment;
9498
this.createdTime = createdTime;
9599
this.modifiedTime = modifiedTime;
@@ -263,6 +267,11 @@ public Configuration getCustomProperties() {
263267
return customProperties;
264268
}
265269

270+
/** Returns the remote data directory of the table. */
271+
public Optional<FsPath> getRemoteDataDir() {
272+
return Optional.ofNullable(remoteDataDir);
273+
}
274+
266275
/** Returns the comment/description of the table. */
267276
public Optional<String> getComment() {
268277
return Optional.ofNullable(comment);
@@ -335,6 +344,7 @@ public static TableInfo of(
335344
numBuckets,
336345
Configuration.fromMap(tableDescriptor.getProperties()),
337346
Configuration.fromMap(tableDescriptor.getCustomProperties()),
347+
null,
338348
tableDescriptor.getComment().orElse(null),
339349
createdTime,
340350
modifiedTime);
@@ -358,6 +368,7 @@ public boolean equals(Object o) {
358368
&& Objects.equals(partitionKeys, that.partitionKeys)
359369
&& Objects.equals(properties, that.properties)
360370
&& Objects.equals(customProperties, that.customProperties)
371+
&& Objects.equals(remoteDataDir, that.remoteDataDir)
361372
&& Objects.equals(comment, that.comment);
362373
}
363374

@@ -376,6 +387,7 @@ public int hashCode() {
376387
numBuckets,
377388
properties,
378389
customProperties,
390+
remoteDataDir,
379391
comment);
380392
}
381393

@@ -402,6 +414,8 @@ public String toString() {
402414
+ properties
403415
+ ", customProperties="
404416
+ customProperties
417+
+ ", remoteDataDir="
418+
+ remoteDataDir
405419
+ ", comment='"
406420
+ comment
407421
+ '\''

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.cluster.rebalance.ServerTag;
22+
import org.apache.fluss.metadata.PartitionInfo;
2223
import org.apache.fluss.metadata.PhysicalTablePath;
2324
import org.apache.fluss.metadata.TableBucket;
2425
import org.apache.fluss.metadata.TableBucketReplica;
@@ -84,6 +85,7 @@ public class CoordinatorContext {
8485
// a map from partition_id -> physicalTablePath
8586
private final Map<Long, PhysicalTablePath> pathByPartitionId = new HashMap<>();
8687
private final Map<PhysicalTablePath, Long> partitionIdByPath = new HashMap<>();
88+
private final Map<Long, PartitionInfo> partitionInfoById = new HashMap<>();
8789

8890
// a map from table_id to the table path
8991
private final Map<Long, TablePath> tablePathById = new HashMap<>();
@@ -252,9 +254,11 @@ public void putTableInfo(TableInfo tableInfo) {
252254
this.tableInfoById.put(tableInfo.getTableId(), tableInfo);
253255
}
254256

255-
public void putPartition(long partitionId, PhysicalTablePath physicalTablePath) {
257+
public void putPartition(
258+
long partitionId, PhysicalTablePath physicalTablePath, PartitionInfo partitionInfo) {
256259
this.pathByPartitionId.put(partitionId, physicalTablePath);
257260
this.partitionIdByPath.put(physicalTablePath, partitionId);
261+
this.partitionInfoById.put(partitionId, partitionInfo);
258262
}
259263

260264
public TableInfo getTableInfoById(long tableId) {
@@ -294,6 +298,10 @@ public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
294298
return Optional.ofNullable(partitionIdByPath.get(physicalTablePath));
295299
}
296300

301+
public PartitionInfo getPartitionInfoById(long partitionId) {
302+
return partitionInfoById.get(partitionId);
303+
}
304+
297305
public Map<Integer, List<Integer>> getTableAssignment(long tableId) {
298306
return tableAssignments.getOrDefault(tableId, Collections.emptyMap());
299307
}
@@ -427,6 +435,11 @@ public Set<TableBucketReplica> getAllReplicasForPartition(long tableId, long par
427435
return allReplicas;
428436
}
429437

438+
public boolean hasPartitionsToDelete(long tableId) {
439+
return partitionsToBeDeleted.stream()
440+
.anyMatch(partition -> partition.getTableId() == tableId);
441+
}
442+
430443
/**
431444
* Pick up the replicas that should retry delete and replicas that considered as success delete.
432445
*

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
import org.apache.fluss.exception.TabletServerNotAvailableException;
4141
import org.apache.fluss.exception.UnknownServerException;
4242
import org.apache.fluss.exception.UnknownTableOrBucketException;
43+
import org.apache.fluss.metadata.PartitionInfo;
4344
import org.apache.fluss.metadata.PhysicalTablePath;
45+
import org.apache.fluss.metadata.ResolvedPartitionSpec;
4446
import org.apache.fluss.metadata.SchemaInfo;
4547
import org.apache.fluss.metadata.TableBucket;
4648
import org.apache.fluss.metadata.TableBucketReplica;
@@ -108,6 +110,7 @@
108110
import org.apache.fluss.server.zk.data.BucketAssignment;
109111
import org.apache.fluss.server.zk.data.LeaderAndIsr;
110112
import org.apache.fluss.server.zk.data.PartitionAssignment;
113+
import org.apache.fluss.server.zk.data.PartitionRegistration;
111114
import org.apache.fluss.server.zk.data.RebalanceTask;
112115
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
113116
import org.apache.fluss.server.zk.data.ServerTags;
@@ -393,8 +396,8 @@ private void initCoordinatorContext() throws Exception {
393396
.filter(entry -> entry.getValue().isPartitioned())
394397
.map(Map.Entry::getKey)
395398
.collect(Collectors.toList());
396-
Map<TablePath, Map<String, Long>> tablePathMap =
397-
zooKeeperClient.getPartitionNameAndIdsForTables(partitionedTablePathList);
399+
Map<TablePath, Map<String, PartitionRegistration>> tablePathMap =
400+
zooKeeperClient.getPartitionNameAndRegistrationsForTables(partitionedTablePathList);
398401
for (TablePath tablePath : tablePathSet) {
399402
TableInfo tableInfo = tablePath2TableInfoMap.get(tablePath);
400403
coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
@@ -405,13 +408,22 @@ private void initCoordinatorContext() throws Exception {
405408
lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis()));
406409
}
407410
if (tableInfo.isPartitioned()) {
408-
Map<String, Long> partitions = tablePathMap.get(tablePath);
411+
Map<String, PartitionRegistration> partitions = tablePathMap.get(tablePath);
409412
if (partitions != null) {
410-
for (Map.Entry<String, Long> partition : partitions.entrySet()) {
413+
for (Map.Entry<String, PartitionRegistration> partition :
414+
partitions.entrySet()) {
415+
long partitionId = partition.getValue().getPartitionId();
411416
// put partition info to coordinator context
417+
PartitionInfo partitionInfo =
418+
new PartitionInfo(
419+
partitionId,
420+
ResolvedPartitionSpec.fromPartitionName(
421+
tableInfo.getPartitionKeys(), partition.getKey()),
422+
partition.getValue().getRemoteDataDir());
412423
coordinatorContext.putPartition(
413-
partition.getValue(),
414-
PhysicalTablePath.of(tableInfo.getTablePath(), partition.getKey()));
424+
partitionId,
425+
PhysicalTablePath.of(tableInfo.getTablePath(), partition.getKey()),
426+
partitionInfo);
415427
}
416428
}
417429
// if the table is auto partition, put the partitions info
@@ -712,6 +724,7 @@ private void processSchemaChange(SchemaChangeEvent schemaChangeEvent) {
712724
oldTableInfo.getNumBuckets(),
713725
oldTableInfo.getProperties(),
714726
oldTableInfo.getCustomProperties(),
727+
oldTableInfo.getRemoteDataDir().orElse(null),
715728
oldTableInfo.getComment().orElse(null),
716729
oldTableInfo.getCreatedTime(),
717730
System.currentTimeMillis()));
@@ -792,12 +805,14 @@ private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
792805
TablePath tablePath = createPartitionEvent.getTablePath();
793806
String partitionName = createPartitionEvent.getPartitionName();
794807
PartitionAssignment partitionAssignment = createPartitionEvent.getPartitionAssignment();
795-
tableManager.onCreateNewPartition(
796-
tablePath,
797-
tableId,
798-
createPartitionEvent.getPartitionId(),
799-
partitionName,
800-
partitionAssignment);
808+
TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId);
809+
PartitionInfo partitionInfo =
810+
new PartitionInfo(
811+
partitionId,
812+
ResolvedPartitionSpec.fromPartitionName(
813+
tableInfo.getPartitionKeys(), partitionName),
814+
createPartitionEvent.getRemoteDataDir());
815+
tableManager.onCreateNewPartition(tablePath, tableId, partitionInfo, partitionAssignment);
801816
autoPartitionManager.addPartition(tableId, partitionName);
802817

803818
Set<TableBucket> tableBuckets = new HashSet<>();

0 commit comments

Comments
 (0)