Skip to content

Commit f47d16e

Browse files
authored
RUST-1873 Transaction API fluency (#1060)
1 parent 7275afe commit f47d16e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1676
-1649
lines changed

action_macro/src/lib.rs

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
extern crate proc_macro;
22

3+
use proc_macro2::Span;
34
use quote::{quote, ToTokens};
45
use syn::{
56
braced,
@@ -9,6 +10,7 @@ use syn::{
910
parse_quote,
1011
parse_quote_spanned,
1112
spanned::Spanned,
13+
visit_mut::VisitMut,
1214
Block,
1315
Error,
1416
Expr,
@@ -18,6 +20,7 @@ use syn::{
1820
Lifetime,
1921
Lit,
2022
Meta,
23+
PathArguments,
2124
Token,
2225
Type,
2326
};
@@ -27,8 +30,12 @@ use syn::{
2730
/// * an opaque wrapper type for the future in case we want to do something more fancy than
2831
/// BoxFuture.
2932
/// * a `run` method for sync execution, optionally with a wrapper function
30-
#[proc_macro]
31-
pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
33+
#[proc_macro_attribute]
34+
pub fn action_impl(
35+
attrs: proc_macro::TokenStream,
36+
input: proc_macro::TokenStream,
37+
) -> proc_macro::TokenStream {
38+
let ActionImplAttrs { sync_type } = parse_macro_input!(attrs as ActionImplAttrs);
3239
let ActionImpl {
3340
generics,
3441
lifetime,
@@ -37,7 +44,6 @@ pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
3744
exec_self_mut,
3845
exec_output,
3946
exec_body,
40-
sync_wrap,
4147
} = parse_macro_input!(input as ActionImpl);
4248

4349
let mut unbounded_generics = generics.clone();
@@ -48,14 +54,34 @@ pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
4854
ty.bounds.clear();
4955
}
5056

51-
let SyncWrap {
52-
sync_arg_mut,
53-
sync_arg,
54-
sync_output,
55-
sync_body,
56-
} = sync_wrap.unwrap_or_else(|| {
57-
parse_quote! { fn sync_wrap(out) -> #exec_output { out } }
58-
});
57+
let sync_run = if let Some(sync_type) = sync_type {
58+
// In expression position, the type needs to be of the form Foo::<Bar>, not Foo<Bar>
59+
let mut formal = sync_type.clone();
60+
struct Visitor;
61+
impl VisitMut for Visitor {
62+
fn visit_path_segment_mut(&mut self, segment: &mut syn::PathSegment) {
63+
if let PathArguments::AngleBracketed(args) = &mut segment.arguments {
64+
if args.colon2_token.is_none() {
65+
args.colon2_token = Some(Token![::](Span::call_site()));
66+
}
67+
}
68+
}
69+
}
70+
syn::visit_mut::visit_type_mut(&mut Visitor, &mut formal);
71+
quote! {
72+
/// Synchronously execute this action.
73+
pub fn run(self) -> Result<#sync_type> {
74+
crate::sync::TOKIO_RUNTIME.block_on(std::future::IntoFuture::into_future(self)).map(#formal::new)
75+
}
76+
}
77+
} else {
78+
quote! {
79+
/// Synchronously execute this action.
80+
pub fn run(self) -> #exec_output {
81+
crate::sync::TOKIO_RUNTIME.block_on(std::future::IntoFuture::into_future(self))
82+
}
83+
}
84+
};
5985

6086
quote! {
6187
impl #generics crate::action::private::Sealed for #action { }
@@ -85,11 +111,7 @@ pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
85111

86112
#[cfg(feature = "sync")]
87113
impl #generics #action {
88-
/// Synchronously execute this action.
89-
pub fn run(self) -> #sync_output {
90-
let #sync_arg_mut #sync_arg = crate::sync::TOKIO_RUNTIME.block_on(std::future::IntoFuture::into_future(self));
91-
#sync_body
92-
}
114+
#sync_run
93115
}
94116
}.into()
95117
}
@@ -107,7 +129,6 @@ struct ActionImpl {
107129
exec_self_mut: Option<Token![mut]>,
108130
exec_output: Type,
109131
exec_body: Block,
110-
sync_wrap: Option<SyncWrap>,
111132
}
112133

113134
impl Parse for ActionImpl {
@@ -155,13 +176,6 @@ impl Parse for ActionImpl {
155176
let exec_output = impl_body.parse()?;
156177
let exec_body = impl_body.parse()?;
157178

158-
// Optional SyncWrap.
159-
let sync_wrap = if impl_body.peek(Token![fn]) {
160-
Some(impl_body.parse()?)
161-
} else {
162-
None
163-
};
164-
165179
if !impl_body.is_empty() {
166180
return Err(exec_args.error("unexpected token"));
167181
}
@@ -174,40 +188,25 @@ impl Parse for ActionImpl {
174188
exec_self_mut,
175189
exec_output,
176190
exec_body,
177-
sync_wrap,
178191
})
179192
}
180193
}
181194

