Skip to content

Commit d4c3ba2

Browse files
authored
[spark] spark: lucene vector index write (#6876)
1 parent 6f46484 commit d4c3ba2

File tree

8 files changed

+236
-8
lines changed

8 files changed

+236
-8
lines changed

.github/workflows/utitcase-spark-4.x.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ jobs:
5151
distribution: 'temurin'
5252

5353
- name: Build Spark
54-
run: mvn -T 2C -B clean install -DskipTests -Pspark4,flink1
54+
run: mvn -T 2C -B clean install -DskipTests -Pspark4,flink1,paimon-lucene
5555

5656
- name: Test Spark
5757
timeout-minutes: 60
@@ -65,6 +65,6 @@ jobs:
6565
test_modules+="org.apache.paimon:paimon-spark-${suffix}_2.13,"
6666
done
6767
test_modules="${test_modules%,}"
68-
mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1
68+
mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1,paimon-lucene
6969
env:
7070
MAVEN_OPTS: -Xmx4096m

paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.lucene.index;
2020

21+
import org.apache.paimon.data.InternalArray;
2122
import org.apache.paimon.types.ArrayType;
2223
import org.apache.paimon.types.DataType;
2324
import org.apache.paimon.types.FloatType;
@@ -38,20 +39,38 @@ public static LuceneVectorIndexFactory init(DataType dataType) {
3839
}
3940
}
4041

41-
public abstract LuceneVectorIndex<?> create(long rowId, Object vector);
42+
public abstract LuceneVectorIndex create(long rowId, Object vector);
4243

