Skip to content

Commit d0ee868

Browse files
authored
Merge pull request #1 from iPeluwa/enterprise-security-features
Enterprise security features
2 parents 547b86f + 5300dbf commit d0ee868

File tree

6 files changed

+145
-110
lines changed

6 files changed

+145
-110
lines changed

README.md

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,56 @@
22

33
![Crates.io Version](https://img.shields.io/crates/v/datafusion-postgres?label=datafusion-postgres)
44

5-
Serving any [datafusion](https://datafusion.apache.org) `SessionContext` with full PostgreSQL compatibility, including authentication, role-based access control, and SSL/TLS encryption. Available as a library and a CLI tool.
6-
7-
This project adds a comprehensive [PostgreSQL compatible access layer](https://github.com/sunng87/pgwire) to the [Apache DataFusion](https://github.com/apache/arrow-datafusion) query engine, making it a drop-in replacement for PostgreSQL in analytics workloads.
5+
A PostgreSQL-compatible server for [Apache DataFusion](https://datafusion.apache.org), supporting authentication, role-based access control, and SSL/TLS encryption. Available as both a library and CLI tool.
86

7+
Built on [pgwire](https://github.com/sunng87/pgwire) to provide PostgreSQL wire protocol compatibility for analytical workloads.
98
It was originally an example of the [pgwire](https://github.com/sunng87/pgwire)
109
project.
1110

1211
## ✨ Key Features
1312

1413
- 🔌 **Full PostgreSQL Wire Protocol** - Compatible with all PostgreSQL clients and drivers
15-
- 🛡️ **Enterprise Security** - Authentication, RBAC, and SSL/TLS encryption
14+
- 🛡️ **Security Features** - Authentication, RBAC, and SSL/TLS encryption
1615
- 🏗️ **Complete System Catalogs** - Real `pg_catalog` tables with accurate metadata
1716
- 📊 **Advanced Data Types** - Comprehensive Arrow ↔ PostgreSQL type mapping
18-
- 🔄 **Transaction Support** - Full ACID transaction lifecycle (BEGIN/COMMIT/ROLLBACK)
17+
- 🔄 **Transaction Support** - ACID transaction lifecycle (BEGIN/COMMIT/ROLLBACK)
1918
-**High Performance** - Apache DataFusion's columnar query execution
2019

21-
## 🎯 Roadmap & Status
22-
23-
- [x] **Core Features**
24-
- [x] datafusion-postgres as a CLI tool
25-
- [x] datafusion-postgres as a library
26-
- [x] datafusion information schema
27-
- [x] Complete `pg_catalog` system tables (pg_type, pg_attribute, pg_proc, pg_class, etc.)
28-
- [x] Comprehensive Arrow ↔ PostgreSQL data type mapping
29-
- [x] Essential PostgreSQL functions (version(), current_schema(), has_table_privilege(), etc.)
30-
31-
- [x] **Security & Authentication** 🆕
32-
- [x] User authentication and management
33-
- [x] Role-based access control (RBAC)
34-
- [x] Granular permissions (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, etc.)
35-
- [x] Role inheritance and grant management
36-
- [x] SSL/TLS connection encryption
37-
- [x] Query-level permission checking
38-
39-
- [x] **Transaction Support** 🆕
40-
- [x] Full ACID transaction lifecycle
41-
- [x] BEGIN/COMMIT/ROLLBACK with all variants
42-
- [x] Failed transaction handling and recovery
43-
- [x] Transaction state management
44-
45-
- [ ] **Future Enhancements**
46-
- [ ] Connection pooling and performance optimizations
47-
- [ ] Advanced authentication methods (SCRAM, LDAP)
48-
- [ ] More PostgreSQL functions and operators
49-
- [ ] COPY protocol for bulk data loading
20+
## 🎯 Features
21+
22+
### Core Functionality
23+
- ✅ Library and CLI tool
24+
- ✅ PostgreSQL wire protocol compatibility
25+
- ✅ Complete `pg_catalog` system tables
26+
- ✅ Arrow ↔ PostgreSQL data type mapping
27+
- ✅ PostgreSQL functions (version, current_schema, has_table_privilege, etc.)
28+
29+
### Security & Authentication
30+
- ✅ User authentication and RBAC
31+
- ✅ Granular permissions (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP)
32+
- ✅ Role inheritance and grant management
33+
- ✅ SSL/TLS encryption
34+
- ✅ Query-level permission checking
35+
36+
### Transaction Support
37+
- ✅ ACID transaction lifecycle
38+
- ✅ BEGIN/COMMIT/ROLLBACK with all variants
39+
- ✅ Failed transaction handling and recovery
40+
41+
### Future Enhancements
42+
- ⏳ Connection pooling optimizations
43+
- ⏳ Advanced authentication (LDAP, certificates)
44+
- ⏳ COPY protocol for bulk data loading
45+
46+
## 🔐 Authentication
47+
48+
Supports standard pgwire authentication methods:
49+
50+
- **Cleartext**: `CleartextStartupHandler` for simple password authentication
51+
- **MD5**: `MD5StartupHandler` for MD5-hashed passwords
52+
- **SCRAM**: `SASLScramAuthStartupHandler` for secure authentication
53+
54+
See `auth.rs` for complete implementation examples using `DfAuthSource`.
5055

5156
## 🚀 Quick Start
5257

@@ -91,12 +96,11 @@ serve(session_context, &server_options).await
9196

9297
### The CLI `datafusion-postgres-cli`
9398

94-
As a command-line application, this tool serves any JSON/CSV/Arrow/Parquet/Avro
95-
files as tables, and exposes them via PostgreSQL compatible protocol with full security features.
99+
Command-line tool to serve JSON/CSV/Arrow/Parquet/Avro files as PostgreSQL-compatible tables.
96100

97101
```
98102
datafusion-postgres-cli 0.6.1
99-
A secure postgres interface for datafusion. Serve any CSV/JSON/Arrow/Parquet files as tables.
103+
A PostgreSQL interface for DataFusion. Serve CSV/JSON/Arrow/Parquet files as tables.
100104
101105
USAGE:
102106
datafusion-postgres-cli [OPTIONS]
@@ -149,6 +153,8 @@ Listening on 127.0.0.1:5432 (unencrypted)
149153

150154
### Connect with psql
151155

156+
> **🔐 Authentication**: The default setup allows connections without authentication for development. For secure deployments, use `DfAuthSource` with standard pgwire authentication handlers (cleartext, MD5, or SCRAM). See `auth.rs` for implementation examples.
157+
152158
```bash
153159
psql -h 127.0.0.1 -p 5432 -U postgres
154160
```

arrow-pg/src/encoder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ fn get_numeric_128_value(
262262
let value = array.value(idx);
263263
Decimal::try_from_i128_with_scale(value, scale)
264264
.map_err(|e| {
265-
let message = match e {
265+
let error_code = match e {
266266
rust_decimal::Error::ExceedsMaximumPossibleValue => {
267267
"22003" // numeric_value_out_of_range
268268
}
@@ -280,8 +280,8 @@ fn get_numeric_128_value(
280280
};
281281
PgWireError::UserError(Box::new(ErrorInfo::new(
282282
"ERROR".to_string(),
283-
message.to_string(),
284-
format!("Numeric value conversion failed: {e:?}"),
283+
error_code.to_string(),
284+
format!("Numeric value conversion failed: {e}"),
285285
)))
286286
})
287287
.map(Some)

datafusion-postgres/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ pgwire = { workspace = true, features = ["server-api-ring", "scram"] }
2525
postgres-types.workspace = true
2626
rust_decimal.workspace = true
2727
tokio = { version = "1.45", features = ["sync", "net"] }
28-
tokio-rustls = "0.26"
28+
tokio-rustls = { version = "0.26", features = ["ring"] }
2929
rustls-pemfile = "2.0"
3030
rustls-pki-types = "1.0"

datafusion-postgres/src/auth.rs

Lines changed: 74 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,8 @@ use std::collections::HashMap;
22
use std::sync::Arc;
33

44
use async_trait::async_trait;
5-
use futures::sink::Sink;
6-
use pgwire::api::auth::{
7-
finish_authentication, save_startup_parameters_to_metadata, AuthSource,
8-
DefaultServerParameterProvider, LoginInfo, Password, StartupHandler,
9-
};
10-
use pgwire::api::ClientInfo;
5+
use pgwire::api::auth::{AuthSource, LoginInfo, Password};
116
use pgwire::error::{PgWireError, PgWireResult};
12-
use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage};
13-
use std::fmt::Debug;
147
use tokio::sync::RwLock;
158

169
/// User information stored in the authentication system
@@ -575,67 +568,97 @@ impl AuthManager {
575568
}
576569
}
577570

578-
/// Custom startup handler that performs authentication
579-
pub struct AuthStartupHandler {
580-
auth_manager: Arc<AuthManager>,
571+
/// AuthSource implementation for integration with pgwire authentication
572+
/// Provides proper password-based authentication instead of custom startup handler
573+
#[derive(Clone)]
574+
pub struct DfAuthSource {
575+
pub auth_manager: Arc<AuthManager>,
581576
}
582577

583-
impl AuthStartupHandler {
578+
impl DfAuthSource {
584579
pub fn new(auth_manager: Arc<AuthManager>) -> Self {
585-
AuthStartupHandler { auth_manager }
580+
DfAuthSource { auth_manager }
586581
}
587582
}
588583

589584
#[async_trait]
590-
impl StartupHandler for AuthStartupHandler {
591-
async fn on_startup<C>(
592-
&self,
593-
client: &mut C,
594-
message: PgWireFrontendMessage,
595-
) -> PgWireResult<()>
596-
where
597-
C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send,
598-
C::Error: Debug,
599-
PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
600-
{
601-
if let PgWireFrontendMessage::Startup(ref startup) = message {
602-
save_startup_parameters_to_metadata(client, startup);
603-
604-
// Extract username from startup message
605-
let username = startup
606-
.parameters
607-
.get("user")
608-
.unwrap_or(&"anonymous".to_string())
609-
.clone();
610-
611-
// For now, we'll do basic authentication
612-
// In a full implementation, this would involve password authentication
613-
let is_authenticated = if username == "postgres" {
614-
// Always allow postgres user for compatibility
615-
true
585+
impl AuthSource for DfAuthSource {
586+
async fn get_password(&self, login: &LoginInfo) -> PgWireResult<Password> {
587+
if let Some(username) = login.user() {
588+
// Check if user exists in our RBAC system
589+
if let Some(user) = self.auth_manager.get_user(username).await {
590+
if user.can_login {
591+
// Return the stored password hash for authentication
592+
// The pgwire authentication handlers (cleartext/md5/scram) will
593+
// handle the actual password verification process
594+
Ok(Password::new(None, user.password_hash.into_bytes()))
595+
} else {
596+
Err(PgWireError::UserError(Box::new(
597+
pgwire::error::ErrorInfo::new(
598+
"FATAL".to_string(),
599+
"28000".to_string(), // invalid_authorization_specification
600+
format!("User \"{username}\" is not allowed to login"),
601+
),
602+
)))
603+
}
616604
} else {
617-
// Check if user exists in our system
618-
self.auth_manager.get_user(&username).await.is_some()
619-
};
620-
621-
if !is_authenticated {
622-
return Err(PgWireError::UserError(Box::new(
605+
Err(PgWireError::UserError(Box::new(
623606
pgwire::error::ErrorInfo::new(
624607
"FATAL".to_string(),
625608
"28P01".to_string(), // invalid_password
626609
format!("password authentication failed for user \"{username}\""),
627610
),
628-
)));
611+
)))
629612
}
630-
631-
// Complete authentication process
632-
finish_authentication(client, &DefaultServerParameterProvider::default()).await?;
613+
} else {
614+
Err(PgWireError::UserError(Box::new(
615+
pgwire::error::ErrorInfo::new(
616+
"FATAL".to_string(),
617+
"28P01".to_string(), // invalid_password
618+
"No username provided in login request".to_string(),
619+
),
620+
)))
633621
}
634-
635-
Ok(())
636622
}
637623
}
638624

625+
// REMOVED: Custom startup handler approach
626+
//
627+
// Instead of implementing a custom StartupHandler, use the proper pgwire authentication:
628+
//
629+
// For cleartext authentication:
630+
// ```rust
631+
// use pgwire::api::auth::cleartext::CleartextStartupHandler;
632+
//
633+
// let auth_source = Arc::new(DfAuthSource::new(auth_manager));
634+
// let authenticator = CleartextStartupHandler::new(
635+
// auth_source,
636+
// Arc::new(DefaultServerParameterProvider::default())
637+
// );
638+
// ```
639+
//
640+
// For MD5 authentication:
641+
// ```rust
642+
// use pgwire::api::auth::md5::MD5StartupHandler;
643+
//
644+
// let auth_source = Arc::new(DfAuthSource::new(auth_manager));
645+
// let authenticator = MD5StartupHandler::new(
646+
// auth_source,
647+
// Arc::new(DefaultServerParameterProvider::default())
648+
// );
649+
// ```
650+
//
651+
// For SCRAM authentication (requires "server-api-scram" feature):
652+
// ```rust
653+
// use pgwire::api::auth::scram::SASLScramAuthStartupHandler;
654+
//
655+
// let auth_source = Arc::new(DfAuthSource::new(auth_manager));
656+
// let authenticator = SASLScramAuthStartupHandler::new(
657+
// auth_source,
658+
// Arc::new(DefaultServerParameterProvider::default())
659+
// );
660+
// ```
661+
639662
/// Simple AuthSource implementation that accepts any user with empty password
640663
pub struct SimpleAuthSource {
641664
auth_manager: Arc<AuthManager>,

0 commit comments

Comments
 (0)