Skip to content

Commit 05b6f66

Browse files
authored
[server] Extend metadata models to support per-table/partition remote data directory (#2763)
1 parent 649bb41 commit 05b6f66

File tree

59 files changed

+815
-138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+815
-138
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
310310
r.getTableId(),
311311
r.getSchemaId(),
312312
TableDescriptor.fromJsonBytes(r.getTableJson()),
313+
// For backward compatibility, results returned by old
314+
// clusters do not include the remote data dir
315+
r.hasRemoteDataDir() ? r.getRemoteDataDir() : null,
313316
r.getCreatedTime(),
314317
r.getModifiedTime()));
315318
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,8 +598,12 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
598598
pbPartitionInfo ->
599599
new PartitionInfo(
600600
pbPartitionInfo.getPartitionId(),
601-
toResolvedPartitionSpec(
602-
pbPartitionInfo.getPartitionSpec())))
601+
toResolvedPartitionSpec(pbPartitionInfo.getPartitionSpec()),
602+
// For backward compatibility, results returned by old
603+
// clusters do not include the remote data dir
604+
pbPartitionInfo.hasRemoteDataDir()
605+
? pbPartitionInfo.getRemoteDataDir()
606+
: null))
603607
.collect(Collectors.toList());
604608
}
605609

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
6868
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6969
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
70+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
7071
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
7172
import static org.apache.fluss.row.BinaryString.fromString;
7273
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
@@ -169,6 +170,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
169170
.distributedBy(3)
170171
.logFormat(logFormat)
171172
.build(),
173+
DEFAULT_REMOTE_DATA_DIR,
172174
System.currentTimeMillis(),
173175
System.currentTimeMillis());
174176
long fetchOffset = 0L;
@@ -313,6 +315,7 @@ void testComplexTypeFetch() throws Exception {
313315
.distributedBy(3)
314316
.logFormat(LogFormat.ARROW)
315317
.build(),
318+
DEFAULT_REMOTE_DATA_DIR,
316319
System.currentTimeMillis(),
317320
System.currentTimeMillis());
318321
long fetchOffset = 0L;

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
172172
DATA1_TABLE_INFO.getNumBuckets(),
173173
DATA1_TABLE_INFO.getProperties(),
174174
DATA1_TABLE_INFO.getCustomProperties(),
175+
DATA1_TABLE_INFO.getRemoteDataDir(),
175176
DATA1_TABLE_INFO.getComment().orElse(null),
176177
DATA1_TABLE_INFO.getCreatedTime(),
177178
DATA1_TABLE_INFO.getModifiedTime()),

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
6262
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6363
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
64+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
6465
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
6566
import static org.apache.fluss.testutils.DataTestUtils.genLogFile;
6667
import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
@@ -218,6 +219,7 @@ void testProjection(String format) throws Exception {
218219
.distributedBy(3)
219220
.logFormat(logFormat)
220221
.build(),
222+
DEFAULT_REMOTE_DATA_DIR,
221223
System.currentTimeMillis(),
222224
System.currentTimeMillis());
223225
long fetchOffset = 0L;

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
7777
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
7878
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
79+
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
7980
import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
8081
import static org.apache.fluss.testutils.DataTestUtils.row;
8182
import static org.assertj.core.api.Assertions.assertThat;
@@ -96,6 +97,7 @@ class RecordAccumulatorTest {
9697
.distributedBy(3)
9798
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), "zstd")
9899
.build(),
100+
DEFAULT_REMOTE_DATA_DIR,
99101
System.currentTimeMillis(),
100102
System.currentTimeMillis());
101103

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,35 @@ public static boolean isAlterableTableOption(String key) {
5959
return ALTERABLE_TABLE_OPTIONS.contains(key);
6060
}
6161

