22
22
23
23
import static org .junit .Assert .assertEquals ;
24
24
25
- import com .uber .m3 .tally .RootScopeBuilder ;
26
- import com .uber .m3 .tally .Scope ;
27
25
import com .uber .m3 .util .ImmutableMap ;
28
26
import io .temporal .activity .ActivityInterface ;
29
27
import io .temporal .activity .ActivityMethod ;
32
30
import io .temporal .client .WorkflowClient ;
33
31
import io .temporal .client .WorkflowOptions ;
34
32
import io .temporal .common .RetryOptions ;
35
- import io .temporal .common .reporter .TestStatsReporter ;
36
33
import io .temporal .testUtils .CountingSlotSupplier ;
37
34
import io .temporal .testing .internal .SDKTestWorkflowRule ;
38
- import io .temporal .worker .MetricsType ;
39
35
import io .temporal .worker .WorkerOptions ;
40
36
import io .temporal .worker .tuning .ActivitySlotInfo ;
41
37
import io .temporal .worker .tuning .CompositeTuner ;
@@ -66,13 +62,9 @@ public class WorkflowSlotsSmallSizeTests {
66
62
new CountingSlotSupplier <>(MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE );
67
63
private final CountingSlotSupplier <LocalActivitySlotInfo > localActivitySlotSupplier =
68
64
new CountingSlotSupplier <>(MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE );
69
- private final TestStatsReporter reporter = new TestStatsReporter ();
70
65
static Semaphore parallelSemRunning = new Semaphore (0 );
71
66
static Semaphore parallelSemBlocked = new Semaphore (0 );
72
67
73
- Scope metricsScope =
74
- new RootScopeBuilder ().reporter (reporter ).reportEvery (com .uber .m3 .util .Duration .ofMillis (1 ));
75
-
76
68
@ Parameterized .Parameter public boolean activitiesAreLocal ;
77
69
78
70
@ Parameterized .Parameters ()
@@ -91,15 +83,13 @@ public static Object[] data() {
91
83
activityTaskSlotSupplier ,
92
84
localActivitySlotSupplier ))
93
85
.build ())
94
- .setMetricsScope (metricsScope )
95
86
.setActivityImplementations (new TestActivitySemaphoreImpl ())
96
87
.setWorkflowTypes (ParallelActivities .class )
97
88
.setDoNotStart (true )
98
89
.build ();
99
90
100
91
@ Before
101
92
public void setup () {
102
- reporter .flush ();
103
93
parallelSemRunning = new Semaphore (0 );
104
94
parallelSemBlocked = new Semaphore (0 );
105
95
}
@@ -116,24 +106,9 @@ public void tearDown() {
116
106
localActivitySlotSupplier .releasedCount .get ());
117
107
}
118
108
119
- private void assertWorkerSlotCount (int worker , int activity , int localActivity ) {
120
- try {
121
- // There can be a delay in metrics emission, another option if this
122
- // is too flaky is to poll the metrics.
123
- Thread .sleep (100 );
124
- } catch (InterruptedException e ) {
125
- throw new RuntimeException (e );
126
- }
127
- reporter .assertGauge (
128
- MetricsType .WORKER_TASK_SLOTS_AVAILABLE , getWorkerTags ("WorkflowWorker" ), worker );
129
- // All slots should be available
130
- reporter .assertGauge (
131
- MetricsType .WORKER_TASK_SLOTS_AVAILABLE , getWorkerTags ("ActivityWorker" ), activity );
132
- // All slots should be available
133
- reporter .assertGauge (
134
- MetricsType .WORKER_TASK_SLOTS_AVAILABLE ,
135
- getWorkerTags ("LocalActivityWorker" ),
136
- localActivity );
109
+ private void assertCurrentUsedCount (int activity , int localActivity ) {
110
+ assertEquals (activity , activityTaskSlotSupplier .currentUsedSet .size ());
111
+ assertEquals (localActivity , localActivitySlotSupplier .currentUsedSet .size ());
137
112
}
138
113
139
114
@ WorkflowInterface
@@ -219,14 +194,11 @@ private void assertIntraWFTSlotCount(int allowedToRun) {
219
194
int runningLAs = activitiesAreLocal ? allowedToRun : 0 ;
220
195
int runningAs = activitiesAreLocal ? 0 : allowedToRun ;
221
196
int runningWFTs = activitiesAreLocal ? 1 : 0 ;
222
- assertWorkerSlotCount (
223
- MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE - runningWFTs ,
224
- MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE - runningAs ,
225
- MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE - runningLAs );
197
+ assertCurrentUsedCount (runningAs , runningLAs );
226
198
}
227
199
228
200
@ Test
229
- public void TestLocalActivitySlotAtLimit () throws InterruptedException {
201
+ public void TestActivitySlotAtLimit () throws InterruptedException {
230
202
testWorkflowRule .getTestEnvironment ().start ();
231
203
WorkflowClient client = testWorkflowRule .getWorkflowClient ();
232
204
TestWorkflow workflow =
@@ -244,14 +216,11 @@ public void TestLocalActivitySlotAtLimit() throws InterruptedException {
244
216
}
245
217
workflow .workflow (true );
246
218
// All slots should be available
247
- assertWorkerSlotCount (
248
- MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE ,
249
- MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE ,
250
- MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE );
219
+ assertCurrentUsedCount (0 , 0 );
251
220
}
252
221
253
222
@ Test
254
- public void TestLocalActivityShutdownWhileWaitingOnSlot () throws InterruptedException {
223
+ public void TestActivityShutdownWhileWaitingOnSlot () throws InterruptedException {
255
224
testWorkflowRule .getTestEnvironment ().start ();
256
225
WorkflowClient client = testWorkflowRule .getWorkflowClient ();
257
226
TestWorkflow workflow =
@@ -267,14 +236,12 @@ public void TestLocalActivityShutdownWhileWaitingOnSlot() throws InterruptedExce
267
236
parallelSemBlocked .release (2 );
268
237
testWorkflowRule .getTestEnvironment ().getWorkerFactory ().awaitTermination (3 , TimeUnit .SECONDS );
269
238
// All slots should be available
270
- assertWorkerSlotCount (
271
- MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE ,
272
- MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE ,
273
- MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE );
239
+ // Used count here is actually -2 since the slots weren't marked used
240
+ assertCurrentUsedCount (0 , 0 );
274
241
}
275
242
276
243
@ Test
277
- public void TestLocalActivitySlotHitsCapacity () throws InterruptedException {
244
+ public void TestActivitySlotHitsCapacity () throws InterruptedException {
278
245
testWorkflowRule .getTestEnvironment ().start ();
279
246
WorkflowClient client = testWorkflowRule .getWorkflowClient ();
280
247
TestWorkflow workflow =
@@ -301,9 +268,6 @@ public void TestLocalActivitySlotHitsCapacity() throws InterruptedException {
301
268
parallelSemBlocked .release (100 );
302
269
workflow .workflow (true );
303
270
// All slots should be available
304
- assertWorkerSlotCount (
305
- MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE ,
306
- MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE ,
307
- MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE );
271
+ assertCurrentUsedCount (0 , 0 );
308
272
}
309
273
}
0 commit comments