Skip to content

Commit d5786df

Browse files
authored
feat(cubesql): Postgres protocol - stream support (#6025)
1 parent 9924a1a commit d5786df

File tree

6 files changed

+466
-273
lines changed

6 files changed

+466
-273
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/Cargo.lock

Lines changed: 17 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/cubesql/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ paste = "1.0.6"
5454
csv = "1.1.6"
5555
tracing = { version = "0.1.35", features = ["async-await"] }
5656
console-subscriber = "0.1.6"
57+
async-stream = "0.3.3"
58+
futures-core = "0.3.23"
59+
futures-util = "0.3.23"
60+
5761

5862
[dev-dependencies]
5963
pretty_assertions = "1.0.0"

rust/cubesql/cubesql/e2e/tests/postgres.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,72 @@ impl PostgresIntegrationTestSuite {
584584
}
585585
).await?;
586586

587+
self.test_simple_query(
588+
r#"declare test_cursor_fetching_less_than_batch_size cursor with hold for SELECT * from information_schema.testing_dataset order by id;"#
589+
.to_string(),
590+
|messages| {
591+
assert_eq!(messages.len(), 1);
592+
}
593+
).await?;
594+
595+
self.test_simple_query(
596+
r#"fetch 800 in test_cursor_fetching_less_than_batch_size; fetch 800 in test_cursor_fetching_less_than_batch_size; fetch 5000 in test_cursor_fetching_less_than_batch_size;"#
597+
.to_string(),
598+
|messages| {
599+
// 5000 rows | 3 completions
600+
assert_eq!(messages.len(), 5003);
601+
602+
self.assert_row(&messages[0], "0".to_string());
603+
self.assert_row(&messages[799], "799".to_string());
604+
605+
self.assert_complete(&messages[800], 800);
606+
607+
self.assert_row(&messages[801], "800".to_string());
608+
self.assert_row(&messages[1600], "1599".to_string());
609+
610+
self.assert_complete(&messages[1601], 800);
611+
612+
self.assert_row(&messages[1602], "1600".to_string());
613+
self.assert_row(&messages[5001], "4999".to_string());
614+
615+
self.assert_complete(&messages[5002], 3400);
616+
},
617+
)
618+
.await?;
619+
620+
self.test_simple_query(
621+
r#"declare test_cursor_fetching_more_than_batch_size cursor with hold for SELECT * from information_schema.testing_dataset order by id;"#
622+
.to_string(),
623+
|messages| {
624+
assert_eq!(messages.len(), 1);
625+
}
626+
).await?;
627+
628+
self.test_simple_query(
629+
r#"fetch 2400 in test_cursor_fetching_more_than_batch_size; fetch 2400 in test_cursor_fetching_more_than_batch_size; fetch 5000 in test_cursor_fetching_more_than_batch_size;"#
630+
.to_string(),
631+
|messages| {
632+
// 5000 rows | 3 completions
633+
assert_eq!(messages.len(), 5003);
634+
635+
self.assert_row(&messages[0], "0".to_string());
636+
self.assert_row(&messages[2399], "2399".to_string());
637+
638+
self.assert_complete(&messages[2400], 2400);
639+
640+
self.assert_row(&messages[2401], "2400".to_string());
641+
self.assert_row(&messages[4800], "4799".to_string());
642+
643+
self.assert_complete(&messages[4801], 2400);
644+
645+
self.assert_row(&messages[4802], "4800".to_string());
646+
self.assert_row(&messages[5001], "4999".to_string());
647+
648+
self.assert_complete(&messages[5002], 200);
649+
},
650+
)
651+
.await?;
652+
587653
Ok(())
588654
}
589655

@@ -857,6 +923,22 @@ impl PostgresIntegrationTestSuite {
857923

858924
Ok(())
859925
}
926+
927+
fn assert_row(&self, message: &SimpleQueryMessage, expected_value: String) {
928+
if let SimpleQueryMessage::Row(row) = message {
929+
assert_eq!(row.get(0), Some(expected_value.as_str()));
930+
} else {
931+
panic!("Must be Row command, {}", expected_value)
932+
}
933+
}
934+
935+
fn assert_complete(&self, message: &SimpleQueryMessage, expected_value: u64) {
936+
if let SimpleQueryMessage::CommandComplete(rows) = message {
937+
assert_eq!(rows, &expected_value);
938+
} else {
939+
panic!("Must be CommandComplete command, {}", expected_value)
940+
}
941+
}
860942
}
861943

862944
#[async_trait]

0 commit comments

Comments
 (0)