Skip to content

Commit aecdbeb

Browse files
committed
♻️ refactor: extract column processing logic into dedicated ColumnMapper service
- Create ColumnMapper service to handle column mapping and encryption configuration - Move get_projection_columns, get_param_columns, get_literal_columns from Frontend - Remove ~150 lines of column processing logic from Frontend - Add Context delegation methods for column processing operations - Remove unused imports (Identifier, TableColumn, Type) from Frontend - Fix test configuration with proper CRN format and required environment variables This refactoring achieves clear separation of concerns: - Frontend focuses solely on PostgreSQL message handling - ColumnMapper handles column mapping and encryption configuration - Context serves as coordinator with clean delegation methods - Maintains all existing functionality while improving code organization
1 parent a8d5c59 commit aecdbeb

File tree

4 files changed

+213
-161
lines changed

4 files changed

+213
-161
lines changed
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
use crate::{
2+
eql::Identifier,
3+
error::{EncryptError, Error},
4+
log::MAPPER,
5+
postgresql::Column,
6+
services::SchemaService,
7+
};
8+
use eql_mapper::{EqlTerm, TableColumn, TypeCheckedStatement};
9+
use postgres_types::Type;
10+
use std::sync::Arc;
11+
use tracing::{debug, warn};
12+
13+
/// Service responsible for processing columns from type-checked SQL statements
14+
/// and mapping them to encryption configurations.
15+
#[derive(Clone)]
16+
pub struct ColumnMapper {
17+
schema_service: Arc<dyn SchemaService>,
18+
}
19+
20+
impl ColumnMapper {
21+
/// Create a new ColumnProcessor with the given schema service and client ID
22+
pub fn new(schema_service: Arc<dyn SchemaService>) -> Self {
23+
Self { schema_service }
24+
}
25+
26+
/// Maps typed statement projection columns to an Encrypt column configuration
27+
///
28+
/// The returned `Vec` is of `Option<Column>` because the Projection columns are a mix of native and EQL types.
29+
/// Only EQL colunms will have a configuration. Native types are always None.
30+
///
31+
/// Preserves the ordering and semantics of the projection to reduce the complexity of positional encryption.
32+
pub fn get_projection_columns(
33+
&self,
34+
typed_statement: &TypeCheckedStatement<'_>,
35+
) -> Result<Vec<Option<Column>>, Error> {
36+
let mut projection_columns = vec![];
37+
38+
for col in typed_statement.projection.columns() {
39+
let eql_mapper::ProjectionColumn { ty, .. } = col;
40+
let configured_column = match &**ty {
41+
eql_mapper::Type::Value(eql_mapper::Value::Eql(eql_term)) => {
42+
let TableColumn { table, column } = eql_term.table_column();
43+
let identifier: Identifier = Identifier::from((table, column));
44+
45+
debug!(
46+
target: MAPPER,
47+
msg = "Configured column",
48+
column = ?identifier,
49+
?eql_term,
50+
);
51+
self.get_column(identifier, eql_term)?
52+
}
53+
_ => None,
54+
};
55+
projection_columns.push(configured_column)
56+
}
57+
58+
Ok(projection_columns)
59+
}
60+
61+
/// Maps typed statement param columns to an Encrypt column configuration
62+
///
63+
/// The returned `Vec` is of `Option<Column>` because the Param columns are a mix of native and EQL types.
64+
/// Only EQL colunms will have a configuration. Native types are always None.
65+
///
66+
/// Preserves the ordering and semantics of the projection to reduce the complexity of positional encryption.
67+
pub fn get_param_columns(
68+
&self,
69+
typed_statement: &TypeCheckedStatement<'_>,
70+
) -> Result<Vec<Option<Column>>, Error> {
71+
let mut param_columns = vec![];
72+
73+
for param in typed_statement.params.iter() {
74+
let configured_column = match param {
75+
(_, eql_mapper::Value::Eql(eql_term)) => {
76+
let TableColumn { table, column } = eql_term.table_column();
77+
let identifier = Identifier::from((table, column));
78+
79+
debug!(
80+
target: MAPPER,
81+
msg = "Encrypted parameter",
82+
column = ?identifier,
83+
?eql_term,
84+
);
85+
86+
self.get_column(identifier, eql_term)?
87+
}
88+
_ => None,
89+
};
90+
param_columns.push(configured_column);
91+
}
92+
93+
Ok(param_columns)
94+
}
95+
96+
/// Maps typed statement literal columns to an Encrypt column configuration
97+
pub fn get_literal_columns(
98+
&self,
99+
typed_statement: &TypeCheckedStatement<'_>,
100+
) -> Result<Vec<Option<Column>>, Error> {
101+
let mut literal_columns = vec![];
102+
103+
for (eql_term, _) in typed_statement.literals.iter() {
104+
let TableColumn { table, column } = eql_term.table_column();
105+
let identifier = Identifier::from((table, column));
106+
107+
debug!(
108+
target: MAPPER,
109+
msg = "Encrypted literal",
110+
column = ?identifier,
111+
?eql_term,
112+
);
113+
let col = self.get_column(identifier, eql_term)?;
114+
if col.is_some() {
115+
literal_columns.push(col);
116+
}
117+
}
118+
119+
Ok(literal_columns)
120+
}
121+
122+
/// Get the column configuration for the Identifier
123+
/// Returns `EncryptError::UnknownColumn` if configuration cannot be found for the Identified column
124+
/// if mapping enabled, and None if mapping is disabled. It'll log a warning either way.
125+
fn get_column(
126+
&self,
127+
identifier: Identifier,
128+
eql_term: &EqlTerm,
129+
) -> Result<Option<Column>, Error> {
130+
match self.schema_service.get_column_config(&identifier) {
131+
Some(config) => {
132+
debug!(
133+
target: MAPPER,
134+
msg = "Configured column",
135+
column = ?identifier
136+
);
137+
138+
// IndexTerm::SteVecSelector
139+
let postgres_type = if matches!(eql_term, EqlTerm::JsonPath(_)) {
140+
Some(Type::JSONPATH)
141+
} else {
142+
None
143+
};
144+
145+
let eql_term = eql_term.variant();
146+
Ok(Some(Column::new(
147+
identifier,
148+
config,
149+
postgres_type,
150+
eql_term,
151+
)))
152+
}
153+
None => {
154+
warn!(
155+
target: MAPPER,
156+
msg = "Configured column not found. Encryption configuration may have been deleted.",
157+
?identifier,
158+
);
159+
Err(EncryptError::UnknownColumn {
160+
table: identifier.table.to_owned(),
161+
column: identifier.column.to_owned(),
162+
}
163+
.into())
164+
}
165+
}
166+
}
167+
}

