@@ -20,9 +20,7 @@ export class GCPSConsumer implements Process {
20
20
keyId : string ,
21
21
privateKey : string ,
22
22
public subscriptionName : string ,
23
- public taskHandlers : {
24
- [ eventName : string ] : ConstructorOf < QueueHandler < any > >
25
- } ,
23
+ public taskHandler : ConstructorOf < QueueHandler < any > > ,
26
24
public prefetchCount : number = 15
27
25
) {
28
26
this . client = new GCPSClient ( serviceAccountEmail , keyId , privateKey )
@@ -98,21 +96,9 @@ export class GCPSConsumer implements Process {
98
96
99
97
await Promise . all (
100
98
messages . map ( async ( m ) => {
101
- if ( m . message . attributes === undefined ) {
102
- if ( this . logger ) {
103
- this . logger . info (
104
- `[GCPS - TASK - START] Event discarded due to corrupted attributes.`
105
- )
106
- }
107
-
108
- return this . ack ( m . ackId )
109
- }
110
-
111
99
return this . executeTask (
112
100
m . ackId ,
113
101
{
114
- eventName :
115
- m . message . attributes . STRONTIUM_EVENT_NAME ,
116
102
message : m . message . data ,
117
103
} ,
118
104
container
@@ -136,21 +122,23 @@ export class GCPSConsumer implements Process {
136
122
requestContainer . parent = applicationContainer
137
123
138
124
// Spawn a handler for the Task type
139
- let handlerType = this . taskHandlers [ task . eventName ]
125
+ let handlerType = this . taskHandler
140
126
if ( this . logger ) {
141
127
this . logger . info (
142
- `[GCPS - TASK - START] Event of type ${
143
- task . eventName
144
- } received by Consumer.`
128
+ `[GCPS - TASK - START] Event received by Consumer for topic.` ,
129
+ {
130
+ subscription : this . subscriptionName
131
+ }
145
132
)
146
133
}
147
134
148
135
if ( handlerType === undefined ) {
149
136
if ( this . logger ) {
150
137
this . logger . error (
151
- `[GCPS - TASK - NO_IMPLEMENTATION_FAIL] No implementation found for event ${
152
- task . eventName
153
- } `
138
+ `[GCPS - TASK - NO_IMPLEMENTATION_FAIL] No implementation found for topic.` ,
139
+ {
140
+ subscription : this . subscriptionName
141
+ }
154
142
)
155
143
}
156
144
await this . nack ( ackId )
@@ -176,9 +164,10 @@ export class GCPSConsumer implements Process {
176
164
await this . ack ( ackId )
177
165
if ( this . logger ) {
178
166
this . logger . info (
179
- `[GCPS - TASK - SUCCESS] Event of type ${
180
- task . eventName
181
- } successfully completed by Consumer.`
167
+ `[GCPS - TASK - SUCCESS] Event successfully completed by Consumer.` ,
168
+ {
169
+ subscription : this . subscriptionName
170
+ }
182
171
)
183
172
}
184
173
} catch ( e ) {
0 commit comments