forked from Sunbird-Obsrv/obsrv-api-service
-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathConnectorRegisterController.ts
More file actions
139 lines (128 loc) · 5.84 KB
/
ConnectorRegisterController.ts
File metadata and controls
139 lines (128 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import { Request, Response } from "express";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import _ from "lodash";
import logger from "../../logger";
import axios from "axios";
import httpStatus from "http-status";
import busboy from "busboy";
import { PassThrough } from "stream";
import { registerConnector } from "../../connections/commandServiceConnection";
import { generatePreSignedUrl } from "../GenerateSignedURL/helper";
import { obsrvError } from "../../types/ObsrvError";
import { config } from "../../configs/Config";
export const apiId = "api.connector.register";
export const code = "FAILED_TO_REGISTER_CONNECTOR";
let resmsgid: string | any;
const connectorRegisterController = async (req: Request, res: Response) => {
resmsgid = _.get(res, "resmsgid");
try {
const uploadStreamResponse: any = await uploadStream(req);
const payload = {
relative_path: uploadStreamResponse[0]
}
logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` })
const downloadUrls = await generatePreSignedUrl("read", [payload.relative_path], "connector")
const urlPayload = {
download_url: _.get(downloadUrls, [0, "preSignedUrl"]),
file_name: _.get(downloadUrls, [0, "fileName"])
}
if (!urlPayload.download_url) {
throw obsrvError("", "SIGNED_URL_NOT_FOUND", `Failed to generate signed url for path ${payload.relative_path}`, "BAD_REQUEST", 400)
}
const userToken = req.get('authorization') as string;
const registryResponse = await registerConnector(urlPayload, userToken);
logger.info({ apiId, resmsgid, message: `Connector registered successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } })
} catch (error: any) {
const errMessage = _.get(error, "response.data.error.message")
logger.error(error, apiId, resmsgid, code);
let errorMessage = error;
const statusCode = _.get(error, "statusCode")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: errMessage || "Failed to register connector" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
};
const uploadStream = async (req: Request) => {
return new Promise((resolve, reject) => {
const filePromises: Promise<void>[] = [];
const busboyClient = busboy({ headers: req.headers });
const relative_path: any[] = [];
let fileCount = 0;
busboyClient.on("file", async (name: any, file: any, info: any) => {
if (fileCount > 0) {
// If more than one file is detected, reject the request
busboyClient.emit("error", reject({
code: "FAILED_TO_UPLOAD",
message: "Uploading multiple files are not allowed",
statusCode: 400,
errCode: "BAD_REQUEST"
}));
return
}
fileCount++;
const processFile = async () => {
const fileName = info?.filename;
try {
const preSignedUrl: any = await generatePreSignedUrl("write", [fileName], "connector")
const filePath = preSignedUrl[0]?.filePath
const fileNameExtracted = extractFileNameFromPath(filePath);
relative_path.push(...fileNameExtracted);
const pass = new PassThrough();
file.pipe(pass);
const fileBuffer = await streamToBuffer(pass);
const uploadHeaders: any = {
"Content-Type": info.mimeType,
"Content-Length": fileBuffer.length,
};
if (config.cloud_config.cloud_storage_provider === "azure") {
uploadHeaders["x-ms-blob-type"] = config.cloud_config.azure_blob_type;
}
await axios.put(preSignedUrl[0]?.preSignedUrl, fileBuffer, {
headers: uploadHeaders
});
}
catch (err) {
logger.error({ apiId, err, resmsgid, message: "Failed to generate sample urls", code: "FILES_GENERATE_URL_FAILURE" })
reject({
code: "FILES_GENERATE_URL_FAILURE",
message: "Failed to generate sample urls",
statusCode: 500,
errCode: "INTERNAL_SERVER_ERROR"
})
}
};
filePromises.push(processFile());
});
busboyClient.on("close", async () => {
try {
await Promise.all(filePromises);
resolve(relative_path);
} catch (error) {
logger.error({ apiId, error, resmsgid, message: "Fail to upload a file", code: "FAILED_TO_UPLOAD" })
reject({
code: "FAILED_TO_UPLOAD",
message: "Fail to upload a file",
statusCode: 400,
errCode: "BAD_REQUEST"
});
}
});
busboyClient.on("error", reject);
req.pipe(busboyClient);
})
}
const streamToBuffer = (stream: PassThrough): Promise<Buffer> => {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("end", () => resolve(Buffer.concat(chunks)));
stream.on("error", reject);
});
};
const extractFileNameFromPath = (filePath: string): string[] => {
const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g;
return filePath.match(regex) || [];
};
export default connectorRegisterController;