Skip to content

Commit cb99b82

Browse files
authored
UBERF-11664 Use LRU cache in Datalake (#9658)
Signed-off-by: Alexander Onnikov <[email protected]>
1 parent ea81ec0 commit cb99b82

File tree

8 files changed

+213
-16
lines changed

8 files changed

+213
-16
lines changed

common/config/rush/pnpm-lock.yaml

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/datalake/pod-datalake/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"@aws-sdk/lib-storage": "^3.738.0",
7878
"@smithy/node-http-handler": "^4.0.2",
7979
"morgan": "^1.10.0",
80-
"on-headers": "^1.0.2"
80+
"on-headers": "^1.0.2",
81+
"lru-cache": "^11.1.0"
8182
}
8283
}

services/datalake/pod-datalake/src/config.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ export interface BucketConfig {
2222
region?: string
2323
}
2424

25+
export interface CacheConfig {
26+
enabled: boolean
27+
blobSize: number
28+
blobCount: number
29+
}
30+
2531
export interface Config {
2632
Port: number
2733
Secret: string
@@ -30,6 +36,7 @@ export interface Config {
3036
Buckets: BucketConfig[]
3137
CleanupInterval: number
3238
Readonly: boolean
39+
Cache: CacheConfig
3340
}
3441

3542
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@@ -79,7 +86,12 @@ const config: Config = (() => {
7986
AccountsUrl: process.env.ACCOUNTS_URL,
8087
DbUrl: process.env.DB_URL,
8188
Buckets: parseBucketsConfig(process.env.BUCKETS),
82-
Readonly: process.env.READONLY === 'true'
89+
Readonly: process.env.READONLY === 'true',
90+
Cache: {
91+
enabled: process.env.CACHE_ENABLED !== 'false',
92+
blobSize: (parseNumber(process.env.CACHE_BLOB_SIZE) ?? 64) * 1024, // Default 64KB
93+
blobCount: parseNumber(process.env.CACHE_BLOB_COUNT) ?? 1000
94+
}
8395
}
8496

8597
const missingEnv = (Object.keys(params) as Array<keyof Config>).filter((key) => params[key] === undefined)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
//
2+
// Copyright © 2025 Hardcore Engineering Inc.
3+
//
4+
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License. You may
6+
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
//
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
import { LRUCache } from 'lru-cache'
17+
import { Readable } from 'stream'
18+
19+
import { type CacheConfig } from '../config'
20+
import { type BlobBody } from './types'
21+
22+
export type CacheEntry = Omit<BlobBody, 'body'> & {
23+
body: Buffer
24+
}
25+
26+
export interface Cache {
27+
get: (key: string) => CacheEntry | undefined
28+
set: (key: string, value: CacheEntry) => void
29+
delete: (key: string) => void
30+
}
31+
32+
class CacheImpl implements Cache {
33+
private readonly cache: LRUCache<string, CacheEntry>
34+
35+
constructor (options: CacheConfig) {
36+
this.cache = new LRUCache({
37+
max: options.blobCount
38+
})
39+
}
40+
41+
get (key: string): CacheEntry | undefined {
42+
return this.cache.get(key)
43+
}
44+
45+
set (key: string, value: CacheEntry): void {
46+
this.cache.set(key, value)
47+
}
48+
49+
delete (key: string): void {
50+
this.cache.delete(key)
51+
}
52+
}
53+
54+
class NoopCache implements Cache {
55+
get (): undefined {
56+
return undefined
57+
}
58+
59+
set (): void {
60+
// noop
61+
}
62+
63+
delete (): void {
64+
// noop
65+
}
66+
}
67+
68+
export async function streamToBuffer (data: Buffer | Readable): Promise<Buffer> {
69+
if (Buffer.isBuffer(data)) {
70+
return data
71+
}
72+
73+
const chunks: Buffer[] = []
74+
for await (const chunk of data) {
75+
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
76+
}
77+
return Buffer.concat(chunks)
78+
}
79+
80+
export function createCache (options: CacheConfig): Cache {
81+
if (options.enabled) {
82+
return new CacheImpl(options)
83+
}
84+
return new NoopCache()
85+
}

services/datalake/pod-datalake/src/datalake/datalake.ts

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@ import { type MeasureContext, type Tx, WorkspaceUuid } from '@hcengineering/core
1717
import { PlatformQueueProducer } from '@hcengineering/server-core'
1818
import { Readable } from 'stream'
1919

20+
import { type Cache, type CacheEntry, createCache, streamToBuffer } from './cache'
2021
import { type BlobDB, WorkspaceStatsResult } from './db'
2122
import { digestToUUID, stringToUUID } from './encodings'
2223
import { type BlobHead, type BlobBody, type BlobList, type BlobStorage, type Datalake, type Location } from './types'
2324
import { type S3Bucket } from '../s3'
2425
import { blobEvents } from './queue'
26+
import { CacheConfig } from '../config'
2527

2628
export class DatalakeImpl implements Datalake {
29+
private readonly cache: Cache
30+
2731
constructor (
2832
private readonly db: BlobDB,
2933
private readonly buckets: Array<{ location: Location, bucket: S3Bucket }>,
3034
private readonly producer: PlatformQueueProducer<Tx>,
3135
private readonly options: {
3236
cacheControl: string
37+
cache: CacheConfig
3338
}
34-
) {}
39+
) {
40+
this.cache = createCache(options.cache)
41+
}
3542

3643
async list (
3744
ctx: MeasureContext,
@@ -82,6 +89,14 @@ export class DatalakeImpl implements Datalake {
8289
return null
8390
}
8491

92+
const cached = this.cache.get(blob.hash)
93+
if (cached !== undefined) {
94+
return {
95+
...cached,
96+
body: Readable.from(cached.body)
97+
}
98+
}
99+
85100
const { bucket } = await this.selectStorage(ctx, workspace, blob.location)
86101

87102
const range = options.range
@@ -90,7 +105,7 @@ export class DatalakeImpl implements Datalake {
90105
return null
91106
}
92107

93-
return {
108+
const result = {
94109
name: blob.name,
95110
etag: blob.hash,
96111
size: blob.size,
@@ -102,6 +117,12 @@ export class DatalakeImpl implements Datalake {
102117
lastModified: object.lastModified,
103118
cacheControl: object.cacheControl
104119
}
120+
121+
if (this.options.cache.enabled && object.size <= this.options.cache.blobSize) {
122+
this.cache.set(blob.hash, { ...result, body: await streamToBuffer(object.body) })
123+
}
124+
125+
return result
105126
}
106127

107128
async delete (ctx: MeasureContext, workspace: WorkspaceUuid, name: string | string[]): Promise<void> {
@@ -136,10 +157,11 @@ export class DatalakeImpl implements Datalake {
136157

137158
const hash = digestToUUID(sha256)
138159
const filename = hash
160+
const etag = hash
139161

140162
// Check if we have the same blob already
141163
if (blob?.hash === hash && blob?.type === contentType) {
142-
return { name, size, contentType, lastModified, etag: hash }
164+
return { name, size, contentType, lastModified, etag }
143165
}
144166

145167
const data = await this.db.getData(ctx, { hash, location })
@@ -151,35 +173,53 @@ export class DatalakeImpl implements Datalake {
151173
try {
152174
const event =
153175
blob != null
154-
? blobEvents.updated(name, { contentType, lastModified, size, etag: hash })
155-
: blobEvents.created(name, { contentType, lastModified, size, etag: hash })
176+
? blobEvents.updated(name, { contentType, lastModified, size, etag })
177+
: blobEvents.created(name, { contentType, lastModified, size, etag })
156178
await this.producer.send(workspace, [event])
157179
} catch (err) {
158180
ctx.error('failed to send blob created event', { workspace, name, err })
159181
}
160182

161-
return { name, size, contentType, lastModified, etag: hash }
183+
return { name, size, contentType, lastModified, etag }
162184
} else {
163185
const putOptions = {
164186
contentLength: size,
165187
contentType,
166188
cacheControl,
167189
lastModified
168190
}
169-
await bucket.put(ctx, filename, body, putOptions)
191+
192+
let data: Readable | Buffer = body
193+
194+
if (this.options.cache.enabled && size <= this.options.cache.blobSize) {
195+
data = await streamToBuffer(body)
196+
const entry: CacheEntry = {
197+
body: data,
198+
bodyLength: data.length,
199+
bodyEtag: etag,
200+
size,
201+
name,
202+
etag,
203+
...putOptions
204+
}
205+
this.cache.set(hash, entry)
206+
}
207+
208+
await bucket.put(ctx, filename, data, putOptions)
170209
await this.db.createBlobData(ctx, { workspace, name, hash, location, filename, size, type: contentType })
171210

172211
try {
173212
const event =
174213
blob != null
175-
? blobEvents.updated(name, { contentType, lastModified, size, etag: hash })
176-
: blobEvents.created(name, { contentType, lastModified, size, etag: hash })
214+
? blobEvents.updated(name, { contentType, lastModified, size, etag })
215+
: blobEvents.created(name, { contentType, lastModified, size, etag })
177216
await this.producer.send(workspace, [event])
178217
} catch (err) {
218+
this.cache.delete(hash)
179219
ctx.error('failed to send blob created event', { workspace, name, err })
180220
}
181221

182-
return { name, size, contentType, lastModified, etag: hash }
222+
return { name, size, contentType, lastModified, etag }
183223
}
184224
}
185225

services/datalake/pod-datalake/src/datalake/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414
//
1515

16+
export * from './cache'
1617
export * from './datalake'
1718
export * from './types'
1819
export * from './utils'

0 commit comments

Comments
 (0)