Skip to content

Commit 4d86cf2

Browse files
committed
Fix Partitioned SessionWindow bug, where restoring from checkpoint triggers an exception.
Cause of the Issue: The PartitionedSessionWindowPipe keeps multiple dictionary states. One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization. On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback. During this, the scenario of empty LinkedList value is missed and not restored. When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException Regression No, this existed for a long time. The customer hit the bug now because of workaround suggested (for another bug) to use 'timestamp by .. over' clause. ReproSteps: Added as a Testcase. Fix: Added code to restore key with empty LinkedList. Verified that this was the exact state just before checkpoint. Added a testcase that triggers this bug and with the fix the testcase passes.
1 parent dc69f56 commit 4d86cf2

File tree

2 files changed

+110
-1
lines changed

2 files changed

+110
-1
lines changed

Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,15 +249,23 @@ public override int CurrentlyBufferedInputCount
249249

250250
protected override void UpdatePointers()
251251
{
252+
// Restore orderedKeysDictionary, as it is not Serialized/Deserialized
252253
int iter = FastDictionary<TKey, long>.IteratorStart;
253254
var temp = new List<Tuple<TKey, long, TPartitionKey>>();
254255
while (this.lastDataTimeDictionary.Iterate(ref iter))
255256
{
257+
var partitionKey = this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key);
258+
256259
if (this.stateDictionary.entries[iter].value.Any())
257260
{
258261
temp.Add(Tuple.Create(
259262
this.lastDataTimeDictionary.entries[iter].key,
260-
Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value), this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key)));
263+
Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value),
264+
partitionKey));
265+
}
266+
else if (!this.orderedKeysDictionary.ContainsKey(partitionKey))
267+
{
268+
this.orderedKeysDictionary.Add(partitionKey, new LinkedList<TKey>());
261269
}
262270
}
263271
foreach (var item in temp.OrderBy(o => o.Item2))
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// *********************************************************************
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// Licensed under the MIT License
4+
// *********************************************************************
5+
using System;
6+
using System.Collections.Generic;
7+
using System.IO;
8+
using System.Linq;
9+
using System.Reactive.Linq;
10+
using System.Reactive.Subjects;
11+
using Microsoft.StreamProcessing;
12+
using Microsoft.VisualStudio.TestTools.UnitTesting;
13+
14+
namespace SimpleTesting
15+
{
16+
/* This test case verifies fix for
17+
* Bug 944724: Exception in Trill SessionWindow when Partition (substreams) is used and restored from checkpoint
18+
*
19+
* Cause of the Bug:
20+
* The PartitionedSessionWindowPipe keeps multiple dictionary states.
21+
* One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization.
22+
* On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback.
23+
* During this, the scenario of empty LinkedList value is missed and not restored.
24+
* When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException
25+
*/
26+
[TestClass]
27+
public class PartitionedStreamCheckpointTests : TestWithConfigSettingsWithoutMemoryLeakDetection
28+
{
29+
[TestMethod, TestCategory("Gated")]
30+
public void CheckpointPartitionedSessionWindow()
31+
{
32+
Config.DataBatchSize = 1;
33+
34+
var data = new PartitionedStreamEvent<int, double>[]
35+
{
36+
PartitionedStreamEvent.CreatePoint(0, 5, 1.0),
37+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 8),
38+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 11),
39+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 14),
40+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 17),
41+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 21),
42+
PartitionedStreamEvent.CreatePoint(0, 24, 1.0),
43+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 100),
44+
};
45+
46+
var expected = new PartitionedStreamEvent<int, double>[]
47+
{
48+
PartitionedStreamEvent.CreateStart(0, 5, 1.0),
49+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 8),
50+
PartitionedStreamEvent.CreateEnd(0, 9, 5, 1.0),
51+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 11),
52+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 14),
53+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 17),
54+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 21),
55+
PartitionedStreamEvent.CreateStart(0, 24, 1.0),
56+
PartitionedStreamEvent.CreateEnd(0, 28, 24, 1.0),
57+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 100),
58+
};
59+
60+
// This index represents the point when the checkpoint restore needs to happen to trigger the bug.
61+
const int checkpointIndex = 6;
62+
63+
var subject = new Subject<PartitionedStreamEvent<int, double>>();
64+
var output = new List<PartitionedStreamEvent<int, double>>();
65+
var process = CreateQueryContainerForPartitionedStream(subject, output);
66+
67+
for (int i = 0; i < data.Length; i++)
68+
{
69+
if (i == checkpointIndex)
70+
{
71+
using (var ms = new MemoryStream())
72+
{
73+
process.Checkpoint(ms);
74+
ms.Seek(0, SeekOrigin.Begin);
75+
76+
subject = new Subject<PartitionedStreamEvent<int, double>>();
77+
process = CreateQueryContainerForPartitionedStream(subject, output, ms);
78+
}
79+
}
80+
81+
subject.OnNext(data[i]);
82+
}
83+
84+
Assert.IsTrue(expected.SequenceEqual(output));
85+
}
86+
87+
private Process CreateQueryContainerForPartitionedStream(
88+
Subject<PartitionedStreamEvent<int, double>> subject,
89+
List<PartitionedStreamEvent<int, double>> output,
90+
Stream stream = null)
91+
{
92+
var qc = new QueryContainer();
93+
var input = qc.RegisterInput(subject);
94+
var streamableOutput = input.SessionTimeoutWindow(4, 5).Sum(o => o);
95+
var egress = qc.RegisterOutput(streamableOutput).ForEachAsync(o => output.Add(o));
96+
var process = qc.Restore(stream);
97+
98+
return process;
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)