1
- use std:: {
2
- fs:: { File , OpenOptions } ,
3
- io:: Write ,
4
- path:: PathBuf ,
5
- } ;
1
+ use std:: path:: PathBuf ;
6
2
7
3
use anyhow:: Result ;
8
4
use futures:: stream:: { FuturesUnordered , StreamExt , TryStreamExt } ;
9
5
use mongodb:: { bson:: doc, Client , Collection , Database } ;
10
6
11
- use crate :: bench:: { parse_json_file_to_documents, Benchmark , COLL_NAME , DATABASE_NAME } ;
7
+ use crate :: {
8
+ bench:: { parse_json_file_to_documents, Benchmark , COLL_NAME , DATABASE_NAME } ,
9
+ fs:: File ,
10
+ } ;
12
11
13
12
const TOTAL_FILES : usize = 100 ;
14
13
@@ -42,9 +41,9 @@ impl Benchmark for JsonMultiExportBenchmark {
42
41
43
42
tasks. push ( async move {
44
43
let json_file_name = path. join ( format ! ( "ldjson{:03}.txt" , i) ) ;
45
- let file = spawn_blocking_and_await ! ( File :: open ( & json_file_name) ) ?;
44
+ let file = File :: open_read ( & json_file_name) . await ?;
46
45
47
- let docs = spawn_blocking_and_await ! ( parse_json_file_to_documents( file) ) ?;
46
+ let docs = parse_json_file_to_documents ( file) . await ?;
48
47
49
48
for mut doc in docs {
50
49
doc. insert ( "file" , i as i32 ) ;
@@ -57,7 +56,6 @@ impl Benchmark for JsonMultiExportBenchmark {
57
56
}
58
57
59
58
while let Some ( result) = tasks. next ( ) . await {
60
- println ! ( "done!" ) ;
61
59
result?;
62
60
}
63
61
@@ -77,32 +75,16 @@ impl Benchmark for JsonMultiExportBenchmark {
77
75
// lot of work for little gain since we `unwrap()` in
78
76
// main.rs anyway.
79
77
let file_name = path. join ( format ! ( "ldjson{:03}.txt" , i) ) ;
80
- let mut file = OpenOptions :: new ( )
81
- . create ( true )
82
- . write ( true )
83
- . open ( & file_name)
84
- . unwrap ( ) ;
78
+ let mut file = File :: open_write ( & file_name) . await . unwrap ( ) ;
85
79
86
80
let mut cursor = coll_ref
87
81
. find ( Some ( doc ! { "file" : i as i32 } ) , None )
88
82
. await
89
83
. unwrap ( ) ;
90
84
91
- let ( sender, mut receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
92
-
93
- let send_future = spawn ! ( async move {
94
- while let Some ( doc) = cursor. try_next( ) . await . unwrap( ) {
95
- sender. send( doc. to_string( ) ) . unwrap( ) ;
96
- }
97
- } ) ;
98
-
99
- let rec_future = spawn_blocking_and_await ! ( async move {
100
- while let Some ( s) = receiver. next( ) . await {
101
- writeln!( file, "{}" , s) . unwrap( ) ;
102
- }
103
- } ) ;
104
-
105
- futures:: future:: join ( send_future, rec_future) . await
85
+ while let Some ( doc) = cursor. try_next ( ) . await . unwrap ( ) {
86
+ file. write_line ( & doc. to_string ( ) ) . await . unwrap ( ) ;
87
+ }
106
88
} ) ;
107
89
}
108
90
0 commit comments