Skip to content

Commit 9af8b11

Browse files
committed
refactor: optimize panic capture handling and direct call paths in subscriber and observer implementations
1 parent d17e7a4 commit 9af8b11

File tree

3 files changed

+154
-12
lines changed

3 files changed

+154
-12
lines changed

observable.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,19 +351,40 @@ func (s *observableImpl[T]) Subscribe(destination Observer[T]) Subscription {
351351
// synchronization.
352352
func (s *observableImpl[T]) SubscribeWithContext(ctx context.Context, destination Observer[T]) Subscription {
353353
subscription := NewSubscriberWithConcurrencyMode(destination, s.mode)
354+
// Compute effective panic-capture policy once at subscription time and
355+
// configure the subscriber hot-path helpers to avoid per-notification
356+
// context lookups. If the destination is not an internal observerImpl,
357+
// we conservatively assume capture is enabled.
358+
capture := true
359+
if oi, ok := destination.(*observerImpl[T]); ok {
360+
capture = oi.capturePanics && !isObserverPanicCaptureDisabled(ctx)
361+
} else {
362+
// For external observer implementations, respect context opt-out.
363+
if isObserverPanicCaptureDisabled(ctx) {
364+
capture = false
365+
}
366+
}
367+
368+
// If subscription is our concrete subscriberImpl, set direct call helpers.
369+
if ssub, ok := subscription.(*subscriberImpl[T]); ok {
370+
// Avoid configuring directors when NewSubscriber returned the input
371+
// subscriber itself (subscription == destination) — in that case the
372+
// destination is already a Subscriber and it is responsible for its
373+
// own hot-path wiring.
374+
if subscription != destination {
375+
ssub.setDirectors(destination, capture)
376+
}
377+
}
354378

355379
// If panic capture is explicitly disabled on the subscription context and
356380
// the observable is in an unsafe/single-producer mode, skip the TryCatch
357-
// wrapper to avoid extra allocations on the hot path. Callers should use
358-
// `WithObserverPanicCaptureDisabled(ctx)` when subscribing in
359-
// performance-sensitive code
381+
// wrapper to avoid extra allocations on the subscribe path. Callers should
382+
// use `WithObserverPanicCaptureDisabled(ctx)` when subscribing in
383+
// performance-sensitive code.
360384
if isObserverPanicCaptureDisabled(ctx) && (s.mode == ConcurrencyModeUnsafe || s.mode == ConcurrencyModeSingleProducer) {
361385
subscription.Add(s.subscribe(ctx, subscription))
362386
return subscription
363387
}
364-
subscription.Add(s.subscribe(ctx, subscription))
365-
return subscription
366-
}
367388

368389
lo.TryCatchWithErrorValue(
369390
func() error {

observer.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ func (o *observerImpl[T]) CompleteWithContext(ctx context.Context) {
203203
}
204204

205205
func (o *observerImpl[T]) tryNext(ctx context.Context, value T) {
206+
// Preserve existing behavior for callers that use this method directly.
207+
// This method still checks the context-based opt-out on each call.
206208
if !o.capturePanics || isObserverPanicCaptureDisabled(ctx) {
207209
o.onNext(ctx, value)
208210
return
@@ -225,6 +227,34 @@ func (o *observerImpl[T]) tryNext(ctx context.Context, value T) {
225227
)
226228
}
227229

230+
// tryNextWithCapture is similar to tryNext but uses the provided `capture` flag
231+
// instead of consulting the subscription context. This allows callers that
232+
// already computed the effective panic-capture policy at subscription time to
233+
// avoid a context lookup on the hot path.
234+
func (o *observerImpl[T]) tryNextWithCapture(ctx context.Context, value T, capture bool) {
235+
if !capture {
236+
o.onNext(ctx, value)
237+
return
238+
}
239+
240+
lo.TryCatchWithErrorValue(
241+
func() error {
242+
o.onNext(ctx, value)
243+
return nil
244+
},
245+
func(e any) {
246+
err := newObserverError(recoverValueToError(e))
247+
248+
if o.onError == nil {
249+
OnUnhandledError(ctx, err)
250+
} else {
251+
// Use tryErrorWithCapture to ensure consistent panic handling.
252+
o.tryErrorWithCapture(ctx, err, capture)
253+
}
254+
},
255+
)
256+
}
257+
228258
func (o *observerImpl[T]) tryError(ctx context.Context, err error) {
229259
if !o.capturePanics || isObserverPanicCaptureDisabled(ctx) {
230260
o.onError(ctx, err)
@@ -243,6 +273,28 @@ func (o *observerImpl[T]) tryError(ctx context.Context, err error) {
243273
)
244274
}
245275

276+
// tryErrorWithCapture behaves like tryError but takes a precomputed capture flag
277+
// rather than checking the subscription context. This avoids one context lookup
278+
// on the hot notification path when the capture policy is known at
279+
// subscription time.
280+
func (o *observerImpl[T]) tryErrorWithCapture(ctx context.Context, err error, capture bool) {
281+
if !capture {
282+
o.onError(ctx, err)
283+
return
284+
}
285+
286+
lo.TryCatchWithErrorValue(
287+
func() error {
288+
o.onError(ctx, err)
289+
return nil
290+
},
291+
func(e any) {
292+
err := newObserverError(recoverValueToError(e))
293+
OnUnhandledError(ctx, err)
294+
},
295+
)
296+
}
297+
246298
func (o *observerImpl[T]) tryComplete(ctx context.Context) {
247299
if !o.capturePanics || isObserverPanicCaptureDisabled(ctx) {
248300
o.onComplete(ctx)
@@ -261,6 +313,26 @@ func (o *observerImpl[T]) tryComplete(ctx context.Context) {
261313
)
262314
}
263315

316+
// tryCompleteWithCapture behaves like tryComplete but uses the provided capture
317+
// flag instead of consulting the context.
318+
func (o *observerImpl[T]) tryCompleteWithCapture(ctx context.Context, capture bool) {
319+
if !capture {
320+
o.onComplete(ctx)
321+
return
322+
}
323+
324+
lo.TryCatchWithErrorValue(
325+
func() error {
326+
o.onComplete(ctx)
327+
return nil
328+
},
329+
func(e any) {
330+
err := newObserverError(recoverValueToError(e))
331+
OnUnhandledError(ctx, err)
332+
},
333+
)
334+
}
335+
264336
func (o *observerImpl[T]) IsClosed() bool {
265337
return atomic.LoadInt32(&o.status) != 0
266338
}

subscriber.go

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ type subscriberImpl[T any] struct {
198198

199199
mode ConcurrencyMode
200200
lockless bool
201+
// Per-subscription direct call helpers. When non-nil these are used in the
202+
// hot path to call the destination without additional interface dispatch
203+
// or context lookups. They are set once at subscription time by the
204+
// Observable (see observable.SubscribeWithContext).
205+
nextDirect func(context.Context, T)
206+
errorDirect func(context.Context, error)
207+
completeDirect func(context.Context)
201208
}
202209

203210
// Implements Observer.
@@ -213,7 +220,11 @@ func (s *subscriberImpl[T]) NextWithContext(ctx context.Context, v T) {
213220

214221
if s.lockless {
215222
if atomic.LoadInt32(&s.status) == 0 {
216-
s.destination.NextWithContext(ctx, v)
223+
if s.nextDirect != nil {
224+
s.nextDirect(ctx, v)
225+
} else {
226+
s.destination.NextWithContext(ctx, v)
227+
}
217228
} else {
218229
OnDroppedNotification(ctx, NewNotificationNext(v))
219230
}
@@ -231,7 +242,11 @@ func (s *subscriberImpl[T]) NextWithContext(ctx context.Context, v T) {
231242
}
232243

233244
if atomic.LoadInt32(&s.status) == 0 {
234-
s.destination.NextWithContext(ctx, v)
245+
if s.nextDirect != nil {
246+
s.nextDirect(ctx, v)
247+
} else {
248+
s.destination.NextWithContext(ctx, v)
249+
}
235250
} else {
236251
OnDroppedNotification(ctx, NewNotificationNext(v))
237252
}
@@ -249,7 +264,11 @@ func (s *subscriberImpl[T]) ErrorWithContext(ctx context.Context, err error) {
249264
if s.lockless {
250265
if atomic.CompareAndSwapInt32(&s.status, 0, 1) {
251266
if s.destination != nil {
252-
s.destination.ErrorWithContext(ctx, err)
267+
if s.errorDirect != nil {
268+
s.errorDirect(ctx, err)
269+
} else {
270+
s.destination.ErrorWithContext(ctx, err)
271+
}
253272
}
254273
} else {
255274
OnDroppedNotification(ctx, NewNotificationError[T](err))
@@ -264,7 +283,11 @@ func (s *subscriberImpl[T]) ErrorWithContext(ctx context.Context, err error) {
264283

265284
if atomic.CompareAndSwapInt32(&s.status, 0, 1) {
266285
if s.destination != nil {
267-
s.destination.ErrorWithContext(ctx, err)
286+
if s.errorDirect != nil {
287+
s.errorDirect(ctx, err)
288+
} else {
289+
s.destination.ErrorWithContext(ctx, err)
290+
}
268291
}
269292
} else {
270293
OnDroppedNotification(ctx, NewNotificationError[T](err))
@@ -285,7 +308,11 @@ func (s *subscriberImpl[T]) CompleteWithContext(ctx context.Context) {
285308
if s.lockless {
286309
if atomic.CompareAndSwapInt32(&s.status, 0, 2) {
287310
if s.destination != nil {
288-
s.destination.CompleteWithContext(ctx)
311+
if s.completeDirect != nil {
312+
s.completeDirect(ctx)
313+
} else {
314+
s.destination.CompleteWithContext(ctx)
315+
}
289316
}
290317
} else {
291318
OnDroppedNotification(ctx, NewNotificationComplete[T]())
@@ -300,7 +327,11 @@ func (s *subscriberImpl[T]) CompleteWithContext(ctx context.Context) {
300327

301328
if atomic.CompareAndSwapInt32(&s.status, 0, 2) {
302329
if s.destination != nil {
303-
s.destination.CompleteWithContext(ctx)
330+
if s.completeDirect != nil {
331+
s.completeDirect(ctx)
332+
} else {
333+
s.destination.CompleteWithContext(ctx)
334+
}
304335
}
305336
} else {
306337
OnDroppedNotification(ctx, NewNotificationComplete[T]())
@@ -337,3 +368,21 @@ func (s *subscriberImpl[T]) unsubscribe() {
337368
// s.Subscription.Unsubscribe() is protected against concurrent calls.
338369
s.Subscription.Unsubscribe()
339370
}
371+
372+
// setDirectors configures per-subscription direct call helpers based on the
373+
// concrete destination type and the precomputed capture flag. This avoids
374+
// per-notification context lookups and type assertions on the hot path.
375+
func (s *subscriberImpl[T]) setDirectors(destination Observer[T], capture bool) {
376+
// Default to interface-based calls.
377+
s.nextDirect = func(ctx context.Context, v T) { destination.NextWithContext(ctx, v) }
378+
s.errorDirect = func(ctx context.Context, err error) { destination.ErrorWithContext(ctx, err) }
379+
s.completeDirect = func(ctx context.Context) { destination.CompleteWithContext(ctx) }
380+
381+
// If destination is an *observerImpl[T], we can call internal helpers that
382+
// accept a precomputed capture flag and therefore avoid context lookups.
383+
if oi, ok := destination.(*observerImpl[T]); ok {
384+
s.nextDirect = func(ctx context.Context, v T) { oi.tryNextWithCapture(ctx, v, capture) }
385+
s.errorDirect = func(ctx context.Context, err error) { oi.tryErrorWithCapture(ctx, err, capture) }
386+
s.completeDirect = func(ctx context.Context) { oi.tryCompleteWithCapture(ctx, capture) }
387+
}
388+
}

0 commit comments

Comments
 (0)