Skip to content

Commit 7ce8ede

Browse files
AramAlsabtinlg
andauthored
Optimize bulk import and the load on chirpstack (#65)
* Batch bulk import create. Update missing * Reworked update many and cleanup * Comment on bulk import * Added device model error codes Co-authored-by: nlg <[email protected]>
1 parent f31a6f3 commit 7ce8ede

File tree

8 files changed

+284
-114
lines changed

8 files changed

+284
-114
lines changed

src/app/applications/bulk-import/bulk-import.component.ts

Lines changed: 184 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,36 @@ import { HttpErrorResponse } from '@angular/common/http';
22
import { Component, OnInit } from '@angular/core';
33
import { Title } from '@angular/platform-browser';
44
import { ActivatedRoute } from '@angular/router';
5+
import {
6+
IotDeviceImportRequest,
7+
IotDevicesImportResponse,
8+
} from '@applications/iot-devices/iot-device.model';
59
import { IoTDeviceService } from '@applications/iot-devices/iot-device.service';
610
import { faDownload, faTrash } from '@fortawesome/free-solid-svg-icons';
711
import { TranslateService } from '@ngx-translate/core';
812
import { ErrorMessageService } from '@shared/error-message.service';
13+
import { splitList } from '@shared/helpers/array.helper';
914
import { Download } from '@shared/helpers/download.helper';
15+
import { BulkImportService } from '@shared/services/bulk-import.service';
1016
import { DownloadService } from '@shared/services/download.service';
1117
import { Papa } from 'ngx-papaparse';
12-
import { Observable } from 'rxjs';
18+
import { Observable, Subject } from 'rxjs';
19+
import { takeWhile } from 'rxjs/operators';
1320
import { BulkImport } from './bulk-import.model';
1421
import { BulkMapping } from './bulkMapping';
1522

1623
@Component({
1724
selector: 'app-bulk-import',
1825
templateUrl: './bulk-import.component.html',
19-
styleUrls: ['./bulk-import.component.scss']
26+
styleUrls: ['./bulk-import.component.scss'],
2027
})
2128
export class BulkImportComponent implements OnInit {
22-
displayedColumns: string[] = ['name', 'type', 'importStatus', 'errorMessages'];
29+
displayedColumns: string[] = [
30+
'name',
31+
'type',
32+
'importStatus',
33+
'errorMessages',
34+
];
2335
isLoading = false;
2436
bulkImport: BulkImport[];
2537
bulkImportResult: BulkImport[];
@@ -28,10 +40,19 @@ export class BulkImportComponent implements OnInit {
2840
faTrash = faTrash;
2941
faDownload = faDownload;
3042
samples = [
31-
{ name: 'generic-http-sample.csv', url: '../../../assets/docs/iotdevice_generichttp.csv' },
32-
{ name: 'lorawan-otaa-sample.csv', url: '../../../assets/docs/iotdevice_lorawan_otaa.csv' },
33-
{ name: 'lorawan-abp-sample.csv', url: '../../../assets/docs/iotdevice_lorawan_abp.csv' },
34-
]
43+
{
44+
name: 'generic-http-sample.csv',
45+
url: '../../../assets/docs/iotdevice_generichttp.csv',
46+
},
47+
{
48+
name: 'lorawan-otaa-sample.csv',
49+
url: '../../../assets/docs/iotdevice_lorawan_otaa.csv',
50+
},
51+
{
52+
name: 'lorawan-abp-sample.csv',
53+
url: '../../../assets/docs/iotdevice_lorawan_abp.csv',
54+
},
55+
];
3556
download$: Observable<Download>;
3657
private bulkMapper = new BulkMapping();
3758
public backButtonTitle: string;
@@ -44,25 +65,23 @@ export class BulkImportComponent implements OnInit {
4465
private titleService: Title,
4566
private translate: TranslateService,
4667
private downloads: DownloadService,
47-
private errorMessageService: ErrorMessageService
68+
private errorMessageService: ErrorMessageService,
69+
private bulkImportService: BulkImportService
4870
) {
4971
this.translate.use('da');
50-
}
72+
}
5173

5274
ngOnInit(): void {
53-
this.translate.get(['TITLE.BULKIMPORT'])
54-
.subscribe(translations => {
55-
this.titleService.setTitle(translations['TITLE.BULKIMPORT']);
56-
});
75+
this.translate.get(['TITLE.BULKIMPORT']).subscribe((translations) => {
76+
this.titleService.setTitle(translations['TITLE.BULKIMPORT']);
77+
});
5778
this.applicationId = +this.route.snapshot.paramMap.get('id');
58-
5979
}
6080

61-
download({ name, url }: { name: string, url: string }) {
81+
download({ name, url }: { name: string; url: string }) {
6282
this.download$ = this.downloads.download(url, name);
6383
}
6484

65-
6685
deleteAttachment(index) {
6786
this.files.splice(index, 1);
6887
}
@@ -76,13 +95,14 @@ export class BulkImportComponent implements OnInit {
7695
}
7796
this.bulkImport = [];
7897
this.bulkImportResult = [];
79-
for (let index = 0; index < evt.length; index++) {
80-
const element = evt[index];
98+
99+
for (const element of evt) {
81100
this.files.push(element.name);
82101
}
102+
83103
// handle csv data
84104
this.isLoading = true;
85-
const files = evt; // File List object
105+
const files = evt; // File List object
86106
const file = files[0];
87107
const reader = new FileReader();
88108
reader.readAsText(file);
@@ -91,7 +111,7 @@ export class BulkImportComponent implements OnInit {
91111
this.papa.parse(csv, {
92112
skipEmptyLines: true,
93113
header: true,
94-
complete: results => {
114+
complete: (results) => {
95115
this.mapData(results.data);
96116
// this step ensures material can read from the array - should be fixed.
97117
this.bulkImportResult = this.bulkImport;
@@ -100,9 +120,8 @@ export class BulkImportComponent implements OnInit {
100120
} else {
101121
return this.bulkImport;
102122
}
103-
}
104-
}
105-
);
123+
},
124+
});
106125
this.isLoading = false;
107126
};
108127
}
@@ -118,43 +137,162 @@ export class BulkImportComponent implements OnInit {
118137

119138
private mapData(data: any[]) {
120139
data.forEach((device) => {
121-
const mappedDevice = this.bulkMapper.dataMapper(device, this.applicationId);
140+
const mappedDevice = this.bulkMapper.dataMapper(
141+
device,
142+
this.applicationId
143+
);
122144
if (mappedDevice) {
123145
this.bulkImport.push(new BulkImport(mappedDevice));
124146
} else {
125-
this.translate.get(['ERROR.SEMANTIC'])
126-
.subscribe(translations => {
127-
this.bulkImport.push(new BulkImport(null, [translations['ERROR.SEMANTIC']]));
128-
});
147+
this.translate.get(['ERROR.SEMANTIC']).subscribe((translations) => {
148+
this.bulkImport.push(
149+
new BulkImport(null, [translations['ERROR.SEMANTIC']])
150+
);
151+
});
129152
}
130153
});
131154
}
132155

133156
addIoTDevice() {
134-
this.bulkImportResult.forEach((requestItem) => {
135-
if (requestItem.device?.id) {
136-
this.iotDeviceService.updateIoTDevice(requestItem.device, requestItem.device.id).subscribe(
157+
// Subscribe to subject in service, Emit the index of next item in the array to be previous
158+
// The emit will activate the subscription which should call the updateIoTDevice
159+
const { newDevices, updatedDevices } = this.splitDevices();
160+
161+
this.postBulkImportPayload(
162+
newDevices,
163+
this.bulkImportService.nextCreateIotDeviceBatchIndex$,
164+
this.iotDeviceService.createIoTDevices.bind(this.iotDeviceService)
165+
);
166+
this.postBulkImportPayload(
167+
updatedDevices,
168+
this.bulkImportService.nextUpdateDeviceBatchIndex$,
169+
this.iotDeviceService.updateIoTDevices.bind(this.iotDeviceService)
170+
);
171+
}
172+
173+
private postBulkImportPayload(
174+
bulkDevices: BulkImport[][],
175+
batchIndex$: Subject<void>,
176+
importDevices: (
177+
payload: IotDeviceImportRequest
178+
) => Observable<IotDevicesImportResponse[]>
179+
): void {
180+
if (!bulkDevices.length) {
181+
return;
182+
}
183+
184+
let batchIndex = 0;
185+
186+
// takeWhile() will unsubscribe once the condition is false
187+
batchIndex$.pipe(takeWhile(() => batchIndex in bulkDevices)).subscribe(
188+
() => {
189+
const requestItems = bulkDevices[batchIndex];
190+
const devices: IotDeviceImportRequest = {
191+
data: requestItems.map((bulkResult) => bulkResult.device),
192+
};
193+
importDevices(devices).subscribe(
137194
(response) => {
138-
console.log(response);
139-
requestItem.importStatus = 'success';
195+
this.onSuccessfulImport(response, requestItems);
196+
++batchIndex;
197+
batchIndex$.next();
140198
},
141199
(error: HttpErrorResponse) => {
142-
requestItem.errorMessages = this.errorMessageService.handleErrorMessageWithFields(error).errorMessages;
143-
requestItem.importStatus = 'Failed';
144-
}
145-
);
146-
} else if (requestItem.device) {
147-
this.iotDeviceService.createIoTDevice(requestItem.device).subscribe(
148-
(res: any) => {
149-
console.log(res);
150-
requestItem.importStatus = 'success';
151-
},
152-
(error) => {
153-
requestItem.errorMessages = this.errorMessageService.handleErrorMessage(error);
154-
requestItem.importStatus = 'Failed';
200+
requestItems.forEach((item) => {
201+
item.errorMessages = this.errorMessageService.handleErrorMessageWithFields(
202+
error
203+
).errorMessages;
204+
item.importStatus = 'Failed';
205+
});
206+
// Continue processing the next batches
207+
++batchIndex;
208+
batchIndex$.next();
155209
}
156210
);
211+
},
212+
(_error: HttpErrorResponse) => {
213+
// Should not happen
214+
},
215+
() => {
216+
// Process any devices whose status hasn't been set and mark them as errors.
217+
this.onCompleteImport(bulkDevices);
218+
}
219+
);
220+
221+
// Trigger our listener
222+
batchIndex$.next();
223+
}
224+
225+
private onSuccessfulImport(
226+
response: IotDevicesImportResponse[],
227+
requestItems: BulkImport[]
228+
) {
229+
response.forEach((responseItem) => {
230+
const match = requestItems.find(
231+
({ device }) =>
232+
device.name === responseItem.idMetadata.name &&
233+
device.applicationId === responseItem.idMetadata.applicationId
234+
);
235+
if (!match) {
236+
return;
237+
}
238+
239+
if (responseItem.error && match) {
240+
match.errorMessages = this.errorMessageService.handleErrorMessageWithFields(
241+
{ error: responseItem.error }
242+
).errorMessages;
243+
match.importStatus = 'Failed';
244+
} else {
245+
match.errorMessages = [];
246+
match.importStatus = 'Success';
157247
}
158248
});
159249
}
250+
251+
private onCompleteImport(devicesBulk: BulkImport[][]) {
252+
for (const bulk of devicesBulk) {
253+
for (const device of bulk) {
254+
if (!device.importStatus) {
255+
device.importStatus = 'Failed';
256+
device.errorMessages = this.errorMessageService.handleErrorMessageWithFields(
257+
{
258+
error: {
259+
message: 'MESSAGE.FAILED-TO-CREATE-OR-UPDATE-IOT-DEVICE',
260+
},
261+
}
262+
).errorMessages;
263+
}
264+
}
265+
}
266+
}
267+
268+
private splitDevices(): {
269+
newDevices: BulkImport[][];
270+
updatedDevices: BulkImport[][];
271+
} {
272+
if (!this.bulkImportResult) {
273+
return { newDevices: [], updatedDevices: [] };
274+
}
275+
276+
const { updatedDevices, newDevices } = this.bulkImportResult.reduce(
277+
(
278+
res: {
279+
newDevices: BulkImport[];
280+
updatedDevices: BulkImport[];
281+
},
282+
curr
283+
) => {
284+
if (curr.device.id) {
285+
res.updatedDevices.push(curr);
286+
} else if (curr.device) {
287+
res.newDevices.push(curr);
288+
}
289+
return res;
290+
},
291+
{ updatedDevices: [], newDevices: [] }
292+
);
293+
return {
294+
newDevices: splitList(newDevices),
295+
updatedDevices: splitList(updatedDevices),
296+
};
297+
}
160298
}

src/app/applications/bulk-import/bulk-import.model.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { IotDevice } from '@applications/iot-devices/iot-device.model';
22

33
export class BulkImport {
44
public device: IotDevice;
5-
public errorMessages = [];
5+
public errorMessages: unknown[] = [];
66
public importStatus = '';
77
constructor(device: IotDevice, errorMessages = [], importStatus = '') {
88
this.device = device;

0 commit comments

Comments
 (0)