Skip to content

Commit 8bb5713

Browse files
committed
refactor: rewrite Insert to use InsertFormatted
1 parent 850814b commit 8bb5713

File tree

3 files changed

+111
-267
lines changed

3 files changed

+111
-267
lines changed

src/insert.rs

Lines changed: 36 additions & 257 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,14 @@
1-
use crate::headers::{with_authentication, with_request_headers};
1+
use crate::insert_formatted::BufInsertFormatted;
22
use crate::row_metadata::RowMetadata;
33
use crate::rowbinary::{serialize_row_binary, serialize_with_validation};
44
use crate::{
5-
Client, Compression, RowWrite,
6-
error::{Error, Result},
5+
Client, RowWrite,
6+
error::Result,
77
formats,
8-
request_body::{ChunkSender, RequestBody},
9-
response::Response,
108
row::{self, Row},
11-
settings,
129
};
13-
use bytes::{Bytes, BytesMut};
1410
use clickhouse_types::put_rbwnat_columns_header;
15-
use hyper::{self, Request};
16-
use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Duration};
17-
use tokio::{
18-
task::JoinHandle,
19-
time::{Instant, Sleep},
20-
};
21-
use url::Url;
11+
use std::{future::Future, marker::PhantomData, time::Duration};
2212

2313
// The desired max frame size.
2414
const BUFFER_SIZE: usize = 256 * 1024;
@@ -35,7 +25,7 @@ const MIN_CHUNK_SIZE: usize = const {
3525
/// The [`Insert::end`] must be called to finalize the `INSERT`.
3626
/// Otherwise, the whole `INSERT` will be aborted.
3727
///
38-
/// Rows are being sent progressively to spread network load.
28+
/// Rows are sent progressively to spread network load.
3929
///
4030
/// # Note: Metadata is Cached
4131
/// If [validation is enabled][Client::with_validation],
@@ -48,92 +38,11 @@ const MIN_CHUNK_SIZE: usize = const {
4838
/// after any changes to the current database schema.
4939
#[must_use]
5040
pub struct Insert<T> {
51-
state: InsertState,
52-
buffer: BytesMut,
41+
insert: BufInsertFormatted,
5342
row_metadata: Option<RowMetadata>,
54-
#[cfg(feature = "lz4")]
55-
compression: Compression,
56-
send_timeout: Option<Duration>,
57-
end_timeout: Option<Duration>,
58-
// Use boxed `Sleep` to reuse a timer entry, it improves performance.
59-
// Also, `tokio::time::timeout()` significantly increases a future's size.
60-
sleep: Pin<Box<Sleep>>,
6143
_marker: PhantomData<fn() -> T>, // TODO: test contravariance.
6244
}
6345

64-
enum InsertState {
65-
NotStarted {
66-
client: Box<Client>,
67-
sql: String,
68-
},
69-
Active {
70-
sender: ChunkSender,
71-
handle: JoinHandle<Result<()>>,
72-
},
73-
Terminated {
74-
handle: JoinHandle<Result<()>>,
75-
},
76-
Completed,
77-
}
78-
79-
impl InsertState {
80-
fn sender(&mut self) -> Option<&mut ChunkSender> {
81-
match self {
82-
InsertState::Active { sender, .. } => Some(sender),
83-
_ => None,
84-
}
85-
}
86-
87-
fn handle(&mut self) -> Option<&mut JoinHandle<Result<()>>> {
88-
match self {
89-
InsertState::Active { handle, .. } | InsertState::Terminated { handle } => Some(handle),
90-
_ => None,
91-
}
92-
}
93-
94-
fn client_with_sql(&self) -> Option<(&Client, &str)> {
95-
match self {
96-
InsertState::NotStarted { client, sql } => Some((client, sql)),
97-
_ => None,
98-
}
99-
}
100-
101-
#[inline]
102-
fn expect_client_mut(&mut self) -> &mut Client {
103-
let Self::NotStarted { client, .. } = self else {
104-
panic!("cannot modify client options while an insert is in-progress")
105-
};
106-
107-
client
108-
}
109-
110-
fn terminated(&mut self) {
111-
match mem::replace(self, InsertState::Completed) {
112-
InsertState::NotStarted { .. } | InsertState::Completed => (),
113-
InsertState::Active { handle, .. } => {
114-
*self = InsertState::Terminated { handle };
115-
}
116-
InsertState::Terminated { handle } => {
117-
*self = InsertState::Terminated { handle };
118-
}
119-
}
120-
}
121-
}
122-
123-
// It should be a regular function, but it decreases performance.
124-
macro_rules! timeout {
125-
($self:expr, $timeout:ident, $fut:expr) => {{
126-
if let Some(timeout) = $self.$timeout {
127-
$self.sleep.as_mut().reset(Instant::now() + timeout);
128-
}
129-
130-
tokio::select! {
131-
res = $fut => Some(res),
132-
_ = &mut $self.sleep, if $self.$timeout.is_some() => None,
133-
}
134-
}};
135-
}
136-
13746
impl<T> Insert<T> {
13847
pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
13948
where
@@ -152,18 +61,11 @@ impl<T> Insert<T> {
15261
let sql = format!("INSERT INTO {table}({fields}) FORMAT {format}");
15362

15463
Self {
155-
state: InsertState::NotStarted {
156-
client: Box::new(client.clone()),
157-
sql,
158-
},
159-
buffer: BytesMut::with_capacity(BUFFER_SIZE),
160-
#[cfg(feature = "lz4")]
161-
compression: client.compression,
162-
send_timeout: None,
163-
end_timeout: None,
164-
sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))),
165-
_marker: PhantomData,
64+
insert: client
65+
.insert_formatted_with(sql)
66+
.buffered_with_capacity(BUFFER_SIZE),
16667
row_metadata,
68+
_marker: PhantomData,
16769
}
16870
}
16971

