Skip to content

Commit abc06c6

Browse files
authored
fix: entity deployment rate-limiter (#1873)
* fix: entity deployment rate-limiter * fix: tests * test: fix and add suite for rate-limiting * chore: apply PR feedback
1 parent 14db3b8 commit abc06c6

File tree

9 files changed

+345
-23
lines changed

9 files changed

+345
-23
lines changed

content/src/components.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ export async function initComponentsWithEnv(env: Environment): Promise<AppCompon
143143
entitiesConfigTtl:
144144
env.getConfig<Map<EntityType, number>>(EnvironmentConfig.DEPLOYMENT_RATE_LIMIT_TTL) ?? new Map(),
145145
entitiesConfigMax:
146-
env.getConfig<Map<EntityType, number>>(EnvironmentConfig.DEPLOYMENT_RATE_LIMIT_MAX) ?? new Map()
146+
env.getConfig<Map<EntityType, number>>(EnvironmentConfig.DEPLOYMENT_RATE_LIMIT_MAX) ?? new Map(),
147+
entitiesConfigUnchangedTtl: new Map([[EntityType.PROFILE, ms('5m')]]) // ms, converted to seconds internally
147148
}
148149
)
149150

content/src/ports/deployRateLimiterComponent.ts

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import { AppComponents } from '../types'
77
export type IDeployRateLimiterComponent = {
88
newDeployment(entityType: EntityType, pointers: string[], localTimestamp: number): void
99
isRateLimited(entityType: EntityType, pointers: string[]): boolean
10+
newUnchangedDeployment(entityType: EntityType, pointers: string[], localTimestamp: number): void
11+
isUnchangedDeploymentRateLimited(entityType: EntityType, pointers: string[]): boolean
1012
}
1113

1214
export type DeploymentRateLimitConfig = {
1315
defaultTtl: number
1416
defaultMax: number
1517
entitiesConfigTtl: Map<EntityType, number>
1618
entitiesConfigMax: Map<EntityType, number>
19+
entitiesConfigUnchangedTtl: Map<EntityType, number>
1720
}
1821

