Skip to content

Commit 5e012b4

Browse files
committed
add vector-store with data evolution
1 parent e1f8281 commit 5e012b4

25 files changed

+1867
-110
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,6 +1452,24 @@
14521452
<td>String</td>
14531453
<td>The Variant shredding schema for writing.</td>
14541454
</tr>
1455+
<tr>
1456+
<td><h5>vector-field</h5></td>
1457+
<td style="word-wrap: break-word;">(none)</td>
1458+
<td>String</td>
1459+
<td>Specify the vector store fields.</td>
1460+
</tr>
1461+
<tr>
1462+
<td><h5>vector.file.format</h5></td>
1463+
<td style="word-wrap: break-word;">(none)</td>
1464+
<td>String</td>
1465+
<td>Specify the vector store file format.</td>
1466+
</tr>
1467+
<tr>
1468+
<td><h5>vector.target-file-size</h5></td>
1469+
<td style="word-wrap: break-word;">(none)</td>
1470+
<td>MemorySize</td>
1471+
<td>Target size of a vector-store file. Default is 10 * TARGET_FILE_SIZE.</td>
1472+
</tr>
14551473
<tr>
14561474
<td><h5>visibility-callback.check-interval</h5></td>
14571475
<td style="word-wrap: break-word;">10 s</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2230,6 +2230,29 @@ public InlineElement getDescription() {
22302230
.withDescription(
22312231
"The interval for checking visibility when visibility-callback enabled.");
22322232

2233+
public static final ConfigOption<String> VECTOR_STORE_FORMAT =
2234+
key("vector.file.format")
2235+
.stringType()
2236+
.noDefaultValue()
2237+
.withDescription("Specify the vector store file format.");
2238+
2239+
public static final ConfigOption<String> VECTOR_STORE_FIELDS =
2240+
key("vector-field")
2241+
.stringType()
2242+
.noDefaultValue()
2243+
.withDescription("Specify the vector store fields.");
2244+
2245+
public static final ConfigOption<MemorySize> VECTOR_STORE_TARGET_FILE_SIZE =
2246+
key("vector.target-file-size")
2247+
.memoryType()
2248+
.noDefaultValue()
2249+
.withDescription(
2250+
Description.builder()
2251+
.text(
2252+
"Target size of a vector-store file."
2253+
+ " Default is 10 * TARGET_FILE_SIZE.")
2254+
.build());
2255+
22332256
private final Options options;
22342257

22352258
public CoreOptions(Map<String, String> options) {
@@ -3469,6 +3492,26 @@ public Duration visibilityCallbackCheckInterval() {
34693492
return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL);
34703493
}
34713494

3495+
public String vectorStoreFileFormatString() {
3496+
return normalizeFileFormat(options.get(VECTOR_STORE_FORMAT));
3497+
}
3498+
3499+
public List<String> vectorStoreFieldNames() {
3500+
String vectorStoreFields = options.get(CoreOptions.VECTOR_STORE_FIELDS);
3501+
if (vectorStoreFields == null || vectorStoreFields.trim().isEmpty()) {
3502+
return new ArrayList<>();
3503+
} else {
3504+
return Arrays.asList(vectorStoreFields.split(","));
3505+
}
3506+
}
3507+
3508+
public long vectorStoreTargetFileSize() {
3509+
// Since vectors are large, it would be better to set a larger target size for vectors.
3510+
return options.getOptional(VECTOR_STORE_TARGET_FILE_SIZE)
3511+
.map(MemorySize::getBytes)
3512+
.orElse(10 * targetFileSize(false));
3513+
}
3514+
34723515
/** Specifies the merge engine for table with primary key. */
34733516
public enum MergeEngine implements DescribedEnum {
34743517
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,17 @@ public static FileFormat fileFormat(CoreOptions options) {
107107
return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration());
108108
}
109109

110+
public static FileFormat vectorStoreFileFormat(CoreOptions options) {
111+
if (options.vectorStoreFieldNames().isEmpty()) {
112+
return null;
113+
}
114+
String vectorStoreFileFormat = options.vectorStoreFileFormatString();
115+
if (vectorStoreFileFormat == null) {
116+
return fileFormat(options);
117+
}
118+
return FileFormat.fromIdentifier(vectorStoreFileFormat, options.toConfiguration());
119+
}
120+
110121
public static FileFormat manifestFormat(CoreOptions options) {
111122
return FileFormat.fromIdentifier(options.manifestFormatString(), options.toConfiguration());
112123
}

paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
5353
import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
5454
import org.apache.paimon.utils.StatsCollectorFactories;
55+
import org.apache.paimon.utils.VectorStoreUtils;
5556

5657
import javax.annotation.Nullable;
5758

@@ -73,8 +74,11 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
7374
private final FileIO fileIO;
7475
private final long schemaId;
7576
private final FileFormat fileFormat;
77+
private final FileFormat vectorStoreFileFormat;
78+
private final List<String> vectorStoreFieldNames;
7679
private final long targetFileSize;
7780
private final long blobTargetFileSize;
81+
private final long vectorStoreTargetFileSize;
7882
private final RowType writeSchema;
7983
@Nullable private final List<String> writeCols;
8084
private final DataFilePathFactory pathFactory;
@@ -105,8 +109,11 @@ public AppendOnlyWriter(
105109
@Nullable IOManager ioManager,
106110
long schemaId,
107111
FileFormat fileFormat,
112+
FileFormat vectorStoreFileFormat,
113+
List<String> vectorStoreFieldNames,
108114
long targetFileSize,
109115
long blobTargetFileSize,
116+
long vectorStoreTargetFileSize,
110117
RowType writeSchema,
111118
@Nullable List<String> writeCols,
112119
long maxSequenceNumber,
@@ -129,8 +136,11 @@ public AppendOnlyWriter(
129136
this.fileIO = fileIO;
130137
this.schemaId = schemaId;
131138
this.fileFormat = fileFormat;
139+
this.vectorStoreFileFormat = vectorStoreFileFormat;
140+
this.vectorStoreFieldNames = vectorStoreFieldNames;
132141
this.targetFileSize = targetFileSize;
133142
this.blobTargetFileSize = blobTargetFileSize;
143+
this.vectorStoreTargetFileSize = vectorStoreTargetFileSize;
134144
this.writeSchema = writeSchema;
135145
this.writeCols = writeCols;
136146
this.pathFactory = pathFactory;
@@ -304,13 +314,25 @@ public void toBufferedWriter() throws Exception {
304314
}
305315

306316
private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
307-
if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) {
308-
return new RollingBlobFileWriter(
317+
boolean hasNormal =
318+
writeSchema.getFields().stream()
319+
.anyMatch(
320+
f ->
321+
!f.type().is(BLOB)
322+
&& !vectorStoreFieldNames.contains(f.name()));
323+
boolean hasBlob = writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB));
324+
boolean hasSeparatedVectorStore =
325+
VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat);
326+
if (hasBlob || (hasNormal && hasSeparatedVectorStore)) {
327+
return new DataEvolutionRollingFileWriter(
309328
fileIO,
310329
schemaId,
311330
fileFormat,
331+
vectorStoreFileFormat,
332+
vectorStoreFieldNames,
312333
targetFileSize,
313334
blobTargetFileSize,
335+
vectorStoreTargetFileSize,
314336
writeSchema,
315337
pathFactory,
316338
seqNumCounterProvider,
@@ -326,13 +348,20 @@ private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
326348
statsDenseStore,
327349
blobConsumer);
328350
}
351+
FileFormat realFileFormat = hasNormal ? fileFormat : vectorStoreFileFormat;
352+
long realTargetFileSize = hasNormal ? targetFileSize : vectorStoreTargetFileSize;
353+
DataFilePathFactory realPathFactory =
354+
hasNormal
355+
? pathFactory
356+
: pathFactory.vectorStorePathFactory(
357+
vectorStoreFileFormat.getFormatIdentifier());
329358
return new RowDataRollingFileWriter(
330359
fileIO,
331360
schemaId,
332-
fileFormat,
333-
targetFileSize,
361+
realFileFormat,
362+
realTargetFileSize,
334363
writeSchema,
335-
pathFactory,
364+
realPathFactory,
336365
seqNumCounterProvider,
337366
fileCompression,
338367
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),

0 commit comments

Comments
 (0)