packages/cipherstash-proxy/src/postgresql/context/mod.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod column;
22

33
use super::{
4+
column_mapper::ColumnMapper,
45
format_code::FormatCode,
56
messages::{describe::Describe, Name, Target},
67
Column,
@@ -44,6 +45,7 @@ pub struct Context {
4445
config: Arc<TandemConfig>,
4546
encryption: Arc<dyn EncryptionService>,
4647
schema: Arc<dyn SchemaService>,
48+
column_processor: ColumnMapper,
4749
statements: Arc<RwLock<HashMap<Name, Arc<Statement>>>>,
4850
portals: Arc<RwLock<HashMap<Name, PortalQueue>>>,
4951
describe: Arc<RwLock<DescribeQueue>>,
@@ -124,6 +126,8 @@ impl Context {
124126
encryption: Arc<dyn EncryptionService>,
125127
schema_service: Arc<dyn SchemaService>,
126128
) -> Context {
129+
let column_processor = ColumnMapper::new(schema_service.clone());
130+
127131
Context {
128132
statements: Arc::new(RwLock::new(HashMap::new())),
129133
portals: Arc::new(RwLock::new(HashMap::new())),
@@ -136,6 +140,7 @@ impl Context {
136140
config,
137141
encryption,
138142
schema: schema_service,
143+
column_processor,
139144
unsafe_disable_mapping: false,
140145
keyset_id: Arc::new(RwLock::new(None)),
141146
}
@@ -532,6 +537,29 @@ impl Context {
532537
self.schema.is_empty_config()
533538
}
534539

540+
// Column processing delegation methods
541+
pub fn get_projection_columns(
542+
&self,
543+
typed_statement: &eql_mapper::TypeCheckedStatement<'_>,
544+
) -> Result<Vec<Option<Column>>, Error> {
545+
self.column_processor
546+
.get_projection_columns(typed_statement)
547+
}
548+
549+
pub fn get_param_columns(
550+
&self,
551+
typed_statement: &eql_mapper::TypeCheckedStatement<'_>,
552+
) -> Result<Vec<Option<Column>>, Error> {
553+
self.column_processor.get_param_columns(typed_statement)
554+
}
555+
556+
pub fn get_literal_columns(
557+
&self,
558+
typed_statement: &eql_mapper::TypeCheckedStatement<'_>,
559+
) -> Result<Vec<Option<Column>>, Error> {
560+
self.column_processor.get_literal_columns(typed_statement)
561+
}
562+
535563
// Direct config access methods
536564
pub fn connection_timeout(&self) -> std::time::Duration {
537565
self.config
@@ -629,10 +657,20 @@ impl Context {
629657
std::env::set_var("CS_DATABASE__NAME", "test");
630658
std::env::set_var("CS_DATABASE__HOST", "localhost");
631659
std::env::set_var("CS_DATABASE__PORT", "5432");
632-
std::env::set_var("CS_AUTH__WORKSPACE_CRN", "crn:ap-southeast-2.aws:test");
660+
std::env::set_var(
661+
"CS_AUTH__WORKSPACE_CRN",
662+
"crn:ap-southeast-2.aws:3KISDURL3ZCWYZ2O",
663+
);
633664
std::env::set_var("CS_AUTH__CLIENT_ACCESS_KEY", "test");
634-
std::env::set_var("CS_ENCRYPT__CLIENT_ID", "test");
635-
std::env::set_var("CS_ENCRYPT__CLIENT_KEY", "test");
665+
std::env::set_var(
666+
"CS_ENCRYPT__CLIENT_ID",
667+
"e40f1692-6bb7-4bbd-a552-4c0f155be073",
668+
);
669+
std::env::set_var("CS_ENCRYPT__CLIENT_KEY", "a4627031a16b7065726d75746174696f6e90090e0805000b0d0c0106040f0a0302076770325f66726f6da16b7065726d75746174696f6e9007060a0b02090d080c00040f0305010e6570325f746fa16b7065726d75746174696f6e900a0206090b04050c070f0e010d030800627033a16b7065726d75746174696f6e98210514181d0818200a18190b1112181809130f15181a0717181e000e0103181f0d181c1602040c181b1006");
670+
std::env::set_var(
671+
"CS_ENCRYPT__KEYSET_ID",
672+
"c50d8463-60e9-41a5-86cd-5782e03a503c",
673+
);
636674

637675
let config = Arc::new(
638676
crate::config::TandemConfig::build("tests/config/unknown.toml")

0 commit comments

Comments
 (0)