1
1
const Minio = require ( 'minio' ) ;
2
2
const { assign, toNumber} = require ( 'lodash/fp' ) ;
3
3
4
- const log = require ( '@serverless/utils/log' ) . log ;
5
-
6
4
const S3EventDefinition = require ( './s3-event-definition' ) ;
7
5
const S3Event = require ( './s3-event' ) ;
8
6
@@ -11,15 +9,23 @@ const delay = timeout =>
11
9
setTimeout ( resolve , timeout ) ;
12
10
} ) ;
13
11
12
+ const defaultLog = {
13
+ debug : console . debug . bind ( console ) ,
14
+ notice : console . log . bind ( console ) ,
15
+ warning : console . warn . bind ( console )
16
+ } ;
17
+
14
18
class S3 {
15
- constructor ( lambda , resources , options ) {
19
+ constructor ( lambda , resources , options , log = defaultLog ) {
16
20
this . lambda = null ;
17
21
this . resources = null ;
18
22
this . options = null ;
23
+ this . log = null ;
19
24
20
25
this . lambda = lambda ;
21
26
this . resources = resources ;
22
27
this . options = options ;
28
+ this . log = log || defaultLog ;
23
29
24
30
const s3Endpoint = this . options . endpoint ? new URL ( this . options . endpoint ) : { } ;
25
31
this . client = new Minio . Client (
@@ -34,6 +40,14 @@ class S3 {
34
40
this . listeners = [ ] ;
35
41
}
36
42
43
+ _safeLog ( level , message ) {
44
+ if ( this . log && typeof this . log [ level ] === 'function' ) {
45
+ this . log [ level ] ( message ) ;
46
+ } else if ( console [ level ] ) {
47
+ console [ level ] ( message ) ;
48
+ }
49
+ }
50
+
37
51
create ( events ) {
38
52
this . events = events ;
39
53
return Promise . all (
@@ -48,35 +62,45 @@ class S3 {
48
62
return Promise . all (
49
63
this . events . map ( async ( { functionKey, s3} ) => {
50
64
const { event, bucket, rules} = s3 ;
51
- await this . _waitFor ( bucket ) ;
65
+ this . log . debug ( `Setting up listener for bucket: ${ bucket } , event: ${ event } ` ) ;
52
66
67
+ await this . _waitFor ( bucket ) ;
53
68
const eventRules = rules || [ ] ;
54
69
const prefix = ( eventRules . find ( rule => rule . prefix ) || { prefix : '*' } ) . prefix ;
55
70
const suffix = ( eventRules . find ( rule => rule . suffix ) || { suffix : '*' } ) . suffix ;
56
-
57
71
const listener = this . client . listenBucketNotification ( bucket , prefix , suffix , [ event ] ) ;
58
72
59
73
listener . on ( 'notification' , async record => {
60
74
if ( record ) {
61
75
try {
76
+ this . log . debug (
77
+ `Received S3 notification for bucket ${ bucket } : ${ JSON . stringify ( record ) } `
78
+ ) ;
62
79
const lambdaFunction = this . lambda . get ( functionKey ) ;
63
80
64
81
const s3Notification = new S3Event ( record ) ;
65
82
lambdaFunction . setEvent ( s3Notification ) ;
66
83
67
84
await lambdaFunction . runHandler ( ) ;
68
85
} catch ( err ) {
69
- log . warn ( err . stack ) ;
86
+ this . log . warning (
87
+ `Error processing S3 notification for bucket ${ bucket } : ${ err . stack } `
88
+ ) ;
70
89
}
71
90
}
72
91
} ) ;
73
92
93
+ listener . on ( 'error' , err => {
94
+ this . log . warning ( `Error in S3 listener for bucket ${ bucket } : ${ err . message } ` ) ;
95
+ } ) ;
96
+
74
97
this . listeners = [ ...this . listeners , listener ] ;
98
+ this . log . debug ( `Listener set up successfully for bucket: ${ bucket } ` ) ;
75
99
} )
76
100
) ;
77
101
}
78
102
79
- stop ( timeout ) {
103
+ stop ( _timeout ) {
80
104
this . listeners . forEach ( listener => listener . stop ( ) ) ;
81
105
this . listeners = [ ] ;
82
106
}
@@ -114,7 +138,7 @@ class S3 {
114
138
115
139
await lambdaFunction . runHandler ( ) ;
116
140
} catch ( err ) {
117
- log . warn ( err . stack ) ;
141
+ this . _safeLog ( 'warning' , err . stack ) ;
118
142
}
119
143
}
120
144
} ) ;
0 commit comments