Skip to content

Commit 2490750

Browse files
authored
chore(2631): Refactor storage service to separate channel and message logic (#2713)
* Create channels service and small fix for loading chain on app start * Separate logic further into channelstore and messagesservice * Add temp placeholders for encryption/decryption * Minor clean up * Expand messages service, tweak getting messages, fix/add tests * Fix subscribing not getting old messages on join * Fix peer list not updating * Add compounderror * Update auth * Update CHANGELOG.md
1 parent 25f21a8 commit 2490750

File tree

18 files changed

+1409
-630
lines changed

18 files changed

+1409
-630
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
### Chores
1111

1212
* Add `trace` level logs to `@quiet/logger` ([#2716](https://github.com/TryQuiet/quiet/issues/2716))
13+
* Refactor the `StorageService` and create `ChannelService`, `MessageService` and `ChannelStore` for handling channel-related persistence ([#2631](https://github.com/TryQuiet/quiet/issues/2631))
1314

1415
## [3.0.0]
1516

packages/backend/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
"mock-fs": "^5.1.2",
8383
"pvutils": "^1.1.3",
8484
"tmp": "^0.2.1",
85-
"pvutils": "^1.1.3",
8685
"ts-jest": "^29.0.3",
8786
"ts-loader": "9.4.2",
8887
"ts-node": "10.9.1",

packages/backend/src/nest/common/types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { type EventsType } from '@orbitdb/core'
2-
import { type ChannelMessage, type PublicChannel } from '@quiet/types'
1+
import { type PublicChannel } from '@quiet/types'
2+
import { ChannelStore } from '../storage/channels/channel.store'
33

44
export interface PublicChannelsRepo {
5-
db: EventsType<ChannelMessage>
5+
store: ChannelStore
66
eventsAttached: boolean
77
}
88

packages/backend/src/nest/common/utils.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import fs from 'fs'
2+
import fsAsync from 'fs/promises'
23
import getPort from 'get-port'
34
import path from 'path'
45
import { Server } from 'socket.io'
@@ -298,19 +299,16 @@ export async function createPeerId(): Promise<CreatedLibp2pPeerId> {
298299
}
299300
}
300301

301-
export const createArbitraryFile = (filePath: string, sizeBytes: number) => {
302-
const stream = fs.createWriteStream(filePath)
302+
export const createArbitraryFile = async (filePath: string, sizeBytes: number) => {
303303
const maxChunkSize = 1048576 // 1MB
304304

305305
let remainingSize = sizeBytes
306306

307307
while (remainingSize > 0) {
308308
const chunkSize = Math.min(maxChunkSize, remainingSize)
309-
stream.write(crypto.randomBytes(chunkSize))
309+
await fsAsync.appendFile(filePath, crypto.randomBytes(chunkSize))
310310
remainingSize -= chunkSize
311311
}
312-
313-
stream.end()
314312
}
315313

316314
export async function* asyncGeneratorFromIterator<T>(asyncIterator: AsyncIterable<T>): AsyncGenerator<T> {

packages/backend/src/nest/connections-manager/connections-manager.service.ts

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
896896
this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, {
897897
certificates: await this.storageService?.loadAllCertificates(),
898898
})
899-
await this.storageService?.loadAllChannels()
899+
await this.storageService?.channels.loadAllChannels()
900900
}
901901
})
902902
this.socketService.on(
@@ -994,7 +994,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
994994
this.socketService.on(
995995
SocketActionTypes.CREATE_CHANNEL,
996996
async (args: CreateChannelPayload, callback: (response?: CreateChannelResponse) => void) => {
997-
callback(await this.storageService?.subscribeToChannel(args.channel))
997+
callback(await this.storageService?.channels.subscribeToChannel(args.channel))
998998
}
999999
)
10001000
this.socketService.on(
@@ -1003,39 +1003,39 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
10031003
payload: { channelId: string; ownerPeerId: string },
10041004
callback: (response: DeleteChannelResponse) => void
10051005
) => {
1006-
callback(await this.storageService?.deleteChannel(payload))
1006+
callback(await this.storageService?.channels.deleteChannel(payload))
10071007
}
10081008
)
10091009
this.socketService.on(
10101010
SocketActionTypes.DELETE_FILES_FROM_CHANNEL,
10111011
async (payload: DeleteFilesFromChannelSocketPayload) => {
10121012
this.logger.info(`socketService - ${SocketActionTypes.DELETE_FILES_FROM_CHANNEL}`)
1013-
await this.storageService?.deleteFilesFromChannel(payload)
1013+
await this.storageService?.channels.deleteFilesFromChannel(payload)
10141014
// await this.deleteFilesFromTemporaryDir() //crashes on mobile, will be fixes in next versions
10151015
}
10161016
)
10171017
this.socketService.on(SocketActionTypes.SEND_MESSAGE, async (args: SendMessagePayload) => {
1018-
await this.storageService?.sendMessage(args.message)
1018+
await this.storageService?.channels.sendMessage(args.message)
10191019
})
10201020
this.socketService.on(
10211021
SocketActionTypes.GET_MESSAGES,
10221022
async (payload: GetMessagesPayload, callback: (response?: MessagesLoadedPayload) => void) => {
1023-
callback(await this.storageService?.getMessages(payload.channelId, payload.ids))
1023+
callback(await this.storageService?.channels.getMessages(payload.channelId, payload.ids))
10241024
}
10251025
)
10261026

10271027
// Files
10281028
this.socketService.on(SocketActionTypes.DOWNLOAD_FILE, async (metadata: FileMetadata) => {
1029-
await this.storageService?.downloadFile(metadata)
1029+
await this.storageService?.channels.downloadFile(metadata)
10301030
})
10311031
this.socketService.on(SocketActionTypes.UPLOAD_FILE, async (metadata: FileMetadata) => {
1032-
await this.storageService?.uploadFile(metadata)
1032+
await this.storageService?.channels.uploadFile(metadata)
10331033
})
10341034
this.socketService.on(SocketActionTypes.FILE_UPLOADED, async (args: FileMetadata) => {
1035-
await this.storageService?.uploadFile(args)
1035+
await this.storageService?.channels.uploadFile(args)
10361036
})
10371037
this.socketService.on(SocketActionTypes.CANCEL_DOWNLOAD, mid => {
1038-
this.storageService?.cancelDownload(mid)
1038+
this.storageService?.channels.cancelDownload(mid)
10391039
})
10401040

10411041
// System
@@ -1051,46 +1051,48 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
10511051

10521052
private attachStorageListeners() {
10531053
if (!this.storageService) return
1054-
this.storageService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
1055-
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
1056-
})
1057-
this.storageService.on(StorageEvents.CERTIFICATES_STORED, (payload: SendCertificatesResponse) => {
1058-
this.logger.info(`Storage - ${StorageEvents.CERTIFICATES_STORED}`)
1059-
this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, payload)
1060-
})
1061-
this.storageService.on(StorageEvents.CHANNELS_STORED, (payload: ChannelsReplicatedPayload) => {
1054+
// Channel and Message Events
1055+
this.storageService.channels.on(StorageEvents.CHANNELS_STORED, (payload: ChannelsReplicatedPayload) => {
10621056
this.serverIoProvider.io.emit(SocketActionTypes.CHANNELS_STORED, payload)
10631057
})
1064-
this.storageService.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => {
1058+
this.storageService.channels.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => {
10651059
this.serverIoProvider.io.emit(SocketActionTypes.MESSAGES_STORED, payload)
10661060
})
1067-
this.storageService.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => {
1061+
this.storageService.channels.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => {
10681062
if (payload.ids.length === 0) {
10691063
return
10701064
}
10711065
this.serverIoProvider.io.emit(SocketActionTypes.MESSAGE_IDS_STORED, payload)
10721066
})
1073-
this.storageService.on(StorageEvents.CHANNEL_SUBSCRIBED, (payload: ChannelSubscribedPayload) => {
1067+
this.storageService.channels.on(StorageEvents.CHANNEL_SUBSCRIBED, (payload: ChannelSubscribedPayload) => {
10741068
this.serverIoProvider.io.emit(SocketActionTypes.CHANNEL_SUBSCRIBED, payload)
10751069
})
1076-
this.storageService.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, (payload: RemoveDownloadStatus) => {
1070+
this.storageService.channels.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, (payload: RemoveDownloadStatus) => {
10771071
this.serverIoProvider.io.emit(SocketActionTypes.REMOVE_DOWNLOAD_STATUS, payload)
10781072
})
1079-
this.storageService.on(StorageEvents.FILE_UPLOADED, (payload: UploadFilePayload) => {
1073+
this.storageService.channels.on(StorageEvents.FILE_UPLOADED, (payload: UploadFilePayload) => {
10801074
this.serverIoProvider.io.emit(SocketActionTypes.FILE_UPLOADED, payload)
10811075
})
1082-
this.storageService.on(StorageEvents.DOWNLOAD_PROGRESS, (payload: DownloadStatus) => {
1076+
this.storageService.channels.on(StorageEvents.DOWNLOAD_PROGRESS, (payload: DownloadStatus) => {
10831077
this.serverIoProvider.io.emit(SocketActionTypes.DOWNLOAD_PROGRESS, payload)
10841078
})
1085-
this.storageService.on(StorageEvents.MESSAGE_MEDIA_UPDATED, (payload: FileMetadata) => {
1079+
this.storageService.channels.on(StorageEvents.MESSAGE_MEDIA_UPDATED, (payload: FileMetadata) => {
10861080
this.serverIoProvider.io.emit(SocketActionTypes.MESSAGE_MEDIA_UPDATED, payload)
10871081
})
1082+
this.storageService.channels.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => {
1083+
this.serverIoProvider.io.emit(SocketActionTypes.PUSH_NOTIFICATION, payload)
1084+
})
1085+
// Other Events
1086+
this.storageService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
1087+
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
1088+
})
1089+
this.storageService.on(StorageEvents.CERTIFICATES_STORED, (payload: SendCertificatesResponse) => {
1090+
this.logger.info(`Storage - ${StorageEvents.CERTIFICATES_STORED}`)
1091+
this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, payload)
1092+
})
10881093
this.storageService.on(StorageEvents.COMMUNITY_UPDATED, (payload: Community) => {
10891094
this.serverIoProvider.io.emit(SocketActionTypes.COMMUNITY_UPDATED, payload)
10901095
})
1091-
this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => {
1092-
this.serverIoProvider.io.emit(SocketActionTypes.PUSH_NOTIFICATION, payload)
1093-
})
10941096
this.storageService.on(StorageEvents.CSRS_STORED, async (payload: { csrs: string[] }) => {
10951097
this.logger.info(`Storage - ${StorageEvents.CSRS_STORED}`)
10961098
const users = await getUsersFromCsrs(payload.csrs)

packages/backend/src/nest/ipfs-file-manager/big-files.long.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ describe('IpfsFileManagerService', () => {
3333
tmpDir = createTmpDir()
3434
filePath = new URL('./testUtils/large-file.txt', import.meta.url).pathname
3535
// Generate 2.1GB file
36-
createArbitraryFile(filePath, BIG_FILE_SIZE)
36+
await createArbitraryFile(filePath, BIG_FILE_SIZE)
3737
module = await Test.createTestingModule({
3838
imports: [TestModule, IpfsFileManagerModule, IpfsModule, SocketModule, Libp2pModule],
3939
}).compile()

packages/backend/src/nest/storage/base.store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ abstract class StoreBase<V, S extends KeyValueType<V> | EventsType<V>> extends E
2424
logger.info('Closed', this.getAddress())
2525
}
2626

27-
abstract init(): Promise<void>
27+
abstract init(...args: any[]): Promise<void> | Promise<StoreBase<V, S>>
2828
abstract clean(): void
2929
}
3030

0 commit comments

Comments
 (0)