Skip to content

Commit 2bc27c4

Browse files
authored
Merge pull request #31 from Microsoft/develop
LINQ and Rx core concepts sample
2 parents e08eeab + e5d075f commit 2bc27c4

25 files changed

+991
-58
lines changed

Doc/HotObservables.md

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Both definitions are useful in many real-world situations, but there is a subtle
1818

1919
Let's start with trivial implementation of Hot observable:
2020

21+
```csharp
2122
class HotObservable : IObservable<int>
2223
{
2324
Subject<int> _subject = new Subject<int>();
@@ -33,19 +34,22 @@ Let's start with trivial implementation of Hot observable:
3334
{
3435
return _subject.Subscribe(observer);
3536
}
36-
}
37+
   }
38+
```
39+
3740
And do a simple query that relies on order:
3841

39-
var o = new HotObservable();
42+
```csharp
43+
   var o = new HotObservable();
4044

4145
var pairs =
4246
from f in o
4347
from n in o.Take(1)
4448
select new { First = f, Next = n };
4549

4650
pairs.Subscribe(Console.WriteLine);
47-
o.Run();
48-
51+
   o.Run();
52+
```
4953

5054
The output is:
5155

@@ -68,16 +72,17 @@ Here:
6872

6973
Now let's try the same query with Cold observable:
7074

71-
var data = new int[] { 0, 1, 2, 3, 4 };
75+
```csharp
76+
   var data = new int[] { 0, 1, 2, 3, 4 };
7277
var o = data.ToObservable();
7378

7479
var pairs =
7580
from f in o
7681
from n in o.Take(1)
7782
select new { First = f, Next = n };
7883

79-
pairs.Subscribe(Console.WriteLine);
80-
84+
   pairs.Subscribe(Console.WriteLine);
85+
```
8186

8287
The output is:
8388

@@ -89,13 +94,15 @@ The output is:
8994

9095
This result is exactly the same as LINQ to Objects on the original data as IEnumerable collection:
9196

97+
```csharp
9298
var data = new int[] { 0, 1, 2, 3, 4 };
9399
var pairs =
94100
from f in data
95101
from n in data.Take(1)
96102
select new { First = f, Next = n };
97103

98-
foreach (var p in pairs) Console.WriteLine(p);
104+
   foreach (var p in pairs) Console.WriteLine(p);
105+
```
99106

100107
Unfortunately, it is not easy to think about it as a marble diagram - it would look like arrows return back in time to start from 0.
101108

@@ -129,14 +136,16 @@ Supporting Cold observables on the other hand requires that the events are store
129136

130137
Let's consider example of Join query on IIS traces:
131138

132-
var requests = from b in begin
139+
```csharp
140+
var requests = from b in begin
133141
from e in end.Where(e=>e.Header.ActivityId == b.Header.ActivityId).Take(1)
134142
select new
135143
{
136144
b.Url,
137145
e.HttpStatus,
138146
Duration = (e.Header.Timestamp - b.Header.Timestamp).TotalMilliseconds
139147
};
148+
```
140149

141150
If "begin" and "end" were small in-memory collections this query could be done using LINQ to Objects. But if they refer to the same trace (say, 1TB etl file on disk):
142151

@@ -174,7 +183,8 @@ This allows users to:
174183

175184
Assuming **single, typed** input sequence the cold observable paradox can be avoided by turning the to hot:
176185

177-
var data = new int[] { 0, 1, 2, 3, 4 };
186+
```
187+
var data = new int[] { 0, 1, 2, 3, 4 };
178188
var o = data.ToObservable().Publish();
179189
180190
var pairs =
@@ -184,6 +194,7 @@ Assuming **single, typed** input sequence the cold observable paradox can be avo
184194
185195
pairs.Subscribe(Console.WriteLine);
186196
o.Connect();
197+
```
187198

188199
Behind the scenes, Publish() uses a Subject.
189200

Samples/LinqRxConcepts/AplogEnumerable.txt

