Skip to content

Commit efbdbb3

Browse files
authored
Merge pull request #223 from graphcareful/wasm-engine-logging
Wasm engine logging
2 parents c27fb3c + 951d741 commit efbdbb3

File tree

8 files changed

+300
-58
lines changed

8 files changed

+300
-58
lines changed

src/js/build-package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
"ts-node": "^9.0.0",
1717
"webpack": "^4.44.2",
1818
"xxhash": "0.3.0",
19-
"js-yaml": "^3.14.0"
19+
"js-yaml": "^3.14.0",
20+
"winston": "3.3.3"
2021
},
2122
"devDependencies": {
2223
"sinon": "^9.0.2",

src/js/modules/rpc/server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export class ProcessBatchServer extends SupervisorServer {
2727

2828
constructor(activeDir: string, inactiveDir: string, submitDir: string) {
2929
super();
30+
// TODO Can lookup the port redpanda is listening for copros on in the redpanda.yaml file
3031
this.managementClient = new ManagementClient(43118);
3132
this.applyCoprocessor = this.applyCoprocessor.bind(this);
3233
this.repository = new Repository();

src/js/modules/rpc/service.ts

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@ import { safeLoadAll } from "js-yaml";
1313
import { join, resolve } from "path";
1414
import * as fs from "fs";
1515
import { promisify } from "util";
16+
import { newLogger } from "../utilities/Logging";
17+
18+
const logger = newLogger("service");
1619

1720
// read yaml config file
1821
const readConfigFile = (confPath: string): Promise<Record<string, any>> => {
1922
try {
23+
logger.info(`Reading from config file: ${confPath}`);
2024
return fs.promises.readFile(confPath).then((file) => safeLoadAll(file)[0]);
2125
} catch (e) {
22-
return Promise.reject(new Error(`Error reading config file: ${e}`));
26+
return Promise.reject(new Error(`Error reading config file: ${e.message}`));
2327
}
2428
};
2529

@@ -33,30 +37,38 @@ const validateOrCreateScaffolding = (directoryPath: string): Promise<void> => {
3337
const path = join(directoryPath, folder);
3438
return exists(path).then((exist) => {
3539
if (!exist) {
36-
console.log(path);
40+
logger.info(`Creating part of scaffolding ${path}`);
3741
return fs.promises.mkdir(path, { recursive: true });
3842
}
3943
});
4044
});
4145
return Promise.all(validations).then(() => null);
4246
};
4347

44-
// read config file path argument
45-
const configPathArg = process.argv.splice(2)[0];
46-
const defaultConfigPath = "/var/lib/redpanda/conf/redpanda.yaml";
47-
const defaultCoprocessorPath = "/var/lib/redpanda/coprocessor";
48-
// resolve config path or assign default value
49-
const configPath = configPathArg ? resolve(configPathArg) : defaultConfigPath;
48+
function main() {
49+
// read config file path argument
50+
logger.info("Starting redpanda wasm service...");
51+
const configPathArg = process.argv.splice(2)[0];
52+
const defaultConfigPath = "/var/lib/redpanda/conf/redpanda.yaml";
53+
const defaultCoprocessorPath = "/var/lib/redpanda/coprocessor";
54+
// resolve config path or assign default value
55+
const configPath = configPathArg ? resolve(configPathArg) : defaultConfigPath;
56+
logger.info(`Using redpanda configuration path: ${configPath}`);
5057

51-
readConfigFile(configPath).then((config) => {
52-
const port = config?.redpanda?.coproc_supervisor_server || 43189;
53-
const path = config?.coproc_engine?.path || defaultCoprocessorPath;
54-
validateOrCreateScaffolding(path).then(() => {
55-
const service = new ProcessBatchServer(
56-
join(path, "active"),
57-
join(path, "inactive"),
58-
join(path, "submit")
59-
);
60-
service.listen(port);
58+
readConfigFile(configPath).then((config) => {
59+
const port = config?.redpanda?.coproc_supervisor_server || 43189;
60+
const path = config?.coproc_engine?.path || defaultCoprocessorPath;
61+
logger.info(`Using root scaffolding path: ${path}`);
62+
validateOrCreateScaffolding(path).then(() => {
63+
const service = new ProcessBatchServer(
64+
join(path, "active"),
65+
join(path, "inactive"),
66+
join(path, "submit")
67+
);
68+
logger.info(`Starting redpanda wasm service on port: ${port}`);
69+
service.listen(port);
70+
});
6171
});
62-
});
72+
}
73+
74+
main();

src/js/modules/supervisors/FileManager.ts

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { rename, readdir } from "fs";
1313
import { promisify } from "util";
1414
import Repository from "./Repository";
1515
import { Handle } from "../domain/Handle";
16+
import { newLogger } from "../utilities/Logging";
1617
import { getChecksumFromFile } from "../utilities/Checksum";
1718
import { Coprocessor, PolicyInjection } from "../public/Coprocessor";
1819
import { Script_ManagerClient as ManagementClient } from "../rpc/serverAndClients/server";
@@ -24,6 +25,8 @@ import {
2425
validateEnableResponseCode,
2526
} from "./HandleError";
2627

28+
const logger = newLogger("FileManager");
29+
2730
/**
2831
* FileManager class is an inotify implementation, it receives a
2932
* Repository and updates this object when to add a new file in
@@ -46,8 +49,8 @@ class FileManager {
4649
.then(() => this.readCoprocessorFolder(repository, this.submitDir))
4750
.then(() => this.startWatchers(repository));
4851
} catch (e) {
49-
console.error(e);
50-
//TODO: implement winston for loggin information and error handler
52+
// TODO: This should be a FATAL error
53+
logger.error(`Failed to register watch(es): ${e.message}`);
5154
}
5255
}
5356

@@ -73,7 +76,7 @@ class FileManager {
7376
return this.moveCoprocessorFile(prevHandle, this.inactiveDir)
7477
.then(() =>
7578
this.deregisterCoprocessor(prevHandle.coprocessor).catch((e) => {
76-
console.error(e.message);
79+
logger.error(e.message);
7780
return Promise.resolve();
7881
})
7982
)
@@ -130,7 +133,7 @@ class FileManager {
130133
path.join(folder, file),
131134
repository,
132135
validatePrevExist
133-
).catch((e) => console.error(e.message))
136+
).catch((e) => logger.error(e.message))
134137
);
135138
});
136139
//TODO: implement winston for loggin information and error handler
@@ -150,22 +153,23 @@ class FileManager {
150153
const id = hash64(Buffer.from(name), 0).readBigUInt64LE();
151154
const [handle] = repository.getHandlesByCoprocessorIds([id]);
152155
if (!handle) {
153-
console.error(
154-
"error: trying to disable a removed coprocessor from " +
155-
"'active' folder but it wasn't loaded in memory, file name " +
156-
`${name}.js`
156+
logger.error(
157+
`Trying to disable a removed coprocessor from 'active' folder but it` +
158+
`wasn't loaded in memory, file name: ${name}.js`
157159
);
158160
return Promise.resolve();
159161
} else {
160162
return this.disableCoprocessors([handle.coprocessor])
161163
.then(() => this.repository.remove(handle))
162-
.then(() =>
163-
console.log(
164+
.then(() => {
165+
logger.info(
164166
`disabled coprocessor: ID ${handle.coprocessor.globalId} ` +
165167
`filename: '${name}.js'`
166-
)
167-
)
168-
.catch((error) => console.log(error.message));
168+
);
169+
})
170+
.catch((error) => {
171+
logger.error(error.message);
172+
});
169173
}
170174
}
171175

@@ -221,12 +225,20 @@ class FileManager {
221225
* @param coprocessors
222226
*/
223227
disableCoprocessors(coprocessors: Coprocessor[]): Promise<void> {
228+
logger.info(
229+
`Initiating RPC call to redpandas coprocessor service at endpoint - ` +
230+
`disable_coprocessors with data: ${coprocessors}`
231+
);
224232
return this.managementClient
225233
.disable_copros({ inputs: coprocessors.map((coproc) => coproc.globalId) })
226234
.then((response) => {
227235
const errors = validateDisableResponseCode(response, coprocessors);
228236
if (errors.length > 0) {
229-
return Promise.reject(this.compactErrors([errors]));
237+
const compactedErrors = this.compactErrors([errors]);
238+
logger.error(
239+
`disable_coprocessors RPC returned with errors: ${compactedErrors}`
240+
);
241+
return Promise.reject(compactedErrors);
230242
} else {
231243
return Promise.resolve();
232244
}
@@ -256,6 +268,10 @@ class FileManager {
256268
if (coprocessors.length == 0) {
257269
return Promise.resolve();
258270
} else {
271+
logger.info(
272+
`Initiating RPC call to redpandas coprocessor service at endpoint - ` +
273+
`enable_coprocessors with data: ${coprocessors}`
274+
);
259275
return this.managementClient
260276
.enable_copros({
261277
coprocessors: coprocessors.map((coproc) => ({
@@ -282,7 +298,12 @@ class FileManager {
282298
coprocessors
283299
);
284300
if (errors.find((errors) => errors.length > 0)) {
285-
return Promise.reject(this.compactErrors(errors));
301+
const compactedErrors = this.compactErrors(errors);
302+
logger.error(
303+
`enable_coprocessors RPC returned with some error: ` +
304+
`${compactedErrors}`
305+
);
306+
return Promise.reject(compactedErrors);
286307
} else {
287308
return Promise.resolve();
288309
}
@@ -363,13 +384,15 @@ class FileManager {
363384
*/
364385
private startWatchers(repository: Repository): void {
365386
this.submitDirWatcher = watch(this.submitDir).on("add", (filePath) => {
366-
this.addCoprocessor(filePath, repository).catch((e) =>
367-
console.error(e.message)
368-
);
387+
logger.info(`Detected new file in submit dir: ${filePath}`);
388+
this.addCoprocessor(filePath, repository).catch((e) => {
389+
logger.error(`addCoprocessor failed with exception: ${e.message}`);
390+
});
391+
});
392+
this.activeDirWatcher = watch(this.activeDir).on("unlink", (filePath) => {
393+
logger.info(`Detected removed file from active dir: ${filePath}`);
394+
this.removeHandleFromFilePath(filePath, repository);
369395
});
370-
this.activeDirWatcher = watch(this.activeDir).on("unlink", (filePath) =>
371-
this.removeHandleFromFilePath(filePath, repository)
372-
);
373396
}
374397
}
375398

src/js/modules/supervisors/HandleError.ts

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export const validateDisableResponseCode = (
2626
): Error[] => {
2727
if (responses.inputs.length !== coprocessors.length) {
2828
throw new Error(
29-
"error: inconsistent response for disable coprocessors, " +
29+
"inconsistent response for disable coprocessors, " +
3030
`the disabled coprocessor response for ${coprocessors.join(", ")} ` +
3131
"doesn't have the same number item results, expected: " +
3232
`${coprocessors.length} result: ${responses.inputs.length}`
@@ -36,17 +36,14 @@ export const validateDisableResponseCode = (
3636
switch (code) {
3737
case DisableResponseCode.internalError: {
3838
errors.push(
39-
new Error(
40-
`internal error: wasm function ID: ${coprocessors[index].globalId}`
41-
)
39+
new Error(`wasm function ID: ${coprocessors[index].globalId}`)
4240
);
4341
break;
4442
}
4543
case DisableResponseCode.scriptDoesNotExist: {
4644
errors.push(
4745
new Error(
48-
`error: script with ID ${coprocessors[index].globalId} ` +
49-
"doesn't exist."
46+
`script with ID ${coprocessors[index].globalId} ` + "doesn't exist."
5047
)
5148
);
5249
}
@@ -61,7 +58,7 @@ export const validateEnableResponseCode = (
6158
): Error[][] => {
6259
if (responses.inputs.length !== coprocessors.length) {
6360
throw new Error(
64-
"error: inconsistent response for enable coprocessors, " +
61+
"inconsistent response for enable coprocessors, " +
6562
`the enable coprocessor response for ${coprocessors.join(", ")} ` +
6663
"doesn't have the same number item results, expected: " +
6764
`${coprocessors.length} result: ${responses.inputs.length}`
@@ -73,25 +70,23 @@ export const validateEnableResponseCode = (
7370
const coprocessor = coprocessors[index];
7471
switch (code) {
7572
case EnableResponseCode.internalError: {
76-
errors.push(new Error(`internal error: wasm function ID: ${id}`));
73+
errors.push(new Error(`wasm function ID: ${id}`));
7774
break;
7875
}
7976
case EnableResponseCode.invalidIngestionPolicy: {
80-
errors.push(new Error(`error: invalid ingestion policy`));
77+
errors.push(new Error(`invalid ingestion policy`));
8178
break;
8279
}
8380
case EnableResponseCode.invalidTopic: {
8481
errors.push(
85-
new Error(
86-
`error: invalid topic "${coprocessor.inputTopics[topicIndex]}"`
87-
)
82+
new Error(`invalid topic "${coprocessor.inputTopics[topicIndex]}"`)
8883
);
8984
break;
9085
}
9186
case EnableResponseCode.topicDoesNotExist: {
9287
errors.push(
9388
new Error(
94-
`error: topic "${coprocessor.inputTopics[topicIndex]}" ` +
89+
`topic "${coprocessor.inputTopics[topicIndex]}" ` +
9590
`doesn't exist`
9691
)
9792
);
@@ -100,14 +95,13 @@ export const validateEnableResponseCode = (
10095
case EnableResponseCode.scriptIdAlreadyExist: {
10196
errors.push(
10297
new Error(
103-
"error: wasm function already register, ID: " +
104-
coprocessor.globalId
98+
"wasm function already register, ID: " + coprocessor.globalId
10599
)
106100
);
107101
break;
108102
}
109103
case EnableResponseCode.materializedTopic: {
110-
errors.push(new Error("error: materialized topic"));
104+
errors.push(new Error("materialized topic"));
111105
break;
112106
}
113107
case EnableResponseCode.success: {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright 2020 Vectorized, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/vectorizedio/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
import * as winston from "winston";
12+
13+
export function newLogger(
14+
module_name: string,
15+
logLevel = "info",
16+
simplify = false
17+
) {
18+
const rpWasmFormatter = () => {
19+
const logLineFormatter = winston.format.printf(
20+
({ level, message, label, timestamp }) => {
21+
return `${timestamp} [${label}] ${level}: ${message}`;
22+
}
23+
);
24+
const simpleLogLineFormatter = winston.format.printf(
25+
({ level, message, label }) => {
26+
return `[${label}] ${level}: ${message}`;
27+
}
28+
);
29+
30+
return (simplify ? [] : [winston.format.timestamp()]).concat([
31+
winston.format.splat(),
32+
winston.format.label({ label: module_name }),
33+
simplify ? simpleLogLineFormatter : logLineFormatter,
34+
]);
35+
};
36+
37+
const logger = winston.createLogger({
38+
level: logLevel,
39+
format: winston.format.combine(...rpWasmFormatter()),
40+
defaultMeta: { service: module_name },
41+
transports: [
42+
new winston.transports.Console({
43+
level: logLevel,
44+
}),
45+
],
46+
});
47+
return logger;
48+
}

0 commit comments

Comments
 (0)