@@ -202,7 +104,7 @@ impl<T> Insert<T> {
202104
/// # Panics
203105
/// If called after the request is started, e.g., after [`Insert::write`].
204106
pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
205-
self.state.expect_client_mut().set_roles(roles);
107+
self.insert.expect_client_mut().set_roles(roles);
206108
self
207109
}
208110

@@ -216,7 +118,7 @@ impl<T> Insert<T> {
216118
/// # Panics
217119
/// If called after the request is started, e.g., after [`Insert::write`].
218120
pub fn with_default_roles(mut self) -> Self {
219-
self.state.expect_client_mut().clear_roles();
121+
self.insert.expect_client_mut().clear_roles();
220122
self
221123
}
222124

@@ -227,7 +129,7 @@ impl<T> Insert<T> {
227129
/// If called after the request is started, e.g., after [`Insert::write`].
228130
#[track_caller]
229131
pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
230-
self.state.expect_client_mut().add_option(name, value);
132+
self.insert.expect_client_mut().add_option(name, value);
231133
self
232134
}
233135

@@ -236,8 +138,7 @@ impl<T> Insert<T> {
236138
send_timeout: Option<Duration>,
237139
end_timeout: Option<Duration>,
238140
) {
239-
self.send_timeout = send_timeout;
240-
self.end_timeout = end_timeout;
141+
self.insert.set_timeouts(send_timeout, end_timeout);
241142
}
242143

243144
/// Serializes the provided row into an internal buffer.
@@ -270,8 +171,8 @@ impl<T> Insert<T> {
270171

271172
async move {
272173
result?;
273-
if self.buffer.len() >= MIN_CHUNK_SIZE {
274-
self.send_chunk().await?;
174+
if self.insert.buf_len() >= MIN_CHUNK_SIZE {
175+
self.insert.flush().await?;
275176
}
276177
Ok(())
277178
}
@@ -282,18 +183,27 @@ impl<T> Insert<T> {
282183
where
283184
T: RowWrite,
284185
{
285-
match self.state {
286-
InsertState::NotStarted { .. } => self.init_request(),
287-
InsertState::Active { .. } => Ok(()),
288-
_ => panic!("write() after error"),
289-
}?;
186+
// We don't want to wait for the buffer to be full before we start the request,
187+
// in the event of an error.
188+
let fresh_request = self.insert.init_request_if_required()?;
189+
190+
// The following calls need an `impl BufMut`
191+
let buffer = self.insert.buffer_mut();
290192

291-
let old_buf_size = self.buffer.len();
193+
let old_buf_size = buffer.len();
292194
let result = match &self.row_metadata {
293-
Some(metadata) => serialize_with_validation(&mut self.buffer, row, metadata),
294-
None => serialize_row_binary(&mut self.buffer, row),
195+
Some(metadata) => {
196+
let res = if fresh_request {
197+
put_rbwnat_columns_header(&metadata.columns, &mut *buffer).map_err(Into::into)
198+
} else {
199+
Ok(())
200+
};
201+
202+
res.and_then(|_| serialize_with_validation(&mut *buffer, row, metadata))
203+
}
204+
None => serialize_row_binary(&mut *buffer, row),
295205
};
296-
let written = self.buffer.len() - old_buf_size;
206+
let written = buffer.len() - old_buf_size;
297207

298208
if result.is_err() {
299209
self.abort();
@@ -309,141 +219,10 @@ impl<T> Insert<T> {
309219
///
310220
/// NOTE: If it isn't called, the whole `INSERT` is aborted.
311221
pub async fn end(mut self) -> Result<()> {
312-
if !self.buffer.is_empty() {
313-
self.send_chunk().await?;
314-
}
315-
self.state.terminated();
316-
self.wait_handle().await
317-
}
318-
319-
async fn send_chunk(&mut self) -> Result<()> {
320-
debug_assert!(matches!(self.state, InsertState::Active { .. }));
321-
322-
// Hyper uses non-trivial and inefficient schema of buffering chunks.
323-
// It's difficult to determine when allocations occur.
324-
// So, instead we control it manually here and rely on the system allocator.
325-
let chunk = self.take_and_prepare_chunk()?;
326-
let sender = self.state.sender().unwrap(); // checked above
327-
328-
let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) {
329-
Some(true) => return Ok(()),
330-
Some(false) => false, // an actual error will be returned from `wait_handle`
331-
None => true,
332-
};
333-
334-
// Error handling.
335-
336-
self.abort();
337-
338-
// TODO: is it required to wait the handle in the case of timeout?
339-
let res = self.wait_handle().await;
340-
341-
if is_timed_out {
342-
Err(Error::TimedOut)
343-
} else {
344-
res?; // a real error should be here.
345-
Err(Error::Network("channel closed".into()))
346-
}
347-
}
348-
349-
async fn wait_handle(&mut self) -> Result<()> {
350-
match self.state.handle() {
351-
Some(handle) => {
352-
let result = match timeout!(self, end_timeout, &mut *handle) {
353-
Some(Ok(res)) => res,
354-
Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()),
355-
Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))),
356-
None => {
357-
// We can do nothing useful here, so just shut down the background task.
358-
handle.abort();
359-
Err(Error::TimedOut)
360-
}
361-
};
362-
self.state = InsertState::Completed;
363-
result
364-
}
365-
_ => Ok(()),
366-
}
367-
}
368-
369-
#[cfg(feature = "lz4")]
370-
fn take_and_prepare_chunk(&mut self) -> Result<Bytes> {
371-
Ok(if self.compression.is_lz4() {
372-
let compressed = crate::compression::lz4::compress(&self.buffer)?;
373-
self.buffer.clear();
374-
compressed
375-
} else {
376-
mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze()
377-
})
378-
}
379-
380-
#[cfg(not(feature = "lz4"))]
381-
fn take_and_prepare_chunk(&mut self) -> Result<Bytes> {
382-
Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze())
383-
}
384-
385-
#[cold]
386-
#[track_caller]
387-
#[inline(never)]
388-
fn init_request(&mut self) -> Result<()> {
389-
debug_assert!(matches!(self.state, InsertState::NotStarted { .. }));
390-
let (client, sql) = self.state.client_with_sql().unwrap(); // checked above
391-
392-
let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
393-
let mut pairs = url.query_pairs_mut();
394-
pairs.clear();
395-
396-
if let Some(database) = &client.database {
397-
pairs.append_pair(settings::DATABASE, database);
398-
}
399-
400-
pairs.append_pair(settings::QUERY, sql);
401-
402-
if client.compression.is_lz4() {
403-
pairs.append_pair(settings::DECOMPRESS, "1");
404-
}
405-
406-
for (name, value) in &client.options {
407-
pairs.append_pair(name, value);
408-
}
409-
410-
drop(pairs);
411-
412-
let mut builder = Request::post(url.as_str());
413-
builder = with_request_headers(builder, &client.headers, &client.products_info);
414-
builder = with_authentication(builder, &client.authentication);
415-
416-
let (sender, body) = RequestBody::chunked();
417-
418-
let request = builder
419-
.body(body)
420-
.map_err(|err| Error::InvalidParams(Box::new(err)))?;
421-
422-
let future = client.http.request(request);
423-
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
424-
let handle =
425-
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });
426-
427-
match self.row_metadata {
428-
None => (), // RowBinary is used, no header is required.
429-
Some(ref metadata) => {
430-
put_rbwnat_columns_header(&metadata.columns, &mut self.buffer)?;
431-
}
432-
}
433-
434-
self.state = InsertState::Active { handle, sender };
435-
Ok(())
222+
self.insert.end().await
436223
}
437224

438225
fn abort(&mut self) {
439-
if let Some(sender) = self.state.sender() {
440-
sender.abort();
441-
}
442-
}
443-
}
444-
445-
impl<T> Drop for Insert<T> {
446-
fn drop(&mut self) {
447-
self.abort();
226+
self.insert.abort();
448227
}
449228
}

0 commit comments

Comments
 (0)