|
| 1 | +//! An asynchronous, pipelined, PostgreSQL client. |
| 2 | +//! |
| 3 | +//! # Example |
| 4 | +//! |
| 5 | +//! ```no_run |
| 6 | +//! use futures::{Future, Stream}; |
| 7 | +//! use tokio_postgres::NoTls; |
| 8 | +//! |
| 9 | +//! # #[cfg(not(feature = "runtime"))] |
| 10 | +//! # let fut = futures::future::ok(()); |
| 11 | +//! # #[cfg(feature = "runtime")] |
| 12 | +//! let fut = |
| 13 | +//! // Connect to the database |
| 14 | +//! tokio_postgres::connect("host=localhost user=postgres", NoTls) |
| 15 | +//! |
| 16 | +//! .map(|(client, connection)| { |
| 17 | +//! // The connection object performs the actual communication with the database, |
| 18 | +//! // so spawn it off to run on its own. |
| 19 | +//! let connection = connection.map_err(|e| eprintln!("connection error: {}", e)); |
| 20 | +//! tokio::spawn(connection); |
| 21 | +//! |
| 22 | +//! // The client is what you use to make requests. |
| 23 | +//! client |
| 24 | +//! }) |
| 25 | +//! |
| 26 | +//! .and_then(|mut client| { |
| 27 | +//! // Now we can prepare a simple statement that just returns its parameter. |
| 28 | +//! client.prepare("SELECT $1::TEXT") |
| 29 | +//! .map(|statement| (client, statement)) |
| 30 | +//! }) |
| 31 | +//! |
| 32 | +//! .and_then(|(mut client, statement)| { |
| 33 | +//! // And then execute it, returning a Stream of Rows which we collect into a Vec |
| 34 | +//! client.query(&statement, &[&"hello world"]).collect() |
| 35 | +//! }) |
| 36 | +//! |
| 37 | +//! // Now we can check that we got back the same string we sent over. |
| 38 | +//! .map(|rows| { |
| 39 | +//! let value: &str = rows[0].get(0); |
| 40 | +//! assert_eq!(value, "hello world"); |
| 41 | +//! }) |
| 42 | +//! |
| 43 | +//! // And report any errors that happened. |
| 44 | +//! .map_err(|e| { |
| 45 | +//! eprintln!("error: {}", e); |
| 46 | +//! }); |
| 47 | +//! |
| 48 | +//! // By default, tokio_postgres uses the tokio crate as its runtime. |
| 49 | +//! tokio::run(fut); |
| 50 | +//! ``` |
| 51 | +//! |
| 52 | +//! # Pipelining |
| 53 | +//! |
| 54 | +//! The client supports *pipelined* requests. Pipelining can improve performance in use cases in which multiple, |
| 55 | +//! independent queries need to be executed. In a traditional workflow, each query is sent to the server after the |
| 56 | +//! previous query completes. In contrast, pipelining allows the client to send all of the queries to the server up |
| 57 | +//! front, eliminating time spent on both sides waiting for the other to finish sending data: |
| 58 | +//! |
| 59 | +//! ```not_rust |
| 60 | +//! Sequential Pipelined |
| 61 | +//! | Client | Server | | Client | Server | |
| 62 | +//! |----------------|-----------------| |----------------|-----------------| |
| 63 | +//! | send query 1 | | | send query 1 | | |
| 64 | +//! | | process query 1 | | send query 2 | process query 1 | |
| 65 | +//! | receive rows 1 | | | send query 3 | process query 2 | |
| 66 | +//! | send query 2 | | | receive rows 1 | process query 3 | |
| 67 | +//! | | process query 2 | | receive rows 2 | | |
| 68 | +//! | receive rows 2 | | | receive rows 3 | | |
| 69 | +//! | send query 3 | | |
| 70 | +//! | | process query 3 | |
| 71 | +//! | receive rows 3 | | |
| 72 | +//! ``` |
| 73 | +//! |
| 74 | +//! In both cases, the PostgreSQL server is executing the queries sequentially - pipelining just allows both sides of |
| 75 | +//! the connection to work concurrently when possible. |
| 76 | +//! |
| 77 | +//! Pipelining happens automatically when futures are polled concurrently (for example, by using the futures `join` |
| 78 | +//! combinator). Say we want to prepare 2 statements: |
| 79 | +//! |
| 80 | +//! ```no_run |
| 81 | +//! use futures::Future; |
| 82 | +//! use tokio_postgres::{Client, Error, Statement}; |
| 83 | +//! |
| 84 | +//! fn prepare_sequential( |
| 85 | +//! client: &mut Client, |
| 86 | +//! ) -> impl Future<Item = (Statement, Statement), Error = Error> |
| 87 | +//! { |
| 88 | +//! client.prepare("SELECT * FROM foo") |
| 89 | +//! .and_then({ |
| 90 | +//! let f = client.prepare("INSERT INTO bar (id, name) VALUES ($1, $2)"); |
| 91 | +//! |s1| f.map(|s2| (s1, s2)) |
| 92 | +//! }) |
| 93 | +//! } |
| 94 | +//! |
| 95 | +//! fn prepare_pipelined( |
| 96 | +//! client: &mut Client, |
| 97 | +//! ) -> impl Future<Item = (Statement, Statement), Error = Error> |
| 98 | +//! { |
| 99 | +//! client.prepare("SELECT * FROM foo") |
| 100 | +//! .join(client.prepare("INSERT INTO bar (id, name) VALUES ($1, $2)")) |
| 101 | +//! } |
| 102 | +//! ``` |
| 103 | +//! |
| 104 | +//! # Runtime |
| 105 | +//! |
| 106 | +//! The client works with arbitrary `AsyncRead + AsyncWrite` streams. Convenience APIs are provided to handle the |
| 107 | +//! connection process, but these are gated by the `runtime` Cargo feature, which is enabled by default. If disabled, |
| 108 | +//! all dependence on the tokio runtime is removed. |
1 | 109 | #![warn(rust_2018_idioms, clippy::all)]
|
2 | 110 |
|
3 | 111 | use bytes::{Bytes, IntoBuf};
|
|
0 commit comments