Skip to content

Commit 6f214d9

Browse files
feat: respect iox::column_type::field metadata when mapping query
1 parent d098ff7 commit 6f214d9

File tree

8 files changed

+469
-11
lines changed

8 files changed

+469
-11
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 0.10.0 [unreleased]
22

3+
### Features
4+
1. [#197](https://github.com/InfluxCommunity/influxdb3-java/pull/197): Respect iox::column_type::field metadata when mapping query results into values
5+
36
## 0.9.0 [2024-08-12]
47

58
### Features

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.ByteArrayOutputStream;
2525
import java.io.IOException;
26+
import java.math.BigInteger;
2627
import java.nio.charset.StandardCharsets;
2728
import java.util.ArrayList;
2829
import java.util.Collections;
@@ -41,6 +42,7 @@
4142
import io.netty.handler.codec.http.HttpMethod;
4243
import org.apache.arrow.vector.FieldVector;
4344
import org.apache.arrow.vector.VectorSchemaRoot;
45+
import org.apache.arrow.vector.util.Text;
4446

4547
import com.influxdb.v3.client.InfluxDBApiException;
4648
import com.influxdb.v3.client.InfluxDBClient;
@@ -183,16 +185,52 @@ public Stream<Object[]> query(@Nonnull final String query,
183185
return queryData(query, parameters, options)
184186
.flatMap(vector -> {
185187
List<FieldVector> fieldVectors = vector.getFieldVectors();
186-
return IntStream
187-
.range(0, vector.getRowCount())
188-
.mapToObj(rowNumber -> {
189-
190-
ArrayList<Object> row = new ArrayList<>();
191-
for (FieldVector fieldVector : fieldVectors) {
192-
row.add(fieldVector.getObject(rowNumber));
193-
}
194-
return row.toArray();
195-
});
188+
return IntStream.range(0, vector.getRowCount())
189+
.mapToObj(rowNumber -> {
190+
ArrayList<Object> row = new ArrayList<>();
191+
for (int i = 0; i < fieldVectors.size(); i++) {
192+
var schema = vector.getSchema().getFields().get(i);
193+
var metaType = schema.getMetadata().get("iox::column::type");
194+
String valueType = metaType != null ? metaType.split("::")[2] : null;
195+
196+
if ("field".equals(valueType)) {
197+
switch (metaType) {
198+
case "iox::column_type::field::integer":
199+
case "iox::column_type::field::uinteger":
200+
var intValue = (Long) fieldVectors.get(i)
201+
.getObject(rowNumber);
202+
row.add(intValue);
203+
break;
204+
case "iox::column_type::field::float":
205+
var doubleValue = (Double) fieldVectors.get(i)
206+
.getObject(rowNumber);
207+
row.add(doubleValue);
208+
break;
209+
case "iox::column_type::field::string":
210+
var textValue = (Text) fieldVectors.get(i)
211+
.getObject(rowNumber);
212+
row.add(textValue.toString());
213+
break;
214+
case "iox::column_type::field::boolean":
215+
var boolValue = (Boolean) fieldVectors.get(i)
216+
.getObject(rowNumber);
217+
row.add(boolValue);
218+
break;
219+
default:
220+
}
221+
} else if ("timestamp".equals(valueType)
222+
|| Objects.equals(schema.getName(), "time")) {
223+
var timestamp = fieldVectors.get(i).getObject(rowNumber);
224+
BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema);
225+
row.add(time);
226+
} else {
227+
Object value = fieldVectors.get(i).getObject(rowNumber);
228+
row.add(value);
229+
}
230+
}
231+
232+
return row.toArray();
233+
});
196234
});
197235
}
198236

src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@
2424
import java.math.BigDecimal;
2525
import java.math.BigInteger;
2626
import java.time.Instant;
27+
import java.time.LocalDateTime;
28+
import java.time.ZoneOffset;
2729
import java.util.HashMap;
2830
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
2932
import java.util.function.Function;
33+
import javax.annotation.Nonnull;
3034
import javax.annotation.Nullable;
3135

36+
import org.apache.arrow.vector.types.pojo.ArrowType;
37+
import org.apache.arrow.vector.types.pojo.Field;
38+
3239
import com.influxdb.v3.client.write.WritePrecision;
3340

3441
import static java.util.function.Function.identity;
@@ -111,4 +118,49 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre
111118

112119
return FROM_NANOS.get(precision).apply(nanos);
113120
}
121+
122+
public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field schema) {
123+
BigInteger result = null;
124+
125+
if (value instanceof Long) {
126+
if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) {
127+
ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType();
128+
TimeUnit timeUnit;
129+
switch (type.getUnit()) {
130+
case SECOND:
131+
timeUnit = TimeUnit.SECONDS;
132+
break;
133+
case MILLISECOND:
134+
timeUnit = TimeUnit.MILLISECONDS;
135+
break;
136+
case MICROSECOND:
137+
timeUnit = TimeUnit.MICROSECONDS;
138+
break;
139+
default:
140+
case NANOSECOND:
141+
timeUnit = TimeUnit.NANOSECONDS;
142+
break;
143+
}
144+
long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit);
145+
Instant instant = Instant.ofEpochSecond(0, nanoseconds);
146+
result = convertInstantToNano(instant, WritePrecision.NS);
147+
} else {
148+
Instant instant = Instant.ofEpochMilli((Long) value);
149+
result = convertInstantToNano(instant, WritePrecision.NS);
150+
}
151+
} else if (value instanceof LocalDateTime) {
152+
Instant instant = ((LocalDateTime) value).toInstant(ZoneOffset.UTC);
153+
result = convertInstantToNano(instant, WritePrecision.NS);
154+
}
155+
return result;
156+
}
157+
158+
private static BigInteger convertInstantToNano(final Instant instant, final WritePrecision precision) {
159+
var writePrecision = WritePrecision.NS;
160+
if (precision != null) {
161+
writePrecision = precision;
162+
}
163+
BigInteger convertedTime = NanosecondConverter.convert(instant, writePrecision);
164+
return NanosecondConverter.convertToNanos(convertedTime, writePrecision);
165+
}
114166
}