Whitespace-only changes.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copied from: https://github.com/Microsoft/Tx/blob/master/Source/Tx.Core/ByteArrayExtensions.cs
2+
3+
using System.Text;
4+
5+
namespace System.Reactive
6+
{
7+
public static class ByteArrayExtensions
8+
{
9+
/// <summary>
10+
/// Displays the byte array as hex-dump, like within a debugger
11+
/// </summary>
12+
/// <param name="bytes">array to display as hex</param>
13+
/// <returns></returns>
14+
public static string ToHexDump(this byte[] bytes)
15+
{
16+
StringBuilder sb = new StringBuilder();
17+
int lineOffset = 0;
18+
19+
while (lineOffset < bytes.Length)
20+
{
21+
// output line offset from the start of the buffer
22+
sb.Append(lineOffset.ToString("x4"));
23+
sb.Append(": ");
24+
25+
// output hex dump
26+
int endOffset = Math.Min(lineOffset + 16, bytes.Length);
27+
int index = 0;
28+
for (int byteOffset = lineOffset; byteOffset < endOffset; byteOffset++)
29+
{
30+
if (index == 8)
31+
sb.Append(' ');
32+
33+
index++;
34+
sb.Append(bytes[byteOffset].ToString("x2"));
35+
sb.Append(' ');
36+
}
37+
38+
// fill in the blanks if we cut off without completing entire line
39+
int lineLength = endOffset - lineOffset;
40+
if (lineLength < 16)
41+
{
42+
for (int i = lineLength; i < 16; i++ )
43+
{
44+
if (index == 8)
45+
sb.Append(' ');
46+
47+
index++;
48+
49+
sb.Append(" ");
50+
}
51+
}
52+
53+
sb.Append(" ");
54+
55+
// output character dump
56+
index = 0;
57+
for (int byteOffset = lineOffset; byteOffset < endOffset; byteOffset++)
58+
{
59+
index++;
60+
byte b = bytes[byteOffset];
61+
if (b > 32)
62+
sb.Append((char)b);
63+
else
64+
sb.Append('.');
65+
66+
if (index == 8)
67+
sb.Append(' ');
68+
}
69+
70+
sb.AppendLine();
71+
lineOffset += 16;
72+
}
73+
74+
return sb.ToString();
75+
}
76+
}
77+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System.Collections;
2+
using System.Collections.Generic;
3+
4+
namespace System.Linq
5+
{
6+
/// <summary>
7+
/// This is mock-up implementation of .Where. It is called Filter to avoid name collision
8+
/// </summary>
9+
class Filter<T> : IEnumerable<T>
10+
{
11+
Func<T, bool> _filter;
12+
IEnumerable<T> _input;
13+
14+
public Filter(IEnumerable<T> input, Func<T, bool> filter)
15+
{
16+
_input = input;
17+
_filter = filter;
18+
}
19+
public IEnumerator<T> GetEnumerator()
20+
{
21+
return new Enumerator(this, _input.GetEnumerator());
22+
}
23+
24+
IEnumerator IEnumerable.GetEnumerator()
25+
{
26+
return new Enumerator(this, _input.GetEnumerator());
27+
}
28+
29+
class Enumerator : IEnumerator<T>
30+
{
31+
Filter<T> _parent;
32+
IEnumerator<T> _input;
33+
public Enumerator(Filter<T> parent, IEnumerator<T> input)
34+
{
35+
_parent = parent;
36+
_input = input;
37+
}
38+
public bool MoveNext()
39+
{
40+
while (true)
41+
{
42+
if (!_input.MoveNext())
43+
return false;
44+
45+
if (_parent._filter(_input.Current))
46+
return true;
47+
}
48+
}
49+
public T Current { get { return _input.Current; } }
50+
object IEnumerator.Current { get { return _input.Current; } }
51+
52+
public void Dispose()
53+
{
54+
}
55+
56+
public void Reset()
57+
{
58+
throw new NotImplementedException();
59+
}
60+
}
61+
}
62+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
using System.Collections;
2+
using System.Collections.Generic;
3+
using System.Reactive;
4+
5+
namespace System.Linq
6+
{
7+
/// <summary>
8+
/// This is operator that executes push pipeline inside pull runtime
9+
/// </summary>
10+
/// <typeparam name="TIn">Input event type</typeparam>
11+
/// <typeparam name="TOut">Potpit event type</typeparam>
12+
class PushInsidePull<TIn, TOut> : IEnumerable<TOut>
13+
{
14+
IEnumerable<TIn> _input;
15+
Func<IObservable<TIn>, IObservable<TOut>> _pushPipe;
16+
public PushInsidePull(IEnumerable<TIn> input, Func<IObservable<TIn>, IObservable<TOut>> pushPipe)
17+
{
18+
_input = input;
19+
_pushPipe = pushPipe;
20+
}
21+
public IEnumerator<TOut> GetEnumerator()
22+
{
23+
return new Enumerator(this, _input.GetEnumerator(), _pushPipe);
24+
}
25+
26+
IEnumerator IEnumerable.GetEnumerator()
27+
{
28+
return new Enumerator(this, _input.GetEnumerator(), _pushPipe);
29+
}
30+
31+
class Enumerator : IEnumerator<TOut>
32+
{
33+
PushInsidePull<TIn, TOut> _parent;
34+
IEnumerator<TIn> _input;
35+
36+
Subject<TIn> _subject = new Subject<TIn>();
37+
IObservable<TOut> _pushOutput;
38+
PushResultHolder<TOut> _result = new PushResultHolder<TOut>();
39+
40+
public Enumerator(PushInsidePull<TIn, TOut> parent, IEnumerator<TIn> input, Func<IObservable<TIn>, IObservable<TOut>> pushPipe)
41+
{
42+
_parent = parent;
43+
_input = input;
44+
_pushOutput = pushPipe(_subject); // this constructs the pipeline, but does not let it go yet
45+
_pushOutput.Subscribe(_result); // this enables the flow... assumind someone pushed events into the _subject
46+
}
47+
public bool MoveNext()
48+
{
49+
while (true)
50+
{
51+
if (!_input.MoveNext())
52+
return false;
53+
54+
_result.HasValue = false;
55+
_subject.OnNext(_input.Current);
56+
57+
if (_result.HasValue)
58+
return true;
59+
}
60+
}
61+
public TOut Current { get { return _result.Value; } }
62+
object IEnumerator.Current { get { return _result.Value; } }
63+
64+
public void Dispose()
65+
{
66+
;
67+
}
68+
69+
public void Reset()
70+
{
71+
throw new NotImplementedException();
72+
}
73+
}
74+
75+
class PushResultHolder<T> : IObserver<T>
76+
{
77+
public bool HasValue = false;
78+
public T Value;
79+
public void OnNext(T value)
80+
{
81+
Value = value;
82+
HasValue = true;
83+
}
84+
public void OnCompleted()
85+
{
86+
throw new NotImplementedException();
87+
}
88+
89+
public void OnError(Exception error)
90+
{
91+
throw new NotImplementedException();
92+
}
93+
}
94+
}
95+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System.Collections.Generic;
2+
3+
namespace System.Linq
4+
{
5+
public static class PullLinq
6+
{
7+
/// <summary>
8+
/// Filter does the same exact thing as .Where, and is intended to show how .Where actually works
9+
/// </summary>
10+
public static IEnumerable<T> Filter<T>(this IEnumerable<T> input, Func<T, bool> filter)
11+
{
12+
return new Filter<T>(input, filter);
13+
}
14+
15+
/// <summary>
16+
/// Invoke a push rule for every element of pull sequence
17+
/// </summary>
18+
/// <typeparam name="TIn">Input event type</typeparam>
19+
/// <typeparam name="TOut">Output event type</typeparam>
20+
/// <param name="input">the input sequence</param>
21+
/// <param name="pushPipe">function that creates IObservable pipeline</param>
22+
/// <returns></returns>
23+
public static IEnumerable<TOut> ReplayRealTimeRule<TIn, TOut>(
24+
this IEnumerable<TIn> input,
25+
Func<IObservable<TIn>, IObservable<TOut>> pushPipe)
26+
{
27+
return new PushInsidePull<TIn, TOut>(input, pushPipe);
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)