Skip to content

Commit 15f0c99

Browse files
committed
Extract SQLExtendedFetch implementation
Fix comments and use `ARROW_UNUSED` Use nullptr explicitly Co-Authored-By: alinalibq <[email protected]>
1 parent 4fc9f9e commit 15f0c99

File tree

4 files changed

+127
-7
lines changed

4 files changed

+127
-7
lines changed

cpp/src/arrow/flight/sql/odbc/odbc_api.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,8 +1066,33 @@ SQLRETURN SQLExtendedFetch(SQLHSTMT stmt, SQLUSMALLINT fetch_orientation,
10661066
<< ", row_count_ptr: " << static_cast<const void*>(row_count_ptr)
10671067
<< ", row_status_array: "
10681068
<< static_cast<const void*>(row_status_array);
1069-
// GH-47714 TODO: Implement SQLExtendedFetch
1070-
return SQL_INVALID_HANDLE;
1069+
1070+
using ODBC::ODBCDescriptor;
1071+
using ODBC::ODBCStatement;
1072+
return ODBCStatement::ExecuteWithDiagnostics(stmt, SQL_ERROR, [=]() {
1073+
// Only SQL_FETCH_NEXT forward-only fetching orientation is supported,
1074+
// meaning the behavior of SQLExtendedFetch is same as SQLFetch.
1075+
if (fetch_orientation != SQL_FETCH_NEXT) {
1076+
throw DriverException("Optional feature not supported.", "HYC00");
1077+
}
1078+
// Ignore fetch_offset as it's not applicable to SQL_FETCH_NEXT
1079+
ARROW_UNUSED(fetch_offset);
1080+
1081+
ODBCStatement* statement = reinterpret_cast<ODBCStatement*>(stmt);
1082+
1083+
// The SQL_ROWSET_SIZE statement attribute specifies the number of rows in the
1084+
// rowset. Retrieve it from GetRowsetSize.
1085+
SQLULEN row_set_size = statement->GetRowsetSize();
1086+
ARROW_LOG(DEBUG) << "SQL_ROWSET_SIZE value for SQLExtendedFetch: " << row_set_size;
1087+
1088+
if (statement->Fetch(static_cast<size_t>(row_set_size), row_count_ptr,
1089+
row_status_array)) {
1090+
return SQL_SUCCESS;
1091+
} else {
1092+
// Reached the end of rowset
1093+
return SQL_NO_DATA;
1094+
}
1095+
});
10711096
}
10721097

