Skip to content

Commit 60ba5b3

Browse files
authored
Add support for asynchronous dispatch (#37)
Introduce a configurable scheduler option for NewEventDispatcher and add two asynchronous schedulers, one that is unlimited and one that uses a bounded queue and a fixed number of workers. For many applications, switching to asynchronous handling will be as easy as adding this new option when creating the event dispatcher. Other applications will need to provide a custom ContextDeriver to copy values from the request context into the detached handler context. As a result of error handling code introduced as part of this change, go-githubapp now requires at least Go 1.13.
1 parent 38289a0 commit 60ba5b3

File tree

4 files changed

+475
-4
lines changed

4 files changed

+475
-4
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ logic of your application.
1010
* [Usage](#usage)
1111
+ [Examples](#examples)
1212
+ [Dependencies](#dependencies)
13+
* [Asynchronous Dispatch](#asynchronous-dispatch)
1314
* [Structured Logging](#structured-logging)
1415
* [GitHub Clients](#github-clients)
1516
+ [Metrics](#metrics)
@@ -86,6 +87,38 @@ Logging and metrics are only active when they are configured (see below). This
8687
means you can add your own logging or metrics libraries without conflict, but
8788
will miss out on the free built-in support.
8889

90+
## Asynchronous Dispatch
91+
92+
GitHub imposes timeouts on webhook delivery responses. If an application does
93+
not respond in time, GitHub closes the connection and marks the delivery as
94+
failed. `go-githubapp` optionally supports asynchronous dispatch to help solve
95+
this problem. When enabled, the event dispatcher sends a response to GitHub after
96+
validating the payload and then runs the event handler in a separate goroutine.
97+
98+
To enable, select an appropriate _scheduler_ and configure the event dispatcher
99+
to use it:
100+
101+
```go
102+
dispatcher := githubapp.NewEventDispatcher(handlers, secret, githubapp.WithScheduler(
103+
githubapp.AsyncScheduler(),
104+
))
105+
```
106+
107+
The following schedulers are included in the library:
108+
109+
- `DefaultScheduler` - a synchronous scheduler that runs event handlers in
110+
the current goroutine. This is the default mode.
111+
112+
- `AsyncScheduler` - an asynchronous scheduler that handles each event in a
113+
new goroutine. This is the simplest asynchronous option.
114+
115+
- `QueueAsyncScheduler` - an asynchronous scheduler that queues events and
116+
handles them with a fixed pool of worker goroutines. This is useful to limit
117+
the amount of concurrent work.
118+
119+
`AsyncScheduler` and `QueueAsyncScheduler` support several additional options
120+
and customizations; see the documentation for details.
121+
89122
## Structured Logging
90123

91124
`go-githubapp` uses [rs/zerolog](https://github.com/rs/zerolog) for structured
@@ -186,6 +219,14 @@ middleware.
186219
| `github.rate.limit[installation:<id>]` | `gauge` | the maximum number of requests permitted to make per hour, tagged with the installation id |
187220
| `github.rate.remaining[installation:<id>]` | `gauge` | the number of requests remaining in the current rate limit window, tagged with the installation id |
188221

222+
If using [asynchronous dispatch](#asynchronous-dispatch) and the `githubapp.WithSchedulingMetrics` option
223+
is set, these metrics are emitted:
224+
225+
| metric name | type | definition |
226+
| ----------- | ---- | ---------- |
227+
| `github.event.queue` | `gauge` | the number of queued unprocessed event |
228+
| `github.event.workers` | `gauge` | the number of workers actively processing events |
229+
189230
Note that metrics need to be published in order to be useful. Several
190231
[publishing options][] are available or you can implement your own.
191232

githubapp/dispatcher.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type ResponseCallback func(w http.ResponseWriter, r *http.Request, event string,
5454
// DispatcherOption configures properties of an event dispatcher.
5555
type DispatcherOption func(*eventDispatcher)
5656

57-
// WithErrorCallback sets the error callback for an event dispatcher.
57+
// WithErrorCallback sets the error callback for a dispatcher.
5858
func WithErrorCallback(onError ErrorCallback) DispatcherOption {
5959
return func(d *eventDispatcher) {
6060
if onError != nil {
@@ -72,6 +72,20 @@ func WithResponseCallback(onResponse ResponseCallback) DispatcherOption {
7272
}
7373
}
7474

75+
// WithScheduler sets the scheduler used to process events. Setting a
76+
// non-default scheduler can enable asynchronous processing. When a scheduler
77+
// is asynchronous, the dispatcher validatates event payloads, queues valid
78+
// events for handling, and then responds to GitHub without waiting for the
79+
// handler to complete. This is useful when handlers may take longer than
80+
// GitHub's timeout for webhook deliveries.
81+
func WithScheduler(s Scheduler) DispatcherOption {
82+
return func(d *eventDispatcher) {
83+
if s != nil {
84+
d.scheduler = s
85+
}
86+
}
87+
}
88+
7589
// ValidationError is passed to error callbacks when the webhook payload fails
7690
// validation.
7791
type ValidationError struct {
@@ -88,6 +102,7 @@ type eventDispatcher struct {
88102
handlerMap map[string]EventHandler
89103
secret string
90104

105+
scheduler Scheduler
91106
onError ErrorCallback
92107
onResponse ResponseCallback
93108
}
@@ -118,6 +133,7 @@ func NewEventDispatcher(handlers []EventHandler, secret string, opts ...Dispatch
118133
d := &eventDispatcher{
119134
handlerMap: handlerMap,
120135
secret: secret,
136+
scheduler: DefaultScheduler(),
121137
onError: DefaultErrorCallback,
122138
onResponse: DefaultResponseCallback,
123139
}
@@ -172,7 +188,12 @@ func (d *eventDispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
172188

173189
handler, ok := d.handlerMap[eventType]
174190
if ok {
175-
if err := handler.Handle(ctx, eventType, deliveryID, payloadBytes); err != nil {
191+
if err := d.scheduler.Schedule(ctx, Dispatch{
192+
Handler: handler,
193+
EventType: eventType,
194+
DeliveryID: deliveryID,
195+
Payload: payloadBytes,
196+
}); err != nil {
176197
d.onError(w, r, err)
177198
return
178199
}
@@ -184,13 +205,19 @@ func (d *eventDispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
184205
func DefaultErrorCallback(w http.ResponseWriter, r *http.Request, err error) {
185206
logger := zerolog.Ctx(r.Context())
186207

187-
if ve, ok := err.(ValidationError); ok {
208+
var ve ValidationError
209+
if errors.As(err, &ve) {
188210
logger.Warn().Err(ve.Cause).Msgf("Received invalid webhook headers or payload")
189211
http.Error(w, "Invalid webhook headers or payload", http.StatusBadRequest)
190212
return
191213
}
214+
if errors.Is(err, ErrCapacityExceeded) {
215+
logger.Warn().Msg("Dropping webhook event due to over-capacity scheduler")
216+
http.Error(w, "No capacity available to processes this event", http.StatusServiceUnavailable)
217+
return
218+
}
192219

193-
logger.Error().Err(err).Msg("Unexpected error handling webhook request")
220+
logger.Error().Err(err).Msg("Unexpected error handling webhook")
194221
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
195222
}
196223

githubapp/scheduler.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
// Copyright 2020 Palantir Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package githubapp
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"sync/atomic"
21+
22+
"github.com/pkg/errors"
23+
"github.com/rcrowley/go-metrics"
24+
"github.com/rs/zerolog"
25+
)
26+
27+
const (
28+
MetricsKeyQueueLength = "github.event.queued"
29+
MetricsKeyActiveWorkers = "github.event.workers"
30+
)
31+
32+
var (
33+
ErrCapacityExceeded = errors.New("scheduler: capacity exceeded")
34+
)
35+
36+
// Dispatch is a webhook payload and the handler that handles it.
37+
type Dispatch struct {
38+
Handler EventHandler
39+
40+
EventType string
41+
DeliveryID string
42+
Payload []byte
43+
}
44+
45+
// Execute calls the Dispatch's handler with the stored arguments.
46+
func (d Dispatch) Execute(ctx context.Context) error {
47+
return d.Handler.Handle(ctx, d.EventType, d.DeliveryID, d.Payload)
48+
}
49+
50+
// AsyncErrorCallback is called by an asynchronous scheduler when an event
51+
// handler returns an error. The error from the handler is passed directly as
52+
// the final argument.
53+
type AsyncErrorCallback func(ctx context.Context, err error)
54+
55+
// DefaultAsyncErrorCallback logs errors.
56+
func DefaultAsyncErrorCallback(ctx context.Context, err error) {
57+
zerolog.Ctx(ctx).Error().Err(err).Msg("Unexpected error handling webhook")
58+
}
59+
60+
// ContextDeriver creates a new independent context from a request's context.
61+
// The new context must be based on context.Background(), not the input.
62+
type ContextDeriver func(context.Context) context.Context
63+
64+
// DefaultContextDeriver copies the logger from the request's context to a new
65+
// context.
66+
func DefaultContextDeriver(ctx context.Context) context.Context {
67+
newCtx := context.Background()
68+
69+
// this value is always unused by async schedulers, but is set for
70+
// compatibility with existing handlers that call SetResponder
71+
newCtx = InitializeResponder(newCtx)
72+
73+
return zerolog.Ctx(ctx).WithContext(newCtx)
74+
}
75+
76+
// Scheduler is a strategy for executing event handlers.
77+
//
78+
// The Schedule method takes a Dispatch and executes it by calling the handler
79+
// for the payload. The execution may be asynchronous, but the scheduler must
80+
// create a new context in this case. The dispatcher waits for Schedule to
81+
// return before responding to GitHub, so asynchronous schedulers should only
82+
// return errors that happen during scheduling, not during execution.
83+
//
84+
// Schedule may return ErrCapacityExceeded if it cannot schedule or queue new
85+
// events at the time of the call.
86+
type Scheduler interface {
87+
Schedule(ctx context.Context, d Dispatch) error
88+
}
89+
90+
// SchedulerOption configures properties of a scheduler.
91+
type SchedulerOption func(*scheduler)
92+
93+
// WithAsyncErrorCallback sets the error callback for an asynchronous
94+
// scheduler. If not set, the scheduler uses DefaultAsyncErrorCallback.
95+
func WithAsyncErrorCallback(onError AsyncErrorCallback) SchedulerOption {
96+
return func(s *scheduler) {
97+
if onError != nil {
98+
s.onError = onError
99+
}
100+
}
101+
}
102+
103+
// WithContextDeriver sets the context deriver for an asynchronous scheduler.
104+
// If not set, the scheduler uses DefaultContextDeriver.
105+
func WithContextDeriver(deriver ContextDeriver) SchedulerOption {
106+
return func(s *scheduler) {
107+
if deriver != nil {
108+
s.deriver = deriver
109+
}
110+
}
111+
}
112+
113+
// WithSchedulingMetrics enables metrics reporting for schedulers.
114+
func WithSchedulingMetrics(r metrics.Registry) SchedulerOption {
115+
return func(s *scheduler) {
116+
metrics.NewRegisteredFunctionalGauge(MetricsKeyQueueLength, r, func() int64 {
117+
return int64(len(s.queue))
118+
})
119+
metrics.NewRegisteredFunctionalGauge(MetricsKeyActiveWorkers, r, func() int64 {
120+
return atomic.LoadInt64(&s.activeWorkers)
121+
})
122+
}
123+
}
124+
125+
type queueDispatch struct {
126+
ctx context.Context
127+
d Dispatch
128+
}
129+
130+
// core functionality and options for (async) schedulers
131+
type scheduler struct {
132+
onError AsyncErrorCallback
133+
deriver ContextDeriver
134+
135+
activeWorkers int64
136+
queue chan queueDispatch
137+
}
138+
139+
func (s *scheduler) safeExecute(ctx context.Context, d Dispatch) {
140+
var err error
141+
defer func() {
142+
if r := recover(); r != nil {
143+
if rerr, ok := r.(error); ok {
144+
err = rerr
145+
} else {
146+
err = fmt.Errorf("%v", r)
147+
}
148+
}
149+
if err != nil && s.onError != nil {
150+
s.onError(ctx, err)
151+
}
152+
atomic.AddInt64(&s.activeWorkers, -1)
153+
}()
154+
155+
atomic.AddInt64(&s.activeWorkers, 1)
156+
err = d.Execute(ctx)
157+
}
158+
159+
func (s *scheduler) derive(ctx context.Context) context.Context {
160+
if s.deriver == nil {
161+
return ctx
162+
}
163+
return s.deriver(ctx)
164+
}
165+
166+
// DefaultScheduler returns a scheduler that executes handlers in the go
167+
// routine of the caller and returns any error.
168+
func DefaultScheduler() Scheduler {
169+
return &defaultScheduler{}
170+
}
171+
172+
type defaultScheduler struct{}
173+
174+
func (s *defaultScheduler) Schedule(ctx context.Context, d Dispatch) error {
175+
return d.Execute(ctx)
176+
}
177+
178+
// AsyncScheduler returns a scheduler that executes handlers in new goroutines.
179+
// Goroutines are not reused and there is no limit on the number created.
180+
func AsyncScheduler(opts ...SchedulerOption) Scheduler {
181+
s := &asyncScheduler{
182+
scheduler: scheduler{
183+
deriver: DefaultContextDeriver,
184+
onError: DefaultAsyncErrorCallback,
185+
},
186+
}
187+
for _, opt := range opts {
188+
opt(&s.scheduler)
189+
}
190+
return s
191+
}
192+
193+
type asyncScheduler struct {
194+
scheduler
195+
}
196+
197+
func (s *asyncScheduler) Schedule(ctx context.Context, d Dispatch) error {
198+
go s.safeExecute(s.derive(ctx), d)
199+
return nil
200+
}
201+
202+
// QueueAsyncScheduler returns a scheduler that executes handlers in a fixed
203+
// number of worker goroutines. If no workers are available, events queue until
204+
// the queue is full.
205+
func QueueAsyncScheduler(queueSize int, workers int, opts ...SchedulerOption) Scheduler {
206+
if queueSize < 0 {
207+
panic("QueueAsyncScheduler: queue size must be non-negative")
208+
}
209+
if workers < 1 {
210+
panic("QueueAsyncScheduler: worker count must be positive")
211+
}
212+
213+
s := &queueScheduler{
214+
scheduler: scheduler{
215+
deriver: DefaultContextDeriver,
216+
onError: DefaultAsyncErrorCallback,
217+
queue: make(chan queueDispatch, queueSize),
218+
},
219+
}
220+
for _, opt := range opts {
221+
opt(&s.scheduler)
222+
}
223+
224+
for i := 0; i < workers; i++ {
225+
go func() {
226+
for d := range s.queue {
227+
s.safeExecute(d.ctx, d.d)
228+
}
229+
}()
230+
}
231+
232+
return s
233+
}
234+
235+
type queueScheduler struct {
236+
scheduler
237+
}
238+
239+
func (s *queueScheduler) Schedule(ctx context.Context, d Dispatch) error {
240+
select {
241+
case s.queue <- queueDispatch{ctx: s.derive(ctx), d: d}:
242+
default:
243+
return ErrCapacityExceeded
244+
}
245+
return nil
246+
}

0 commit comments

Comments
 (0)