@@ -56,22 +56,31 @@ TEST(ConcurrentReadableArrayTest, MultiThreaded) {
56
56
};
57
57
ConcurrentReadableArray<Value> array;
58
58
59
+ // The writers will append values with their thread number and increasing
60
+ // values of x.
59
61
auto writer = [&](int threadNumber) {
60
62
for (int i = 0 ; i < insertCount; i++)
61
63
array.push_back ({ threadNumber, i });
62
64
};
63
65
64
66
auto reader = [&] {
67
+ // Track the maximum value we've seen for each writer thread.
65
68
int maxByThread[writerCount];
66
69
bool done = false ;
67
70
while (!done) {
68
71
for (int i = 0 ; i < writerCount; i++)
69
72
maxByThread[i] = -1 ;
70
73
for (auto element : array.snapshot ()) {
71
74
ASSERT_LT (element.threadNumber , writerCount);
75
+ // Each element we see must be larger than the maximum element we've
76
+ // previously seen for that writer thread, otherwise that means that
77
+ // we're seeing mutations out of order.
72
78
ASSERT_GT (element.x , maxByThread[element.threadNumber ]);
73
79
maxByThread[element.threadNumber ] = element.x ;
74
80
}
81
+
82
+ // If the max for each thread is the max that'll be inserted, then we're
83
+ // done and should exit.
75
84
done = true ;
76
85
for (int i = 0 ; i < writerCount; i++) {
77
86
if (maxByThread[i] < insertCount - 1 )
@@ -87,5 +96,61 @@ TEST(ConcurrentReadableArrayTest, MultiThreaded) {
87
96
reader ();
88
97
});
89
98
90
- ASSERT_EQ (array.snapshot ().count (), writerCount * insertCount);
99
+ ASSERT_EQ (array.snapshot ().count (), (size_t )writerCount * insertCount);
100
+ }
101
+
102
+ TEST (ConcurrentReadableArrayTest, MultiThreaded2) {
103
+ const int writerCount = 16 ;
104
+ const int readerCount = 8 ;
105
+ const int insertCount = 100000 ;
106
+
107
+ struct Value {
108
+ int threadNumber;
109
+ int x;
110
+ };
111
+ ConcurrentReadableArray<Value> array;
112
+
113
+ // The writers will append values with their thread number and increasing
114
+ // values of x.
115
+ auto writer = [&](int threadNumber) {
116
+ for (int i = 0 ; i < insertCount; i++)
117
+ array.push_back ({ threadNumber, i });
118
+ };
119
+
120
+ auto reader = [&] {
121
+ // Track the maximum value we've seen for each writer thread.
122
+ int maxByThread[writerCount];
123
+ for (int i = 0 ; i < writerCount; i++)
124
+ maxByThread[i] = -1 ;
125
+ bool done = false ;
126
+ while (!done) {
127
+ auto snapshot = array.snapshot ();
128
+ // Don't do anything until some data is actually added.
129
+ if (snapshot.count () == 0 )
130
+ continue ;
131
+
132
+ // Grab the last element in the snapshot.
133
+ auto element = snapshot.begin ()[snapshot.count () - 1 ];
134
+ ASSERT_LT (element.threadNumber , writerCount);
135
+ // Each element we see must be equal to or larger than the maximum element
136
+ // we've previously seen for that writer thread, otherwise that means that
137
+ // we're seeing mutations out of order.
138
+ ASSERT_GE (element.x , maxByThread[element.threadNumber ]);
139
+ maxByThread[element.threadNumber ] = element.x ;
140
+
141
+ // We'll eventually see some thread add its maximum value. We'll call it
142
+ // done when we reach that point.
143
+ if (element.x == insertCount - 1 )
144
+ done = true ;
145
+ }
146
+ };
147
+
148
+ threadedExecute (writerCount + readerCount, [&](int i) {
149
+ if (i < writerCount)
150
+ writer (i);
151
+ else
152
+ reader ();
153
+ });
154
+
155
+ ASSERT_EQ (array.snapshot ().count (), (size_t )writerCount * insertCount);
91
156
}
0 commit comments