@@ -3,30 +3,28 @@ const { messages } = require('elasticio-node');
3
3
const { v4 : uuidv4 } = require ( 'uuid' ) ;
4
4
const ObjectStorageWrapperExtended = require ( './utils-wrapper/ObjectStorageWrapperExtended' ) ;
5
5
6
- let timeHandle ;
7
- let groupList = [ ] ;
6
+ const groupList = { } ;
8
7
let delay ;
9
8
10
- async function timer ( this_ ) {
11
- for ( let i = 0 ; i < groupList . length ; i += 1 ) {
9
+ async function timer ( this_ , groupId ) {
10
+ try {
12
11
const storage = new ObjectStorageWrapperExtended ( this_ ) ;
13
- // eslint-disable-next-line no-await-in-loop
14
- const results = await storage . lookupObjectById ( groupList [ i ] ) ;
12
+ const results = await storage . lookupObjectById ( groupId ) ;
15
13
const incomingData = { } ;
16
14
results . messages . forEach ( ( message ) => {
17
15
incomingData [ message . messageId ] = message . messageData ;
18
16
} ) ;
19
-
20
- // eslint-disable-next-line no-await-in-loop
21
17
await this_ . emit ( 'data' , messages . newMessageWithBody ( {
22
18
groupSize : Object . keys ( results . messageIdsSeen ) . length ,
23
19
groupId : results . messages [ 0 ] . groupId ,
24
20
messageData : incomingData ,
25
21
} ) ) ;
26
- // eslint-disable-next-line no-await-in-loop
27
- await storage . deleteObjectById ( groupList [ i ] ) ;
22
+ await storage . deleteObjectById ( groupId ) ;
23
+ delete groupList [ groupId ] ;
24
+ } catch ( e ) {
25
+ this_ . emit ( 'error' , e . message ) ;
26
+ this_ . logger . error ( e ) ;
28
27
}
29
- groupList = [ ] ;
30
28
}
31
29
32
30
async function processAction ( msg , cfg ) {
@@ -65,23 +63,17 @@ async function processAction(msg, cfg) {
65
63
messageGroupSize,
66
64
isCreated,
67
65
} = await storage . createMessageGroupIfNotExists ( groupId , groupSize ) ;
66
+ await storage . createNewObjectInMessageGroup ( object , messageGroupId ) ;
68
67
69
68
if ( isCreated ) {
70
- await storage . createNewObjectInMessageGroup ( object , messageGroupId ) ;
71
69
this . logger . info ( 'New Group created. Added message' ) ;
72
70
}
73
71
if ( ! isCreated ) {
74
- await storage . createNewObjectInMessageGroup ( object , messageGroupId ) ;
75
72
this . logger . info ( 'Existed Group found. Added message' ) ;
76
73
this . logger . info ( `Saved messages: ${ Object . keys ( messageGroup . messageIdsSeen ) . join ( ', ' ) } ` ) ;
77
74
}
78
- const parsedMessageGroup = await storage . lookupObjectById ( messageGroupId ) ;
79
- const filteredMessages = parsedMessageGroup . messages
80
- . filter ( ( message ) => message . messageId !== messageId ) ;
81
- filteredMessages . push ( object ) ;
82
- parsedMessageGroup . messages = filteredMessages ;
83
- await storage . updateObjectById ( messageGroupId , parsedMessageGroup ) ;
84
- const messagesNumberSeen = Object . keys ( parsedMessageGroup . messageIdsSeen ) . length ;
75
+ const updatedMessageGroup = await storage . lookupObjectById ( messageGroupId ) ;
76
+ const messagesNumberSeen = Object . keys ( updatedMessageGroup . messageIdsSeen ) . length ;
85
77
86
78
this . logger . info (
87
79
`Saw message ${ messageId } of group ${ groupId } .
@@ -90,7 +82,7 @@ async function processAction(msg, cfg) {
90
82
// when groupSized option is selected
91
83
if ( mode === 'groupSize' ) {
92
84
if ( messagesNumberSeen >= messageGroupSize ) {
93
- parsedMessageGroup . messages . forEach ( ( message ) => {
85
+ updatedMessageGroup . messages . forEach ( ( message ) => {
94
86
incomingData [ message . messageId ] = message . messageData ;
95
87
} ) ;
96
88
await this . emit ( 'data' , messages . newMessageWithBody ( {
@@ -106,25 +98,30 @@ async function processAction(msg, cfg) {
106
98
// When delay timer option is selected
107
99
if ( mode === 'timeout' ) {
108
100
delay = ( timersec >= 20000 ) ? 20000 : timersec ;
109
- clearTimeout ( timeHandle ) ;
110
- timeHandle = setTimeout ( timer , delay , this ) ;
111
- if ( ! groupList . includes ( messageGroupId ) ) {
112
- groupList . push ( messageGroupId ) ;
101
+ if ( ! groupList [ messageGroupId ] ) {
102
+ groupList [ messageGroupId ] = { } ;
103
+ }
104
+ const group = groupList [ messageGroupId ] ;
105
+ if ( group . timeoutId ) {
106
+ clearTimeout ( group . timeoutId ) ;
113
107
}
108
+ group . timeoutId = setTimeout ( timer , delay , this , messageGroupId ) ;
114
109
}
115
110
116
111
// When both groupSize and delay timer option is selected
117
112
if ( mode === 'groupSize&timeout' ) {
118
113
delay = ( timersec >= 20000 ) ? 20000 : timersec ;
119
- clearTimeout ( timeHandle ) ;
120
- timeHandle = setTimeout ( timer , delay , this ) ;
121
-
122
- if ( ! groupList . includes ( messageGroupId ) ) {
123
- groupList . push ( messageGroupId ) ;
114
+ if ( ! groupList [ messageGroupId ] ) {
115
+ groupList [ messageGroupId ] = { } ;
124
116
}
117
+ const group = groupList [ messageGroupId ] ;
118
+ if ( group . timeoutId ) {
119
+ clearTimeout ( group . timeoutId ) ;
120
+ }
121
+ group . timeoutId = setTimeout ( timer , delay , this , messageGroupId ) ;
125
122
126
123
if ( messagesNumberSeen >= messageGroupSize ) {
127
- parsedMessageGroup . messages . forEach ( ( message ) => {
124
+ updatedMessageGroup . messages . forEach ( ( message ) => {
128
125
incomingData [ message . messageId ] = message . messageData ;
129
126
} ) ;
130
127
@@ -133,9 +130,12 @@ async function processAction(msg, cfg) {
133
130
groupId,
134
131
messageData : incomingData ,
135
132
} ) ) ;
136
- await storage . deleteObjectById ( messageGroupId ) ;
137
- this . logger . info ( `Message group with id ${ messageGroupId } has been deleted` ) ;
138
- groupList = groupList . filter ( ( def ) => def !== messageGroupId ) ;
133
+ if ( groupList [ messageGroupId ] ) {
134
+ clearTimeout ( groupList [ messageGroupId ] . timeoutId ) ;
135
+ await storage . deleteObjectById ( messageGroupId ) ;
136
+ delete groupList [ messageGroupId ] ;
137
+ this . logger . info ( `Message group with id ${ messageGroupId } has been deleted` ) ;
138
+ }
139
139
}
140
140
}
141
141
}
0 commit comments