62+
/**
63+
* Returns the default remote data directory from the configuration. Used as a fallback for
64+
* tables or partitions that do not contain remote data directory metadata.
65+
*
66+
* @param conf the Fluss configuration
67+
* @return the default remote data directory path, never {@code null} if the configuration is
68+
* valid (i.e., at least one of {@code remote.data.dir} or {@code remote.data.dirs} is set)
69+
* @throws IllegalConfigurationException if the configuration is invalid (i.e., both {@code
70+
* remote.data.dir} and {@code remote.data.dirs} are unset)
71+
* @see ConfigOptions#REMOTE_DATA_DIR
72+
* @see ConfigOptions#REMOTE_DATA_DIRS
73+
*/
74+
public static String getDefaultRemoteDataDir(Configuration conf) {
75+
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
76+
if (!remoteDataDirs.isEmpty()) {
77+
return remoteDataDirs.get(0);
78+
}
79+
80+
String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
81+
if (remoteDataDir == null) {
82+
throw new IllegalConfigurationException(
83+
String.format(
84+
"Either %s or %s must be configured.",
85+
ConfigOptions.REMOTE_DATA_DIR.key(),
86+
ConfigOptions.REMOTE_DATA_DIRS.key()));
87+
}
88+
return remoteDataDir;
89+
}
90+
6291
@VisibleForTesting
6392
static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
6493
Map<String, ConfigOption<?>> options = new HashMap<>();

fluss-common/src/main/java/org/apache/fluss/metadata/PartitionInfo.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,27 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121

22+
import javax.annotation.Nullable;
23+
2224
import java.util.Objects;
2325