4344
/** Factory for creating LuceneFloatVectorIndex instances. */
4445
public static class LuceneFloatVectorIndexFactory extends LuceneVectorIndexFactory {
4546
@Override
46-
public LuceneVectorIndex<?> create(long rowId, Object vector) {
47-
return new LuceneFloatVectorIndex(rowId, (float[]) vector);
47+
public LuceneVectorIndex create(long rowId, Object fieldData) {
48+
float[] vector;
49+
if (fieldData instanceof float[]) {
50+
vector = (float[]) fieldData;
51+
} else if (fieldData instanceof InternalArray) {
52+
vector = ((InternalArray) fieldData).toFloatArray();
53+
} else {
54+
throw new RuntimeException(
55+
"Unsupported vector type: " + fieldData.getClass().getName());
56+
}
57+
return new LuceneFloatVectorIndex(rowId, vector);
4858
}
4959
}
5060

5161
/** Factory for creating LuceneByteVectorIndex instances. */
5262
public static class LuceneByteVectorIndexFactory extends LuceneVectorIndexFactory {
5363
@Override
54-
public LuceneVectorIndex<?> create(long rowId, Object vector) {
64+
public LuceneVectorIndex create(long rowId, Object fieldData) {
65+
byte[] vector;
66+
if (fieldData instanceof byte[]) {
67+
vector = (byte[]) fieldData;
68+
} else if (fieldData instanceof InternalArray) {
69+
vector = ((InternalArray) fieldData).toByteArray();
70+
} else {
71+
throw new RuntimeException(
72+
"Unsupported vector type: " + fieldData.getClass().getName());
73+
}
5574
return new LuceneByteVectorIndex(rowId, (byte[]) vector);
5675
}
5776
}

paimon-spark/paimon-spark-4.0/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ under the License.
116116
<classifier>tests</classifier>
117117
<scope>test</scope>
118118
</dependency>
119+
120+
<dependency>
121+
<groupId>org.apache.paimon</groupId>
122+
<artifactId>paimon-lucene</artifactId>
123+
<version>${project.version}</version>
124+
<scope>test</scope>
125+
</dependency>
119126
</dependencies>
120127

121128
<build>
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.procedure
20+
21+
import org.apache.paimon.utils.Range
22+
23+
import scala.collection.JavaConverters._
24+
import scala.collection.immutable
25+
26+
class CreateGlobalVectorIndexProcedureTest extends CreateGlobalIndexProcedureTest {
27+
test("create lucene-vector-knn global index") {
28+
import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
29+
import java.util.ServiceLoader
30+
import scala.collection.JavaConverters._
31+
32+
withTable("T") {
33+
spark.sql("""
34+
|CREATE TABLE T (id INT, v ARRAY<FLOAT>)
35+
|TBLPROPERTIES (
36+
| 'bucket' = '-1',
37+
| 'global-index.row-count-per-shard' = '10000',
38+
| 'row-tracking.enabled' = 'true',
39+
| 'data-evolution.enabled' = 'true')
40+
|""".stripMargin)
41+
42+
val values = (0 until 100)
43+
.map(
44+
i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
45+
.mkString(",")
46+
spark.sql(s"INSERT INTO T VALUES $values")
47+
48+
val output =
49+
spark
50+
.sql("CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => 'lucene-vector-knn', options => 'vector.dim=3')")
51+
.collect()
52+
.head
53+
54+
assert(output.getBoolean(0))
55+
56+
val table = loadTable("T")
57+
val indexEntries = table
58+
.store()
59+
.newIndexFileHandler()
60+
.scanEntries()
61+
.asScala
62+
.filter(_.indexFile().indexType() == "lucene-vector-knn")
63+
64+
assert(indexEntries.nonEmpty)
65+
val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
66+
assert(totalRowCount == 100L)
67+
}
68+
}
69+
70+
test("create lucene-vector-knn global index with partition") {
71+
withTable("T") {
72+
spark.sql("""
73+
|CREATE TABLE T (id INT, v ARRAY<FLOAT>, pt STRING)
74+
|TBLPROPERTIES (
75+
| 'bucket' = '-1',
76+
| 'global-index.row-count-per-shard' = '10000',
77+
| 'row-tracking.enabled' = 'true',
78+
| 'data-evolution.enabled' = 'true')
79+
| PARTITIONED BY (pt)
80+
|""".stripMargin)
81+
82+
var values = (0 until 65000)
83+
.map(
84+
i =>
85+
s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)), 'p0')")
86+
.mkString(",")
87+
spark.sql(s"INSERT INTO T VALUES $values")
88+
89+
values = (0 until 35000)
90+
.map(
91+
i =>
92+
s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)), 'p1')")
93+
.mkString(",")
94+
spark.sql(s"INSERT INTO T VALUES $values")
95+
96+
values = (0 until 22222)
97+
.map(
98+
i =>
99+
s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)), 'p0')")
100+
.mkString(",")
101+
spark.sql(s"INSERT INTO T VALUES $values")
102+
103+
val output =
104+
spark
105+
.sql("CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => 'lucene-vector-knn', options => 'vector.dim=3')")
106+
.collect()
107+
.head
108+
109+
assert(output.getBoolean(0))
110+
111+
val table = loadTable("T")
112+
val indexEntries = table
113+
.store()
114+
.newIndexFileHandler()
115+
.scanEntries()
116+
.asScala
117+
.filter(_.indexFile().indexType() == "lucene-vector-knn")
118+
119+
assert(indexEntries.nonEmpty)
120+
val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
121+
assert(totalRowCount == 122222L)
122+
}
123+
}
124+
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.paimon.globalindex.GlobalIndexWriter;
2424
import org.apache.paimon.globalindex.GlobalIndexer;
2525
import org.apache.paimon.globalindex.IndexedSplit;
26-
import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
2726
import org.apache.paimon.index.GlobalIndexMeta;
2827
import org.apache.paimon.index.IndexFileMeta;
2928
import org.apache.paimon.io.CompactIncrement;
@@ -72,7 +71,7 @@ private static List<IndexFileMeta> convertToIndexMeta(
7271
range.from, range.to, context.indexField().id(), null, entry.meta());
7372
IndexFileMeta indexFileMeta =
7473
new IndexFileMeta(
75-
BitmapGlobalIndexerFactory.IDENTIFIER,
74+
context.indexType(),
7675
fileName,
7776
fileSize,
7877
range.to - range.from + 1,
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.globalindex.lucene;
20+
21+
import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
22+
import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
23+
24+
/**
25+
* Builder for creating lucene-vector-knn global indexes.
26+
*
27+
* <p>This implementation does not apply any custom transformations to the input dataset, allowing
28+
* the data to be processed as-is for lucene index creation.
29+
*/
30+
public class LuceneGlobalIndexBuilder extends GlobalIndexBuilder {
31+
32+
protected LuceneGlobalIndexBuilder(GlobalIndexBuilderContext context) {
33+
super(context);
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.globalindex.lucene;
20+
21+
import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
22+
import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
23+
import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
24+
25+
/**
26+
* Factory for creating lucene-vector-knn global index builders.
27+
*
28+
* <p>This factory is automatically discovered via Java's ServiceLoader mechanism.
29+
*/
30+
public class LuceneGlobalIndexBuilderFactory implements GlobalIndexBuilderFactory {
31+
32+
private static final String IDENTIFIER = "lucene-vector-knn";
33+
34+
@Override
35+
public String identifier() {
36+
return IDENTIFIER;
37+
}
38+
39+
@Override
40+
public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
41+
return new LuceneGlobalIndexBuilder(context);
42+
}
43+
}

paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616
# limitations under the License.
1717

1818
org.apache.paimon.spark.globalindex.bitmap.BitmapGlobalIndexBuilderFactory
19+
org.apache.paimon.spark.globalindex.lucene.LuceneGlobalIndexBuilderFactory

0 commit comments

Comments
 (0)