Skip to content

Commit 3aaccfd

Browse files
committed
[server] Extend metadata models to support per-table/partition remote data directory
1 parent bafba42 commit 3aaccfd

File tree

55 files changed

+678
-87
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+678
-87
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
298298
r.getTableId(),
299299
r.getSchemaId(),
300300
TableDescriptor.fromJsonBytes(r.getTableJson()),
301+
// For backward compatibility, results returned by old
302+
// clusters do not include the remote data dir
303+
r.hasRemoteDataDir() ? r.getRemoteDataDir() : null,
301304
r.getCreatedTime(),
302305
r.getModifiedTime()));
303306
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,12 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
555555
pbPartitionInfo ->
556556
new PartitionInfo(
557557
pbPartitionInfo.getPartitionId(),
558-
toResolvedPartitionSpec(
559-
pbPartitionInfo.getPartitionSpec())))
558+
toResolvedPartitionSpec(pbPartitionInfo.getPartitionSpec()),
559+
// For backward compatibility, results returned by old
560+
// clusters do not include the remote data dir
561+
pbPartitionInfo.hasRemoteDataDir()
562+
? pbPartitionInfo.getRemoteDataDir()
563+
: null))
560564
.collect(Collectors.toList());
561565
}
562566

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
6868
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6969
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
70+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
7071
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
7172
import static org.apache.fluss.row.BinaryString.fromString;
7273
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
@@ -169,6 +170,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
169170
.distributedBy(3)
170171
.logFormat(logFormat)
171172
.build(),
173+
DEFAULT_REMOTE_DATA_DIR,
172174
System.currentTimeMillis(),
173175
System.currentTimeMillis());
174176
long fetchOffset = 0L;
@@ -313,6 +315,7 @@ void testComplexTypeFetch() throws Exception {
313315
.distributedBy(3)
314316
.logFormat(LogFormat.ARROW)
315317
.build(),
318+
DEFAULT_REMOTE_DATA_DIR,
316319
System.currentTimeMillis(),
317320
System.currentTimeMillis());
318321
long fetchOffset = 0L;

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
172172
DATA1_TABLE_INFO.getNumBuckets(),
173173
DATA1_TABLE_INFO.getProperties(),
174174
DATA1_TABLE_INFO.getCustomProperties(),
175+
DATA1_TABLE_INFO.getRemoteDataDir(),
175176
DATA1_TABLE_INFO.getComment().orElse(null),
176177
DATA1_TABLE_INFO.getCreatedTime(),
177178
DATA1_TABLE_INFO.getModifiedTime()),

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
6262
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6363
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
64+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
6465
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
6566
import static org.apache.fluss.testutils.DataTestUtils.genLogFile;
6667
import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
@@ -218,6 +219,7 @@ void testProjection(String format) throws Exception {
218219
.distributedBy(3)
219220
.logFormat(logFormat)
220221
.build(),
222+
DEFAULT_REMOTE_DATA_DIR,
221223
System.currentTimeMillis(),
222224
System.currentTimeMillis());
223225
long fetchOffset = 0L;

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
7777
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
7878
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
79+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
7980
import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
8081
import static org.apache.fluss.testutils.DataTestUtils.row;
8182
import static org.assertj.core.api.Assertions.assertThat;
@@ -96,6 +97,7 @@ class RecordAccumulatorTest {
9697
.distributedBy(3)
9798
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), "zstd")
9899
.build(),
100+
DEFAULT_REMOTE_DATA_DIR,
99101
System.currentTimeMillis(),
100102
System.currentTimeMillis());
101103

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,25 @@ public static boolean isAlterableTableOption(String key) {
5959
return ALTERABLE_TABLE_OPTIONS.contains(key);
6060
}
6161