2426
/**
25-
* Information of a partition metadata, includes the partition's name and the partition id that
26-
* represents the unique identifier of the partition.
27+
* Information of a partition metadata, includes partition id (unique identifier of the partition),
28+
* partition name, remote data dir for partitioned data storage, etc.
2729
*
2830
* @since 0.2
2931
*/
3032
@PublicEvolving
3133
public class PartitionInfo {
3234
private final long partitionId;
3335
private final ResolvedPartitionSpec partitionSpec;
36+
private final @Nullable String remoteDataDir;
3437

35-
public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
38+
public PartitionInfo(
39+
long partitionId, ResolvedPartitionSpec partitionSpec, @Nullable String remoteDataDir) {
3640
this.partitionId = partitionId;
3741
this.partitionSpec = partitionSpec;
42+
this.remoteDataDir = remoteDataDir;
3843
}
3944

4045
/** Get the partition id. The id is globally unique in the Fluss cluster. */
@@ -58,6 +63,11 @@ public PartitionSpec getPartitionSpec() {
5863
return partitionSpec.toPartitionSpec();
5964
}
6065

66+
@Nullable
67+
public String getRemoteDataDir() {
68+
return remoteDataDir;
69+
}
70+
6171
@Override
6272
public boolean equals(Object o) {
6373
if (this == o) {
@@ -67,16 +77,25 @@ public boolean equals(Object o) {
6777
return false;
6878
}
6979
PartitionInfo that = (PartitionInfo) o;
70-
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
80+
return partitionId == that.partitionId
81+
&& Objects.equals(partitionSpec, that.partitionSpec)
82+
&& Objects.equals(remoteDataDir, that.remoteDataDir);
7183
}
7284

7385
@Override
7486
public int hashCode() {
75-
return Objects.hash(partitionId, partitionSpec);
87+
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
7688
}
7789

7890
@Override
7991
public String toString() {
80-
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
92+
return "Partition{name='"
93+
+ getPartitionName()
94+
+ '\''
95+
+ ", id="
96+
+ partitionId
97+
+ ", remoteDataDir="
98+
+ remoteDataDir
99+
+ '}';
81100
}
82101
}

fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public final class TableInfo {
5959
private final Configuration properties;
6060
private final TableConfig tableConfig;
6161
private final Configuration customProperties;
62+
private final @Nullable String remoteDataDir;
6263
private final @Nullable String comment;
6364

6465
private final long createdTime;
@@ -74,6 +75,7 @@ public TableInfo(
7475
int numBuckets,
7576
Configuration properties,
7677
Configuration customProperties,
78+
@Nullable String remoteDataDir,
7779
@Nullable String comment,
7880
long createdTime,
7981
long modifiedTime) {
@@ -90,6 +92,7 @@ public TableInfo(
9092
this.properties = properties;
9193
this.tableConfig = new TableConfig(properties);
9294
this.customProperties = customProperties;
95+
this.remoteDataDir = remoteDataDir;
9396
this.comment = comment;
9497
this.createdTime = createdTime;
9598
this.modifiedTime = modifiedTime;
@@ -263,6 +266,12 @@ public Configuration getCustomProperties() {
263266
return customProperties;
264267
}
265268

269+
/** Returns the remote data directory of the table. */
270+
@Nullable
271+
public String getRemoteDataDir() {
272+
return remoteDataDir;
273+
}
274+
266275
/** Returns the comment/description of the table. */
267276
public Optional<String> getComment() {
268277
return Optional.ofNullable(comment);
@@ -314,6 +323,7 @@ public static TableInfo of(
314323
long tableId,
315324
int schemaId,
316325
TableDescriptor tableDescriptor,
326+
String remoteDataDir,
317327
long createdTime,
318328
long modifiedTime) {
319329
Schema schema = tableDescriptor.getSchema();
@@ -335,6 +345,7 @@ public static TableInfo of(
335345
numBuckets,
336346
Configuration.fromMap(tableDescriptor.getProperties()),
337347
Configuration.fromMap(tableDescriptor.getCustomProperties()),
348+
remoteDataDir,
338349
tableDescriptor.getComment().orElse(null),
339350
createdTime,
340351
modifiedTime);
@@ -358,6 +369,7 @@ public boolean equals(Object o) {
358369
&& Objects.equals(partitionKeys, that.partitionKeys)
359370
&& Objects.equals(properties, that.properties)
360371
&& Objects.equals(customProperties, that.customProperties)
372+
&& Objects.equals(remoteDataDir, that.remoteDataDir)
361373
&& Objects.equals(comment, that.comment);
362374
}
363375

@@ -376,6 +388,7 @@ public int hashCode() {
376388
numBuckets,
377389
properties,
378390
customProperties,
391+
remoteDataDir,
379392
comment);
380393
}
381394

@@ -402,6 +415,8 @@ public String toString() {
402415
+ properties
403416
+ ", customProperties="
404417
+ customProperties
418+
+ ", remoteDataDir="
419+
+ remoteDataDir
405420
+ ", comment='"
406421
+ comment
407422
+ '\''

fluss-common/src/test/java/org/apache/fluss/record/TestData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class TestData {
3939
public static final short DEFAULT_SCHEMA_ID = 1;
4040
public static final long BASE_OFFSET = 0L;
4141
public static final byte DEFAULT_MAGIC = CURRENT_LOG_MAGIC_VALUE;
42+
public static final String DEFAULT_REMOTE_DATA_DIR = "/tmp/fluss/remote-data";
4243
// ---------------------------- data1 and related table info begin ---------------------------
4344
public static final List<Object[]> DATA1 =
4445
Arrays.asList(
@@ -93,6 +94,7 @@ public final class TestData {
9394
DATA1_TABLE_ID,
9495
1,
9596
DATA1_TABLE_DESCRIPTOR,
97+
DEFAULT_REMOTE_DATA_DIR,
9698
currentMillis,
9799
currentMillis);
98100

@@ -118,6 +120,7 @@ public final class TestData {
118120
PARTITION_TABLE_ID,
119121
1,
120122
DATA1_PARTITIONED_TABLE_DESCRIPTOR,
123+
DEFAULT_REMOTE_DATA_DIR,
121124
System.currentTimeMillis(),
122125
System.currentTimeMillis());
123126

@@ -148,6 +151,7 @@ public final class TestData {
148151
DATA1_TABLE_ID_PK,
149152
1,
150153
DATA1_TABLE_DESCRIPTOR_PK,
154+
DEFAULT_REMOTE_DATA_DIR,
151155
currentMillis,
152156
currentMillis);
153157

@@ -217,6 +221,7 @@ public final class TestData {
217221
DATA2_TABLE_ID,
218222
1,
219223
DATA2_TABLE_DESCRIPTOR,
224+
DEFAULT_REMOTE_DATA_DIR,
220225
System.currentTimeMillis(),
221226
System.currentTimeMillis());
222227
// -------------------------------- data2 info end ------------------------------------

0 commit comments

Comments
 (0)