Skip to content

Commit 3210a29

Browse files
committed
refactor: port async_json_stream_reader implementation
1 parent c108a8d commit 3210a29

File tree

6 files changed

+1474
-530
lines changed

6 files changed

+1474
-530
lines changed

Cargo.toml

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,25 @@ name = "asyncjsonparser"
33
version = "0.1.0"
44
edition = "2021"
55
rust-version = "1.74"
6-
description = "Async JSON value streaming parser for AsyncRead sources"
6+
description = "Async JSON stream reader for selective parsing of large payloads"
77
readme = "README.md"
88
license = "MIT OR Apache-2.0"
99
repository = "https://github.com/singular-labs/async-json-streamer"
1010
homepage = "https://github.com/singular-labs/async-json-streamer"
1111
documentation = "https://docs.rs/asyncjsonparser"
12-
keywords = ["json", "async", "streaming", "parser", "ndjson"]
12+
keywords = ["json", "async", "streaming", "parser"]
1313
categories = ["asynchronous", "parser-implementations", "encoding"]
1414

1515
[dependencies]
16-
bytes = "1.6"
17-
futures = "0.3"
18-
serde = { version = "1.0", features = ["derive"] }
19-
serde_json = "1.0"
20-
thiserror = "1.0"
16+
bytes = "1.4.0"
17+
serde_json = "1.0.96"
18+
thiserror = "2.0.11"
19+
tokio = { version = "1.34.0", features = ["fs", "io-util", "macros", "rt-multi-thread"] }
20+
tracing = "0.1.40"
2121

2222
[dev-dependencies]
23-
futures = { version = "0.3", features = ["executor"] }
24-
tokio = { version = "1.37", features = ["fs", "macros", "rt-multi-thread", "io-util"] }
25-
tokio-util = { version = "0.7", features = ["compat"] }
23+
assert_matches = "1.5.0"
24+
serde = { version = "1.0.199", features = ["derive"] }
2625

2726
[package.metadata.docs.rs]
2827
all-features = true

README.md

Lines changed: 36 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# asyncjsonparser
22

3-
Async JSON value streaming parser for `AsyncRead` sources.
3+
Async JSON stream reader for selective parsing of large payloads. This is the
4+
standalone home of Extract's `AsyncJsonStreamReader` implementation.
45