1922
export function createDeployRateLimiter(
@@ -27,6 +30,11 @@ export function createDeployRateLimiter(
2730
rateLimitConfig
2831
)
2932

33+
const unchangedDeploymentCacheMap: Map<EntityType, NodeCache> = generateUnchangedDeploymentCacheMap(
34+
logs,
35+
rateLimitConfig
36+
)
37+
3038
function getCacheFromEntityType(entityType: EntityType): { cache: NodeCache; maxSize: number } {
3139
const cache = deploymentCacheMap.get(entityType)
3240
if (!cache) {
@@ -35,10 +43,18 @@ export function createDeployRateLimiter(
3543
return cache
3644
}
3745

46+
function getUnchangedCacheFromEntityType(entityType: EntityType): NodeCache {
47+
const cache = unchangedDeploymentCacheMap.get(entityType)
48+
if (!cache) {
49+
throw new Error(`Invalid Entity Type: ${entityType}`)
50+
}
51+
return cache
52+
}
53+
3854
return {
3955
newDeployment(entityType: EntityType, pointers: string[], localTimestamp: number): void {
4056
const cacheByEntityType = getCacheFromEntityType(entityType)
41-
for (const pointer in pointers) {
57+
for (const pointer of pointers) {
4258
cacheByEntityType.cache.set(pointer, localTimestamp)
4359
}
4460
},
@@ -51,10 +67,46 @@ export function createDeployRateLimiter(
5167
pointers.some((p) => !!cacheByEntityType.cache.get(p)) ||
5268
cacheByEntityType.cache.stats.keys > cacheByEntityType.maxSize
5369
)
70+
},
71+
72+
newUnchangedDeployment(entityType: EntityType, pointers: string[], localTimestamp: number): void {
73+
const cache = getUnchangedCacheFromEntityType(entityType)
74+
for (const pointer of pointers) {
75+
cache.set(pointer, localTimestamp)
76+
}
77+
},
78+
79+
isUnchangedDeploymentRateLimited(entityType: EntityType, pointers: string[]): boolean {
80+
const cache = getUnchangedCacheFromEntityType(entityType)
81+
return pointers.some((p) => !!cache.get(p))
5482
}
5583
}
5684
}
5785

86+
function generateUnchangedDeploymentCacheMap(
87+
logs: ILoggerComponent.ILogger,
88+
rateLimitConfig: DeploymentRateLimitConfig
89+
): Map<EntityType, NodeCache> {
90+
const unchangedCacheMap: Map<EntityType, NodeCache> = new Map()
91+
92+
for (const entityType of Object.values(EntityType)) {
93+
const ttl: number = toSeconds(rateLimitConfig.entitiesConfigUnchangedTtl.get(entityType) ?? 0)
94+
unchangedCacheMap.set(entityType, new NodeCache({ stdTTL: ttl, checkperiod: ttl }))
95+
}
96+
97+
const configEntries: string[] = []
98+
for (const [entityType, cache] of unchangedCacheMap) {
99+
if (cache.options.stdTTL && cache.options.stdTTL > 0) {
100+
configEntries.push(`${entityType}: { unchanged_ttl: ${cache.options.stdTTL} }`)
101+
}
102+
}
103+
if (configEntries.length > 0) {
104+
logs.info(`Unchanged deployment rate limit configured for:\n${configEntries.join('\n')}`)
105+
}
106+
107+
return unchangedCacheMap
108+
}
109+
58110
function generateDeploymentCacheMap(
59111
logs: ILoggerComponent.ILogger,
60112
rateLimitConfig: DeploymentRateLimitConfig
@@ -97,6 +149,13 @@ function toString(deploymentCacheMap: Map<EntityType, { cache: NodeCache; maxSiz
97149
return stringifyMap.join('\n')
98150
}
99151

152+
/**
153+
* Convert milliseconds to seconds for NodeCache stdTTL which expects seconds.
154+
*/
155+
function toSeconds(milliseconds: number): number {
156+
return Math.floor(milliseconds / 1000)
157+
}
158+
100159
function getCacheConfigPerEntityMap(
101160
entitiesConfigMax: Map<EntityType, number>,
102161
entitiesConfigTtl: Map<EntityType, number>
@@ -106,28 +165,28 @@ function getCacheConfigPerEntityMap(
106165
EntityType.PROFILE,
107166
{
108167
max: entitiesConfigMax.get(EntityType.PROFILE) ?? 300,
109-
ttl: entitiesConfigTtl.get(EntityType.PROFILE) ?? ms('1m')
168+
ttl: toSeconds(entitiesConfigTtl.get(EntityType.PROFILE) ?? ms('15s'))
110169
}
111170
],
112171
[
113172
EntityType.SCENE,
114173
{
115174
max: entitiesConfigMax.get(EntityType.SCENE) ?? 100000,
116-
ttl: entitiesConfigTtl.get(EntityType.SCENE) ?? ms('20s')
175+
ttl: toSeconds(entitiesConfigTtl.get(EntityType.SCENE) ?? ms('20s'))
117176
}
118177
],
119178
[
120179
EntityType.WEARABLE,
121180
{
122181
max: entitiesConfigMax.get(EntityType.WEARABLE) ?? 100000,
123-
ttl: entitiesConfigTtl.get(EntityType.WEARABLE) ?? ms('20s')
182+
ttl: toSeconds(entitiesConfigTtl.get(EntityType.WEARABLE) ?? ms('20s'))
124183
}
125184
],
126185
[
127186
EntityType.STORE,
128187
{
129188
max: entitiesConfigMax.get(EntityType.STORE) ?? 300,
130-
ttl: entitiesConfigTtl.get(EntityType.STORE) ?? ms('1m')
189+
ttl: toSeconds(entitiesConfigTtl.get(EntityType.STORE) ?? ms('1m'))
131190
}
132191
]
133192
])

content/src/ports/deployer.ts

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { bufferToStream } from '@dcl/catalyst-storage/dist/content-item'
22
import { AuthChain, Authenticator } from '@dcl/crypto'
33
import { Entity, EntityType, IPFSv2 } from '@dcl/schemas'
4+
import { isDeepStrictEqual } from 'util'
45
import { EnvironmentConfig } from '../Environment'
56
import {
67
AuditInfo,
@@ -24,6 +25,17 @@ export function isIPFSHash(hash: string): boolean {
2425
return IPFSv2.validate(hash)
2526
}
2627

28+
/**
29+
* Compare two entities' metadata using deep equality (order-independent).
30+
* id, timestamp, version and pointers are top-level Entity fields, not
31+
* inside metadata, so they are excluded automatically.
32+
* Since ADR-290, profiles no longer carry content files so only metadata
33+
* is compared.
34+
*/
35+
export function isEntityContentUnchanged(newEntity: Entity, activeEntity: Entity): boolean {
36+
return isDeepStrictEqual(newEntity.metadata, activeEntity.metadata)
37+
}
38+
2739
/**
2840
* This function will take some deployment files and hash them. They might come already hashed, and if that is the case we will just return them.
2941
* They could come hashed because the denylist decorator might have already hashed them for its own validations. In order to avoid re-hashing
@@ -93,12 +105,20 @@ export function createDeployer(
93105
entity: Entity,
94106
auditInfo: LocalDeploymentAuditInfo,
95107
hashes: Map<string, Uint8Array>,
96-
context: DeploymentContext
108+
context: DeploymentContext,
109+
isContentUnchanged: boolean
97110
): Promise<InvalidResult | { auditInfoComplete: AuditInfo; wasEntityDeployed: boolean }> {
98111
const deployedEntity = await getEntityById(database, entityId)
99112
const isEntityAlreadyDeployed = !!deployedEntity
100113

101-
const validationResult = await validateDeployment(entity, context, isEntityAlreadyDeployed, auditInfo, hashes)
114+
const validationResult = await validateDeployment(
115+
entity,
116+
context,
117+
isEntityAlreadyDeployed,
118+
auditInfo,
119+
hashes,
120+
isContentUnchanged
121+
)
102122

103123
if (!validationResult.ok) {
104124
logger.warn(`Validations for deployment failed`, {
@@ -202,14 +222,18 @@ export function createDeployer(
202222
context: DeploymentContext,
203223
isEntityDeployedAlready: boolean,
204224
auditInfo: LocalDeploymentAuditInfo,
205-
hashes: Map<string, Uint8Array>
225+
hashes: Map<string, Uint8Array>,
226+
isContentUnchanged: boolean
206227
): Promise<{ ok: boolean; errors?: string[] }> {
207228
// When deploying a new entity in some context which is not sync, we run some server side checks
208229
const serverValidationResult = await components.serverValidator.validate(entity, context, {
209230
areThereNewerEntities: (entity) => areThereNewerEntitiesOnPointers(entity),
210231
isEntityDeployedAlready: () => isEntityDeployedAlready,
211232
isNotFailedDeployment: (entity) => components.failedDeployments.findFailedDeployment(entity.id) === undefined,
212-
isEntityRateLimited: (entity) => components.deployRateLimiter.isRateLimited(entity.type, entity.pointers),
233+
isEntityRateLimited: (entity) =>
234+
components.deployRateLimiter.isRateLimited(entity.type, entity.pointers) ||
235+
(isContentUnchanged &&
236+
components.deployRateLimiter.isUnchangedDeploymentRateLimited(entity.type, entity.pointers)),
213237
isRequestTtlBackwards: (entity) =>
214238
components.clock.now() - entity.timestamp >
215239
components.env.getConfig<number>(EnvironmentConfig.REQUEST_TTL_BACKWARDS)
@@ -292,6 +316,19 @@ export function createDeployer(
292316

293317
const contextToDeploy: DeploymentContext = calculateIfLegacy(entity, auditInfo.authChain, context)
294318

319+
// Check if the entity content is unchanged from the currently active entity
320+
let isContentUnchanged = false
321+
if (context === DeploymentContext.LOCAL) {
322+
try {
323+
const activeEntities = await components.activeEntities.withPointers(components.database, entity.pointers)
324+
if (activeEntities.length > 0) {
325+
isContentUnchanged = isEntityContentUnchanged(entity, activeEntities[0])
326+
}
327+
} catch (error) {
328+
logger.warn(`Failed to check if entity content is unchanged, assuming changed`, { entityId })
329+
}
330+
}
331+
295332
try {
296333
logger.info(`Deploying entity`, {
297334
entityId,
@@ -304,7 +341,8 @@ export function createDeployer(
304341
entity,
305342
auditInfo,
306343
hashes,
307-
contextToDeploy
344+
contextToDeploy,
345+
isContentUnchanged
308346
)
309347

310348
if (!storeResult) {
@@ -348,6 +386,14 @@ export function createDeployer(
348386
entity.pointers,
349387
storeResult.auditInfoComplete.localTimestamp
350388
)
389+
390+
if (isContentUnchanged) {
391+
components.deployRateLimiter.newUnchangedDeployment(
392+
entity.type,
393+
entity.pointers,
394+
storeResult.auditInfoComplete.localTimestamp
395+
)
396+
}
351397
}
352398

353399
// add the entity to the bloom filter to prevent expensive operations during the sync

content/test/integration/E2ETestEnvironment.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { DAOComponent } from '../../src/ports/dao-servers-getter'
1313
import { IDatabaseComponent, createDatabaseComponent } from '../../src/ports/postgres'
1414
import { AppComponents } from '../../src/types'
1515
import { MockedDAOClient } from '../helpers/service/synchronization/clients/MockedDAOClient'
16+
import { createNoOpDeployRateLimiter } from '../mocks/deploy-rate-limiter-mock'
1617
import { TestProgram } from './TestProgram'
1718

1819
export class E2ETestEnvironment {
@@ -195,6 +196,9 @@ export class ServerBuilder {
195196
jest.spyOn(components.daoClient, 'getAllServers').mockImplementation(() => this.dao.getAllServers())
196197
}
197198

199+
// Override methods on the existing rate limiter object (not replace it)
200+
// because the deployer captures its own reference to the original object
201+
Object.assign(components.deployRateLimiter, createNoOpDeployRateLimiter())
198202
servers[i] = new TestProgram(components)
199203
this.testEnvCalls.registerServer(servers[i])
200204
}

0 commit comments

Comments
 (0)