Skip to content

Commit fcf7061

Browse files
authored
fix: unhandled error variants in session ops (#230)
1 parent 2d2146b commit fcf7061

File tree

2 files changed

+236
-6
lines changed

2 files changed

+236
-6
lines changed

src/api.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -664,11 +664,31 @@ impl From<TerminalMessage> for ApiError {
664664
Ok(s) => s,
665665
Err(e) => return ApiError::S2STerminalDecode(e.into()),
666666
};
667-
let response = match serde_json::from_str::<ApiErrorResponse>(&msg.body) {
668-
Ok(r) => r,
669-
Err(e) => return ApiError::S2STerminalDecode(e.into()),
670-
};
671-
ApiError::Server(status, response)
667+
if status == StatusCode::PRECONDITION_FAILED {
668+
let condition_failed = match serde_json::from_str::<AppendConditionFailed>(&msg.body) {
669+
Ok(condition_failed) => condition_failed,
670+
Err(err) => {
671+
return ApiError::S2STerminalDecode(err.into());
672+
}
673+
};
674+
ApiError::AppendConditionFailed(condition_failed)
675+
} else if status == StatusCode::RANGE_NOT_SATISFIABLE {
676+
let tail = match serde_json::from_str::<TailResponse>(&msg.body) {
677+
Ok(tail) => tail,
678+
Err(err) => {
679+
return ApiError::S2STerminalDecode(err.into());
680+
}
681+
};
682+
ApiError::ReadBeyondTail(tail)
683+
} else {
684+
let response = match serde_json::from_str::<ApiErrorResponse>(&msg.body) {
685+
Ok(response) => response,
686+
Err(err) => {
687+
return ApiError::S2STerminalDecode(err.into());
688+
}
689+
};
690+
ApiError::Server(status, response)
691+
}
672692
}
673693
}
674694

tests/stream_ops.rs

Lines changed: 211 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::time::Duration;
44