182-
// fn sync_wrap([mut] out) -> OutType { <out body> }
183-
struct SyncWrap {
184-
sync_arg_mut: Option<Token![mut]>,
185-
sync_arg: Ident,
186-
sync_output: Type,
187-
sync_body: Block,
195+
struct ActionImplAttrs {
196+
sync_type: Option<Type>,
188197
}
189198

190-
impl Parse for SyncWrap {
199+
impl Parse for ActionImplAttrs {
191200
fn parse(input: ParseStream) -> syn::Result<Self> {
192-
input.parse::<Token![fn]>()?;
193-
parse_name(input, "sync_wrap")?;
194-
let args_input;
195-
parenthesized!(args_input in input);
196-
let sync_arg_mut = args_input.parse()?;
197-
let sync_arg = args_input.parse()?;
198-
if !args_input.is_empty() {
199-
return Err(args_input.error("unexpected token"));
201+
let mut out = Self { sync_type: None };
202+
if input.is_empty() {
203+
return Ok(out);
200204
}
201-
input.parse::<Token![->]>()?;
202-
let sync_output = input.parse()?;
203-
let sync_body = input.parse()?;
204205

205-
Ok(SyncWrap {
206-
sync_arg_mut,
207-
sync_arg,
208-
sync_output,
209-
sync_body,
210-
})
206+
parse_name(input, "sync")?;
207+
input.parse::<Token![=]>()?;
208+
out.sync_type = Some(input.parse()?);
209+
Ok(out)
211210
}
212211
}
213212

src/action.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod run_command;
2424
mod search_index;
2525
mod session;
2626
mod shutdown;
27+
pub(crate) mod transaction;
2728
mod update;
2829
mod watch;
2930

@@ -51,6 +52,7 @@ pub use run_command::{RunCommand, RunCursorCommand};
5152
pub use search_index::{CreateSearchIndex, DropSearchIndex, ListSearchIndexes, UpdateSearchIndex};
5253
pub use session::StartSession;
5354
pub use shutdown::Shutdown;
55+
pub use transaction::{AbortTransaction, CommitTransaction, StartTransaction};
5456
pub use update::Update;
5557
pub use watch::Watch;
5658

src/action/aggregate.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -134,45 +134,49 @@ impl<'a> Aggregate<'a, ImplicitSession> {
134134
}
135135
}
136136

137-
action_impl! {
138-
impl<'a> Action for Aggregate<'a, ImplicitSession> {
139-
type Future = AggregateFuture;
140-
141-
async fn execute(mut self) -> Result<Cursor<Document>> {
142-
resolve_options!(
143-
self.target,
144-
self.options,
145-
[read_concern, write_concern, selection_criteria]
146-
);
147-
148-
let aggregate = crate::operation::aggregate::Aggregate::new(self.target.target(), self.pipeline, self.options);
149-
let client = self.target.client();
150-
client.execute_cursor_operation(aggregate).await
151-
}
152-
153-
fn sync_wrap(out) -> Result<crate::sync::Cursor<Document>> {
154-
out.map(crate::sync::Cursor::new)
155-
}
137+
#[action_impl(sync = crate::sync::Cursor<Document>)]
138+
impl<'a> Action for Aggregate<'a, ImplicitSession> {
139+
type Future = AggregateFuture;
140+
141+
async fn execute(mut self) -> Result<Cursor<Document>> {
142+
resolve_options!(
143+
self.target,
144+
self.options,
145+
[read_concern, write_concern, selection_criteria]
146+
);
147+
148+
let aggregate = crate::operation::aggregate::Aggregate::new(
149+
self.target.target(),
150+
self.pipeline,
151+
self.options,
152+
);
153+
let client = self.target.client();
154+
client.execute_cursor_operation(aggregate).await
156155
}
157156
}
158157

159-
action_impl! {
160-
impl<'a> Action for Aggregate<'a, ExplicitSession<'a>> {
161-
type Future = AggregateSessionFuture;
162-
163-
async fn execute(mut self) -> Result<SessionCursor<Document>> {
164-
resolve_read_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
165-
resolve_write_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
166-
resolve_selection_criteria_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
167-
168-
let aggregate = crate::operation::aggregate::Aggregate::new(self.target.target(), self.pipeline, self.options);
169-
let client = self.target.client();
170-
client.execute_session_cursor_operation(aggregate, self.session.0).await
171-
}
172-
173-
fn sync_wrap(out) -> Result<crate::sync::SessionCursor<Document>> {
174-
out.map(crate::sync::SessionCursor::new)
175-
}
158+
#[action_impl(sync = crate::sync::SessionCursor<Document>)]
159+
impl<'a> Action for Aggregate<'a, ExplicitSession<'a>> {
160+
type Future = AggregateSessionFuture;
161+
162+
async fn execute(mut self) -> Result<SessionCursor<Document>> {
163+
resolve_read_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
164+
resolve_write_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
165+
resolve_selection_criteria_with_session!(
166+
self.target,
167+
self.options,
168+
Some(&mut *self.session.0)
169+
)?;
170+
171+
let aggregate = crate::operation::aggregate::Aggregate::new(
172+
self.target.target(),
173+
self.pipeline,
174+
self.options,
175+
);
176+
let client = self.target.client();
177+
client
178+
.execute_session_cursor_operation(aggregate, self.session.0)
179+
.await
176180
}
177181
}
178182

src/action/count.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,14 @@ impl<'a> EstimatedDocumentCount<'a> {
100100
);
101101
}
102102

103-
action_impl! {
104-
impl<'a> Action for EstimatedDocumentCount<'a> {
105-
type Future = EstimatedDocumentCountFuture;
106-
107-
async fn execute(mut self) -> Result<u64> {
108-
resolve_options!(self.cr, self.options, [read_concern, selection_criteria]);
109-
let op = crate::operation::count::Count::new(self.cr.namespace(), self.options);
110-
self.cr.client().execute_operation(op, None).await
111-
}
103+
#[action_impl]
104+
impl<'a> Action for EstimatedDocumentCount<'a> {
105+
type Future = EstimatedDocumentCountFuture;
106+
107+
async fn execute(mut self) -> Result<u64> {
108+
resolve_options!(self.cr, self.options, [read_concern, selection_criteria]);
109+
let op = crate::operation::count::Count::new(self.cr.namespace(), self.options);
110+
self.cr.client().execute_operation(op, None).await
112111
}
113112
}
114113

@@ -140,16 +139,19 @@ impl<'a> CountDocuments<'a> {
140139
}
141140
}
142141

143-
action_impl! {
144-
impl<'a> Action for CountDocuments<'a> {
145-
type Future = CountDocumentsFuture;
142+
#[action_impl]
143+
impl<'a> Action for CountDocuments<'a> {
144+
type Future = CountDocumentsFuture;
146145

147-
async fn execute(mut self) -> Result<u64> {
148-
resolve_read_concern_with_session!(self.cr, self.options, self.session.as_ref())?;
149-
resolve_selection_criteria_with_session!(self.cr, self.options, self.session.as_ref())?;
146+
async fn execute(mut self) -> Result<u64> {
147+
resolve_read_concern_with_session!(self.cr, self.options, self.session.as_ref())?;
148+
resolve_selection_criteria_with_session!(self.cr, self.options, self.session.as_ref())?;
150149

151-
let op = crate::operation::count_documents::CountDocuments::new(self.cr.namespace(), self.filter, self.options)?;
152-
self.cr.client().execute_operation(op, self.session).await
153-
}
150+
let op = crate::operation::count_documents::CountDocuments::new(
151+
self.cr.namespace(),
152+
self.filter,
153+
self.options,
154+
)?;
155+
self.cr.client().execute_operation(op, self.session).await
154156
}
155157
}

src/action/create_index.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,33 +102,31 @@ impl<'a, M> CreateIndex<'a, M> {
102102
}
103103
}
104104

105-
action_impl! {
106-
impl<'a> Action for CreateIndex<'a, Single> {
107-
type Future = CreateIndexFuture;
105+
#[action_impl]
106+
impl<'a> Action for CreateIndex<'a, Single> {
107+
type Future = CreateIndexFuture;
108108

109-
async fn execute(self) -> Result<CreateIndexResult> {
110-
let inner: CreateIndex<'a, Multiple> = CreateIndex {
111-
coll: self.coll,
112-
indexes: self.indexes,
113-
options: self.options,
114-
session: self.session,
115-
_mode: PhantomData,
116-
};
117-
let response = inner.await?;
118-
Ok(response.into_create_index_result())
119-
}
109+
async fn execute(self) -> Result<CreateIndexResult> {
110+
let inner: CreateIndex<'a, Multiple> = CreateIndex {
111+
coll: self.coll,
112+
indexes: self.indexes,
113+
options: self.options,
114+
session: self.session,
115+
_mode: PhantomData,
116+
};
117+
let response = inner.await?;
118+
Ok(response.into_create_index_result())
120119
}
121120
}
122121

123-
action_impl! {
124-
impl<'a> Action for CreateIndex<'a, Multiple> {
125-
type Future = CreateIndexesFuture;
122+
#[action_impl]
123+
impl<'a> Action for CreateIndex<'a, Multiple> {
124+
type Future = CreateIndexesFuture;
126125

127-
async fn execute(mut self) -> Result<CreateIndexesResult> {
128-
resolve_write_concern_with_session!(self.coll, self.options, self.session.as_ref())?;
126+
async fn execute(mut self) -> Result<CreateIndexesResult> {
127+
resolve_write_concern_with_session!(self.coll, self.options, self.session.as_ref())?;
129128

130-
let op = Op::new(self.coll.namespace(), self.indexes, self.options);
131-
self.coll.client().execute_operation(op, self.session).await
132-
}
129+
let op = Op::new(self.coll.namespace(), self.indexes, self.options);
130+
self.coll.client().execute_operation(op, self.session).await
133131
}
134132
}

0 commit comments

Comments
 (0)