@@ -139,7 +139,7 @@ export class MarQS {
139139 public async dequeueMessageInEnv ( env : AuthenticatedEnvironment ) {
140140 return this . #trace(
141141 "dequeueMessageInEnv" ,
142- async ( span , abort ) => {
142+ async ( span ) => {
143143 const parentQueue = this . keys . envSharedQueueKey ( env ) ;
144144
145145 // Read the parent queue for matching queues
@@ -150,7 +150,6 @@ export class MarQS {
150150 ) ;
151151
152152 if ( ! messageQueue ) {
153- abort ( ) ;
154153 return ;
155154 }
156155
@@ -167,7 +166,6 @@ export class MarQS {
167166 } ) ;
168167
169168 if ( ! messageData ) {
170- abort ( ) ;
171169 return ;
172170 }
173171
@@ -181,8 +179,6 @@ export class MarQS {
181179 [ SemanticAttributes . CONCURRENCY_KEY ] : message . concurrencyKey ,
182180 [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
183181 } ) ;
184- } else {
185- abort ( ) ;
186182 }
187183
188184 return message ;
@@ -204,7 +200,7 @@ export class MarQS {
204200 public async dequeueMessageInSharedQueue ( ) {
205201 return this . #trace(
206202 "dequeueMessageInSharedQueue" ,
207- async ( span , abort ) => {
203+ async ( span ) => {
208204 const parentQueue = constants . SHARED_QUEUE ;
209205
210206 // Read the parent queue for matching queues
@@ -215,7 +211,6 @@ export class MarQS {
215211 ) ;
216212
217213 if ( ! messageQueue ) {
218- abort ( ) ;
219214 return ;
220215 }
221216
@@ -233,7 +228,6 @@ export class MarQS {
233228 } ) ;
234229
235230 if ( ! messageData ) {
236- abort ( ) ;
237231 return ;
238232 }
239233
@@ -247,8 +241,6 @@ export class MarQS {
247241 [ SemanticAttributes . CONCURRENCY_KEY ] : message . concurrencyKey ,
248242 [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
249243 } ) ;
250- } else {
251- abort ( ) ;
252244 }
253245
254246 return message ;
@@ -355,17 +347,12 @@ export class MarQS {
355347
356348 async #trace< T > (
357349 name : string ,
358- fn : ( span : Span , abort : ( ) => void ) => Promise < T > ,
359- options ?: SpanOptions
350+ fn : ( span : Span ) => Promise < T > ,
351+ options ?: SpanOptions & { sampleRate ?: number }
360352 ) : Promise < T > {
361353 return tracer . startActiveSpan ( name , options ?? { } , async ( span ) => {
362- let _abort = false ;
363- let aborter = ( ) => {
364- _abort = true ;
365- } ;
366-
367354 try {
368- return await fn ( span , aborter ) ;
355+ return await fn ( span ) ;
369356 } catch ( e ) {
370357 if ( e instanceof Error ) {
371358 span . recordException ( e ) ;
@@ -375,9 +362,7 @@ export class MarQS {
375362
376363 throw e ;
377364 } finally {
378- if ( ! _abort ) {
379- span . end ( ) ;
380- }
365+ span . end ( ) ;
381366 }
382367 } ) ;
383368 }
@@ -480,7 +465,7 @@ export class MarQS {
480465 ) {
481466 return this . #trace(
482467 "getRandomQueueFromParentQueue" ,
483- async ( span , abort ) => {
468+ async ( span ) => {
484469 const { range, selectionId } = await queuePriorityStrategy . nextCandidateSelection (
485470 parentQueue
486471 ) ;
@@ -497,7 +482,6 @@ export class MarQS {
497482 ) ;
498483
499484 if ( typeof choice !== "string" ) {
500- abort ( ) ;
501485 return ;
502486 }
503487
0 commit comments