src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ PointValues toPointValues(final int rowNumber,
9595
String valueType = parts[2];
9696

9797
if ("field".equals(valueType)) {
98-
p.setField(name, value);
98+
setFieldWithMetaType(p, name, value, metaType);
9999
} else if ("tag".equals(valueType) && value instanceof String) {
100100
p.setTag(name, (String) value);
101101
} else if ("timestamp".equals(valueType)) {
@@ -105,6 +105,32 @@ PointValues toPointValues(final int rowNumber,
105105
return p;
106106
}
107107

108+
private void setFieldWithMetaType(final PointValues p,
109+
final String name,
110+
final Object value,
111+
final String metaType) {
112+
if (value == null) {
113+
return;
114+
}
115+
116+
switch (metaType) {
117+
case "iox::column_type::field::integer":
118+
case "iox::column_type::field::uinteger":
119+
p.setIntegerField(name, (Long) value);
120+
break;
121+
case "iox::column_type::field::float":
122+
p.setFloatField(name, (Double) value);
123+
break;
124+
case "iox::column_type::field::string":
125+
p.setStringField(name, (String) value);
126+
break;
127+
case "iox::column_type::field::boolean":
128+
p.setBooleanField(name, (Boolean) value);
129+
break;
130+
default:
131+
}
132+
}
133+
108134
private void setTimestamp(@Nonnull final Object value,
109135
@Nonnull final Field schema,
110136
@Nonnull final PointValues pointValues) {

src/main/java/main.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
import com.influxdb.v3.client.InfluxDBClient;
23+
import com.influxdb.v3.client.PointValues;
24+
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
30+
31+
public class main {
32+
public static void main(String[] args) {
33+
34+
String host = "https://us-east-1-1.aws.cloud2.influxdata.com";
35+
char[] token = "xWh3VQCb3pMJPw7T2lnEwFLXO-pb4OWzfNN76UTpmRKtlg83yJlz6maLC3AL0B6M6gMWWZY2QApzSdEeEopWlQ==".toCharArray();
36+
String database = "admin";
37+
38+
// List<PointValues> arrayList = new ArrayList<>();
39+
try (InfluxDBClient client = InfluxDBClient.getInstance(host, token, database)) {
40+
// arrayList = client.queryPoints("SELECT * FROM host2")
41+
// .collect(Collectors.toList());
42+
// System.out.println("arrayList = " + arrayList);
43+
Stream<Object[]> query = client.query("SELECT * FROM host2");
44+
query.forEach(row -> {
45+
System.out.println("row = " + Arrays.toString(row));
46+
});
47+
client.close();
48+
} catch (Exception e) {
49+
throw new RuntimeException(e);
50+
}
51+
}
52+
}

src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,19 @@
2121
*/
2222
package com.influxdb.v3.client;
2323

24+
import java.math.BigInteger;
25+
import java.time.Instant;
2426
import java.util.Map;
2527
import java.util.Properties;
28+
import java.util.UUID;
29+
import java.util.stream.Stream;
2630

2731
import org.assertj.core.api.Assertions;
2832
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
34+
35+
import com.influxdb.v3.client.write.WriteOptions;
36+
import com.influxdb.v3.client.write.WritePrecision;
2937

3038
class InfluxDBClientTest {
3139

@@ -116,4 +124,53 @@ public void unsupportedQueryParams() throws Exception {
116124
+ "class com.influxdb.v3.client.internal.InfluxDBClientImpl");
117125
}
118126
}
127+
128+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
129+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
130+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
131+
@Test
132+
public void testQuery() throws Exception {
133+
try (InfluxDBClient client = InfluxDBClient.getInstance(
134+
System.getenv("TESTING_INFLUXDB_URL"),
135+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
136+
System.getenv("TESTING_INFLUXDB_DATABASE"),
137+
null)) {
138+
String uuid = UUID.randomUUID().toString();
139+
long timestamp = Instant.now().getEpochSecond();
140+
String record = String.format(
141+
"host10,tag=empty "
142+
+ "name=\"intel\","
143+
+ "mem_total=2048,"
144+
+ "disk_free=100i,"
145+
+ "temperature=100.86,"
146+
+ "isActive=true,"
147+
+ "testId=\"%s\" %d",
148+
uuid,
149+
timestamp
150+
);
151+
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));
152+
153+
Map<String, Object> parameters = Map.of("testId", uuid);
154+
String sql = "Select * from host10 where \"testId\"=$testId";
155+
try (Stream<Object[]> stream = client.query(sql, parameters)) {
156+
stream.findFirst()
157+
.ifPresent(objects -> {
158+
Assertions.assertThat(objects[0].getClass()).isEqualTo(Long.class);
159+
Assertions.assertThat(objects[0]).isEqualTo(100L);
160+
161+
Assertions.assertThat(objects[1].getClass()).isEqualTo(Boolean.class);
162+
Assertions.assertThat(objects[1]).isEqualTo(true);
163+
164+
Assertions.assertThat(objects[2].getClass()).isEqualTo(Double.class);
165+
Assertions.assertThat(objects[2]).isEqualTo(2048.0);
166+
167+
Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class);
168+
Assertions.assertThat(objects[3]).isEqualTo("intel");
169+
170+
Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class);
171+
Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
172+
});
173+
}
174+
}
175+
}
119176
}

0 commit comments

Comments
 (0)