Skip to content

Commit b9f19f3

Browse files
authored
Clarify request construction methods for sender and receiver (payjoin#814)
closes payjoin#813 Improve method naming consistency across the sender and receiver components, particularly around request creation. These changes help better reflect the behavior of these methods and align with OHTTP semantics
2 parents 3cd1c3c + 5ec16de commit b9f19f3

File tree

16 files changed

+201
-188
lines changed

16 files changed

+201
-188
lines changed

payjoin-cli/src/app/v1.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl AppTrait for App {
6969
let (req, ctx) = SenderBuilder::new(psbt, uri.clone())
7070
.build_recommended(fee_rate)
7171
.with_context(|| "Failed to build payjoin request")?
72-
.extract_v1();
72+
.create_v1_post_request();
7373
let http = http_agent()?;
7474
let body = String::from_utf8(req.body.clone()).unwrap();
7575
println!("Sending fallback request to {}", &req.url);

payjoin-cli/src/app/v2/mod.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl App {
182182
match self.post_original_proposal(context.clone(), persister).await {
183183
Ok(()) => (),
184184
Err(_) => {
185-
let (req, v1_ctx) = context.extract_v1();
185+
let (req, v1_ctx) = context.create_v1_post_request();
186186
let response = post_request(req).await?;
187187
let psbt = Arc::new(
188188
v1_ctx.process_response(response.bytes().await?.to_vec().as_slice())?,
@@ -208,8 +208,9 @@ impl App {
208208
sender: Sender<WithReplyKey>,
209209
persister: &SenderPersister,
210210
) -> Result<()> {
211-
let (req, ctx) = sender
212-
.extract_v2(self.unwrap_relay_or_else_fetch(Some(sender.endpoint().clone())).await?)?;
211+
let (req, ctx) = sender.create_v2_post_request(
212+
self.unwrap_relay_or_else_fetch(Some(sender.endpoint().clone())).await?,
213+
)?;
213214
let response = post_request(req).await?;
214215
println!("Posted original proposal...");
215216
let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
@@ -224,7 +225,7 @@ impl App {
224225
let mut session = sender.clone();
225226
// Long poll until we get a response
226227
loop {
227-
let (req, ctx) = session.extract_req(
228+
let (req, ctx) = session.create_poll_request(
228229
self.unwrap_relay_or_else_fetch(Some(session.endpoint().clone())).await?,
229230
)?;
230231
let response = post_request(req).await?;
@@ -260,11 +261,11 @@ impl App {
260261

261262
let mut session = session;
262263
loop {
263-
let (req, context) = session.extract_req(&ohttp_relay)?;
264+
let (req, context) = session.create_poll_request(&ohttp_relay)?;
264265
println!("Polling receive request...");
265266
let ohttp_response = post_request(req).await?;
266267
let state_transition = session
267-
.process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
268+
.process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
268269
.save(persister);
269270
match state_transition {
270271
Ok(OptionalTransitionOutcome::Progress(next_state)) => {
@@ -439,11 +440,11 @@ impl App {
439440
persister: &ReceiverPersister,
440441
) -> Result<()> {
441442
let (req, ohttp_ctx) = proposal
442-
.extract_req(&self.unwrap_relay_or_else_fetch(None).await?)
443+
.create_post_request(&self.unwrap_relay_or_else_fetch(None).await?)
443444
.map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
444445
let res = post_request(req).await?;
445446
let payjoin_psbt = proposal.psbt().clone();
446-
proposal.process_res(&res.bytes().await?, ohttp_ctx).save(persister)?;
447+
proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
447448
println!(
448449
"Response successful. Watch mempool for successful Payjoin. TXID: {}",
449450
payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()

payjoin-directory/src/db.rs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -54,42 +54,33 @@ impl DbPool {
5454
}
5555

5656
/// Peek using [`DEFAULT_COLUMN`] as the channel type.
57-
pub async fn push_default(&self, subdirectory_id: &ShortId, data: Vec<u8>) -> Result<()> {
58-
self.push(subdirectory_id, DEFAULT_COLUMN, data).await
57+
pub async fn push_default(&self, mailbox_id: &ShortId, data: Vec<u8>) -> Result<()> {
58+
self.push(mailbox_id, DEFAULT_COLUMN, data).await
5959
}
6060

61-
pub async fn peek_default(&self, subdirectory_id: &ShortId) -> Result<Vec<u8>> {
62-
self.peek_with_timeout(subdirectory_id, DEFAULT_COLUMN).await
61+
pub async fn peek_default(&self, mailbox_id: &ShortId) -> Result<Vec<u8>> {
62+
self.peek_with_timeout(mailbox_id, DEFAULT_COLUMN).await
6363
}
6464

65-
pub async fn push_v1(&self, subdirectory_id: &ShortId, data: Vec<u8>) -> Result<()> {
66-
self.push(subdirectory_id, PJ_V1_COLUMN, data).await
65+
pub async fn push_v1(&self, mailbox_id: &ShortId, data: Vec<u8>) -> Result<()> {
66+
self.push(mailbox_id, PJ_V1_COLUMN, data).await
6767
}
6868

6969
/// Peek using [`PJ_V1_COLUMN`] as the channel type.
70-
pub async fn peek_v1(&self, subdirectory_id: &ShortId) -> Result<Vec<u8>> {
71-
self.peek_with_timeout(subdirectory_id, PJ_V1_COLUMN).await
70+
pub async fn peek_v1(&self, mailbox_id: &ShortId) -> Result<Vec<u8>> {
71+
self.peek_with_timeout(mailbox_id, PJ_V1_COLUMN).await
7272
}
7373

74-
async fn push(
75-
&self,
76-
subdirectory_id: &ShortId,
77-
channel_type: &str,
78-
data: Vec<u8>,
79-
) -> Result<()> {
74+
async fn push(&self, mailbox_id: &ShortId, channel_type: &str, data: Vec<u8>) -> Result<()> {
8075
let mut conn = self.client.get_async_connection().await?;
81-
let key = channel_name(subdirectory_id, channel_type);
76+
let key = channel_name(mailbox_id, channel_type);
8277
() = conn.set(&key, data.clone()).await?;
8378
() = conn.publish(&key, "updated").await?;
8479
Ok(())
8580
}
8681

87-
async fn peek_with_timeout(
88-
&self,
89-
subdirectory_id: &ShortId,
90-
channel_type: &str,
91-
) -> Result<Vec<u8>> {
92-
match tokio::time::timeout(self.timeout, self.peek(subdirectory_id, channel_type)).await {
82+
async fn peek_with_timeout(&self, mailbox_id: &ShortId, channel_type: &str) -> Result<Vec<u8>> {
83+
match tokio::time::timeout(self.timeout, self.peek(mailbox_id, channel_type)).await {
9384
Ok(redis_result) => match redis_result {
9485
Ok(result) => Ok(result),
9586
Err(redis_err) => Err(Error::Redis(redis_err)),
@@ -98,11 +89,11 @@ impl DbPool {
9889
}
9990
}
10091

101-
async fn peek(&self, subdirectory_id: &ShortId, channel_type: &str) -> RedisResult<Vec<u8>> {
92+
async fn peek(&self, mailbox_id: &ShortId, channel_type: &str) -> RedisResult<Vec<u8>> {
10293
let mut conn = self.client.get_async_connection().await?;
103-
let key = channel_name(subdirectory_id, channel_type);
94+
let key = channel_name(mailbox_id, channel_type);
10495

105-
// Attempt to fetch existing content for the given subdirectory_id and channel_type
96+
// Attempt to fetch existing content for the given mailbox_id and channel_type
10697
if let Ok(data) = conn.get::<_, Vec<u8>>(&key).await {
10798
if !data.is_empty() {
10899
return Ok(data);
@@ -112,7 +103,7 @@ impl DbPool {
112103

113104
// Set up a temporary listener for changes
114105
let mut pubsub_conn = self.client.get_async_connection().await?.into_pubsub();
115-
let channel_name = channel_name(subdirectory_id, channel_type);
106+
let channel_name = channel_name(mailbox_id, channel_type);
116107
pubsub_conn.subscribe(&channel_name).await?;
117108

118109
// Use a block to limit the scope of the mutable borrow
@@ -146,6 +137,6 @@ impl DbPool {
146137
}
147138
}
148139

149-
fn channel_name(subdirectory_id: &ShortId, channel_type: &str) -> Vec<u8> {
150-
(subdirectory_id.to_string() + channel_type).into_bytes()
140+
fn channel_name(mailbox_id: &ShortId, channel_type: &str) -> Vec<u8> {
141+
(mailbox_id.to_string() + channel_type).into_bytes()
151142
}

payjoin-directory/src/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,8 @@ async fn handle_v2(
257257
let path_segments: Vec<&str> = path.split('/').collect();
258258
debug!("handle_v2: {:?}", &path_segments);
259259
match (parts.method, path_segments.as_slice()) {
260-
(Method::POST, &["", id]) => post_subdir(id, body, pool).await,
261-
(Method::GET, &["", id]) => get_subdir(id, pool).await,
260+
(Method::POST, &["", id]) => post_mailbox(id, body, pool).await,
261+
(Method::GET, &["", id]) => get_mailbox(id, pool).await,
262262
(Method::PUT, &["", id]) => put_payjoin_v1(id, body, pool).await,
263263
_ => Ok(not_found()),
264264
}
@@ -371,7 +371,7 @@ impl From<hyper::http::Error> for HandlerError {
371371

372372
impl From<ShortIdError> for HandlerError {
373373
fn from(_: ShortIdError) -> Self {
374-
HandlerError::BadRequest(anyhow::anyhow!("subdirectory ID must be 13 bech32 characters"))
374+
HandlerError::BadRequest(anyhow::anyhow!("mailbox ID must be 13 bech32 characters"))
375375
}
376376
}
377377

@@ -443,13 +443,13 @@ async fn put_payjoin_v1(
443443
}
444444
}
445445

446-
async fn post_subdir(
446+
async fn post_mailbox(
447447
id: &str,
448448
body: BoxBody<Bytes, hyper::Error>,
449449
pool: DbPool,
450450
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, HandlerError> {
451451
let none_response = Response::builder().status(StatusCode::OK).body(empty())?;
452-
trace!("post_subdir");
452+
trace!("post_mailbox");
453453

454454
let id = ShortId::from_str(id)?;
455455

@@ -465,11 +465,11 @@ async fn post_subdir(
465465
}
466466
}
467467

468-
async fn get_subdir(
468+
async fn get_mailbox(
469469
id: &str,
470470
pool: DbPool,
471471
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, HandlerError> {
472-
trace!("get_subdir");
472+
trace!("get_mailbox");
473473
let id = ShortId::from_str(id)?;
474474
let timeout_response = Response::builder().status(StatusCode::ACCEPTED).body(empty())?;
475475
handle_peek(pool.peek_default(&id).await, timeout_response)

payjoin-ffi/python/test/test_payjoin_integration_test.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async def process_receiver_proposal(self, receiver: ReceiveSession, recv_persist
6161
if res is None:
6262
return None
6363
return res
64-
64+
6565
if receiver.is_UNCHECKED_PROPOSAL():
6666
return await self.process_unchecked_proposal(receiver.inner, recv_persister)
6767
if receiver.is_MAYBE_INPUTS_OWNED():
@@ -78,56 +78,55 @@ async def process_receiver_proposal(self, receiver: ReceiveSession, recv_persist
7878
return await self.process_provisional_proposal(receiver.inner, recv_persister)
7979
if receiver.is_PAYJOIN_PROPOSAL():
8080
return receiver
81-
81+
8282
raise Exception(f"Unknown receiver state: {receiver}")
83-
84-
83+
8584
def create_receiver_context(self, receiver_address: bitcoinffi.Address, directory: Url, ohttp_keys: OhttpKeys, recv_persister: InMemoryReceiverSessionEventLog) -> Initialized:
8685
receiver = UninitializedReceiver().create_session(address=receiver_address, directory=directory.as_string(), ohttp_keys=ohttp_keys, expire_after=None).save(recv_persister)
8786
return receiver
88-
87+
8988
async def retrieve_receiver_proposal(self, receiver: Initialized, recv_persister: InMemoryReceiverSessionEventLog, ohttp_relay: Url):
9089
agent = httpx.AsyncClient()
91-
request: RequestResponse = receiver.extract_req(ohttp_relay.as_string())
90+
request: RequestResponse = receiver.create_poll_request(ohttp_relay.as_string())
9291
response = await agent.post(
9392
url=request.request.url.as_string(),
9493
headers={"Content-Type": request.request.content_type},
9594
content=request.request.body
9695
)
97-
res = receiver.process_res(response.content, request.client_response).save(recv_persister)
96+
res = receiver.process_response(response.content, request.client_response).save(recv_persister)
9897
if res.is_none():
9998
return None
10099
proposal = res.success()
101100
return await self.process_unchecked_proposal(proposal, recv_persister)
102-
101+
103102
async def process_unchecked_proposal(self, proposal: UncheckedProposal, recv_persister: InMemoryReceiverSessionEventLog) :
104103
receiver = proposal.check_broadcast_suitability(None, MempoolAcceptanceCallback(self.receiver)).save(recv_persister)
105104
return await self.process_maybe_inputs_owned(receiver, recv_persister)
106-
105+
107106
async def process_maybe_inputs_owned(self, proposal: MaybeInputsOwned, recv_persister: InMemoryReceiverSessionEventLog):
108107
maybe_inputs_owned = proposal.check_inputs_not_owned(IsScriptOwnedCallback(self.receiver)).save(recv_persister)
109108
return await self.process_maybe_inputs_seen(maybe_inputs_owned, recv_persister)
110-
109+
111110
async def process_maybe_inputs_seen(self, proposal: MaybeInputsSeen, recv_persister: InMemoryReceiverSessionEventLog):
112111
outputs_unknown = proposal.check_no_inputs_seen_before(CheckInputsNotSeenCallback(self.receiver)).save(recv_persister)
113112
return await self.process_outputs_unknown(outputs_unknown, recv_persister)
114-
113+
115114
async def process_outputs_unknown(self, proposal: OutputsUnknown, recv_persister: InMemoryReceiverSessionEventLog):
116115
wants_outputs = proposal.identify_receiver_outputs(IsScriptOwnedCallback(self.receiver)).save(recv_persister)
117116
return await self.process_wants_outputs(wants_outputs, recv_persister)
118-
117+
119118
async def process_wants_outputs(self, proposal: WantsOutputs, recv_persister: InMemoryReceiverSessionEventLog):
120119
wants_inputs = proposal.commit_outputs().save(recv_persister)
121120
return await self.process_wants_inputs(wants_inputs, recv_persister)
122-
121+
123122
async def process_wants_inputs(self, proposal: WantsInputs, recv_persister: InMemoryReceiverSessionEventLog):
124123
provisional_proposal = proposal.contribute_inputs(get_inputs(self.receiver)).commit_inputs().save(recv_persister)
125124
return await self.process_provisional_proposal(provisional_proposal, recv_persister)
126-
125+
127126
async def process_provisional_proposal(self, proposal: ProvisionalProposal, recv_persister: InMemoryReceiverSessionEventLog):
128127
payjoin_proposal = proposal.finalize_proposal(ProcessPsbtCallback(self.receiver), 1, 10).save(recv_persister)
129128
return ReceiveSession.PAYJOIN_PROPOSAL(payjoin_proposal)
130-
129+
131130
async def test_integration_v2_to_v2(self):
132131
try:
133132
receiver_address = bitcoinffi.Address(json.loads(self.receiver.call("getnewaddress", [])), bitcoinffi.Network.REGTEST)
@@ -154,7 +153,7 @@ async def test_integration_v2_to_v2(self):
154153
pj_uri = session.pj_uri()
155154
psbt = build_sweep_psbt(self.sender, pj_uri)
156155
req_ctx: WithReplyKey = SenderBuilder(psbt, pj_uri).build_recommended(1000).save(sender_persister)
157-
request: RequestV2PostContext = req_ctx.extract_v2(ohttp_relay.as_string())
156+
request: RequestV2PostContext = req_ctx.create_v2_post_request(ohttp_relay.as_string())
158157
response = await agent.post(
159158
url=request.request.url.as_string(),
160159
headers={"Content-Type": request.request.content_type},
@@ -172,7 +171,7 @@ async def test_integration_v2_to_v2(self):
172171
self.assertEqual(payjoin_proposal.is_PAYJOIN_PROPOSAL(), True)
173172

174173
payjoin_proposal = payjoin_proposal.inner
175-
request: RequestResponse = payjoin_proposal.extract_req(ohttp_relay.as_string())
174+
request: RequestResponse = payjoin_proposal.create_post_request(ohttp_relay.as_string())
176175
response = await agent.post(
177176
url=request.request.url.as_string(),
178177
headers={"Content-Type": request.request.content_type},
@@ -184,7 +183,7 @@ async def test_integration_v2_to_v2(self):
184183
# Inside the Sender:
185184
# Sender checks, signs, finalizes, extracts, and broadcasts
186185
# Replay post fallback to get the response
187-
request: RequestOhttpContext = send_ctx.extract_req(ohttp_relay.as_string())
186+
request: RequestOhttpContext = send_ctx.create_poll_request(ohttp_relay.as_string())
188187
response = await agent.post(
189188
url=request.request.url.as_string(),
190189
headers={"Content-Type": request.request.content_type},
@@ -254,7 +253,7 @@ def callback(self, tx):
254253
return res
255254
except Exception as e:
256255
print(f"An error occurred: {e}")
257-
return None
256+
return None
258257

259258
class IsScriptOwnedCallback(IsScriptOwned):
260259
def __init__(self, connection: RpcClient):
@@ -280,7 +279,7 @@ def __init__(self, connection: RpcClient):
280279
self.connection = connection
281280

282281
def callback(self, psbt: str):
283-
res = json.loads(self.connection.call("walletprocesspsbt", [psbt]))
282+
res = json.loads(self.connection.call("walletprocesspsbt", [psbt]))
284283
return res['psbt']
285284

286285
if __name__ == "__main__":

payjoin-ffi/src/receive/mod.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -214,21 +214,22 @@ impl
214214
}
215215

216216
impl Initialized {
217-
pub fn extract_req(
217+
/// Construct an OHTTP encapsulated GET request, polling the mailbox for the Original PSBT
218+
pub fn create_poll_request(
218219
&self,
219220
ohttp_relay: String,
220221
) -> Result<(Request, ClientResponse), ReceiverError> {
221222
self.0
222223
.clone()
223-
.extract_req(ohttp_relay)
224+
.create_poll_request(ohttp_relay)
224225
.map(|(req, ctx)| (req.into(), ctx.into()))
225226
.map_err(Into::into)
226227
}
227228

228-
///The response can either be an UncheckedProposal or an ACCEPTED message indicating no UncheckedProposal is available yet.
229-
pub fn process_res(&self, body: &[u8], ctx: &ClientResponse) -> InitializedTransition {
229+
/// The response can either be an UncheckedProposal or an ACCEPTED message indicating no UncheckedProposal is available yet.
230+
pub fn process_response(&self, body: &[u8], ctx: &ClientResponse) -> InitializedTransition {
230231
InitializedTransition(Arc::new(RwLock::new(Some(
231-
self.0.clone().process_res(body, ctx.into()),
232+
self.0.clone().process_response(body, ctx.into()),
232233
))))
233234
}
234235

@@ -845,30 +846,30 @@ impl PayjoinProposal {
845846
.to_string()
846847
}
847848

848-
/// Extract an OHTTP Encapsulated HTTP POST request for the Proposal PSBT
849-
pub fn extract_req(
849+
/// Construct an OHTTP Encapsulated HTTP POST request for the Proposal PSBT
850+
pub fn create_post_request(
850851
&self,
851852
ohttp_relay: String,
852853
) -> Result<(Request, ClientResponse), ReceiverError> {
853854
self.0
854855
.clone()
855-
.extract_req(ohttp_relay)
856+
.create_post_request(ohttp_relay)
856857
.map_err(Into::into)
857858
.map(|(req, ctx)| (req.into(), ctx.into()))
858859
}
859860

860-
///Processes the response for the final POST message from the receiver client in the v2 Payjoin protocol.
861+
/// Processes the response for the final POST message from the receiver client in the v2 Payjoin protocol.
861862
///
862863
/// This function decapsulates the response using the provided OHTTP context. If the response status is successful, it indicates that the Payjoin proposal has been accepted. Otherwise, it returns an error with the status code.
863864
///
864865
/// After this function is called, the receiver can either wait for the Payjoin transaction to be broadcast or choose to broadcast the original PSBT.
865-
pub fn process_res(
866+
pub fn process_response(
866867
&self,
867868
body: &[u8],
868869
ohttp_context: &ClientResponse,
869870
) -> PayjoinProposalTransition {
870871
PayjoinProposalTransition(Arc::new(RwLock::new(Some(
871-
self.0.clone().process_res(body, ohttp_context.into()),
872+
self.0.clone().process_response(body, ohttp_context.into()),
872873
))))
873874
}
874875
}

0 commit comments

Comments
 (0)