Skip to content

Commit 8437629

Browse files
authored
Add Begin frame to Session (#81)
1 parent 2c8f24c commit 8437629

File tree

7 files changed

+41
-31
lines changed

7 files changed

+41
-31
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [5.3.0] - 2026-01-xx
4+
5+
* Add `Begin` frame to `Session`
6+
37
## [5.2.1] - 2026-01-16
48

59
* Fix link credit handling

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "5.2.1"
3+
version = "5.3.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"
@@ -27,7 +27,7 @@ frame-trace = []
2727
[dependencies]
2828
ntex-amqp-codec = "2.0.0"
2929

30-
ntex-bytes = "1.2"
30+
ntex-bytes = "1.4"
3131
ntex-router = "1"
3232
ntex-rt = "3.4"
3333
ntex-io = "3.3"

codec/src/types/variant.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl DescribedCompound {
141141
/// ```text
142142
/// 0xc0 0x02 0x01 0x50 0x03
143143
/// ```
144-
pub fn decode<T: Decode>(self) -> Result<T, AmqpParseError> {
144+
pub fn decode<T: Decode>(&self) -> Result<T, AmqpParseError> {
145145
let mut buf = self.data.clone();
146146
let result = T::decode(&mut buf)?;
147147
if buf.is_empty() {

src/connection.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl ConnectionInner {
245245
pub(crate) fn register_remote_session(
246246
&mut self,
247247
remote_channel_id: u16,
248-
begin: &Begin,
248+
begin: Begin,
249249
cell: &Cell<ConnectionInner>,
250250
) -> Result<(), AmqpProtocolError> {
251251
log::trace!(
@@ -256,24 +256,23 @@ impl ConnectionInner {
256256

257257
let entry = self.sessions.vacant_entry();
258258
let local_token = entry.key();
259+
let outgoing_window = begin.incoming_window();
259260

260261
let session = Cell::new(SessionInner::new(
261262
local_token,
262263
false,
263264
ConnectionRef(cell.clone()),
264265
remote_channel_id,
265-
begin.next_outgoing_id(),
266-
begin.incoming_window(),
267-
begin.outgoing_window(),
266+
begin,
268267
));
269268
entry.insert(SessionState::Established(session));
270269
self.sessions_map.insert(remote_channel_id, local_token);
271270

272271
let begin = Begin(Box::new(codec::BeginInner {
272+
outgoing_window,
273273
remote_channel: Some(remote_channel_id),
274274
next_outgoing_id: 1,
275275
incoming_window: u32::MAX,
276-
outgoing_window: begin.incoming_window(),
277276
handle_max: u32::MAX,
278277
offered_capabilities: None,
279278
desired_capabilities: None,
@@ -293,7 +292,7 @@ impl ConnectionInner {
293292
&mut self,
294293
local_channel_id: u16,
295294
remote_channel_id: u16,
296-
begin: &Begin,
295+
begin: Begin,
297296
) {
298297
log::trace!(
299298
"{}: Begin response received: local {:?} remote {:?}",
@@ -312,9 +311,7 @@ impl ConnectionInner {
312311
true,
313312
ConnectionRef(cell.clone()),
314313
remote_channel_id,
315-
begin.next_outgoing_id(),
316-
begin.incoming_window(),
317-
begin.outgoing_window(),
314+
begin,
318315
));
319316
self.sessions_map.insert(remote_channel_id, local_token);
320317

@@ -381,9 +378,9 @@ impl ConnectionInner {
381378
// the remote-channel property in the frame is the local channel id
382379
// we previously sent to the remote
383380
if let Some(local_channel_id) = begin.remote_channel() {
384-
self.complete_session_creation(local_channel_id, channel_id, &begin);
381+
self.complete_session_creation(local_channel_id, channel_id, begin);
385382
} else {
386-
self.register_remote_session(channel_id, &begin, inner)?;
383+
self.register_remote_session(channel_id, begin, inner)?;
387384
}
388385
return Ok(Action::None);
389386
}

src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ pub enum AmqpProtocolError {
6666
ConnectionDropped,
6767
}
6868

69+
impl From<AmqpParseError> for AmqpProtocolError {
70+
fn from(err: AmqpParseError) -> Self {
71+
Self::Codec(err.into())
72+
}
73+
}
74+
6975
#[derive(Clone, Debug, thiserror::Error)]
7076
#[error("Amqp error: {:?} {:?} ({:?})", err, description, info)]
7177
pub struct AmqpError {

src/rcvlink.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,8 +390,7 @@ impl ReceiverLinkInner {
390390
.connection()
391391
.config()
392392
.write_buf()
393-
.buf_with_capacity(msg.encoded_size())
394-
.into();
393+
.buf_with_capacity(msg.encoded_size());
395394
msg.encode(&mut buf);
396395
buf
397396
}
@@ -402,7 +401,6 @@ impl ReceiverLinkInner {
402401
.config()
403402
.write_buf()
404403
.buf_with_capacity(16)
405-
.into()
406404
};
407405
self.partial_body = Some(body);
408406

src/session.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use ntex_util::{HashMap, future::Either, future::Ready};
66
use slab::Slab;
77

88
use ntex_amqp_codec::protocol::{
9-
self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End,
10-
Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source,
11-
Transfer, TransferBody, TransferNumber,
9+
self as codec, Accepted, Attach, Begin, DeliveryNumber, DeliveryState, Detach, Disposition,
10+
End, Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode,
11+
Source, Transfer, TransferBody, TransferNumber,
1212
};
1313
use ntex_amqp_codec::{AmqpFrame, Encode};
1414

@@ -33,6 +33,7 @@ pub(crate) struct SessionInner {
3333
sink: ConnectionRef,
3434
next_outgoing_id: TransferNumber,
3535
flags: Flags,
36+
begin: Begin,
3637

3738
remote_channel_id: u16,
3839
next_incoming_id: TransferNumber,
@@ -64,6 +65,12 @@ impl Session {
6465
Session { inner }
6566
}
6667

68+
#[inline]
69+
/// Get begin frame reference
70+
pub fn frame(&self) -> &Begin {
71+
&self.inner.get_ref().begin
72+
}
73+
6774
#[inline]
6875
/// Get io tag for current connection
6976
pub fn tag(&self) -> &'static str {
@@ -282,17 +289,12 @@ impl SessionInner {
282289
local: bool,
283290
sink: ConnectionRef,
284291
remote_channel_id: u16,
285-
next_incoming_id: DeliveryNumber,
286-
remote_incoming_window: u32,
287-
remote_outgoing_window: u32,
292+
begin: Begin,
288293
) -> SessionInner {
289294
SessionInner {
290-
id,
291-
sink,
292-
next_incoming_id,
293-
remote_channel_id,
294-
remote_incoming_window,
295-
remote_outgoing_window,
295+
next_incoming_id: begin.next_outgoing_id(),
296+
remote_incoming_window: begin.incoming_window(),
297+
remote_outgoing_window: begin.outgoing_window(),
296298
flags: if local { Flags::LOCAL } else { Flags::empty() },
297299
next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
298300
unsettled_snd_deliveries: HashMap::default(),
@@ -305,6 +307,10 @@ impl SessionInner {
305307
pool_notify: pool::new(),
306308
pool_credit: pool::new(),
307309
closed: condition::Condition::new(),
310+
id,
311+
sink,
312+
begin,
313+
remote_channel_id,
308314
}
309315
}
310316

@@ -1255,8 +1261,7 @@ impl SessionInner {
12551261
.sink
12561262
.config()
12571263
.write_buf()
1258-
.buf_with_capacity(msg.encoded_size())
1259-
.into();
1264+
.buf_with_capacity(msg.encoded_size());
12601265
msg.encode(&mut buf);
12611266
buf.freeze()
12621267
}

0 commit comments

Comments
 (0)