@@ -55,9 +55,9 @@ internal Listing(Reddit reddit, string url, IWebAgent webAgent)
55
55
/// <param name="limitPerRequest">The number of listings to be returned per request</param>
56
56
/// <param name="maximumLimit">The maximum number of listings to return</param>
57
57
/// <returns></returns>
58
- public IEnumerator < T > GetEnumerator ( int limitPerRequest , int maximumLimit = - 1 )
58
+ public IEnumerator < T > GetEnumerator ( int limitPerRequest , int maximumLimit = - 1 , bool stream = false )
59
59
{
60
- return new ListingEnumerator < T > ( this , limitPerRequest , maximumLimit ) ;
60
+ return new ListingEnumerator < T > ( this , limitPerRequest , maximumLimit , stream ) ;
61
61
}
62
62
63
63
/// <summary>
@@ -69,6 +69,7 @@ public IEnumerator<T> GetEnumerator()
69
69
return GetEnumerator ( DefaultListingPerRequest ) ;
70
70
}
71
71
72
+
72
73
/// <summary>
73
74
/// Returns an enumerator that iterates through a collection
74
75
/// </summary>
@@ -103,6 +104,24 @@ public IEnumerable<T> GetListing(int maximumLimit, int limitPerRequest)
103
104
return GetEnumerator ( enumerator ) ;
104
105
}
105
106
107
+ /// <summary>
108
+ /// Returns an IEnumerable instance which will infinitely yield new <see cref="Thing"/>
109
+ /// </summary>
110
+ /// <param name="limitPerRequest">
111
+ /// Number of records to return in each request to the reddit api. Defaults to using the reddit
112
+ /// standard of 25 records of requests.
113
+ /// Adjusting this up or down based on the size of your subreddit and the rate at which new content
114
+ /// is created.
115
+ /// </param>
116
+ /// <param name="maximumLimit">maximum number of records to return</param>
117
+ /// <returns></returns>
118
+ public IEnumerable < T > GetListingStream ( int limitPerRequest = - 1 , int maximumLimit = - 1 )
119
+ {
120
+ // Get the enumerator with the specified maximum and per request limits
121
+ var enumerator = GetEnumerator ( limitPerRequest , maximumLimit , true ) ;
122
+ return GetEnumerator ( enumerator ) ;
123
+ }
124
+
106
125
/// <summary>
107
126
/// Converts an IEnumerator instance to an IEnumerable
108
127
/// </summary>
@@ -119,6 +138,7 @@ private static IEnumerable<T> GetEnumerator(IEnumerator<T> enumerator)
119
138
#pragma warning disable 0693
120
139
private class ListingEnumerator < T > : IEnumerator < T > where T : Thing
121
140
{
141
+ private bool stream = false ;
122
142
private Listing < T > Listing { get ; set ; }
123
143
private int CurrentPageIndex { get ; set ; }
124
144
private string After { get ; set ; }
@@ -128,17 +148,22 @@ private class ListingEnumerator<T> : IEnumerator<T> where T : Thing
128
148
private int LimitPerRequest { get ; set ; }
129
149
private int MaximumLimit { get ; set ; }
130
150
151
+ private List < string > done ;
152
+
131
153
/// <summary>
132
154
/// Creates a new ListingEnumerator instance
133
155
/// </summary>
134
156
/// <param name="listing"></param>
135
157
/// <param name="limitPerRequest">The number of listings to be returned per request. -1 will exclude this parameter and use the Reddit default (25)</param>
136
158
/// <param name="maximumLimit">The maximum number of listings to return, -1 will not add a limit</param>
137
- public ListingEnumerator ( Listing < T > listing , int limitPerRequest , int maximumLimit )
159
+ /// <param name="stream">yield new <see cref="Thing"/> as they are created</param>
160
+ public ListingEnumerator ( Listing < T > listing , int limitPerRequest , int maximumLimit , bool stream = false )
138
161
{
139
162
Listing = listing ;
140
163
CurrentPageIndex = - 1 ;
141
164
CurrentPage = new Thing [ 0 ] ;
165
+ done = new List < string > ( ) ;
166
+ this . stream = stream ;
142
167
143
168
// Set the listings per page (if not specified, use the Reddit default of 25) and the maximum listings
144
169
LimitPerRequest = ( limitPerRequest <= 0 ? DefaultListingPerRequest : limitPerRequest ) ;
@@ -147,13 +172,24 @@ public ListingEnumerator(Listing<T> listing, int limitPerRequest, int maximumLim
147
172
148
173
public T Current
149
174
{
150
- get
175
+ get
151
176
{
152
177
return ( T ) CurrentPage [ CurrentPageIndex ] ;
153
178
}
154
179
}
155
180
156
181
private void FetchNextPage ( )
182
+ {
183
+ if ( stream )
184
+ PageForward ( ) ;
185
+ else
186
+ PageBack ( ) ;
187
+ }
188
+
189
+ /// <summary>
190
+ /// Standard behavior. Page from newest to oldest - "backward" in time.
191
+ /// </summary>
192
+ private void PageBack ( )
157
193
{
158
194
var url = Listing . Url ;
159
195
@@ -200,14 +236,83 @@ private void FetchNextPage()
200
236
Parse ( json ) ;
201
237
}
202
238
239
+
240
+ /// <summary>
241
+ /// Page from oldest to newest - "forward" in time.
242
+ /// </summary>
243
+ private void PageForward ( )
244
+ {
245
+ var url = Listing . Url ;
246
+
247
+ if ( Before != null )
248
+ {
249
+ url += ( url . Contains ( "?" ) ? "&" : "?" ) + "before=" + Before ;
250
+ }
251
+
252
+ if ( LimitPerRequest != - 1 )
253
+ {
254
+ int limit = LimitPerRequest ;
255
+
256
+ if ( limit > MaximumLimit )
257
+ {
258
+ // If the limit is more than the maximum number of listings, adjust
259
+ limit = MaximumLimit ;
260
+ }
261
+ else if ( Count + limit > MaximumLimit )
262
+ {
263
+ // If a smaller subset of listings are needed, adjust the limit
264
+ limit = MaximumLimit - Count ;
265
+ }
266
+
267
+ if ( limit > 0 )
268
+ {
269
+ // Add the limit, the maximum number of items to be returned per page
270
+ url += ( url . Contains ( "?" ) ? "&" : "?" ) + "limit=" + limit ;
271
+ }
272
+ }
273
+
274
+ if ( Count > 0 )
275
+ {
276
+ // Add the count, the number of items already seen in this listingStream
277
+ // The Reddit API uses this to determine when to give values for before and after fields
278
+ url += ( url . Contains ( "?" ) ? "&" : "?" ) + "count=" + Count ;
279
+ }
280
+
281
+ var request = Listing . WebAgent . CreateGet ( url ) ;
282
+ var response = request . GetResponse ( ) ;
283
+ var data = Listing . WebAgent . GetResponseString ( response . GetResponseStream ( ) ) ;
284
+ var json = JToken . Parse ( data ) ;
285
+ if ( json [ "kind" ] . ValueOrDefault < string > ( ) != "Listing" )
286
+ throw new FormatException ( "Reddit responded with an object that is not a listingStream." ) ;
287
+ Parse ( json ) ;
288
+ }
289
+
290
+
203
291
private void Parse ( JToken json )
204
292
{
205
293
var children = json [ "data" ] [ "children" ] as JArray ;
206
- CurrentPage = new Thing [ children . Count ] ;
207
-
208
- for ( int i = 0 ; i < CurrentPage . Length ; i ++ )
209
- CurrentPage [ i ] = Thing . Parse < T > ( Listing . Reddit , children [ i ] , Listing . WebAgent ) ;
294
+ var things = new List < Thing > ( ) ;
210
295
296
+ for ( int i = 0 ; i < children . Count ; i ++ )
297
+ {
298
+ if ( ! stream )
299
+ things . Add ( Thing . Parse < T > ( Listing . Reddit , children [ i ] , Listing . WebAgent ) ) ;
300
+ else
301
+ {
302
+ // we only want to see new items.
303
+ var id = children [ i ] [ "data" ] [ "id" ] . ValueOrDefault < string > ( ) ;
304
+ if ( String . IsNullOrEmpty ( id ) || done . Contains ( id ) )
305
+ continue ;
306
+
307
+ things . Add ( Thing . Parse < T > ( Listing . Reddit , children [ i ] , Listing . WebAgent ) ) ;
308
+ done . Add ( id ) ;
309
+ }
310
+ }
311
+
312
+ if ( stream )
313
+ things . Reverse ( ) ;
314
+
315
+ CurrentPage = things . ToArray ( ) ;
211
316
// Increase the total count of items returned
212
317
Count += CurrentPage . Length ;
213
318
@@ -226,6 +331,14 @@ object System.Collections.IEnumerator.Current
226
331
}
227
332
228
333
public bool MoveNext ( )
334
+ {
335
+ if ( stream )
336
+ return MoveNextForward ( ) ;
337
+ else
338
+ return MoveNextBack ( ) ;
339
+ }
340
+
341
+ private bool MoveNextBack ( )
229
342
{
230
343
CurrentPageIndex ++ ;
231
344
if ( CurrentPageIndex == CurrentPage . Length )
@@ -255,6 +368,63 @@ public bool MoveNext()
255
368
return true ;
256
369
}
257
370
371
+ private bool MoveNextForward ( )
372
+ {
373
+ CurrentPageIndex ++ ;
374
+ if ( CurrentPageIndex == CurrentPage . Length )
375
+ {
376
+ int tries = 0 ;
377
+ while ( true )
378
+ {
379
+ if ( MaximumLimit != - 1 && Count >= MaximumLimit )
380
+ return false ;
381
+
382
+ tries ++ ;
383
+ // Get the next page
384
+ try
385
+ {
386
+ FetchNextPage ( ) ;
387
+ }
388
+ catch ( Exception ex )
389
+ {
390
+ // sleep for a while to see if we can recover
391
+ // Sleep() will rethrow after waiting a bit
392
+ // todo: make this smarter
393
+ Sleep ( tries , ex ) ;
394
+ }
395
+
396
+ CurrentPageIndex = 0 ;
397
+
398
+ if ( CurrentPage . Length == 0 )
399
+ {
400
+ // No listings were returned in the page
401
+ // sleep for a while
402
+ Sleep ( tries ) ;
403
+ }
404
+ else
405
+ {
406
+ tries = 0 ;
407
+ break ;
408
+ }
409
+ }
410
+ }
411
+ return true ;
412
+ }
413
+
414
+ private void Sleep ( int tries , Exception ex = null )
415
+ {
416
+ // wait up to 3 minutes between tries
417
+ int seconds = 180 ;
418
+
419
+ if ( tries > 36 )
420
+ if ( ex != null )
421
+ throw ex ;
422
+ else
423
+ seconds = tries * 5 ;
424
+
425
+ System . Threading . Thread . Sleep ( seconds * 1000 ) ;
426
+ }
427
+
258
428
public void Reset ( )
259
429
{
260
430
After = Before = null ;
0 commit comments