Skip to content

Commit ac39c9a

Browse files
committed
Refactor encryption to use workers
1 parent 3209476 commit ac39c9a

30 files changed

+856
-158
lines changed

packages/sdk/jest.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const config: Config.InitialOptions = {
1313
moduleNameMapper: {
1414
"^@/createSignatureValidationWorker$": "<rootDir>/src/_jest/createSignatureValidationWorker.ts",
1515
"^@/createSigningWorker$": "<rootDir>/src/_jest/createSigningWorker.ts",
16+
"^@/createEncryptionWorker$": "<rootDir>/src/_jest/createEncryptionWorker.ts",
1617
"^@/(.*)$": "<rootDir>/src/_nodejs/$1",
1718
},
1819
transform: {

packages/sdk/rollup.config.mts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const browserAliases: Alias[] = [
3636
const WORKERS: Record<string, string> = {
3737
'SignatureValidationWorker': 'signature/SignatureValidationWorker',
3838
'SigningWorker': 'signature/SigningWorker',
39+
'EncryptionWorker': 'encryption/EncryptionWorker',
3940
}
4041

4142
export default defineConfig([
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/**
2+
* Browser-specific encryption worker factory.
3+
*/
4+
import Worker from 'web-worker'
5+
6+
export function createEncryptionWorker(): InstanceType<typeof Worker> {
7+
return new Worker(
8+
new URL('./workers/EncryptionWorker.browser.mjs', import.meta.url),
9+
{ type: 'module' }
10+
)
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/**
2+
* Jest-specific encryption worker factory.
3+
* Points to the built worker in dist/ for testing.
4+
*/
5+
import Worker from 'web-worker'
6+
7+
export function createEncryptionWorker(): InstanceType<typeof Worker> {
8+
return new Worker(
9+
new URL('../../dist/workers/EncryptionWorker.node.mjs', import.meta.url),
10+
{ type: 'module' }
11+
)
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/**
2+
* Karma-specific encryption worker factory.
3+
* Points to the built worker in dist/ for browser testing.
4+
*/
5+
import Worker from 'web-worker'
6+
7+
export function createEncryptionWorker(): InstanceType<typeof Worker> {
8+
return new Worker(
9+
new URL('../../dist/workers/EncryptionWorker.browser.mjs', import.meta.url),
10+
{ type: 'module' }
11+
)
12+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/**
2+
* Node.js-specific encryption worker factory.
3+
*/
4+
import Worker from 'web-worker'
5+
6+
export function createEncryptionWorker(): InstanceType<typeof Worker> {
7+
return new Worker(
8+
new URL('./workers/EncryptionWorker.node.mjs', import.meta.url),
9+
{ type: 'module' }
10+
)
11+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
* Singleton encryption service using Web Worker.
3+
* This offloads CPU-intensive AES encryption operations to a separate thread.
4+
* Works in both browser and Node.js environments via platform-specific config.
5+
*
6+
* The worker is lazily initialized on first use and shared across all consumers.
7+
*/
8+
import { wrap, releaseProxy, transfer, type Remote } from 'comlink'
9+
import { Lifecycle, scoped } from 'tsyringe'
10+
import { EncryptedGroupKey } from '@streamr/trackerless-network'
11+
import { createEncryptionWorker } from '@/createEncryptionWorker'
12+
import type { EncryptionWorkerApi } from './EncryptionWorker'
13+
import { DestroySignal } from '../DestroySignal'
14+
import { StreamrClientError } from '../StreamrClientError'
15+
import { GroupKey } from './GroupKey'
16+
17+
@scoped(Lifecycle.ContainerScoped)
18+
export class EncryptionService {
19+
private worker: ReturnType<typeof createEncryptionWorker> | undefined
20+
private workerApi: Remote<EncryptionWorkerApi> | undefined
21+
22+
constructor(destroySignal: DestroySignal) {
23+
destroySignal.onDestroy.listen(() => this.destroy())
24+
}
25+
26+
private getWorkerApi(): Remote<EncryptionWorkerApi> {
27+
if (this.workerApi === undefined) {
28+
this.worker = createEncryptionWorker()
29+
this.workerApi = wrap<EncryptionWorkerApi>(this.worker)
30+
}
31+
return this.workerApi
32+
}
33+
34+
/**
35+
* Encrypt data using AES-256-CTR.
36+
* Note: The input data buffer is transferred to the worker and becomes unusable after this call.
37+
*/
38+
async encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Promise<Uint8Array> {
39+
const result = await this.getWorkerApi().encrypt(
40+
transfer({ data, cipherKey }, [data.buffer])
41+
)
42+
if (result.type === 'error') {
43+
throw new Error(`AES encryption failed: ${result.message}`)
44+
}
45+
return result.data
46+
}
47+
48+
/**
49+
* Decrypt AES-256-CTR encrypted data.
50+
* Note: The input cipher buffer is transferred to the worker and becomes unusable after this call.
51+
*/
52+
async decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Promise<Uint8Array> {
53+
const result = await this.getWorkerApi().decrypt(
54+
transfer({ cipher, cipherKey }, [cipher.buffer])
55+
)
56+
if (result.type === 'error') {
57+
throw new Error(`AES decryption failed: ${result.message}`)
58+
}
59+
return result.data
60+
}
61+
62+
/**
63+
* Encrypt the next group key using the current group key.
64+
*/
65+
async encryptNextGroupKey(currentKey: GroupKey, nextKey: GroupKey): Promise<EncryptedGroupKey> {
66+
const result = await this.getWorkerApi().encryptGroupKey({
67+
nextGroupKeyId: nextKey.id,
68+
nextGroupKeyData: nextKey.data,
69+
currentGroupKeyData: currentKey.data
70+
})
71+
if (result.type === 'error') {
72+
throw new Error(`Group key encryption failed: ${result.message}`)
73+
}
74+
return {
75+
id: result.id,
76+
data: result.data
77+
}
78+
}
79+
80+
/**
81+
* Decrypt an encrypted group key using the current group key.
82+
*/
83+
async decryptNextGroupKey(currentKey: GroupKey, encryptedKey: EncryptedGroupKey): Promise<GroupKey> {
84+
const result = await this.getWorkerApi().decryptGroupKey({
85+
encryptedGroupKeyId: encryptedKey.id,
86+
encryptedGroupKeyData: encryptedKey.data,
87+
currentGroupKeyData: currentKey.data
88+
})
89+
if (result.type === 'error') {
90+
throw new Error(`Group key decryption failed: ${result.message}`)
91+
}
92+
return new GroupKey(result.id, Buffer.from(result.data))
93+
}
94+
95+
/**
96+
* Decrypt a stream message's content and optionally the new group key.
97+
* This combines both operations for efficiency when processing messages.
98+
* Note: The input content buffer is transferred to the worker and becomes unusable after this call.
99+
*/
100+
async decryptStreamMessage(
101+
content: Uint8Array,
102+
groupKey: GroupKey,
103+
encryptedNewGroupKey?: EncryptedGroupKey
104+
): Promise<[Uint8Array, GroupKey?]> {
105+
const request = {
106+
content,
107+
groupKeyData: groupKey.data,
108+
newGroupKey: encryptedNewGroupKey ? {
109+
id: encryptedNewGroupKey.id,
110+
data: encryptedNewGroupKey.data
111+
} : undefined
112+
}
113+
const result = await this.getWorkerApi().decryptStreamMessage(
114+
transfer(request, [content.buffer])
115+
)
116+
if (result.type === 'error') {
117+
throw new StreamrClientError(`AES decryption failed: ${result.message}`, 'DECRYPT_ERROR')
118+
}
119+
120+
let newGroupKey: GroupKey | undefined
121+
if (result.newGroupKey) {
122+
newGroupKey = new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data))
123+
}
124+
125+
return [result.content, newGroupKey]
126+
}
127+
128+
destroy(): void {
129+
if (this.workerApi !== undefined) {
130+
this.workerApi[releaseProxy]()
131+
this.workerApi = undefined
132+
}
133+
if (this.worker !== undefined) {
134+
this.worker.terminate()
135+
this.worker = undefined
136+
}
137+
}
138+
}

packages/sdk/src/encryption/EncryptionUtil.ts

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
import { ml_kem1024 } from '@noble/post-quantum/ml-kem'
22
import { randomBytes } from '@noble/post-quantum/utils'
3-
import { StreamMessageAESEncrypted } from '../protocol/StreamMessage'
4-
import { StreamrClientError } from '../StreamrClientError'
5-
import { GroupKey } from './GroupKey'
63
import { AsymmetricEncryptionType } from '@streamr/trackerless-network'
7-
import { binaryToUtf8, createCipheriv, createDecipheriv, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils'
8-
9-
export const INITIALIZATION_VECTOR_LENGTH = 16
4+
import { binaryToUtf8, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils'
5+
import { decryptWithAES, encryptWithAES } from './aesUtils'
106

117
const INFO = Buffer.from('streamr-key-exchange')
128
const KEM_CIPHER_LENGTH_BYTES = 1568
139
const KDF_SALT_LENGTH_BYTES = 64
1410

11+
/**
12+
* Asymmetric encryption utility class for RSA and ML-KEM (post-quantum) key exchange.
13+
*
14+
* For AES symmetric encryption of stream messages, use EncryptionService instead.
15+
* This class only handles asymmetric encryption for key exchange operations.
16+
*/
1517
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
1618
export class EncryptionUtil {
1719
/**
@@ -116,7 +118,7 @@ export class EncryptionUtil {
116118
const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt)
117119

118120
// Encrypt plaintext with the AES wrapping key
119-
const aesEncryptedPlaintext = this.encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey))
121+
const aesEncryptedPlaintext = encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey))
120122

121123
// Concatenate the deliverables into a binary package
122124
return Buffer.concat([kemCipher, kdfSalt, aesEncryptedPlaintext])
@@ -138,44 +140,6 @@ export class EncryptionUtil {
138140
const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt)
139141

140142
// Decrypt the aesEncryptedPlaintext
141-
return this.decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey))
142-
}
143-
144-
/*
145-
* Returns a hex string without the '0x' prefix.
146-
*/
147-
static encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array {
148-
const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode
149-
const cipher = createCipheriv('aes-256-ctr', cipherKey, iv)
150-
return Buffer.concat([iv, cipher.update(data), cipher.final()])
151-
}
152-
153-
/*
154-
* 'ciphertext' must be a hex string (without '0x' prefix), 'groupKey' must be a GroupKey. Returns a Buffer.
155-
*/
156-
static decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Buffer {
157-
const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH)
158-
const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv)
159-
return Buffer.concat([decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()])
160-
}
161-
162-
static decryptStreamMessage(streamMessage: StreamMessageAESEncrypted, groupKey: GroupKey): [Uint8Array, GroupKey?] | never {
163-
let content: Uint8Array
164-
try {
165-
content = this.decryptWithAES(streamMessage.content, groupKey.data)
166-
} catch {
167-
throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage)
168-
}
169-
170-
let newGroupKey: GroupKey | undefined = undefined
171-
if (streamMessage.newGroupKey) {
172-
try {
173-
newGroupKey = groupKey.decryptNextGroupKey(streamMessage.newGroupKey)
174-
} catch {
175-
throw new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', streamMessage)
176-
}
177-
}
178-
179-
return [content, newGroupKey]
143+
return decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey))
180144
}
181145
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Web Worker for AES encryption operations.
3+
* Offloads CPU-intensive cryptographic operations to a separate thread.
4+
*/
5+
import { expose, transfer } from 'comlink'
6+
import { decryptWithAES, encryptWithAES } from './aesUtils'
7+
import {
8+
encryptNextGroupKey,
9+
decryptNextGroupKey,
10+
decryptStreamMessageContent,
11+
AESEncryptRequest,
12+
AESDecryptRequest,
13+
EncryptGroupKeyRequest,
14+
DecryptGroupKeyRequest,
15+
DecryptStreamMessageRequest,
16+
AESEncryptResult,
17+
AESDecryptResult,
18+
EncryptGroupKeyResult,
19+
DecryptGroupKeyResult,
20+
DecryptStreamMessageResult
21+
} from './encryptionUtils'
22+
23+
const workerApi = {
24+
encrypt: async (request: AESEncryptRequest): Promise<AESEncryptResult> => {
25+
try {
26+
const result = encryptWithAES(request.data, request.cipherKey)
27+
return transfer({ type: 'success', data: result }, [result.buffer])
28+
} catch (err) {
29+
return { type: 'error', message: String(err) }
30+
}
31+
},
32+
33+
decrypt: async (request: AESDecryptRequest): Promise<AESDecryptResult> => {
34+
try {
35+
const result = decryptWithAES(request.cipher, request.cipherKey)
36+
return transfer({ type: 'success', data: result }, [result.buffer])
37+
} catch (err) {
38+
return { type: 'error', message: String(err) }
39+
}
40+
},
41+
42+
encryptGroupKey: async (request: EncryptGroupKeyRequest): Promise<EncryptGroupKeyResult> => {
43+
try {
44+
const result = encryptNextGroupKey(
45+
request.nextGroupKeyId,
46+
request.nextGroupKeyData,
47+
request.currentGroupKeyData
48+
)
49+
return transfer(
50+
{ type: 'success', id: result.id, data: result.data },
51+
[result.data.buffer]
52+
)
53+
} catch (err) {
54+
return { type: 'error', message: String(err) }
55+
}
56+
},
57+
58+
decryptGroupKey: async (request: DecryptGroupKeyRequest): Promise<DecryptGroupKeyResult> => {
59+
try {
60+
const result = decryptNextGroupKey(
61+
request.encryptedGroupKeyId,
62+
request.encryptedGroupKeyData,
63+
request.currentGroupKeyData
64+
)
65+
return transfer(
66+
{ type: 'success', id: result.id, data: result.data },
67+
[result.data.buffer]
68+
)
69+
} catch (err) {
70+
return { type: 'error', message: String(err) }
71+
}
72+
},
73+
74+
decryptStreamMessage: async (request: DecryptStreamMessageRequest): Promise<DecryptStreamMessageResult> => {
75+
try {
76+
const result = decryptStreamMessageContent(
77+
request.content,
78+
request.groupKeyData,
79+
request.newGroupKey
80+
)
81+
const transferables: ArrayBuffer[] = [result.content.buffer as ArrayBuffer]
82+
if (result.newGroupKey) {
83+
transferables.push(result.newGroupKey.data.buffer as ArrayBuffer)
84+
}
85+
return transfer(
86+
{ type: 'success', content: result.content, newGroupKey: result.newGroupKey },
87+
transferables
88+
)
89+
} catch (err) {
90+
return { type: 'error', message: String(err) }
91+
}
92+
}
93+
}
94+
95+
export type EncryptionWorkerApi = typeof workerApi
96+
97+
expose(workerApi)

0 commit comments

Comments
 (0)