5
5
* GNU General Public License version 2.
6
6
*/
7
7
8
+ use std:: time:: Duration ;
9
+
8
10
use async_trait:: async_trait;
11
+ use cloned:: cloned;
12
+ use context:: PerfCounterType ;
13
+ use edenapi_types:: ServerError ;
9
14
use edenapi_types:: StreamingChangelogRequest ;
10
15
use edenapi_types:: StreamingChangelogResponse ;
16
+ use edenapi_types:: legacy:: Metadata ;
17
+ use edenapi_types:: legacy:: StreamingChangelogBlob ;
18
+ use edenapi_types:: legacy:: StreamingChangelogData ;
19
+ use futures:: future:: FutureExt ;
20
+ use futures:: stream;
21
+ use futures:: stream:: StreamExt ;
22
+ use futures:: stream:: TryStreamExt ;
23
+ use futures_ext:: FbStreamExt ;
24
+ use futures_stats:: TimedFutureExt ;
11
25
use mononoke_api:: Repo ;
26
+ use slog:: debug;
27
+ use streaming_clone:: StreamingCloneArc ;
28
+ use time_ext:: DurationExt ;
12
29
13
30
use super :: HandlerResult ;
14
31
use super :: SaplingRemoteApiHandler ;
15
32
use super :: SaplingRemoteApiMethod ;
16
33
use super :: handler:: SaplingRemoteApiContext ;
17
34
35
+ const TIMEOUT_SECS : Duration = Duration :: from_secs ( 4 * 60 * 60 ) ;
36
+
18
37
/// Legacy streaming changelog handler from wireproto.
19
38
#[ allow( dead_code) ]
20
39
pub struct StreamingCloneHandler ;
@@ -29,9 +48,108 @@ impl SaplingRemoteApiHandler for StreamingCloneHandler {
29
48
const ENDPOINT : & ' static str = "/streaming_clone" ;
30
49
31
50
async fn handler (
32
- _ectx : SaplingRemoteApiContext < Self :: PathExtractor , Self :: QueryStringExtractor , Repo > ,
33
- _request : Self :: Request ,
51
+ ectx : SaplingRemoteApiContext < Self :: PathExtractor , Self :: QueryStringExtractor , Repo > ,
52
+ request : Self :: Request ,
34
53
) -> HandlerResult < ' async_trait , Self :: Response > {
35
- unimplemented ! ( "StreamingCloneHandler is not implemented" )
54
+ let streaming_clone = ectx. repo ( ) . repo ( ) . streaming_clone_arc ( ) ;
55
+ let ctx = ectx. repo ( ) . ctx ( ) . clone ( ) ;
56
+
57
+ let changelog = streaming_clone
58
+ . fetch_changelog ( ctx. clone ( ) , request. tag . as_deref ( ) )
59
+ . await ?;
60
+
61
+ let data_blob_chunk_count = 0 ;
62
+ let data_blobs: Vec < _ > = changelog
63
+ . data_blobs
64
+ . into_iter ( )
65
+ . map ( |fut| {
66
+ cloned ! ( ctx) ;
67
+ async move {
68
+ let ( stats, res) = fut. timed ( ) . await ;
69
+ ctx. perf_counters ( ) . add_to_counter (
70
+ PerfCounterType :: SumManifoldPollTime ,
71
+ stats. poll_time . as_nanos_unchecked ( ) as i64 ,
72
+ ) ;
73
+ if let Ok ( bytes) = res. as_ref ( ) {
74
+ ctx. perf_counters ( )
75
+ . add_to_counter ( PerfCounterType :: BytesSent , bytes. len ( ) as i64 )
76
+ }
77
+
78
+ let data = res. map ( |res| {
79
+ StreamingChangelogData :: DataBlobChunk ( StreamingChangelogBlob {
80
+ chunk : res. into ( ) ,
81
+ chunk_id : data_blob_chunk_count,
82
+ } )
83
+ } ) ;
84
+
85
+ StreamingChangelogResponse {
86
+ data : data. map_err ( |e| ServerError :: generic ( format ! ( "{:?}" , e) ) ) ,
87
+ }
88
+ }
89
+ . boxed ( )
90
+ } )
91
+ . collect ( ) ;
92
+
93
+ let index_blob_chunk_count = 0 ;
94
+ let index_blobs: Vec < _ > = changelog
95
+ . index_blobs
96
+ . into_iter ( )
97
+ . map ( |fut| {
98
+ cloned ! ( ctx) ;
99
+ async move {
100
+ let ( stats, res) = fut. timed ( ) . await ;
101
+ ctx. perf_counters ( ) . add_to_counter (
102
+ PerfCounterType :: SumManifoldPollTime ,
103
+ stats. poll_time . as_nanos_unchecked ( ) as i64 ,
104
+ ) ;
105
+ if let Ok ( bytes) = res. as_ref ( ) {
106
+ ctx. perf_counters ( )
107
+ . add_to_counter ( PerfCounterType :: BytesSent , bytes. len ( ) as i64 )
108
+ }
109
+
110
+ let data = res. map ( |res| {
111
+ StreamingChangelogData :: IndexBlobChunk ( StreamingChangelogBlob {
112
+ chunk : res. into ( ) ,
113
+ chunk_id : index_blob_chunk_count,
114
+ } )
115
+ } ) ;
116
+
117
+ StreamingChangelogResponse {
118
+ data : data. map_err ( |e| ServerError :: generic ( format ! ( "{:?}" , e) ) ) ,
119
+ }
120
+ }
121
+ . boxed ( )
122
+ } )
123
+ . collect ( ) ;
124
+
125
+ debug ! (
126
+ ctx. logger( ) ,
127
+ "streaming changelog {} index bytes, {} data bytes" ,
128
+ changelog. index_size,
129
+ changelog. data_size
130
+ ) ;
131
+
132
+ let metadata = StreamingChangelogData :: Metadata ( Metadata {
133
+ index_size : changelog. index_size as u64 ,
134
+ data_size : changelog. data_size as u64 ,
135
+ } ) ;
136
+ let mut response_header = Vec :: new ( ) ;
137
+ response_header. push ( metadata) ;
138
+
139
+ let response = stream:: iter (
140
+ response_header
141
+ . into_iter ( )
142
+ . map ( |data| StreamingChangelogResponse { data : Ok ( data) } ) ,
143
+ ) ;
144
+
145
+ let res = response
146
+ . chain ( stream:: iter ( index_blobs) . buffered ( 100 ) )
147
+ . chain ( stream:: iter ( data_blobs) . buffered ( 100 ) ) ;
148
+
149
+ Ok ( res
150
+ . whole_stream_timeout ( TIMEOUT_SECS )
151
+ . yield_periodically ( )
152
+ . map_err ( |e| e. into ( ) )
153
+ . boxed ( ) )
36
154
}
37
155
}
0 commit comments