Skip to content

Commit eebc009

Browse files
committed
moving arrow tests around
1 parent 7829ff7 commit eebc009

File tree

4 files changed

+552
-485
lines changed

4 files changed

+552
-485
lines changed

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,6 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi
424424

425425
is StructVector -> vector.values(range)
426426
.withTypeNullable(field.isNullable, nullability)
427-
.also { infer = Infer.Type }
428427

429428
is ListVector -> vector.values(range)
430429
.withTypeNullable(field.isNullable, nullability)
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package org.jetbrains.kotlinx.dataframe.io
2+
3+
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo
4+
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder
5+
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils
6+
import org.apache.arrow.adbc.core.AdbcDriver
7+
import org.apache.arrow.adbc.driver.jdbc.JdbcConnection
8+
import org.apache.arrow.adbc.driver.jdbc.JdbcDriver
9+
import org.apache.arrow.adbc.driver.jdbc.JdbcQuirks
10+
import org.apache.arrow.memory.RootAllocator
11+
import org.apache.arrow.vector.types.DateUnit
12+
import org.apache.arrow.vector.types.pojo.ArrowType
13+
import org.intellij.lang.annotations.Language
14+
import org.jetbrains.kotlinx.dataframe.DataFrame
15+
import org.jetbrains.kotlinx.dataframe.api.print
16+
import org.junit.Ignore
17+
import org.junit.Test
18+
import java.math.BigDecimal
19+
import java.sql.Connection
20+
import java.sql.Date
21+
import java.sql.Time
22+
import java.sql.Timestamp
23+
import java.sql.Types
24+
import java.util.UUID
25+
import kotlin.reflect.full.memberProperties
26+
import kotlin.reflect.jvm.isAccessible
27+
28+
class ArrowAdbcTest {
29+
/**
30+
* We can connect to JDBC databases from arrow using [ADBC](https://arrow.apache.org/adbc/current/driver/jdbc.html).
31+
*/
32+
@Test
33+
fun `JDBC integration H2 MySQL`() {
34+
val url = "jdbc:h2:mem:test5;DB_CLOSE_DELAY=-1;MODE=MySQL;DATABASE_TO_UPPER=false"
35+
36+
val db = JdbcDriver(RootAllocator())
37+
.open(
38+
buildMap {
39+
AdbcDriver.PARAM_URI.set(this, url)
40+
},
41+
)
42+
43+
val df = db.connect().use { connection ->
44+
// Create table Customer
45+
@Language("SQL")
46+
val createCustomerTableQuery = """
47+
CREATE TABLE Customer (
48+
id INT PRIMARY KEY,
49+
name VARCHAR(50),
50+
age INT
51+
)
52+
"""
53+
connection.createStatement().apply { setSqlQuery(createCustomerTableQuery) }.executeUpdate()
54+
55+
// Create table Sale
56+
@Language("SQL")
57+
val createSaleTableQuery = """
58+
CREATE TABLE Sale (
59+
id INT PRIMARY KEY,
60+
customerId INT,
61+
amount DECIMAL(10, 2) NOT NULL
62+
)
63+
"""
64+
connection.createStatement().apply { setSqlQuery(createSaleTableQuery) }.executeUpdate()
65+
66+
// add data to the Customer table
67+
listOf(
68+
"INSERT INTO Customer (id, name, age) VALUES (1, 'John', 40)",
69+
"INSERT INTO Customer (id, name, age) VALUES (2, 'Alice', 25)",
70+
"INSERT INTO Customer (id, name, age) VALUES (3, 'Bob', 47)",
71+
"INSERT INTO Customer (id, name, age) VALUES (4, NULL, NULL)",
72+
).forEach {
73+
connection.createStatement().apply { setSqlQuery(it) }.executeUpdate()
74+
}
75+
76+
// add data to the Sale table
77+
listOf(
78+
"INSERT INTO Sale (id, customerId, amount) VALUES (1, 1, 100.50)",
79+
"INSERT INTO Sale (id, customerId, amount) VALUES (2, 2, 50.00)",
80+
"INSERT INTO Sale (id, customerId, amount) VALUES (3, 1, 75.25)",
81+
"INSERT INTO Sale (id, customerId, amount) VALUES (4, 3, 35.15)",
82+
).forEach {
83+
connection.createStatement().apply { setSqlQuery(it) }.executeUpdate()
84+
}
85+
86+
val query = connection.createStatement().apply {
87+
setSqlQuery("SELECT * FROM Customer")
88+
}.executeQuery()
89+
90+
DataFrame.readArrow(query.reader)
91+
}
92+
93+
df.print(borders = true, columnTypes = true)
94+
}
95+
96+
/**
97+
* We can connect to JDBC databases from arrow using [ADBC](https://arrow.apache.org/adbc/current/driver/jdbc.html).
98+
* TODO hard to define calendar stuff
99+
*/
100+
@Test
101+
@Ignore
102+
fun `JDBC integration H2 PostgreSQL`() {
103+
val url =
104+
"jdbc:h2:mem:test3;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH"
105+
106+
val config = JdbcToArrowConfigBuilder()
107+
.setArraySubTypeByColumnNameMap(
108+
mapOf(
109+
"dateArrayCol" to JdbcFieldInfo(Types.ARRAY),
110+
),
111+
).build()
112+
113+
val quirks = JdbcQuirks.builder("h2")
114+
.typeConverter {
115+
if (it.jdbcType == Types.ARRAY) {
116+
ArrowType.Date(DateUnit.DAY)
117+
} else {
118+
JdbcToArrowUtils.getArrowTypeFromJdbcType(it.fieldInfo, null)
119+
}
120+
}
121+
.build()
122+
123+
val db = JdbcDriver(RootAllocator())
124+
.open(
125+
buildMap {
126+
AdbcDriver.PARAM_URI.set(this, url)
127+
put(JdbcDriver.PARAM_JDBC_QUIRKS, quirks)
128+
},
129+
)
130+
131+
val df = db.connect().use { connection ->
132+
133+
@Language("SQL")
134+
val createTableStatement =
135+
"""
136+
CREATE TABLE IF NOT EXISTS table1 (
137+
id serial PRIMARY KEY,
138+
bigintCol bigint not null,
139+
smallintCol smallint not null,
140+
bigserialCol bigserial not null,
141+
booleanCol boolean not null,
142+
byteaCol bytea not null,
143+
characterCol character not null,
144+
characterNCol character(10) not null,
145+
charCol char not null,
146+
dateCol date not null,
147+
doubleCol double precision not null,
148+
integerCol integer,
149+
intArrayCol integer array,
150+
doubleArrayCol double precision array,
151+
dateArrayCol date array,
152+
textArrayCol text array,
153+
booleanArrayCol boolean array
154+
)
155+
""".trimIndent()
156+
connection.createStatement().apply { setSqlQuery(createTableStatement) }.executeUpdate()
157+
158+
@Language("SQL")
159+
val createTableQuery =
160+
"""
161+
CREATE TABLE IF NOT EXISTS table2 (
162+
id serial PRIMARY KEY,
163+
moneyCol money not null,
164+
numericCol numeric not null,
165+
realCol real not null,
166+
smallintCol smallint not null,
167+
serialCol serial not null,
168+
textCol text,
169+
timeCol time not null,
170+
timeWithZoneCol time with time zone not null,
171+
timestampCol timestamp not null,
172+
timestampWithZoneCol timestamp with time zone not null,
173+
uuidCol uuid not null
174+
)
175+
""".trimIndent()
176+
connection.createStatement().apply { setSqlQuery(createTableQuery) }.executeUpdate()
177+
178+
@Language("SQL")
179+
val insertData1 =
180+
"""
181+
INSERT INTO table1 (
182+
bigintCol, smallintCol, bigserialCol, booleanCol,
183+
byteaCol, characterCol, characterNCol, charCol,
184+
dateCol, doubleCol,
185+
integerCol, intArrayCol,
186+
doubleArrayCol, dateArrayCol, textArrayCol, booleanArrayCol
187+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
188+
""".trimIndent()
189+
190+
@Language("SQL")
191+
val insertData2 =
192+
"""
193+
INSERT INTO table2 (
194+
moneyCol, numericCol,
195+
realCol, smallintCol,
196+
serialCol, textCol, timeCol,
197+
timeWithZoneCol, timestampCol, timestampWithZoneCol,
198+
uuidCol
199+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
200+
""".trimIndent()
201+
202+
// temporary workaround to create arrays
203+
val jdbcConnection = JdbcConnection::class
204+
.memberProperties
205+
.find { it.name == "connection" }!!
206+
.also { it.isAccessible = true }
207+
.get(connection as JdbcConnection) as Connection
208+
209+
val intArray = jdbcConnection.createArrayOf("INTEGER", arrayOf(1, 2, 3))
210+
val doubleArray = jdbcConnection.createArrayOf("DOUBLE", arrayOf(1.1, 2.2, 3.3))
211+
val dateArray = jdbcConnection.createArrayOf(
212+
"DATE",
213+
arrayOf(Date.valueOf("2023-08-01"), Date.valueOf("2023-08-02")),
214+
)
215+
val textArray = jdbcConnection.createArrayOf("TEXT", arrayOf("Hello", "World"))
216+
val booleanArray = jdbcConnection.createArrayOf("BOOLEAN", arrayOf(true, false, true))
217+
218+
jdbcConnection.prepareStatement(insertData1).use {
219+
for (i in 1..3) {
220+
it.setLong(1, i * 1000L)
221+
it.setShort(2, 11.toShort())
222+
it.setLong(3, 1000000000L + i)
223+
it.setBoolean(4, i % 2 == 1)
224+
it.setBytes(5, byteArrayOf(1, 2, 3))
225+
it.setString(6, "A")
226+
it.setString(7, "Hello")
227+
it.setString(8, "A")
228+
it.setDate(9, Date.valueOf("2023-08-01"))
229+
it.setDouble(10, 12.34)
230+
it.setInt(11, 12345 * i)
231+
it.setArray(12, intArray)
232+
it.setArray(13, doubleArray)
233+
it.setArray(14, dateArray)
234+
it.setArray(15, textArray)
235+
it.setArray(16, booleanArray)
236+
it.executeUpdate()
237+
}
238+
}
239+
240+
jdbcConnection.prepareStatement(insertData2).use {
241+
// Insert data into table2
242+
for (i in 1..3) {
243+
it.setBigDecimal(1, BigDecimal("123.45"))
244+
it.setBigDecimal(2, BigDecimal("12.34"))
245+
it.setFloat(3, 12.34f)
246+
it.setInt(4, 1000 + i)
247+
it.setInt(5, 1000000 + i)
248+
it.setString(6, null)
249+
it.setTime(7, Time.valueOf("12:34:56"))
250+
it.setTimestamp(8, Timestamp(System.currentTimeMillis()))
251+
it.setTimestamp(9, Timestamp(System.currentTimeMillis()))
252+
it.setTimestamp(10, Timestamp(System.currentTimeMillis()))
253+
it.setObject(11, UUID.randomUUID(), Types.OTHER)
254+
it.executeUpdate()
255+
}
256+
}
257+
258+
val query = connection.createStatement().apply {
259+
setSqlQuery("SELECT * FROM table1")
260+
}.executeQuery()
261+
262+
DataFrame.readArrow(query.reader)
263+
}
264+
265+
df.print(borders = true, columnTypes = true)
266+
}
267+
}

0 commit comments

Comments
 (0)