10731098
SQLRETURN SQLFetchScroll(SQLHSTMT stmt, SQLSMALLINT fetch_orientation,

cpp/src/arrow/flight/sql/odbc/odbc_impl/odbc_statement.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,8 @@ void ODBCStatement::ExecuteDirect(const std::string& query) {
316316
is_prepared_ = false;
317317
}
318318

319-
bool ODBCStatement::Fetch(size_t rows) {
319+
bool ODBCStatement::Fetch(size_t rows, SQLULEN* row_count_ptr,
320+
SQLUSMALLINT* row_status_array) {
320321
if (has_reached_end_of_result_) {
321322
ird_->SetRowsProcessed(0);
322323
return false;
@@ -349,11 +350,24 @@ bool ODBCStatement::Fetch(size_t rows) {
349350
current_ard_->NotifyBindingsHavePropagated();
350351
}
351352

352-
size_t rows_fetched = current_result_->Move(rows, current_ard_->GetBindOffset(),
353-
current_ard_->GetBoundStructOffset(),
354-
ird_->GetArrayStatusPtr());
353+
uint16_t* array_status_ptr;
354+
if (row_status_array) {
355+
// For SQLExtendedFetch only
356+
array_status_ptr = row_status_array;
357+
} else {
358+
array_status_ptr = ird_->GetArrayStatusPtr();
359+
}
360+
361+
size_t rows_fetched =
362+
current_result_->Move(rows, current_ard_->GetBindOffset(),
363+
current_ard_->GetBoundStructOffset(), array_status_ptr);
355364
ird_->SetRowsProcessed(static_cast<SQLULEN>(rows_fetched));
356365

366+
if (row_count_ptr) {
367+
// For SQLExtendedFetch only
368+
*row_count_ptr = rows_fetched;
369+
}
370+
357371
row_number_ += rows_fetched;
358372
has_reached_end_of_result_ = rows_fetched != rows;
359373
return rows_fetched != 0;

cpp/src/arrow/flight/sql/odbc/odbc_impl/odbc_statement.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ class ODBCStatement : public ODBCHandle<ODBCStatement> {
5959
void ExecuteDirect(const std::string& query);
6060

6161
/// \brief Return true if the number of rows fetch was greater than zero.
62-
bool Fetch(size_t rows);
62+
///
63+
/// row_count_ptr and row_status_array are optional arguments, they are only needed for
64+
/// SQLExtendedFetch
65+
bool Fetch(size_t rows, SQLULEN* row_count_ptr = 0, SQLUSMALLINT* row_status_array = 0);
66+
6367
bool IsPrepared() const;
6468

6569
void GetStmtAttr(SQLINTEGER statement_attribute, SQLPOINTER output,

cpp/src/arrow/flight/sql/odbc/tests/statement_test.cc

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,83 @@ TYPED_TEST(StatementTest, TestSQLBindColIndicatorOnlySQLUnbind) {
18161816
// EXPECT_EQ(1, char_val_ind);
18171817
}
18181818

1819+
TYPED_TEST(StatementTest, TestSQLExtendedFetchRowFetching) {
1820+
// Set SQL_ROWSET_SIZE to fetch 3 rows at once
1821+
1822+
constexpr SQLULEN rows = 3;
1823+
SQLINTEGER val[rows];
1824+
SQLLEN buf_len = sizeof(val);
1825+
SQLLEN ind[rows];
1826+
1827+
// Same variable will be used for column 1, the value of `val`
1828+
// should be updated after every SQLFetch call.
1829+
ASSERT_EQ(SQL_SUCCESS, SQLBindCol(this->stmt, 1, SQL_C_LONG, val, buf_len, ind));
1830+
1831+
ASSERT_EQ(SQL_SUCCESS, SQLSetStmtAttr(this->stmt, SQL_ROWSET_SIZE,
1832+
reinterpret_cast<SQLPOINTER>(rows), 0));
1833+
1834+
std::wstring wsql =
1835+
LR"(
1836+
SELECT 1 AS small_table
1837+
UNION ALL
1838+
SELECT 2
1839+
UNION ALL
1840+
SELECT 3;
1841+
)";
1842+
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
1843+
1844+
ASSERT_EQ(SQL_SUCCESS,
1845+
SQLExecDirect(this->stmt, &sql0[0], static_cast<SQLINTEGER>(sql0.size())));
1846+
1847+
// Fetch row 1-3.
1848+
SQLULEN row_count;
1849+
SQLUSMALLINT row_status[rows];
1850+
1851+
ASSERT_EQ(SQL_SUCCESS,
1852+
SQLExtendedFetch(this->stmt, SQL_FETCH_NEXT, 0, &row_count, row_status));
1853+
EXPECT_EQ(3, row_count);
1854+
1855+
for (int i = 0; i < rows; i++) {
1856+
EXPECT_EQ(SQL_SUCCESS, row_status[i]);
1857+
}
1858+
1859+
// Verify 1 is returned for row 1
1860+
EXPECT_EQ(1, val[0]);
1861+
// Verify 2 is returned for row 2
1862+
EXPECT_EQ(2, val[1]);
1863+
// Verify 3 is returned for row 3
1864+
EXPECT_EQ(3, val[2]);
1865+
1866+
// Verify result set has no more data beyond row 3
1867+
SQLULEN row_count2;
1868+
SQLUSMALLINT row_status2[rows];
1869+
EXPECT_EQ(SQL_NO_DATA,
1870+
SQLExtendedFetch(this->stmt, SQL_FETCH_NEXT, 0, &row_count2, row_status2));
1871+
}
1872+
1873+
TEST_F(StatementRemoteTest, DISABLED_TestSQLExtendedFetchQueryNullIndicator) {
1874+
// GH-47110: SQLExtendedFetch should return SQL_SUCCESS_WITH_INFO for 22002
1875+
// Limitation on mock test server prevents null from working properly, so use remote
1876+
// server instead. Mock server has type `DENSE_UNION` for null column data.
1877+
SQLINTEGER val;
1878+
1879+
ASSERT_EQ(SQL_SUCCESS, SQLBindCol(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
1880+
1881+
std::wstring wsql = L"SELECT null as null_col;";
1882+
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
1883+
1884+
ASSERT_EQ(SQL_SUCCESS,
1885+
SQLExecDirect(this->stmt, &sql0[0], static_cast<SQLINTEGER>(sql0.size())));
1886+
1887+
SQLULEN row_count1;
1888+
SQLUSMALLINT row_status1[1];
1889+
1890+
// SQLExtendedFetch should return SQL_SUCCESS_WITH_INFO for 22002 state
1891+
ASSERT_EQ(SQL_SUCCESS_WITH_INFO,
1892+
SQLExtendedFetch(this->stmt, SQL_FETCH_NEXT, 0, &row_count1, row_status1));
1893+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState22002);
1894+
}
1895+
18191896
TYPED_TEST(StatementTest, TestSQLMoreResultsNoData) {
18201897
// Verify SQLMoreResults returns SQL_NO_DATA by default.
18211898
std::wstring wsql = L"SELECT 1;";

0 commit comments

Comments
 (0)