Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.sql.SQLException;
import java.util.Calendar;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand All @@ -36,19 +36,28 @@
import org.apache.calcite.avatica.util.AbstractCursor;
import org.apache.calcite.avatica.util.ArrayImpl;

@RequiredArgsConstructor
@Slf4j
class ArrowStreamReaderCursor extends AbstractCursor {

private static final int INIT_ROW_NUMBER = -1;

private final ArrowStreamReader reader;
private final Properties connectionProperties;

@lombok.Getter
private int rowsSeen = 0;

private final AtomicInteger currentIndex = new AtomicInteger(INIT_ROW_NUMBER);

public ArrowStreamReaderCursor(ArrowStreamReader reader) {
this(reader, null);
}

public ArrowStreamReaderCursor(ArrowStreamReader reader, Properties connectionProperties) {
this.reader = reader;
this.connectionProperties = connectionProperties;
}

private void wasNullConsumer(boolean wasNull) {
this.wasNull[0] = wasNull;
}
Expand All @@ -68,7 +77,8 @@ public List<Accessor> createAccessors(
}

private Accessor createAccessor(FieldVector vector) throws SQLException {
return QueryJDBCAccessorFactory.createAccessor(vector, currentIndex::get, this::wasNullConsumer);
return QueryJDBCAccessorFactory.createAccessor(
vector, currentIndex::get, this::wasNullConsumer, connectionProperties);
}

private boolean loadNextBatch() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public boolean executeAsyncQuery() throws SQLException {
val queryTimeout = QueryTimeout.of(
statementProperties.getQueryTimeout(), statementProperties.getQueryTimeoutLocalEnforcementDelay());
val client = getQueryExecutor(queryTimeout);
listener = AsyncQueryStatusListener.of(sql, client, queryTimeout);
val connectionProperties = connection.getConnectionProperties().toProperties();
listener = AsyncQueryStatusListener.of(sql, client, queryTimeout, connectionProperties);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ private DataCloudResultSet executeAdaptiveQuery(String sql) throws SQLException
val queryTimeout = QueryTimeout.of(
statementProperties.getQueryTimeout(), statementProperties.getQueryTimeoutLocalEnforcementDelay());
val client = getQueryExecutor(queryTimeout);
val connectionProperties = connection.getConnectionProperties().toProperties();
listener = targetMaxRows > 0
? AdaptiveQueryStatusListener.of(sql, client, queryTimeout, targetMaxRows, targetMaxBytes)
: AdaptiveQueryStatusListener.of(sql, client, queryTimeout);
? AdaptiveQueryStatusListener.of(
sql, client, queryTimeout, targetMaxRows, targetMaxBytes, connectionProperties)
: AdaptiveQueryStatusListener.of(sql, client, queryTimeout, connectionProperties);

resultSet = listener.generateResultSet();
log.info("executeAdaptiveQuery completed. queryId={}", listener.getQueryId());
Expand All @@ -127,7 +129,8 @@ public DataCloudStatement executeAsyncQuery(String sql) throws SQLException {
val queryTimeout = QueryTimeout.of(
statementProperties.getQueryTimeout(), statementProperties.getQueryTimeoutLocalEnforcementDelay());
val client = getQueryExecutor(queryTimeout);
listener = AsyncQueryStatusListener.of(sql, client, queryTimeout);
val connectionProperties = connection.getConnectionProperties().toProperties();
listener = AsyncQueryStatusListener.of(sql, client, queryTimeout, connectionProperties);
log.info("executeAsyncQuery completed. queryId={}", listener.getQueryId());
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.TimeZone;
import java.util.stream.Stream;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -68,17 +69,29 @@ private StreamingResultSet(
@SneakyThrows
public static StreamingResultSet of(
String queryId, HyperGrpcClientExecutor client, Iterator<QueryResult> iterator) {
return of(queryId, client, iterator, null);
}

@SneakyThrows
public static StreamingResultSet of(
String queryId,
HyperGrpcClientExecutor client,
Iterator<QueryResult> iterator,
Properties connectionProperties) {
try {
val channel = ExecuteQueryResponseChannel.of(StreamUtilities.toStream(iterator));
val reader = new ArrowStreamReader(channel, new RootAllocator(ROOT_ALLOCATOR_MB_FROM_V2));
val schemaRoot = reader.getVectorSchemaRoot();
val columns = toColumnMetaData(schemaRoot.getSchema().getFields());
val timezone = TimeZone.getDefault();

// Use session timezone from connection properties instead of system default
val timezone = getSessionTimeZone(connectionProperties);

val state = new QueryState();
val signature = new Meta.Signature(
columns, null, Collections.emptyList(), Collections.emptyMap(), null, Meta.StatementType.SELECT);
val metadata = new AvaticaResultSetMetaData(null, null, signature);
val cursor = new ArrowStreamReaderCursor(reader);
val cursor = new ArrowStreamReaderCursor(reader, connectionProperties);
val result =
new StreamingResultSet(client, cursor, queryId, null, state, signature, metadata, timezone, null);
result.execute2(cursor, columns);
Expand Down Expand Up @@ -124,4 +137,28 @@ public int getFetchDirection() {
public int getRow() {
return cursor.getRowsSeen();
}

/**
* Gets the session timezone from connection properties.
*
* @param connectionProperties Connection properties, may be null
* @return Session timezone, falls back to system default if not specified
*/
private static TimeZone getSessionTimeZone(Properties connectionProperties) {
if (connectionProperties == null) {
return TimeZone.getDefault();
}

String timezoneProp = connectionProperties.getProperty("querySetting.timezone");
if (timezoneProp == null || timezoneProp.trim().isEmpty()) {
return TimeZone.getDefault();
}

try {
return TimeZone.getTimeZone(timezoneProp);
} catch (Exception e) {
// If timezone parsing fails, fall back to default
return TimeZone.getDefault();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.salesforce.datacloud.jdbc.core.accessor.impl.TimeVectorAccessor;
import com.salesforce.datacloud.jdbc.core.accessor.impl.VarCharVectorAccessor;
import java.sql.SQLException;
import java.util.Properties;
import java.util.function.IntSupplier;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
Expand Down Expand Up @@ -71,6 +72,15 @@ public interface WasNullConsumer {
public static QueryJDBCAccessor createAccessor(
ValueVector vector, IntSupplier getCurrentRow, QueryJDBCAccessorFactory.WasNullConsumer wasNullConsumer)
throws SQLException {
return createAccessor(vector, getCurrentRow, wasNullConsumer, null);
}

public static QueryJDBCAccessor createAccessor(
ValueVector vector,
IntSupplier getCurrentRow,
QueryJDBCAccessorFactory.WasNullConsumer wasNullConsumer,
Properties connectionProperties)
throws SQLException {
Types.MinorType arrowType =
Types.getMinorTypeForArrowType(vector.getField().getType());
if (arrowType.equals(Types.MinorType.VARCHAR)) {
Expand Down Expand Up @@ -114,21 +124,29 @@ public static QueryJDBCAccessor createAccessor(
} else if (arrowType.equals(Types.MinorType.TIMESEC)) {
return new TimeVectorAccessor((TimeSecVector) vector, getCurrentRow, wasNullConsumer);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPSECTZ)) {
return new TimeStampVectorAccessor((TimeStampSecTZVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampSecTZVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPSEC)) {
return new TimeStampVectorAccessor((TimeStampSecVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampSecVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPMILLITZ)) {
return new TimeStampVectorAccessor((TimeStampMilliTZVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampMilliTZVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPMILLI)) {
return new TimeStampVectorAccessor((TimeStampMilliVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampMilliVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPMICROTZ)) {
return new TimeStampVectorAccessor((TimeStampMicroTZVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampMicroTZVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPMICRO)) {
return new TimeStampVectorAccessor((TimeStampMicroVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampMicroVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPNANOTZ)) {
return new TimeStampVectorAccessor((TimeStampNanoTZVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampNanoTZVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.TIMESTAMPNANO)) {
return new TimeStampVectorAccessor((TimeStampNanoVector) vector, getCurrentRow, wasNullConsumer);
return new TimeStampVectorAccessor(
(TimeStampNanoVector) vector, getCurrentRow, wasNullConsumer, connectionProperties);
} else if (arrowType.equals(Types.MinorType.LIST)) {
return new ListVectorAccessor((ListVector) vector, getCurrentRow, wasNullConsumer);
} else if (arrowType.equals(Types.MinorType.LARGELIST)) {
Expand Down
Loading
Loading