Skip to content

Commit 43d9387

Browse files
wip
1 parent 051a47e commit 43d9387

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.ByteArrayOutputStream;
2525
import java.io.IOException;
26+
import java.math.BigInteger;
2627
import java.nio.charset.StandardCharsets;
2728
import java.util.ArrayList;
2829
import java.util.Collections;
@@ -183,16 +184,45 @@ public Stream<Object[]> query(@Nonnull final String query,
183184
return queryData(query, parameters, options)
184185
.flatMap(vector -> {
185186
List<FieldVector> fieldVectors = vector.getFieldVectors();
186-
return IntStream
187-
.range(0, vector.getRowCount())
188-
.mapToObj(rowNumber -> {
189-
190-
ArrayList<Object> row = new ArrayList<>();
191-
for (FieldVector fieldVector : fieldVectors) {
192-
row.add(fieldVector.getObject(rowNumber));
193-
}
194-
return row.toArray();
195-
});
187+
return IntStream.range(0, vector.getRowCount())
188+
.mapToObj(rowNumber -> {
189+
ArrayList<Object> row = new ArrayList<>();
190+
for (int i = 0; i < fieldVectors.size(); i++) {
191+
var schema = vector.getSchema().getFields().get(i);
192+
var metaType = schema.getMetadata().get("iox::column::type");
193+
String valueType = metaType.split("::")[2];
194+
if ("field".equals(valueType)) {
195+
switch (metaType) {
196+
case "iox::column_type::field::integer":
197+
case "iox::column_type::field::uinteger":
198+
var intValue = (Long) fieldVectors.get(i).getObject(rowNumber);
199+
row.add(intValue);
200+
break;
201+
case "iox::column_type::field::float":
202+
var doubleValue = (Double) fieldVectors.get(i).getObject(rowNumber);
203+
row.add(doubleValue);
204+
break;
205+
case "iox::column_type::field::string":
206+
var stringValue = (String) fieldVectors.get(i).getObject(rowNumber);
207+
row.add(stringValue);
208+
break;
209+
case "iox::column_type::field::boolean":
210+
var boolValue = (Boolean) fieldVectors.get(i).getObject(rowNumber);
211+
row.add(boolValue);
212+
break;
213+
default:
214+
}
215+
} else if ("timestamp".equals(valueType)) {
216+
var timestamp = fieldVectors.get(i).getObject(rowNumber);
217+
BigInteger time = NanosecondConverter.getTimestamp(timestamp, schema);
218+
row.add(time);
219+
} else {
220+
row.add(fieldVectors.get(i).getObject(rowNumber));
221+
}
222+
}
223+
224+
return row.toArray();
225+
});
196226
});
197227
}
198228

src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,18 @@
2424
import java.math.BigDecimal;
2525
import java.math.BigInteger;
2626
import java.time.Instant;
27+
import java.time.LocalDateTime;
28+
import java.time.ZoneOffset;
2729
import java.util.HashMap;
2830
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
2932
import java.util.function.Function;
33+
import javax.annotation.Nonnull;
3034
import javax.annotation.Nullable;
3135

3236
import com.influxdb.v3.client.write.WritePrecision;
37+
import org.apache.arrow.vector.types.pojo.ArrowType;
38+
import org.apache.arrow.vector.types.pojo.Field;
3339

3440
import static java.util.function.Function.identity;
3541

@@ -111,4 +117,40 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre
111117

112118
return FROM_NANOS.get(precision).apply(nanos);
113119
}
120+
121+
public static BigInteger getTimestamp(@Nonnull final Object value, @Nonnull final Field schema) {
122+
BigInteger result = null;
123+
124+
if (value instanceof Long) {
125+
if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) {
126+
ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType();
127+
TimeUnit timeUnit;
128+
switch (type.getUnit()) {
129+
case SECOND:
130+
timeUnit = TimeUnit.SECONDS;
131+
break;
132+
case MILLISECOND:
133+
timeUnit = TimeUnit.MILLISECONDS;
134+
break;
135+
case MICROSECOND:
136+
timeUnit = TimeUnit.MICROSECONDS;
137+
break;
138+
default:
139+
case NANOSECOND:
140+
timeUnit = TimeUnit.NANOSECONDS;
141+
break;
142+
}
143+
long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit);
144+
BigInteger convertedTime = NanosecondConverter.convert(Instant.ofEpochSecond(0, nanoseconds), WritePrecision.NS);
145+
result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS);
146+
} else {
147+
BigInteger convertedTime = NanosecondConverter.convert(Instant.ofEpochMilli((Long) value), WritePrecision.NS);
148+
result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS);
149+
}
150+
} else if (value instanceof LocalDateTime) {
151+
BigInteger convertedTime = NanosecondConverter.convert(((LocalDateTime) value).toInstant(ZoneOffset.UTC), WritePrecision.NS);
152+
result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS);
153+
}
154+
return result;
155+
}
114156
}

0 commit comments

Comments
 (0)