@@ -5,6 +5,9 @@ const onExit = require('signal-exit');
5
5
const pump = require ( 'pump' ) ;
6
6
const { delay, getSplitLinesTransform} = require ( './utils' ) ;
7
7
8
+ const EXPECTED_LAMBDA_CALL = 7 ;
9
+ const PATH = './files/test.txt' ;
10
+
8
11
const client = new Minio . Client ( {
9
12
region : 'eu-west-1' ,
10
13
endPoint : 'localhost' ,
@@ -14,45 +17,96 @@ const client = new Minio.Client({
14
17
useSSL : false
15
18
} ) ;
16
19
17
- const path = './files/test.txt' ;
18
20
const uploadFiles = async ( ) => {
19
21
await delay ( 1000 ) ;
20
22
21
- await Promise . all ( [
22
- client . fPutObject ( 'documents' , 'first.txt' , path ) ,
23
- client . fPutObject ( 'pictures' , 'first.txt' , path ) ,
24
- client . fPutObject ( 'files' , 'first.txt' , path ) ,
25
- client . fPutObject ( 'documents' , 'second.txt' , path ) ,
26
- client . fPutObject ( 'pictures' , 'second.txt' , path ) ,
27
- client . fPutObject ( 'files' , 'second.txt' , path ) ,
28
- client . fPutObject ( 'others' , 'correct/test.txt' , path ) ,
29
- client . fPutObject ( 'others' , 'wrong/test.csv' , path ) ,
30
- client . fPutObject ( 'others' , 'correct/test.csv' , path ) ,
31
- client . fPutObject ( 'others' , 'wrong/test.txt' , path )
32
- ] ) ;
23
+ await client . fPutObject ( 'documents' , 'first.txt' , PATH ) ;
24
+ await client . fPutObject ( 'documents' , 'second.txt' , PATH ) ;
25
+ await delay ( 1000 ) ;
26
+
27
+ await client . fPutObject ( 'pictures' , 'first.txt' , PATH ) ;
28
+ await client . fPutObject ( 'pictures' , 'second.txt' , PATH ) ;
29
+ await delay ( 1000 ) ;
30
+
31
+ await client . fPutObject ( 'files' , 'first.txt' , PATH ) ;
32
+ await client . fPutObject ( 'files' , 'second.txt' , PATH ) ;
33
+ await delay ( 1000 ) ;
34
+
35
+ await client . fPutObject ( 'others' , 'correct/test.txt' , PATH ) ;
36
+ await client . fPutObject ( 'others' , 'wrong/test.csv' , PATH ) ;
37
+ await client . fPutObject ( 'others' , 'correct/test.csv' , PATH ) ;
38
+ await client . fPutObject ( 'others' , 'wrong/test.txt' , PATH ) ;
33
39
} ;
34
- const EXPECTED_LAMBDA_CALL = 9 ; // pictures files are consumed twice, by myPromiseHandler and myPythonHandler
35
40
36
41
const serverless = spawn ( 'sls' , [ 'offline' , 'start' , '--config' , 'serverless.s3.yml' ] , {
37
42
stdio : [ 'pipe' , 'pipe' , 'pipe' ] ,
38
43
cwd : __dirname
39
44
} ) ;
40
45
46
+ let lambdaCallCounter = 0 ;
47
+ const processedEvents = new Set ( ) ;
48
+
49
+ function incrementlambdaCallCounter ( eventId ) {
50
+ if ( eventId && processedEvents . has ( eventId ) ) {
51
+ return ;
52
+ }
53
+
54
+ if ( eventId ) {
55
+ processedEvents . add ( eventId ) ;
56
+ }
57
+
58
+ lambdaCallCounter ++ ;
59
+ console . debug ( `lambda call count for s3 test: ${ lambdaCallCounter } /${ EXPECTED_LAMBDA_CALL } ` ) ;
60
+
61
+ if ( lambdaCallCounter >= EXPECTED_LAMBDA_CALL ) {
62
+ console . log ( `${ lambdaCallCounter } /${ EXPECTED_LAMBDA_CALL } lambda calls reached` ) ;
63
+ serverless . kill ( ) ;
64
+ }
65
+ }
66
+
67
+ function processS3Event ( output ) {
68
+ if ( ! output . includes ( 'Records' ) || ! output . includes ( 'eventSource":"minio:s3"' ) ) return ;
69
+
70
+ try {
71
+ output
72
+ . split ( '\n' )
73
+ . filter ( line => line . includes ( 'Records' ) && line . includes ( 'eventSource":"minio:s3"' ) )
74
+ . forEach ( line => {
75
+ const jsonStart = line . indexOf ( '{' ) ;
76
+ if ( jsonStart < 0 ) return ;
77
+
78
+ const jsonEnd = line . lastIndexOf ( '}' ) ;
79
+ if ( jsonEnd <= jsonStart ) return ;
80
+
81
+ const jsonStr = line . slice ( jsonStart , jsonEnd + 1 ) ;
82
+
83
+ const eventData = JSON . parse ( jsonStr ) ;
84
+ if ( ! eventData || ! eventData . Records || eventData . Records . length === 0 ) return ;
85
+ const eventId = `${ eventData . Records [ 0 ] . s3 . bucket . name } -${ eventData . Records [ 0 ] . s3 . object . key } ` ;
86
+ incrementlambdaCallCounter ( eventId ) ;
87
+ } ) ;
88
+ } catch ( err ) {
89
+ console . error ( 'Error in processS3Event:' , { err, output} ) ;
90
+ }
91
+ }
92
+
93
+ serverless . stdout . on ( 'data' , data => {
94
+ processS3Event ( data . toString ( ) ) ;
95
+ } ) ;
96
+
97
+ serverless . stderr . on ( 'data' , data => {
98
+ console . log ( `STDERR: ${ data . toString ( ) . trim ( ) } ` ) ;
99
+ } ) ;
100
+
41
101
pump (
42
102
serverless . stderr ,
43
103
getSplitLinesTransform ( ) ,
44
104
new Writable ( {
45
105
objectMode : true ,
46
- write ( line , enc , cb ) {
106
+ write ( line , _enc , cb ) {
47
107
if ( / S t a r t i n g O f f l i n e S 3 / . test ( line ) ) {
48
108
uploadFiles ( ) ;
49
109
}
50
-
51
- this . count =
52
- ( this . count || 0 ) +
53
- ( line . match ( / \( λ : .* \) R e q u e s t I d : .* D u r a t i o n : .* m s { 2 } B i l l e d D u r a t i o n : .* m s / g) || [ ] )
54
- . length ;
55
- if ( this . count === EXPECTED_LAMBDA_CALL ) serverless . kill ( ) ;
56
110
cb ( ) ;
57
111
}
58
112
} )
0 commit comments