Skip to content

Commit bcb3653

Browse files
authored
fix: copy from CSV OOM when file is large. (#18830)
* fix: copy from CSV OOM when file is large. * ci: add logs of loading test data.
1 parent a60af5a commit bcb3653

File tree

5 files changed

+18
-43
lines changed

5 files changed

+18
-43
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/stage/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ databend-common-storages-parquet = { workspace = true }
3434
databend-storages-common-stage = { workspace = true }
3535
databend-storages-common-table-meta = { workspace = true }
3636
enum-as-inner = { workspace = true }
37-
futures = { workspace = true }
3837
jsonb = { workspace = true }
3938
lexical-core = { workspace = true }
4039
log = { workspace = true }

src/query/storages/stage/src/read/row_based/processors/reader.rs

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,21 @@ use databend_common_exception::Result;
2323
use databend_common_expression::DataBlock;
2424
use databend_common_pipeline_sources::PrefetchAsyncSource;
2525
use databend_storages_common_stage::SingleFilePartition;
26-
use futures::AsyncRead;
27-
use futures::AsyncReadExt;
2826
use log::debug;
2927
use opendal::Operator;
3028

3129
use crate::read::row_based::batch::BytesBatch;
3230

3331
struct FileState {
3432
file: SingleFilePartition,
35-
reader: opendal::FuturesAsyncReader,
33+
reader: opendal::Reader,
3634
offset: usize,
3735
}
3836

3937
pub struct BytesReader {
4038
table_ctx: Arc<dyn TableContext>,
4139
op: Operator,
4240
read_batch_size: usize,
43-
io_size: usize,
4441
file_state: Option<FileState>,
4542
prefetch_num: usize,
4643
}
@@ -52,36 +49,26 @@ impl BytesReader {
5249
read_batch_size: usize,
5350
prefetch_num: usize,
5451
) -> Result<Self> {
55-
// TODO: Use 8MiB as default IO size for now, we can extract as a new config.
56-
let default_io_size: usize = 8 * 1024 * 1024;
57-
// Calculate the IO size, which:
58-
//
59-
// - is the multiple of read_batch_size.
60-
// - is larger or equal to default_io_size.
61-
let io_size = default_io_size.div_ceil(read_batch_size) * read_batch_size;
62-
6352
Ok(Self {
6453
table_ctx,
6554
op,
6655
read_batch_size,
67-
io_size,
6856
file_state: None,
6957
prefetch_num,
7058
})
7159
}
7260

7361
pub async fn read_batch(&mut self) -> Result<DataBlock> {
7462
if let Some(state) = &mut self.file_state {
75-
let end = state.file.size.min(self.read_batch_size + state.offset);
76-
let mut buffer = vec![0u8; end - state.offset];
77-
let n = read_full(&mut state.reader, &mut buffer[..]).await?;
63+
let end = state.file.size.min(self.read_batch_size + state.offset) as u64;
64+
let buffer = state.reader.read(state.offset as u64..end).await?.to_vec();
65+
let n = buffer.len();
7866
if n == 0 {
7967
return Err(ErrorCode::BadBytes(format!(
8068
"Unexpected EOF {} expect {} bytes, read only {} bytes.",
8169
state.file.path, state.file.size, state.offset
8270
)));
8371
};
84-
buffer.truncate(n);
8572

8673
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, n);
8774
self.table_ctx
@@ -129,15 +116,8 @@ impl PrefetchAsyncSource for BytesReader {
129116
};
130117
let file = SingleFilePartition::from_part(&part)?.clone();
131118

132-
let reader = self
133-
.op
134-
.reader_with(&file.path)
135-
.chunk(self.io_size)
136-
// TODO: Use 4 concurrent for test, let's extract as a new setting.
137-
.concurrent(4)
138-
.await?
139-
.into_futures_async_read(0..file.size as u64)
140-
.await?;
119+
let reader = self.op.reader(&file.path).await?;
120+
141121
self.file_state = Some(FileState {
142122
file,
143123
reader,
@@ -150,18 +130,3 @@ impl PrefetchAsyncSource for BytesReader {
150130
}
151131
}
152132
}
153-
154-
#[async_backtrace::framed]
155-
pub async fn read_full<R: AsyncRead + Unpin>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
156-
let mut buf = &mut buf[0..];
157-
let mut n = 0;
158-
while !buf.is_empty() {
159-
let read = reader.read(buf).await?;
160-
if read == 0 {
161-
break;
162-
}
163-
n += read;
164-
buf = &mut buf[read..]
165-
}
166-
Ok(n)
167-
}

tests/sqllogictests/scripts/prepare_tpch_data.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ echo "CREATE TABLE IF NOT EXISTS ${db}.lineitem
120120
#import data
121121
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
122122
python $CURDIR/prepare_duckdb_tpch_data.py 1
123+
ls -lh /tmp/tpch_1/*
123124

124125
stmt "drop stage if exists s1"
125126
stmt "create stage s1 url='fs:///tmp/tpch_1/'"

tests/sqllogictests/src/util.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,17 @@ fn run_script(name: &str, args: &[&str]) -> Result<()> {
234234
name,
235235
String::from_utf8(output.stderr).unwrap()
236236
)));
237+
} else {
238+
println!(
239+
"script stdout:\n {}",
240+
String::from_utf8(output.stdout).unwrap()
241+
);
242+
if !output.stderr.is_empty() {
243+
println!(
244+
"script stderr:\n {}",
245+
String::from_utf8(output.stderr).unwrap()
246+
);
247+
}
237248
}
238249
Ok(())
239250
}

0 commit comments

Comments
 (0)