55
use assert_matches::assert_matches;
66
use common::S2Stream;
7+
use futures::StreamExt;
78
use s2::{
89
append_session::AppendSessionConfig,
910
batching::BatchingConfig,
@@ -403,7 +404,38 @@ async fn append_with_mismatched_seq_num_errors(stream: &S2Stream) -> Result<(),
403404

404405
#[test_context(S2Stream)]
405406
#[tokio_shared_rt::test(shared)]
406-
async fn append_with_mismatched_fencing_token(stream: &S2Stream) -> Result<(), S2Error> {
407+
async fn append_session_with_mismatched_seq_num_errors(stream: &S2Stream) -> Result<(), S2Error> {
408+
let session = stream.append_session(AppendSessionConfig::new());
409+
410+
let input1 = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
411+
"lorem",
412+
)?])?);
413+
414+
let ack = session.submit(input1).await?.await?;
415+
416+
assert_eq!(ack.start.seq_num, 0);
417+
assert_eq!(ack.end.seq_num, 1);
418+
419+
let input2 = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
420+
"ipsum",
421+
)?])?)
422+
.with_match_seq_num(0);
423+
424+
let result = session.submit(input2).await?.await;
425+
426+
assert_matches!(
427+
result,
428+
Err(S2Error::AppendConditionFailed(
429+
AppendConditionFailed::SeqNumMismatch(1)
430+
))
431+
);
432+
433+
Ok(())
434+
}
435+
436+
#[test_context(S2Stream)]
437+
#[tokio_shared_rt::test(shared)]
438+
async fn append_with_mismatched_fencing_token_errors(stream: &S2Stream) -> Result<(), S2Error> {
407439
let fencing_token_1 = FencingToken::generate(30).expect("valid fencing token");
408440
let input1 = AppendInput::new(AppendRecordBatch::try_from_iter([CommandRecord::fence(
409441
fencing_token_1.clone(),
@@ -433,6 +465,42 @@ async fn append_with_mismatched_fencing_token(stream: &S2Stream) -> Result<(), S
433465
Ok(())
434466
}
435467

468+
#[test_context(S2Stream)]
469+
#[tokio_shared_rt::test(shared)]
470+
async fn append_session_with_mismatched_fencing_token_errors(
471+
stream: &S2Stream,
472+
) -> Result<(), S2Error> {
473+
let session = stream.append_session(AppendSessionConfig::new());
474+
475+
let fencing_token_1 = FencingToken::generate(30).expect("valid fencing token");
476+
let input1 = AppendInput::new(AppendRecordBatch::try_from_iter([CommandRecord::fence(
477+
fencing_token_1.clone(),
478+
)
479+
.into()])?);
480+
481+
let ack = session.submit(input1).await?.await?;
482+
483+
assert_eq!(ack.start.seq_num, 0);
484+
assert_eq!(ack.end.seq_num, 1);
485+
486+
let fencing_token_2 = FencingToken::generate(30).expect("valid fencing token");
487+
let input2 = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
488+
"ipsum",
489+
)?])?)
490+
.with_fencing_token(fencing_token_2);
491+
492+
let result = session.submit(input2).await?.await;
493+
494+
assert_matches!(
495+
result,
496+
Err(S2Error::AppendConditionFailed(AppendConditionFailed::FencingTokenMismatch(fencing_token))) => {
497+
assert_eq!(fencing_token, fencing_token_1)
498+
}
499+
);
500+
501+
Ok(())
502+
}
503+
436504
#[test_context(S2Stream)]
437505
#[tokio_shared_rt::test(shared)]
438506
async fn read_empty_stream_errors(stream: &S2Stream) -> Result<(), S2Error> {
@@ -450,6 +518,148 @@ async fn read_empty_stream_errors(stream: &S2Stream) -> Result<(), S2Error> {
450518
Ok(())
451519
}
452520

521+
#[test_context(S2Stream)]
522+
#[tokio_shared_rt::test(shared)]
523+
async fn read_beyond_tail_errors(stream: &S2Stream) -> Result<(), S2Error> {
524+
let input = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
525+
"lorem",
526+
)?])?);
527+
528+
let ack = stream.append(input).await?;
529+
530+
assert_eq!(ack.start.seq_num, 0);
531+
assert_eq!(ack.end.seq_num, 1);
532+
533+
let result = stream
534+
.read(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(10))))
535+
.await;
536+
537+
assert_matches!(
538+
result,
539+
Err(S2Error::ReadBeyondTail(StreamPosition { seq_num: 1, .. }))
540+
);
541+
542+
Ok(())
543+
}
544+
545+
#[test_context(S2Stream)]
546+
#[tokio_shared_rt::test(shared)]
547+
async fn read_beyond_tail_with_clamp_to_tail_errors(stream: &S2Stream) -> Result<(), S2Error> {
548+
let input = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
549+
"lorem",
550+
)?])?);
551+
552+
let ack = stream.append(input).await?;
553+
554+
assert_eq!(ack.start.seq_num, 0);
555+
assert_eq!(ack.end.seq_num, 1);
556+
557+
let result = stream
558+
.read(
559+
ReadInput::new().with_start(
560+
ReadStart::new()
561+
.with_from(ReadFrom::SeqNum(10))
562+
.with_clamp_to_tail(true),
563+
),
564+
)
565+
.await;
566+
567+
assert_matches!(
568+
result,
569+
Err(S2Error::ReadBeyondTail(StreamPosition { seq_num: 1, .. }))
570+
);
571+
572+
Ok(())
573+
}
574+
575+
#[test_context(S2Stream)]
576+
#[tokio_shared_rt::test(shared)]
577+
async fn read_beyond_tail_with_clamp_to_tail_and_wait_returns_empty_batch(
578+
stream: &S2Stream,
579+
) -> Result<(), S2Error> {
580+
let input = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
581+
"lorem",
582+
)?])?);
583+
584+
let ack = stream.append(input).await?;
585+
586+
assert_eq!(ack.start.seq_num, 0);
587+
assert_eq!(ack.end.seq_num, 1);
588+
589+
let batch = stream
590+
.read(
591+
ReadInput::new()
592+
.with_start(
593+
ReadStart::new()
594+
.with_from(ReadFrom::SeqNum(10))
595+
.with_clamp_to_tail(true),
596+
)
597+
.with_stop(ReadStop::new().with_wait(1)),
598+
)
599+
.await?;
600+
601+
assert!(batch.records.is_empty());
602+
603+
Ok(())
604+
}
605+
606+
#[test_context(S2Stream)]
607+
#[tokio_shared_rt::test(shared)]
608+
async fn read_session_beyond_tail_errors(stream: &S2Stream) -> Result<(), S2Error> {
609+
let input = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
610+
"lorem",
611+
)?])?);
612+
613+
let ack = stream.append(input).await?;
614+
615+
assert_eq!(ack.start.seq_num, 0);
616+
assert_eq!(ack.end.seq_num, 1);
617+
618+
let mut batches = stream
619+
.read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(10))))
620+
.await;
621+
622+
let result = batches.next().await;
623+
624+
assert_matches!(
625+
result,
626+
Some(Err(S2Error::ReadBeyondTail(StreamPosition {
627+
seq_num: 1,
628+
..
629+
})))
630+
);
631+
632+
Ok(())
633+
}
634+
635+
#[test_context(S2Stream)]
636+
#[tokio_shared_rt::test(shared)]
637+
async fn read_session_beyond_tail_with_clamp_to_tail(stream: &S2Stream) -> Result<(), S2Error> {
638+
let input = AppendInput::new(AppendRecordBatch::try_from_iter([AppendRecord::new(
639+
"lorem",
640+
)?])?);
641+
642+
let ack = stream.append(input).await?;
643+
644+
assert_eq!(ack.start.seq_num, 0);
645+
assert_eq!(ack.end.seq_num, 1);
646+
647+
let mut batches = stream
648+
.read_session(
649+
ReadInput::new().with_start(
650+
ReadStart::new()
651+
.with_from(ReadFrom::SeqNum(10))
652+
.with_clamp_to_tail(true),
653+
),
654+
)
655+
.await;
656+
657+
let result = tokio::time::timeout(Duration::from_secs(1), batches.next()).await;
658+
assert_matches!(result, Err(tokio::time::error::Elapsed { .. }));
659+
660+
Ok(())
661+
}
662+
453663
#[test_context(S2Stream)]
454664
#[tokio_shared_rt::test(shared)]
455665
async fn append_with_empty_header_value(stream: &S2Stream) -> Result<(), S2Error> {

0 commit comments

Comments
 (0)