Skip to content

Commit 5924728

Browse files
authored
[arrow] Introduce Arrow native read & write interface (#6855)
1 parent ee095bc commit 5924728

File tree

4 files changed

+339
-0
lines changed

4 files changed

+339
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.arrow.reader;
20+
21+
import java.io.IOException;
22+
23+
/** Native reader definition. */
24+
public abstract class NativeReader {
25+
26+
private long nativeReaderPtr;
27+
28+
public abstract void readBatch(long arrayAddress, long schemaAddress) throws IOException;
29+
30+
public abstract void close();
31+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.arrow.writer;
20+
21+
import org.apache.paimon.arrow.ArrowBundleRecords;
22+
import org.apache.paimon.arrow.ArrowUtils;
23+
import org.apache.paimon.arrow.vector.ArrowCStruct;
24+
import org.apache.paimon.arrow.vector.ArrowFormatCWriter;
25+
import org.apache.paimon.data.InternalRow;
26+
import org.apache.paimon.format.BundleFormatWriter;
27+
import org.apache.paimon.fs.PositionOutputStream;
28+
import org.apache.paimon.io.BundleRecords;
29+
30+
import org.apache.arrow.c.ArrowArray;
31+
import org.apache.arrow.c.ArrowSchema;
32+
import org.apache.arrow.memory.BufferAllocator;
33+
import org.apache.arrow.vector.VectorSchemaRoot;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import java.io.IOException;
38+
39+
/** Arrow bundle writer. */
40+
public class ArrowBundleWriter implements BundleFormatWriter {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(ArrowBundleWriter.class);
43+
44+
private final PositionOutputStream underlyingStream;
45+
private final NativeWriter nativeWriter;
46+
47+
private final ArrowFormatCWriter arrowFormatWriter;
48+
49+
private long serializeCost = 0;
50+
private long jniCost = 0;
51+
52+
public ArrowBundleWriter(
53+
PositionOutputStream positionOutputStream,
54+
ArrowFormatCWriter arrowFormatWriter,
55+
NativeWriter nativeWriter) {
56+
this.underlyingStream = positionOutputStream;
57+
this.arrowFormatWriter = arrowFormatWriter;
58+
this.nativeWriter = nativeWriter;
59+
}
60+
61+
@Override
62+
public void addElement(InternalRow internalRow) {
63+
if (!arrowFormatWriter.write(internalRow)) {
64+
flush();
65+
if (!arrowFormatWriter.write(internalRow)) {
66+
throw new RuntimeException("Exception happens while write to orc file");
67+
}
68+
}
69+
}
70+
71+
@Override
72+
public void writeBundle(BundleRecords bundleRecords) throws IOException {
73+
if (bundleRecords instanceof ArrowBundleRecords) {
74+
add(((ArrowBundleRecords) bundleRecords).getVectorSchemaRoot());
75+
} else {
76+
for (InternalRow row : bundleRecords) {
77+
addElement(row);
78+
}
79+
}
80+
}
81+
82+
public void add(VectorSchemaRoot vsr) {
83+
long t1 = System.currentTimeMillis();
84+
BufferAllocator bufferAllocator = vsr.getVector(0).getAllocator();
85+
try (ArrowArray array = ArrowArray.allocateNew(bufferAllocator);
86+
ArrowSchema schema = ArrowSchema.allocateNew(bufferAllocator)) {
87+
ArrowCStruct struct =
88+
ArrowUtils.serializeToCStruct(vsr, array, schema, bufferAllocator);
89+
long t2 = System.currentTimeMillis();
90+
serializeCost += (t2 - t1);
91+
this.nativeWriter.writeIpcBytes(struct.arrayAddress(), struct.schemaAddress());
92+
array.release();
93+
schema.release();
94+
jniCost += (System.currentTimeMillis() - t2);
95+
} catch (RuntimeException e) {
96+
LOG.error("Exception happens while add vsr", e);
97+
throw e;
98+
}
99+
}
100+
101+
@Override
102+
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
103+
return suggestedCheck && (underlyingStream.getPos() > targetSize);
104+
}
105+
106+
@Override
107+
public void close() throws IOException {
108+
flush();
109+
System.out.println("Serialize vsr cost: " + serializeCost + "ms");
110+
System.out.println("Jni cost: " + jniCost + "ms");
111+
this.nativeWriter.close();
112+
this.arrowFormatWriter.close();
113+
}
114+
115+
private void flush() {
116+
try {
117+
if (!arrowFormatWriter.empty()) {
118+
long t1 = System.currentTimeMillis();
119+
arrowFormatWriter.flush();
120+
ArrowCStruct struct = arrowFormatWriter.toCStruct();
121+
long t2 = System.currentTimeMillis();
122+
serializeCost += (t2 - t1);
123+
this.nativeWriter.writeIpcBytes(struct.arrayAddress(), struct.schemaAddress());
124+
jniCost += (System.currentTimeMillis() - t2);
125+
arrowFormatWriter.reset();
126+
}
127+
} catch (RuntimeException e) {
128+
LOG.error("Exception happens while flush to file", e);
129+
throw e;
130+
}
131+
}
132+
133+
public NativeWriter getNativeWriter() {
134+
return nativeWriter;
135+
}
136+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.arrow.writer;
20+
21+
/** Native writer definition. */
22+
public abstract class NativeWriter {
23+
24+
private long nativeWriterPtr;
25+
26+
public abstract long nativeMemoryUsed();
27+
28+
public abstract void writeIpcBytes(long arrayAddress, long schemaAddress);
29+
30+
public abstract void close();
31+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
import java.io.File;
22+
import java.io.FileOutputStream;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.security.AccessController;
26+
import java.security.PrivilegedAction;
27+
import java.util.logging.Logger;
28+
29+
/**
30+
* Utils for loading jni. The jni file is placed at {@code /osName/osArch/jniName.libExtension}.
31+
* User should pass jniName through {@code load}.
32+
*/
33+
public class JNIUtils {
34+
35+
private static final Logger LOG = Logger.getLogger(JNIUtils.class.getName());
36+
37+
private static boolean inited = false;
38+
39+
private static String osName() {
40+
String osName = System.getProperty("os.name").toLowerCase().replace(' ', '_');
41+
if (osName.startsWith("win")) {
42+
return "win";
43+
} else {
44+
return osName.startsWith("mac") ? "darwin" : osName;
45+
}
46+
}
47+
48+
private static String osArch() {
49+
return System.getProperty("os.arch");
50+
}
51+
52+
private static String libExtension() {
53+
if (!osName().contains("os_x") && !osName().contains("darwin")) {
54+
return osName().contains("win") ? "dll" : "so";
55+
} else {
56+
return "dylib";
57+
}
58+
}
59+
60+
private static String resourceName(String jniName) {
61+
return "/" + osName() + "/" + osArch() + "/" + jniName + "." + libExtension();
62+
}
63+
64+
private static void loadLibraryFile(final String fileName) {
65+
AccessController.doPrivileged(
66+
(PrivilegedAction<Void>)
67+
() -> {
68+
System.load(fileName);
69+
return null;
70+
});
71+
}
72+
73+
public static boolean supportCurrentPlatform(String jniName) {
74+
String jniFileName = resourceName(jniName);
75+
try (InputStream jniFileInput = JNIUtils.class.getResourceAsStream(jniFileName)) {
76+
LOG.info("Try to load " + jniFileName + " found jni file: " + (jniFileInput != null));
77+
return jniFileInput != null;
78+
} catch (IOException e) {
79+
LOG.warning("Failed to load " + jniFileName + " due to " + e.getMessage());
80+
// ignore exception
81+
return false;
82+
}
83+
}
84+
85+
public static synchronized void load(String jniName) {
86+
if (!inited) {
87+
String jniFileName = resourceName(jniName);
88+
InputStream jniFileInput = JNIUtils.class.getResourceAsStream(jniFileName);
89+
if (jniFileInput == null) {
90+
throw new UnsatisfiedLinkError(
91+
"Can't find " + jniFileName + " to link arrow file io");
92+
} else {
93+
File tempFile = null;
94+
FileOutputStream fin = null;
95+
96+
try {
97+
tempFile = File.createTempFile(jniName, "." + libExtension());
98+
tempFile.deleteOnExit();
99+
fin = new FileOutputStream(tempFile);
100+
byte[] bytes = new byte[4096];
101+
102+
while (true) {
103+
int bSize = jniFileInput.read(bytes);
104+
if (bSize == -1) {
105+
try {
106+
fin.flush();
107+
fin.close();
108+
fin = null;
109+
} catch (IOException ignored) {
110+
}
111+
112+
loadLibraryFile(tempFile.getAbsolutePath());
113+
inited = true;
114+
return;
115+
}
116+
117+
fin.write(bytes, 0, bSize);
118+
}
119+
} catch (IOException ioException) {
120+
ExceptionInInitializerError exceptionInInitializerError =
121+
new ExceptionInInitializerError(
122+
"Cannot unpack " + jniName + ": " + ioException.getMessage());
123+
exceptionInInitializerError.setStackTrace(ioException.getStackTrace());
124+
throw exceptionInInitializerError;
125+
} finally {
126+
try {
127+
jniFileInput.close();
128+
if (fin != null) {
129+
fin.close();
130+
}
131+
132+
if (tempFile != null && tempFile.exists()) {
133+
tempFile.delete();
134+
}
135+
} catch (IOException ignored) {
136+
}
137+
}
138+
}
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)