diff --git a/package.json b/package.json index 33ea5d71..f1d7a5f1 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,8 @@ "redis": "^4.6.8", "ts-node": "^10.9.1", "typescript": "^4.7.4", - "winston": "^3.8.2" + "winston": "^3.8.2", + "zod": "^3.22.4" }, "devDependencies": { "@snapshot-labs/eslint-config": "^0.1.0-beta.15", diff --git a/src/ingestor.ts b/src/ingestor.ts index f531e82b..4f478210 100644 --- a/src/ingestor.ts +++ b/src/ingestor.ts @@ -13,6 +13,20 @@ import log from './helpers/log'; import { capture } from '@snapshot-labs/snapshot-sentry'; import { flaggedIps } from './helpers/moderation'; import { timeIngestorProcess } from './helpers/metrics'; +import { Envelope, InitialEnvelope } from './schemas'; + +type Writers = typeof writer; +type WritersKeys = keyof Writers; +type Handler = { + verify: (message: Parameters[0]) => Promise; + action: ( + message: Parameters[0], + ipfs: Parameters[1], + receipt: Parameters[2], + id: Parameters[3], + context: Parameters[4] + ) => Promise; +}; const NAME = 'snapshot'; const VERSION = '0.1.4'; @@ -25,13 +39,11 @@ export default async function ingestor(req) { const endTimer = timeIngestorProcess.startTimer(); try { - const body = req.body; - if (flaggedIps.includes(sha256(getIp(req)))) { return Promise.reject('unauthorized'); } - const schemaIsValid = snapshot.utils.validateSchema(envelope, body); + const schemaIsValid = snapshot.utils.validateSchema(envelope, req.body); if (schemaIsValid !== true) { log.warn(`[ingestor] Wrong envelope format ${JSON.stringify(schemaIsValid)}`); return Promise.reject('wrong envelope format'); @@ -40,14 +52,14 @@ export default async function ingestor(req) { const ts = Date.now() / 1e3; const over = 300; const under = 60 * 60 * 24 * 3; // 3 days - const overTs = (ts + over).toFixed(); - const underTs = (ts - under).toFixed(); - const { domain, message, types } = body.data; + const overTs = Math.round(ts + over); + const underTs = Math.round(ts - under); - if (JSON.stringify(body).length > 1e5) return Promise.reject('too large message'); + if (JSON.stringify(req.body).length > 1e5) return Promise.reject('too large message'); - if (message.timestamp > overTs || message.timestamp < underTs) - return Promise.reject('wrong timestamp'); + const initialEnvelopeResult = InitialEnvelope.safeParse(req.body); + if (!initialEnvelopeResult.success) return Promise.reject('wrong envelope format'); + const { domain, types } = initialEnvelopeResult.data.data; if (domain.name !== NAME || domain.version !== VERSION) return Promise.reject('wrong domain'); @@ -58,10 +70,25 @@ export default async function ingestor(req) { if (!Object.keys(hashTypes).includes(hash)) return Promise.reject('wrong types'); type = hashTypes[hash]; + const bodyWithType = { + ...req.body, + data: { + ...req.body.data, + type + } + }; + const envelopeResult = Envelope.safeParse(bodyWithType); + if (!envelopeResult.success) return Promise.reject('wrong envelope format'); + const { data: parsed } = envelopeResult; + const { type: messageType, message } = parsed.data; + + if (message.timestamp > overTs || message.timestamp < underTs) + return Promise.reject('wrong timestamp'); + network = '1'; let aliased = false; - if (!['settings', 'alias', 'profile'].includes(type)) { - if (!message.space) return Promise.reject('unknown space'); + + if (messageType !== 'settings' && messageType !== 'alias' && messageType !== 'profile') { const space = await getSpace(message.space); if (!space) return Promise.reject('unknown space'); network = space.network; @@ -78,36 +105,41 @@ export default async function ingestor(req) { 'delete-proposal', 'statement' ]; - if (body.address !== message.from) { - if (!aliasTypes.includes(type) && !aliasOptionTypes.includes(type)) + if (parsed.address !== message.from) { + if (!aliasTypes.includes(messageType) && !aliasOptionTypes.includes(messageType)) return Promise.reject('wrong from'); if (aliasOptionTypes.includes(type) && !aliased) return Promise.reject('alias not enabled'); - if (!(await isValidAlias(message.from, body.address))) return Promise.reject('wrong alias'); + if (!(await isValidAlias(message.from, parsed.address))) return Promise.reject('wrong alias'); } // Check if signature is valid try { - const isValidSig = await snapshot.utils.verify(body.address, body.sig, body.data, network, { - broviderUrl - }); + const isValidSig = await snapshot.utils.verify( + parsed.address, + parsed.sig, + parsed.data, + network, + { + broviderUrl + } + ); if (!isValidSig) throw new Error('invalid signature'); } catch (e: any) { - log.warn(`signature validation failed for ${body.address} ${JSON.stringify(e)}`); + log.warn(`signature validation failed for ${parsed.address} ${JSON.stringify(e)}`); return Promise.reject('signature validation failed'); } - const id = snapshot.utils.getHash(body.data); + const id = snapshot.utils.getHash(parsed.data); let payload = {}; if (await doesMessageExist(id)) { return Promise.reject('duplicate message'); } - if (type === 'settings') payload = JSON.parse(message.settings); - - if (type === 'proposal') + if (messageType === 'settings') payload = JSON.parse(message.settings); + if (messageType === 'proposal') payload = { name: message.title, body: message.body, @@ -122,10 +154,12 @@ export default async function ingestor(req) { type: message.type, app: kebabCase(message.app || '') }; - if (type === 'alias') payload = { alias: message.alias }; - if (type === 'statement') payload = { about: message.about, statement: message.statement }; - if (type === 'delete-proposal') payload = { proposal: message.proposal }; - if (type === 'update-proposal') { + if (messageType === 'alias') payload = { alias: message.alias }; + if (messageType === 'statement') { + payload = { about: message.about, statement: message.statement }; + } + if (messageType === 'delete-proposal') payload = { proposal: message.proposal }; + if (messageType === 'update-proposal') { payload = { proposal: message.proposal, name: message.title, @@ -138,14 +172,11 @@ export default async function ingestor(req) { type: message.type }; } - if (type === 'flag-proposal') payload = { proposal: message.proposal }; - - if (['vote', 'vote-array', 'vote-string'].includes(type)) { - if (message.metadata && message.metadata.length > 2000) - return Promise.reject('too large metadata'); + if (messageType === 'flag-proposal') payload = { proposal: message.proposal }; + if (messageType === 'vote' || messageType === 'vote-array' || messageType === 'vote-string') { let choice = message.choice; - if (type === 'vote-string') { + if (messageType === 'vote-string') { const proposal = await getProposal(message.space, message.proposal); if (!proposal) return Promise.reject('unknown proposal'); if (proposal.privacy !== 'shutter') choice = JSON.parse(message.choice); @@ -161,38 +192,51 @@ export default async function ingestor(req) { type = 'vote'; } - let legacyBody = { + const legacyBody = { address: message.from, msg: JSON.stringify({ version: domain.version, timestamp: message.timestamp, - space: message.space, + space: 'space' in message ? message.space : '', type, payload }), - sig: body.sig + sig: parsed.sig }; const msg = jsonParse(legacyBody.msg); - if (['follow', 'unfollow', 'subscribe', 'unsubscribe', 'profile'].includes(type)) { - legacyBody = message; - } - let context; try { - context = await writer[type].verify(legacyBody); + if ( + messageType === 'follow' || + messageType === 'unfollow' || + messageType === 'subscribe' || + messageType === 'unsubscribe' || + messageType === 'profile' + ) { + // NOTE: those are only writers that support new format right now, in the future it won't be conditional + // once other writers are updated + const handler = writer[messageType] as Handler; + context = await handler.verify(message); + } else { + context = await writer[messageType].verify(legacyBody); + } } catch (e) { if (typeof e !== 'string') { capture(e); } - log.warn(`[ingestor] [space: ${message?.space}] verify failed ${JSON.stringify(e)}`); + log.warn( + `[ingestor] [space: ${'space' in message && message.space}] verify failed ${JSON.stringify( + e + )}` + ); return Promise.reject(e); } let pinned; let receipt; try { - const { address, sig, ...restBody } = body; + const { address, sig, ...restBody } = parsed; const ipfsBody = { address, sig, @@ -201,7 +245,7 @@ export default async function ingestor(req) { }; [pinned, receipt] = await Promise.all([ pin(ipfsBody, process.env.PINEAPPLE_URL), - issueReceipt(body.sig) + issueReceipt(parsed.sig) ]); } catch (e) { capture(e); @@ -210,16 +254,30 @@ export default async function ingestor(req) { const ipfs = pinned.cid; try { - await writer[type].action(legacyBody, ipfs, receipt, id, context); + if ( + messageType === 'follow' || + messageType === 'unfollow' || + messageType === 'subscribe' || + messageType === 'unsubscribe' || + messageType === 'profile' + ) { + // NOTE: those are only writers that support new format right now, in the future it won't be conditional + // once other writers are updated + const handler = writer[messageType] as Handler; + context = await handler.action(message, ipfs, receipt, id, context); + } else { + await writer[messageType].action(legacyBody, ipfs, receipt, id, context); + } + await storeMsg( id, ipfs, - body.address, + parsed.address, msg.version, msg.timestamp, msg.space || '', msg.type, - body.sig, + parsed.sig, receipt ); } catch (e) { @@ -230,9 +288,9 @@ export default async function ingestor(req) { } const shortId = `${id.slice(0, 7)}...`; - const spaceText = message.space ? ` on "${message.space}"` : ''; + const spaceText = 'space' in message && message.space ? ` on "${message.space}"` : ''; log.info( - `[ingestor] New "${type}"${spaceText} for "${body.address}", id: ${shortId}, IP: ${sha256( + `[ingestor] New "${type}"${spaceText} for "${parsed.address}", id: ${shortId}, IP: ${sha256( getIp(req) )}` ); diff --git a/src/schemas.ts b/src/schemas.ts new file mode 100644 index 00000000..3f5557df --- /dev/null +++ b/src/schemas.ts @@ -0,0 +1,149 @@ +import { z } from 'zod'; + +const BaseMessage = z.object({ + from: z.string(), + timestamp: z.number() +}); + +export const AliasMessage = BaseMessage.extend({ + alias: z.string() +}); + +export const DeleteProposalMessage = BaseMessage.extend({ + space: z.string(), + proposal: z.string() +}); + +export const DeleteSpaceMessage = BaseMessage.extend({ + space: z.string() +}); + +export const FlagProposalMessage = BaseMessage.extend({ + space: z.string(), + proposal: z.string() +}); + +export const FollowMessage = BaseMessage.extend({ + space: z.string() +}); +export type FollowMessage = z.infer; + +export const ProfileMessage = BaseMessage.extend({ + profile: z.string() +}); +export type ProfileMessage = z.infer; + +export const ProposalMessage = BaseMessage.extend({ + space: z.string(), + title: z.string().min(1).max(256), + body: z.string().max(20000), // TODO: handle turbo/default + discussion: z.string().optional(), + choices: z.array(z.string()).min(1).max(500), + type: z.enum([ + 'single-choice', + 'approval', + 'ranked-choice', + 'quadratic', + 'weighted', + 'custom', + 'basic' + ]), + snapshot: z.number(), + start: z.number().min(1000000000).max(2000000000), + end: z.number().min(1000000000).max(2000000000), + plugins: z.string(), + app: z.string().max(24).optional() +}); + +export const SettingsMessage = BaseMessage.extend({ + settings: z.string() +}); + +export const StatementMessage = BaseMessage.extend({ + space: z.string(), + about: z.string(), + statement: z.string().optional() +}); + +export const SubscribeMessage = BaseMessage.extend({ + space: z.string() +}); +export type SubscribeMessage = z.infer; + +export const UpdateProposal = ProposalMessage.omit({ + snapshot: true, + start: true, + end: true, + app: true +}).extend({ + proposal: z.string() +}); + +export const UnfollowMessage = BaseMessage.extend({ + space: z.string() +}); +export type UnfollowMessage = z.infer; + +export const UnsubscribeMessage = BaseMessage.extend({ + space: z.string() +}); +export type UnsubscribeMessage = z.infer; + +export const BaseVoteMessage = BaseMessage.extend({ + space: z.string(), + proposal: z.string(), + choice: z.union([ + z.number(), + z.string(), + z.boolean(), + z.object({}).passthrough(), + z.array(z.number()) + ]), + reason: z.string().max(140).optional(), + metadata: z.string().max(2000).optional(), + app: z.string().max(24).optional() +}); + +export const VoteArrayMessage = BaseVoteMessage.extend({ + choice: z.array(z.string()) +}); + +export const VoteStringMessage = BaseVoteMessage.extend({ + choice: z.string() +}); + +const MessageUnion = z.discriminatedUnion('type', [ + z.object({ type: z.literal('alias'), message: AliasMessage }), + z.object({ type: z.literal('delete-proposal'), message: DeleteProposalMessage }), + z.object({ type: z.literal('delete-space'), message: DeleteSpaceMessage }), + z.object({ type: z.literal('flag-proposal'), message: FlagProposalMessage }), + z.object({ type: z.literal('follow'), message: FollowMessage }), + z.object({ type: z.literal('profile'), message: ProfileMessage }), + z.object({ type: z.literal('proposal'), message: ProposalMessage }), + z.object({ type: z.literal('settings'), message: SettingsMessage }), + z.object({ type: z.literal('statement'), message: StatementMessage }), + z.object({ type: z.literal('subscribe'), message: SubscribeMessage }), + z.object({ type: z.literal('unfollow'), message: UnfollowMessage }), + z.object({ type: z.literal('unsubscribe'), message: UnsubscribeMessage }), + z.object({ type: z.literal('update-proposal'), message: UpdateProposal }), + z.object({ type: z.literal('vote'), message: BaseVoteMessage }), + z.object({ type: z.literal('vote-string'), message: VoteStringMessage }), + z.object({ type: z.literal('vote-array'), message: VoteArrayMessage }) +]); + +export const InitialEnvelope = z.object({ + address: z.string(), + data: z.object({ + domain: z.object({ + name: z.string(), + version: z.string() + }), + types: z.any(), + primaryType: z.string().optional() + }), + sig: z.string() +}); + +export const Envelope = InitialEnvelope.extend({ + data: InitialEnvelope.shape.data.and(MessageUnion) +}); diff --git a/src/writer/follow.ts b/src/writer/follow.ts index 260f0d9e..1f9d91b0 100644 --- a/src/writer/follow.ts +++ b/src/writer/follow.ts @@ -1,5 +1,6 @@ import { FOLLOWS_LIMIT_PER_USER } from '../helpers/limits'; import db from '../helpers/mysql'; +import { FollowMessage } from '../schemas'; export const getFollowsCount = async (follower: string): Promise => { const query = `SELECT COUNT(*) AS count FROM follows WHERE follower = ?`; @@ -9,7 +10,7 @@ export const getFollowsCount = async (follower: string): Promise => { return count; }; -export async function verify(message): Promise { +export async function verify(message: FollowMessage): Promise { const count = await getFollowsCount(message.from); if (count >= FOLLOWS_LIMIT_PER_USER) { @@ -19,7 +20,12 @@ export async function verify(message): Promise { return true; } -export async function action(message, ipfs, receipt, id): Promise { +export async function action( + message: FollowMessage, + ipfs: string, + receipt: string, + id: string +): Promise { const params = { id, ipfs, diff --git a/src/writer/profile.ts b/src/writer/profile.ts index baf3a34f..234f798e 100644 --- a/src/writer/profile.ts +++ b/src/writer/profile.ts @@ -2,9 +2,10 @@ import snapshot from '@snapshot-labs/snapshot.js'; import db from '../helpers/mysql'; import { jsonParse } from '../helpers/utils'; import log from '../helpers/log'; +import { ProfileMessage } from '../schemas'; -export async function verify(body): Promise { - const profile = jsonParse(body.profile, {}); +export async function verify(message: ProfileMessage): Promise { + const profile = jsonParse(message.profile, {}); const schemaIsValid = snapshot.utils.validateSchema(snapshot.schemas.profile, profile); if (schemaIsValid !== true) { log.warn(`[writer] Wrong profile format ${JSON.stringify(schemaIsValid)}`); @@ -14,7 +15,7 @@ export async function verify(body): Promise { return true; } -export async function action(message, ipfs): Promise { +export async function action(message: ProfileMessage, ipfs: string): Promise { const profile = jsonParse(message.profile, {}); const params = { diff --git a/src/writer/subscribe.ts b/src/writer/subscribe.ts index c018da36..956b870c 100644 --- a/src/writer/subscribe.ts +++ b/src/writer/subscribe.ts @@ -1,10 +1,16 @@ import db from '../helpers/mysql'; +import { SubscribeMessage } from '../schemas'; export async function verify(): Promise { return true; } -export async function action(message, ipfs, receipt, id): Promise { +export async function action( + message: SubscribeMessage, + ipfs: string, + receipt: string, + id: string +): Promise { const params = { id, ipfs, diff --git a/src/writer/unfollow.ts b/src/writer/unfollow.ts index 001df9a3..5ab9e21a 100644 --- a/src/writer/unfollow.ts +++ b/src/writer/unfollow.ts @@ -1,10 +1,11 @@ import db from '../helpers/mysql'; +import { UnfollowMessage } from '../schemas'; export async function verify(): Promise { return true; } -export async function action(message): Promise { +export async function action(message: UnfollowMessage): Promise { const query = 'DELETE FROM follows WHERE follower = ? AND space = ? LIMIT 1'; await db.queryAsync(query, [message.from, message.space]); } diff --git a/src/writer/unsubscribe.ts b/src/writer/unsubscribe.ts index 2bff4b70..361a85d7 100644 --- a/src/writer/unsubscribe.ts +++ b/src/writer/unsubscribe.ts @@ -1,10 +1,11 @@ import db from '../helpers/mysql'; +import { UnsubscribeMessage } from '../schemas'; export async function verify(): Promise { return true; } -export async function action(message): Promise { +export async function action(message: UnsubscribeMessage): Promise { const query = 'DELETE FROM subscriptions WHERE address = ? AND space = ? LIMIT 1'; await db.queryAsync(query, [message.from, message.space]); } diff --git a/test/integration/ingestor.test.ts b/test/integration/ingestor.test.ts index b97b0b34..f8e2e54f 100644 --- a/test/integration/ingestor.test.ts +++ b/test/integration/ingestor.test.ts @@ -173,7 +173,7 @@ describe('ingestor', () => { const invalidRequest = cloneDeep(voteRequest); invalidRequest.body.data.message.metadata = JSON.stringify({ reason: ' - '.repeat(5000) }); - await expect(ingestor(invalidRequest)).rejects.toMatch('large'); + await expect(ingestor(invalidRequest)).rejects.toMatch('wrong envelope format'); }); it('rejects when IPFS pinning fail', async () => { diff --git a/test/integration/writer/follows.test.ts b/test/integration/writer/follows.test.ts index c5f18e0b..468c2c65 100644 --- a/test/integration/writer/follows.test.ts +++ b/test/integration/writer/follows.test.ts @@ -31,13 +31,13 @@ describe('writer/follow', () => { }); it('rejects when the user has followed too much spaces', () => { - expect(verify({ from: followerId })).rejects.toEqual( + expect(verify({ from: followerId, timestamp: 0, space: '0cf5e.eth' })).rejects.toEqual( `you can join max ${FOLLOWS_LIMIT_PER_USER} spaces` ); }); it('returns true when the user has not reached the limit', () => { - expect(verify({ from: '0x1' })).resolves.toEqual(true); + expect(verify({ from: '0x1', timestamp: 0, space: '0cf5e.eth' })).resolves.toEqual(true); }); }); }); diff --git a/yarn.lock b/yarn.lock index 7c3c1204..f1d8bcc5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5688,3 +5688,8 @@ yocto-queue@^0.1.0: version "0.1.0" resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b" integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q== + +zod@^3.22.4: + version "3.22.4" + resolved "https://registry.yarnpkg.com/zod/-/zod-3.22.4.tgz#f31c3a9386f61b1f228af56faa9255e845cf3fff" + integrity sha512-iC+8Io04lddc+mVqQ9AZ7OQ2MrUKGN+oIQyq1vemgt46jwCwLfhq7/pwnBnNXXXZb8VTVLKwp9EDkx+ryxIWmg==