Skip to content

Commit 482ba06

Browse files
authored
feat: update pgwire to 0.37 (#265)
* feat: update pgwire to 0.37 * chore: update msrv
1 parent c03227b commit 482ba06

File tree

6 files changed

+64
-67
lines changed

6 files changed

+64
-67
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,6 @@ jobs:
9191
- uses: actions/checkout@v4
9292
- uses: actions-rs/toolchain@v1
9393
with:
94-
toolchain: "1.88.0"
94+
toolchain: "1.89"
9595
override: true
9696
- run: cargo build --all-features

Cargo.lock

Lines changed: 12 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ members = ["datafusion-postgres", "datafusion-postgres-cli", "arrow-pg", "datafu
55
[workspace.package]
66
edition = "2021"
77
license = "Apache-2.0"
8-
rust-version = "1.88.0"
8+
rust-version = "1.89"
99
authors = ["Ning Sun <[email protected]>"]
1010
keywords = ["database", "postgresql", "datafusion"]
1111
homepage = "https://github.com/datafusion-contrib/datafusion-postgres/"
@@ -18,7 +18,7 @@ bytes = "1.11.0"
1818
chrono = { version = "0.4", features = ["std"] }
1919
datafusion = { version = "51", default-features = false }
2020
futures = "0.3"
21-
pgwire = { version = "0.36.3", default-features = false }
21+
pgwire = { version = "0.37", default-features = false }
2222
postgres-types = "0.2"
2323
rust_decimal = { version = "1.39", features = ["db-postgres"] }
2424
tokio = { version = "1", default-features = false }

arrow-pg/src/row_encoder.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,31 @@ pub struct RowEncoder {
1717
rb: RecordBatch,
1818
curr_idx: usize,
1919
fields: Arc<Vec<FieldInfo>>,
20+
row_encoder: DataRowEncoder,
2021
}
2122

2223
impl RowEncoder {
2324
pub fn new(rb: RecordBatch, fields: Arc<Vec<FieldInfo>>) -> Self {
2425
assert_eq!(rb.num_columns(), fields.len());
2526
Self {
2627
rb,
27-
fields,
28+
fields: fields.clone(),
2829
curr_idx: 0,
30+
row_encoder: DataRowEncoder::new(fields),
2931
}
3032
}
3133

3234
pub fn next_row(&mut self) -> Option<PgWireResult<DataRow>> {
3335
if self.curr_idx == self.rb.num_rows() {
3436
return None;
3537
}
36-
let mut encoder = DataRowEncoder::new(self.fields.clone());
3738
for col in 0..self.rb.num_columns() {
3839
let array = self.rb.column(col);
3940
let field = &self.fields[col];
4041

41-
encode_value(&mut encoder, array, self.curr_idx, field).unwrap();
42+
encode_value(&mut self.row_encoder, array, self.curr_idx, field).unwrap();
4243
}
4344
self.curr_idx += 1;
44-
Some(encoder.finish())
45+
Some(Ok(self.row_encoder.take_row()))
4546
}
4647
}

datafusion-postgres/src/handlers.rs

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@ use pgwire::api::auth::noop::NoopStartupHandler;
1313
use pgwire::api::auth::StartupHandler;
1414
use pgwire::api::portal::{Format, Portal};
1515
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
16-
use pgwire::api::results::{
17-
DescribePortalResponse, DescribeResponse, DescribeStatementResponse, Response, Tag,
18-
};
16+
use pgwire::api::results::{FieldInfo, Response, Tag};
1917
use pgwire::api::stmt::QueryParser;
20-
use pgwire::api::stmt::StoredStatement;
2118
use pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers, Type};
2219
use pgwire::error::{PgWireError, PgWireResult};
2320
use pgwire::types::format::FormatOptions;
@@ -202,58 +199,6 @@ impl ExtendedQueryHandler for DfSessionService {
202199
self.parser.clone()
203200
}
204201

205-
async fn do_describe_statement<C>(
206-
&self,
207-
_client: &mut C,
208-
target: &StoredStatement<Self::Statement>,
209-
) -> PgWireResult<DescribeStatementResponse>
210-
where
211-
C: ClientInfo + Unpin + Send + Sync,
212-
{
213-
if let (_, Some((_, plan))) = &target.statement {
214-
let schema = plan.schema();
215-
let fields =
216-
arrow_schema_to_pg_fields(schema.as_arrow(), &Format::UnifiedBinary, None)?;
217-
let params = plan
218-
.get_parameter_types()
219-
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
220-
221-
let mut param_types = Vec::with_capacity(params.len());
222-
for param_type in ordered_param_types(&params).iter() {
223-
// Fixed: Use &params
224-
if let Some(datatype) = param_type {
225-
let pgtype = into_pg_type(datatype)?;
226-
param_types.push(pgtype);
227-
} else {
228-
param_types.push(Type::UNKNOWN);
229-
}
230-
}
231-
232-
Ok(DescribeStatementResponse::new(param_types, fields))
233-
} else {
234-
Ok(DescribeStatementResponse::no_data())
235-
}
236-
}
237-
238-
async fn do_describe_portal<C>(
239-
&self,
240-
_client: &mut C,
241-
target: &Portal<Self::Statement>,
242-
) -> PgWireResult<DescribePortalResponse>
243-
where
244-
C: ClientInfo + Unpin + Send + Sync,
245-
{
246-
if let (_, Some((_, plan))) = &target.statement.statement {
247-
let format = &target.result_column_format;
248-
let schema = plan.schema();
249-
let fields = arrow_schema_to_pg_fields(schema.as_arrow(), format, None)?;
250-
251-
Ok(DescribePortalResponse::new(fields))
252-
} else {
253-
Ok(DescribePortalResponse::no_data())
254-
}
255-
}
256-
257202
async fn do_query<C>(
258203
&self,
259204
client: &mut C,
@@ -433,6 +378,48 @@ impl QueryParser for Parser {
433378
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
434379
Ok((query, Some((statement, logical_plan))))
435380
}
381+
382+
fn get_parameter_types(&self, stmt: &Self::Statement) -> PgWireResult<Vec<Type>> {
383+
if let (_, Some((_, plan))) = stmt {
384+
let params = plan
385+
.get_parameter_types()
386+
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
387+
388+
let mut param_types = Vec::with_capacity(params.len());
389+
for param_type in ordered_param_types(&params).iter() {
390+
// Fixed: Use &params
391+
if let Some(datatype) = param_type {
392+
let pgtype = into_pg_type(datatype)?;
393+
param_types.push(pgtype);
394+
} else {
395+
param_types.push(Type::UNKNOWN);
396+
}
397+
}
398+
399+
Ok(param_types)
400+
} else {
401+
Ok(vec![])
402+
}
403+
}
404+
405+
fn get_result_schema(
406+
&self,
407+
stmt: &Self::Statement,
408+
column_format: Option<&Format>,
409+
) -> PgWireResult<Vec<FieldInfo>> {
410+
if let (_, Some((_, plan))) = stmt {
411+
let schema = plan.schema();
412+
let fields = arrow_schema_to_pg_fields(
413+
schema.as_arrow(),
414+
column_format.unwrap_or(&Format::UnifiedBinary),
415+
None,
416+
)?;
417+
418+
Ok(fields)
419+
} else {
420+
Ok(vec![])
421+
}
422+
}
436423
}
437424

438425
fn ordered_param_types(types: &HashMap<String, Option<DataType>>) -> Vec<Option<&DataType>> {

datafusion-postgres/src/hooks/set_show.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ fn mock_show_response(name: &str, value: &str) -> PgWireResult<QueryResponse> {
111111
let row = {
112112
let mut encoder = DataRowEncoder::new(Arc::new(fields.clone()));
113113
encoder.encode_field(&Some(value))?;
114-
encoder.finish()
114+
Ok(encoder.take_row())
115115
};
116116

117117
let row_stream = futures::stream::once(async move { row });

0 commit comments

Comments
 (0)