Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
"redis": "^4.6.8",
"ts-node": "^10.9.1",
"typescript": "^4.7.4",
"winston": "^3.8.2"
"winston": "^3.8.2",
"zod": "^3.22.4"
},
"devDependencies": {
"@snapshot-labs/eslint-config": "^0.1.0-beta.15",
Expand Down
154 changes: 106 additions & 48 deletions src/ingestor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ import log from './helpers/log';
import { capture } from '@snapshot-labs/snapshot-sentry';
import { flaggedIps } from './helpers/moderation';
import { timeIngestorProcess } from './helpers/metrics';
import { Envelope, InitialEnvelope } from './schemas';

type Writers = typeof writer;
type WritersKeys = keyof Writers;
type Handler<WriterKey extends WritersKeys> = {
verify: (message: Parameters<Writers[WriterKey]['verify']>[0]) => Promise<any>;
action: (
message: Parameters<Writers[WriterKey]['action']>[0],
ipfs: Parameters<Writers[WriterKey]['action']>[1],
receipt: Parameters<Writers[WriterKey]['action']>[2],
id: Parameters<Writers[WriterKey]['action']>[3],
context: Parameters<Writers[WriterKey]['action']>[4]
) => Promise<any>;
};

const NAME = 'snapshot';
const VERSION = '0.1.4';
Expand All @@ -25,13 +39,11 @@ export default async function ingestor(req) {
const endTimer = timeIngestorProcess.startTimer();

try {
const body = req.body;

if (flaggedIps.includes(sha256(getIp(req)))) {
return Promise.reject('unauthorized');
}

const schemaIsValid = snapshot.utils.validateSchema(envelope, body);
const schemaIsValid = snapshot.utils.validateSchema(envelope, req.body);
if (schemaIsValid !== true) {
log.warn(`[ingestor] Wrong envelope format ${JSON.stringify(schemaIsValid)}`);
return Promise.reject('wrong envelope format');
Expand All @@ -40,14 +52,14 @@ export default async function ingestor(req) {
const ts = Date.now() / 1e3;
const over = 300;
const under = 60 * 60 * 24 * 3; // 3 days
const overTs = (ts + over).toFixed();
const underTs = (ts - under).toFixed();
const { domain, message, types } = body.data;
const overTs = Math.round(ts + over);
const underTs = Math.round(ts - under);

if (JSON.stringify(body).length > 1e5) return Promise.reject('too large message');
if (JSON.stringify(req.body).length > 1e5) return Promise.reject('too large message');

if (message.timestamp > overTs || message.timestamp < underTs)
return Promise.reject('wrong timestamp');
const initialEnvelopeResult = InitialEnvelope.safeParse(req.body);
if (!initialEnvelopeResult.success) return Promise.reject('wrong envelope format');
const { domain, types } = initialEnvelopeResult.data.data;

if (domain.name !== NAME || domain.version !== VERSION) return Promise.reject('wrong domain');

Expand All @@ -58,10 +70,25 @@ export default async function ingestor(req) {
if (!Object.keys(hashTypes).includes(hash)) return Promise.reject('wrong types');
type = hashTypes[hash];

const bodyWithType = {
...req.body,
data: {
...req.body.data,
type
}
};
const envelopeResult = Envelope.safeParse(bodyWithType);
if (!envelopeResult.success) return Promise.reject('wrong envelope format');
const { data: parsed } = envelopeResult;
const { type: messageType, message } = parsed.data;

if (message.timestamp > overTs || message.timestamp < underTs)
return Promise.reject('wrong timestamp');

network = '1';
let aliased = false;
if (!['settings', 'alias', 'profile'].includes(type)) {
if (!message.space) return Promise.reject('unknown space');

if (messageType !== 'settings' && messageType !== 'alias' && messageType !== 'profile') {
const space = await getSpace(message.space);
if (!space) return Promise.reject('unknown space');
network = space.network;
Expand All @@ -78,36 +105,41 @@ export default async function ingestor(req) {
'delete-proposal',
'statement'
];
if (body.address !== message.from) {
if (!aliasTypes.includes(type) && !aliasOptionTypes.includes(type))
if (parsed.address !== message.from) {
if (!aliasTypes.includes(messageType) && !aliasOptionTypes.includes(messageType))
return Promise.reject('wrong from');

if (aliasOptionTypes.includes(type) && !aliased) return Promise.reject('alias not enabled');

if (!(await isValidAlias(message.from, body.address))) return Promise.reject('wrong alias');
if (!(await isValidAlias(message.from, parsed.address))) return Promise.reject('wrong alias');
}

// Check if signature is valid
try {
const isValidSig = await snapshot.utils.verify(body.address, body.sig, body.data, network, {
broviderUrl
});
const isValidSig = await snapshot.utils.verify(
parsed.address,
parsed.sig,
parsed.data,
network,
{
broviderUrl
}
);
if (!isValidSig) throw new Error('invalid signature');
} catch (e: any) {
log.warn(`signature validation failed for ${body.address} ${JSON.stringify(e)}`);
log.warn(`signature validation failed for ${parsed.address} ${JSON.stringify(e)}`);
return Promise.reject('signature validation failed');
}

const id = snapshot.utils.getHash(body.data);
const id = snapshot.utils.getHash(parsed.data);
let payload = {};

if (await doesMessageExist(id)) {
return Promise.reject('duplicate message');
}

if (type === 'settings') payload = JSON.parse(message.settings);

if (type === 'proposal')
if (messageType === 'settings') payload = JSON.parse(message.settings);
if (messageType === 'proposal')
payload = {
name: message.title,
body: message.body,
Expand All @@ -122,10 +154,12 @@ export default async function ingestor(req) {
type: message.type,
app: kebabCase(message.app || '')
};
if (type === 'alias') payload = { alias: message.alias };
if (type === 'statement') payload = { about: message.about, statement: message.statement };
if (type === 'delete-proposal') payload = { proposal: message.proposal };
if (type === 'update-proposal') {
if (messageType === 'alias') payload = { alias: message.alias };
if (messageType === 'statement') {
payload = { about: message.about, statement: message.statement };
}
if (messageType === 'delete-proposal') payload = { proposal: message.proposal };
if (messageType === 'update-proposal') {
payload = {
proposal: message.proposal,
name: message.title,
Expand All @@ -138,14 +172,11 @@ export default async function ingestor(req) {
type: message.type
};
}
if (type === 'flag-proposal') payload = { proposal: message.proposal };

if (['vote', 'vote-array', 'vote-string'].includes(type)) {
if (message.metadata && message.metadata.length > 2000)
return Promise.reject('too large metadata');
if (messageType === 'flag-proposal') payload = { proposal: message.proposal };

if (messageType === 'vote' || messageType === 'vote-array' || messageType === 'vote-string') {
let choice = message.choice;
if (type === 'vote-string') {
if (messageType === 'vote-string') {
const proposal = await getProposal(message.space, message.proposal);
if (!proposal) return Promise.reject('unknown proposal');
if (proposal.privacy !== 'shutter') choice = JSON.parse(message.choice);
Expand All @@ -161,38 +192,51 @@ export default async function ingestor(req) {
type = 'vote';
}

let legacyBody = {
const legacyBody = {
address: message.from,
msg: JSON.stringify({
version: domain.version,
timestamp: message.timestamp,
space: message.space,
space: 'space' in message ? message.space : '',
type,
payload
}),
sig: body.sig
sig: parsed.sig
};
const msg = jsonParse(legacyBody.msg);

if (['follow', 'unfollow', 'subscribe', 'unsubscribe', 'profile'].includes(type)) {
legacyBody = message;
}

let context;
try {
context = await writer[type].verify(legacyBody);
if (
messageType === 'follow' ||
messageType === 'unfollow' ||
messageType === 'subscribe' ||
messageType === 'unsubscribe' ||
messageType === 'profile'
) {
// NOTE: those are only writers that support new format right now, in the future it won't be conditional
// once other writers are updated
const handler = writer[messageType] as Handler<typeof messageType>;
context = await handler.verify(message);
} else {
context = await writer[messageType].verify(legacyBody);
}
} catch (e) {
if (typeof e !== 'string') {
capture(e);
}
log.warn(`[ingestor] [space: ${message?.space}] verify failed ${JSON.stringify(e)}`);
log.warn(
`[ingestor] [space: ${'space' in message && message.space}] verify failed ${JSON.stringify(
e
)}`
);
return Promise.reject(e);
}

let pinned;
let receipt;
try {
const { address, sig, ...restBody } = body;
const { address, sig, ...restBody } = parsed;
const ipfsBody = {
address,
sig,
Expand All @@ -201,7 +245,7 @@ export default async function ingestor(req) {
};
[pinned, receipt] = await Promise.all([
pin(ipfsBody, process.env.PINEAPPLE_URL),
issueReceipt(body.sig)
issueReceipt(parsed.sig)
]);
} catch (e) {
capture(e);
Expand All @@ -210,16 +254,30 @@ export default async function ingestor(req) {
const ipfs = pinned.cid;

try {
await writer[type].action(legacyBody, ipfs, receipt, id, context);
if (
messageType === 'follow' ||
messageType === 'unfollow' ||
messageType === 'subscribe' ||
messageType === 'unsubscribe' ||
messageType === 'profile'
) {
// NOTE: those are only writers that support new format right now, in the future it won't be conditional
// once other writers are updated
const handler = writer[messageType] as Handler<typeof messageType>;
context = await handler.action(message, ipfs, receipt, id, context);
} else {
await writer[messageType].action(legacyBody, ipfs, receipt, id, context);
}

await storeMsg(
id,
ipfs,
body.address,
parsed.address,
msg.version,
msg.timestamp,
msg.space || '',
msg.type,
body.sig,
parsed.sig,
receipt
);
} catch (e) {
Expand All @@ -230,9 +288,9 @@ export default async function ingestor(req) {
}

const shortId = `${id.slice(0, 7)}...`;
const spaceText = message.space ? ` on "${message.space}"` : '';
const spaceText = 'space' in message && message.space ? ` on "${message.space}"` : '';
log.info(
`[ingestor] New "${type}"${spaceText} for "${body.address}", id: ${shortId}, IP: ${sha256(
`[ingestor] New "${type}"${spaceText} for "${parsed.address}", id: ${shortId}, IP: ${sha256(
getIp(req)
)}`
);
Expand Down
Loading