56
[![crates.io](https://img.shields.io/crates/v/asyncjsonparser.svg)](https://crates.io/crates/asyncjsonparser)
67
[![docs.rs](https://docs.rs/asyncjsonparser/badge.svg)](https://docs.rs/asyncjsonparser)
@@ -9,11 +10,11 @@ Async JSON value streaming parser for `AsyncRead` sources.
910

1011
## Why asyncjsonparser
1112

12-
- Parses **consecutive JSON values** from a single async byte stream.
13-
- Handles **NDJSON** and whitespace-separated JSON values.
14-
- Works with **any runtime** via `futures::io::AsyncRead`.
15-
- **Configurable buffering** with a hard limit for safety.
16-
- **No unsafe** code.
13+
- Stream through large JSON without deserializing the full payload.
14+
- Selectively read keys, values, and arrays using a tokenized reader.
15+
- Handles chunk boundaries and escaped strings correctly.
16+
- Built on Tokio `AsyncRead`.
17+
- No unsafe code.
1718

1819
## Install
1920

@@ -25,98 +26,48 @@ cargo add asyncjsonparser
2526

2627
```rust
2728
use asyncjsonparser::AsyncJsonStreamReader;
28-
use futures::executor::block_on;
29-
use futures::io::Cursor;
30-
31-
let data = b"{\"id\":1}\n{\"id\":2} {\"id\":3}";
32-
let mut parser = AsyncJsonStreamReader::new(Cursor::new(data));
33-
34-
let ids = block_on(async {
35-
let mut out = Vec::new();
36-
while let Some(value) = parser.next_value::<serde_json::Value>().await? {
37-
out.push(value["id"].as_i64().unwrap());
38-
}
39-
Ok::<_, asyncjsonparser::Error>(out)
40-
})
41-
.unwrap();
42-
43-
assert_eq!(ids, vec![1, 2, 3]);
44-
```
45-
46-
## Tokio example
47-
48-
`asyncjsonparser` uses `futures::io::AsyncRead`. For Tokio, adapt with
49-
`tokio-util`:
50-
51-
```bash
52-
cargo add tokio --features fs,io-util,macros,rt-multi-thread
53-
cargo add tokio-util --features compat
54-
```
55-
56-
```rust
57-
use asyncjsonparser::AsyncJsonStreamReader;
58-
use tokio_util::compat::TokioAsyncReadCompatExt;
29+
use std::io::Cursor;
5930

6031
#[tokio::main]
61-
async fn main() -> Result<(), asyncjsonparser::Error> {
62-
let file = tokio::fs::File::open("data.ndjson").await?;
63-
let reader = tokio::io::BufReader::new(file).compat();
64-
let mut parser = AsyncJsonStreamReader::new(reader);
65-
66-
while let Some(value) = parser.next_value::<serde_json::Value>().await? {
67-
println!("id={}", value["id"]);
32+
async fn main() -> Result<(), asyncjsonparser::AsyncJsonStreamReaderError> {
33+
let data = r#"{"status":"success","results":[{"id":1},{"id":2}]}"#;
34+
let mut reader = AsyncJsonStreamReader::new(Cursor::new(data.as_bytes().to_vec()));
35+
36+
while let Some(key) = reader.next_object_entry().await? {
37+
match key.as_str() {
38+
"status" => {
39+
let status = reader.read_string().await?;
40+
println!("status={status}");
41+
}
42+
"results" => {
43+
while reader.start_array_item().await? {
44+
let obj = reader.deserialize_object().await?;
45+
println!("id={}", obj["id"]);
46+
}
47+
}
48+
_ => {}
49+
}
6850
}
6951

7052
Ok(())
7153
}
7254
```
7355

74-
## Stream interface
75-
76-
```rust
77-
use asyncjsonparser::AsyncJsonStreamReader;
78-
use futures::executor::block_on;
79-
use futures::io::Cursor;
80-
use futures::stream::TryStreamExt;
81-
82-
let data = b"{\"id\":1} {\"id\":2}";
83-
let parser = AsyncJsonStreamReader::new(Cursor::new(data));
84-
85-
let ids: Vec<i64> = block_on(async {
86-
parser
87-
.into_stream::<serde_json::Value>()
88-
.map_ok(|value| value["id"].as_i64().unwrap())
89-
.try_collect()
90-
.await
91-
})
92-
.unwrap();
93-
94-
assert_eq!(ids, vec![1, 2]);
95-
```
96-
97-
## Configuration
98-
99-
```rust
100-
use asyncjsonparser::{AsyncJsonStreamReader, ParserConfig};
101-
use futures::io::Cursor;
56+
## Common patterns
10257

103-
let config = ParserConfig::new(2 * 1024 * 1024, 16 * 1024);
104-
let parser = AsyncJsonStreamReader::with_config(Cursor::new(b"{}"), config);
105-
```
58+
- Read object entries with `next_object_entry`.
59+
- Skip values by calling `next_object_entry` again without consuming the value.
60+
- Stream arrays with `start_array_item`.
61+
- Parse string/number/bool with `read_string`, `read_number`, `read_boolean`.
62+
- Deserialize a sub-object with `deserialize_object`.
10663

10764
## Error handling
10865

109-
All fallible operations return `asyncjsonparser::Error`:
110-
111-
- `Error::Io` for reader failures
112-
- `Error::Json` for invalid or truncated JSON
113-
- `Error::BufferLimit` when the buffer exceeds `max_buffer_size`
114-
115-
## Performance tips
66+
All fallible operations return `AsyncJsonStreamReaderError`:
11667

117-
- Increase `read_chunk_size` if your reader prefers larger reads.
118-
- Keep `max_buffer_size` large enough for your largest JSON payload.
119-
- For huge payloads, prefer NDJSON over a single giant JSON array.
68+
- `Io` for reader failures
69+
- `JsonError` for malformed JSON
70+
- `UnexpectedToken` when the stream doesn't match the expected structure
12071

12172
## MSRV
12273

examples/ndjson.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,26 @@
11
use asyncjsonparser::AsyncJsonStreamReader;
2-
use futures::executor::block_on;
3-
use futures::io::Cursor;
2+
use std::io::Cursor;
43

5-
fn main() -> Result<(), asyncjsonparser::Error> {
6-
let data = b"{\"id\":1}\n{\"id\":2} {\"id\":3}";
7-
let mut parser = AsyncJsonStreamReader::new(Cursor::new(data));
4+
#[tokio::main]
5+
async fn main() -> Result<(), asyncjsonparser::AsyncJsonStreamReaderError> {
6+
let data = r#"{"status":"ok","results":[{"id":1},{"id":2}]}"#;
7+
let mut reader = AsyncJsonStreamReader::new(Cursor::new(data.as_bytes().to_vec()));
88

9-
block_on(async {
10-
while let Some(value) = parser.next_value::<serde_json::Value>().await? {
11-
println!("id={}", value["id"]);
9+
while let Some(key) = reader.next_object_entry().await? {
10+
match key.as_str() {
11+
"status" => {
12+
let status = reader.read_string().await?;
13+
println!("status={status}");
14+
}
15+
"results" => {
16+
while reader.start_array_item().await? {
17+
let obj = reader.deserialize_object().await?;
18+
println!("id={}", obj["id"]);
19+
}
20+
}
21+
_ => {}
1222
}
13-
Ok(())
14-
})
23+
}
24+
25+
Ok(())
1526
}

examples/tokio.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use asyncjsonparser::AsyncJsonStreamReader;
2-
use tokio_util::compat::TokioAsyncReadCompatExt;
2+
use tokio::io::BufReader;
33

44
#[tokio::main]
5-
async fn main() -> Result<(), asyncjsonparser::Error> {
6-
let file = tokio::fs::File::open("data.ndjson").await?;
7-
let reader = tokio::io::BufReader::new(file).compat();
5+
async fn main() -> Result<(), asyncjsonparser::AsyncJsonStreamReaderError> {
6+
let file = tokio::fs::File::open("data.json").await?;
7+
let reader = BufReader::new(file);
88
let mut parser = AsyncJsonStreamReader::new(reader);
99

10-
while let Some(value) = parser.next_value::<serde_json::Value>().await? {
11-
println!("id={}", value["id"]);
10+
while let Some(key) = parser.next_object_entry().await? {
11+
if key == "payload" {
12+
let payload = parser.deserialize_object().await?;
13+
println!("payload keys: {}", payload.len());
14+
}
1215
}
1316

1417
Ok(())

0 commit comments

Comments
 (0)