62+
/**
63+
* Returns the default remote data directory from the configuration. Used as a fallback for
64+
* tables or partitions that do not contain remote data directory metadata.
65+
*
66+
* @param conf the Fluss configuration
67+
* @return the default remote data directory path, never {@code null} if the configuration is
68+
* valid (i.e., at least one of {@code remote.data.dir} or {@code remote.data.dirs} is set)
69+
* @see ConfigOptions#REMOTE_DATA_DIR
70+
* @see ConfigOptions#REMOTE_DATA_DIRS
71+
*/
72+
public static String getDefaultRemoteDataDir(Configuration conf) {
73+
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
74+
if (!remoteDataDirs.isEmpty()) {
75+
return remoteDataDirs.get(0);
76+
}
77+
78+
return conf.get(ConfigOptions.REMOTE_DATA_DIR);
79+
}
80+
6281
@VisibleForTesting
6382
static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
6483
Map<String, ConfigOption<?>> options = new HashMap<>();

fluss-common/src/main/java/org/apache/fluss/metadata/PartitionInfo.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222
import java.util.Objects;
2323

2424
/**
25-
* Information of a partition metadata, includes the partition's name and the partition id that
26-
* represents the unique identifier of the partition.
25+
* Information of a partition metadata, includes partition id (unique identifier of the partition),
26+
* partition name, remote data dir for partitioned data storage, etc.
2727
*
2828
* @since 0.2
2929
*/
3030
@PublicEvolving
3131
public class PartitionInfo {
3232
private final long partitionId;
3333
private final ResolvedPartitionSpec partitionSpec;
34+
private final String remoteDataDir;
3435

35-
public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
36+
public PartitionInfo(
37+
long partitionId, ResolvedPartitionSpec partitionSpec, String remoteDataDir) {
3638
this.partitionId = partitionId;
3739
this.partitionSpec = partitionSpec;
40+
this.remoteDataDir = remoteDataDir;
3841
}
3942

4043
/** Get the partition id. The id is globally unique in the Fluss cluster. */
@@ -58,6 +61,10 @@ public PartitionSpec getPartitionSpec() {
5861
return partitionSpec.toPartitionSpec();
5962
}
6063

64+
public String getRemoteDataDir() {
65+
return remoteDataDir;
66+
}
67+
6168
@Override
6269
public boolean equals(Object o) {
6370
if (this == o) {
@@ -67,16 +74,25 @@ public boolean equals(Object o) {
6774
return false;
6875
}
6976
PartitionInfo that = (PartitionInfo) o;
70-
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
77+
return partitionId == that.partitionId
78+
&& Objects.equals(partitionSpec, that.partitionSpec)
79+
&& Objects.equals(remoteDataDir, that.remoteDataDir);
7180
}
7281

7382
@Override
7483
public int hashCode() {
75-
return Objects.hash(partitionId, partitionSpec);
84+
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
7685
}
7786

7887
@Override
7988
public String toString() {
80-
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
89+
return "Partition{name='"
90+
+ getPartitionName()
91+
+ '\''
92+
+ ", id="
93+
+ partitionId
94+
+ ", remoteDataDir="
95+
+ remoteDataDir
96+
+ '}';
8197
}
8298
}

fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public final class TableInfo {
5959
private final Configuration properties;
6060
private final TableConfig tableConfig;
6161
private final Configuration customProperties;
62+
private final String remoteDataDir;
6263
private final @Nullable String comment;
6364

6465
private final long createdTime;
@@ -74,6 +75,7 @@ public TableInfo(
7475
int numBuckets,
7576
Configuration properties,
7677
Configuration customProperties,
78+
String remoteDataDir,
7779
@Nullable String comment,
7880
long createdTime,
7981
long modifiedTime) {
@@ -90,6 +92,7 @@ public TableInfo(
9092
this.properties = properties;
9193
this.tableConfig = new TableConfig(properties);
9294
this.customProperties = customProperties;
95+
this.remoteDataDir = remoteDataDir;
9396
this.comment = comment;
9497
this.createdTime = createdTime;
9598
this.modifiedTime = modifiedTime;
@@ -263,6 +266,11 @@ public Configuration getCustomProperties() {
263266
return customProperties;
264267
}
265268

269+
/** Returns the remote data directory of the table. */
270+
public String getRemoteDataDir() {
271+
return remoteDataDir;
272+
}
273+
266274
/** Returns the comment/description of the table. */
267275
public Optional<String> getComment() {
268276
return Optional.ofNullable(comment);
@@ -314,6 +322,7 @@ public static TableInfo of(
314322
long tableId,
315323
int schemaId,
316324
TableDescriptor tableDescriptor,
325+
String remoteDataDir,
317326
long createdTime,
318327
long modifiedTime) {
319328
Schema schema = tableDescriptor.getSchema();
@@ -335,6 +344,7 @@ public static TableInfo of(
335344
numBuckets,
336345
Configuration.fromMap(tableDescriptor.getProperties()),
337346
Configuration.fromMap(tableDescriptor.getCustomProperties()),
347+
remoteDataDir,
338348
tableDescriptor.getComment().orElse(null),
339349
createdTime,
340350
modifiedTime);
@@ -358,6 +368,7 @@ public boolean equals(Object o) {
358368
&& Objects.equals(partitionKeys, that.partitionKeys)
359369
&& Objects.equals(properties, that.properties)
360370
&& Objects.equals(customProperties, that.customProperties)
371+
&& Objects.equals(remoteDataDir, that.remoteDataDir)
361372
&& Objects.equals(comment, that.comment);
362373
}
363374

@@ -376,6 +387,7 @@ public int hashCode() {
376387
numBuckets,
377388
properties,
378389
customProperties,
390+
remoteDataDir,
379391
comment);
380392
}
381393

@@ -402,6 +414,8 @@ public String toString() {
402414
+ properties
403415
+ ", customProperties="
404416
+ customProperties
417+
+ ", remoteDataDir="
418+
+ remoteDataDir
405419
+ ", comment='"
406420
+ comment
407421
+ '\''

fluss-common/src/test/java/org/apache/fluss/record/TestData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class TestData {
3939
public static final short DEFAULT_SCHEMA_ID = 1;
4040
public static final long BASE_OFFSET = 0L;
4141
public static final byte DEFAULT_MAGIC = CURRENT_LOG_MAGIC_VALUE;
42+
public static final String DEFAULT_REMOTE_DATA_DIR = "/tmp/fluss/remote-data";
4243
// ---------------------------- data1 and related table info begin ---------------------------
4344
public static final List<Object[]> DATA1 =
4445
Arrays.asList(
@@ -93,6 +94,7 @@ public final class TestData {
9394
DATA1_TABLE_ID,
9495
1,
9596
DATA1_TABLE_DESCRIPTOR,
97+
DEFAULT_REMOTE_DATA_DIR,
9698
currentMillis,
9799
currentMillis);
98100

@@ -118,6 +120,7 @@ public final class TestData {
118120
PARTITION_TABLE_ID,
119121
1,
120122
DATA1_PARTITIONED_TABLE_DESCRIPTOR,
123+
DEFAULT_REMOTE_DATA_DIR,
121124
System.currentTimeMillis(),
122125
System.currentTimeMillis());
123126

@@ -148,6 +151,7 @@ public final class TestData {
148151
DATA1_TABLE_ID_PK,
149152
1,
150153
DATA1_TABLE_DESCRIPTOR_PK,
154+
DEFAULT_REMOTE_DATA_DIR,
151155
currentMillis,
152156
currentMillis);
153157

@@ -217,6 +221,7 @@ public final class TestData {
217221
DATA2_TABLE_ID,
218222
1,
219223
DATA2_TABLE_DESCRIPTOR,
224+
DEFAULT_REMOTE_DATA_DIR,
220225
System.currentTimeMillis(),
221226
System.currentTimeMillis());
222227
// -------------------------------- data2 info end ------------------------------------

0 commit comments

Comments
 (0)