Skip to content

Commit 0d6f2d5

Browse files
committed
fix: DataStores now use one message per a place instead of many, which cuts down on subscription costs
1 parent c31ce90 commit 0d6f2d5

File tree

7 files changed

+359
-39
lines changed

7 files changed

+359
-39
lines changed

pnpm-lock.yaml

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/datastore/src/Server/DataStore.lua

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ local PromiseMaidUtils = require("PromiseMaidUtils")
8181
local PromiseRetryUtils = require("PromiseRetryUtils")
8282
local PromiseUtils = require("PromiseUtils")
8383
local Rx = require("Rx")
84+
local ServiceBag = require("ServiceBag")
8485
local Signal = require("Signal")
8586
local Symbol = require("Symbol")
8687
local ValueObject = require("ValueObject")
@@ -179,16 +180,41 @@ function DataStore.SetSessionLockingEnabled(self: DataStore, sessionLockingEnabl
179180
self._sessionLockingEnabledHelper = DataStoreLockHelper.new(self)
180181
self._maid._sessionLockingEnabledHelper = self._sessionLockingEnabledHelper
181182
end
183+
else
184+
self._sessionLockingEnabledHelper = nil
185+
self._maid._sessionLockingEnabledHelper = nil
186+
end
187+
end
188+
189+
--[=[
190+
Sets session messaging enabled.
182191
183-
if not self._sessionMessagingEnabledHelper then
184-
self._sessionMessagingEnabledHelper = DataStoreMessageHelper.new(self)
192+
Currently this only works in conjunction with session locking, and allows for a session lock
193+
to gracefully close when requested by another session.
194+
195+
@param isEnabled boolean
196+
@param serviceBag ServiceBag -- Required when enabling
197+
]=]
198+
function DataStore.SetSessionMessagingEnabled(
199+
self: DataStore,
200+
isEnabled: boolean,
201+
serviceBag: ServiceBag.ServiceBag
202+
): ()
203+
if isEnabled then
204+
assert(serviceBag, "Must provide serviceBag when enabling session messaging")
205+
206+
if
207+
self._sessionMessagingEnabledHelper == nil
208+
or self._sessionMessagingEnabledHelper:GetServiceBag() ~= serviceBag
209+
then
210+
self._maid._sessionMessagingEnabledHelper = nil
211+
self._sessionMessagingEnabledHelper = nil
212+
213+
self._sessionMessagingEnabledHelper = DataStoreMessageHelper.new(serviceBag, self)
185214
self._maid._sessionMessagingEnabledHelper = self._sessionMessagingEnabledHelper
186215
end
187216
else
188-
self._sessionLockingEnabledHelper = nil
189217
self._sessionMessagingEnabledHelper = nil
190-
191-
self._maid._sessionLockingEnabledHelper = nil
192218
self._maid._sessionMessagingEnabledHelper = nil
193219
end
194220
end
@@ -618,7 +644,11 @@ function DataStore._promiseGetAsyncNoCache(self: DataStore): Promise.Promise<()>
618644
if self._sessionMessagingEnabledHelper and tryMessagingServiceSessionClose then
619645
-- Gracefully kick to avoid losing memory
620646
self._sessionMessagingEnabledHelper
621-
:PromiseCloseSessionGraceful(lockResult.blockingSession.SessionId)
647+
:PromiseCloseSessionGraceful(
648+
lockResult.blockingSession.PlaceId,
649+
lockResult.blockingSession.JobId,
650+
lockResult.blockingSession.SessionId
651+
)
622652
:Then(function()
623653
-- Give enough time for Roblox to replicate changes
624654
-- We probably could bump back to the loop but this has slightly better error messages

src/datastore/src/Server/DataStoreMessageHelper.lua

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
--!strict
2-
32
--[=[
43
@class DataStoreMessageHelper
54
]=]
@@ -8,8 +7,10 @@ local require = require(script.Parent.loader).load(script)
87

98
local BaseObject = require("BaseObject")
109
local MessagingServiceUtils = require("MessagingServiceUtils")
10+
local PlaceMessagingService = require("PlaceMessagingService")
1111
local Promise = require("Promise")
1212
local PromiseMaidUtils = require("PromiseMaidUtils")
13+
local ServiceBag = require("ServiceBag")
1314

1415
local DEBUG_LOG = false
1516

@@ -21,6 +22,8 @@ export type DataStoreMessageHelper =
2122
typeof(setmetatable(
2223
{} :: {
2324
_dataStore: any,
25+
_serviceBag: ServiceBag.ServiceBag,
26+
_placeMessagingService: PlaceMessagingService.PlaceMessagingService,
2427
_sessionClosedNotifications: { [string]: Promise.Promise<()> },
2528
},
2629
{} :: typeof({ __index = DataStoreMessageHelper })
@@ -38,30 +41,39 @@ export type CloseSessionComplete = {
3841

3942
export type DataStoreMessage = CloseSessionRequest | CloseSessionComplete
4043

41-
function DataStoreMessageHelper.new(dataStore: any): DataStoreMessageHelper
44+
function DataStoreMessageHelper.new(serviceBag: ServiceBag.ServiceBag, dataStore: any): DataStoreMessageHelper
4245
local self: DataStoreMessageHelper = setmetatable(BaseObject.new() :: any, DataStoreMessageHelper)
4346

47+
self._serviceBag = assert(serviceBag, "No serviceBag")
48+
self._placeMessagingService = self._serviceBag:GetService(PlaceMessagingService :: any)
49+
4450
self._dataStore = assert(dataStore, "No dataStore")
4551
self._sessionClosedNotifications = {}
4652

47-
-- Explicitly don't give to maid so wecan disconnect the subscription before destroying
48-
MessagingServiceUtils.promiseSubscribe(self:_getActiveStoreTopic(self._dataStore:GetSessionId()), function(data)
49-
self:_handleSubscription(data)
50-
end):Then(function(subscription)
51-
if not self.Destroy then
52-
subscription:Disconnect()
53-
end
54-
55-
self._maid:GiveTask(function()
56-
subscription:Disconnect()
57-
end)
58-
end)
53+
self._maid:GiveTask(
54+
self._placeMessagingService
55+
:ObserveMessages(self:_getActiveStoreTopic(self._dataStore:GetSessionId()))
56+
:Subscribe(function(data, metadata)
57+
self:_handleSubscription(data, metadata)
58+
end)
59+
)
5960

6061
return self
6162
end
6263

64+
--[=[
65+
Returns the service bag used by this helper. (Used for comparison)
66+
67+
@return ServiceBag
68+
]=]
69+
function DataStoreMessageHelper.GetServiceBag(self: DataStoreMessageHelper): ServiceBag.ServiceBag
70+
return self._serviceBag
71+
end
72+
6373
function DataStoreMessageHelper.PromiseCloseSessionGraceful(
6474
self: DataStoreMessageHelper,
75+
placeId: number,
76+
jobId: string,
6577
sessionId: string
6678
): Promise.Promise<()>
6779
local promise: any = self._sessionClosedNotifications[sessionId]
@@ -84,7 +96,7 @@ function DataStoreMessageHelper.PromiseCloseSessionGraceful(
8496
end))
8597
end)
8698

87-
return self:PromiseMessage(sessionId, {
99+
return self:PromiseMessage(placeId, jobId, sessionId, {
88100
type = "close-session",
89101
requesterSessionId = self._dataStore:GetSessionId(),
90102
}):Then(function()
@@ -94,6 +106,8 @@ end
94106

95107
function DataStoreMessageHelper.PromiseMessage(
96108
self: DataStoreMessageHelper,
109+
placeId: number,
110+
jobId: string,
97111
sessionId: string,
98112
message: DataStoreMessage
99113
): Promise.Promise<()>
@@ -103,14 +117,15 @@ function DataStoreMessageHelper.PromiseMessage(
103117
print("[DataStoreMessageHelper] - Sending message:", MessagingServiceUtils.toHumanReadable(message))
104118
end
105119

106-
return self._maid:GivePromise(MessagingServiceUtils.promisePublish(self:_getActiveStoreTopic(sessionId), message))
120+
local topic = self:_getActiveStoreTopic(sessionId)
121+
return self._maid:GivePromise(self._placeMessagingService:SendMessage(placeId, jobId, topic, message))
107122
end
108123

109124
function DataStoreMessageHelper._handleSubscription(
110125
self: DataStoreMessageHelper,
111-
subscriptionData: MessagingServiceUtils.SubscriptionData
112-
)
113-
local data = subscriptionData.Data
126+
data: any,
127+
metadata: PlaceMessagingService.PlacePacketMetadata
128+
): ()
114129
if type(data) ~= "table" or type(data.type) ~= "string" then
115130
warn(`[DataStoreMessageHelper] - Received malformed message: {MessagingServiceUtils.toHumanReadable(data)}`)
116131
return
@@ -129,13 +144,10 @@ function DataStoreMessageHelper._handleSubscription(
129144

130145
closeSessionPromise:Then(function()
131146
-- We could have GCed by now, but try to send off a notification to the requester
132-
MessagingServiceUtils.promisePublish(
133-
topic,
134-
{
135-
type = "close-session-complete",
136-
senderId = senderId,
137-
} :: CloseSessionComplete
138-
)
147+
self._placeMessagingService:SendMessageToAddress(metadata.from, topic, {
148+
type = "close-session-complete",
149+
senderId = senderId,
150+
})
139151
end)
140152
end
141153
elseif data.type == "close-session-complete" then

src/datastore/src/Server/PlayerDataStoreManager.lua

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ local Maid = require("Maid")
6060
local PendingPromiseTracker = require("PendingPromiseTracker")
6161
local Promise = require("Promise")
6262
local PromiseUtils = require("PromiseUtils")
63+
local ServiceBag = require("ServiceBag")
6364

6465
local PlayerDataStoreManager = setmetatable({}, BaseObject)
6566
PlayerDataStoreManager.ClassName = "PlayerDataStoreManager"
@@ -73,6 +74,7 @@ export type PlayerDataStoreManager =
7374
typeof(setmetatable(
7475
{} :: {
7576
_robloxDataStore: any,
77+
_serviceBag: ServiceBag.ServiceBag,
7678
_keyGenerator: KeyGenerator,
7779
_datastores: { [PlayerUserId]: DataStore.DataStore },
7880
_removing: { [PlayerUserId]: boolean },
@@ -93,6 +95,7 @@ export type PlayerDataStoreManager =
9395
@return PlayerDataStoreManager
9496
]=]
9597
function PlayerDataStoreManager.new(
98+
serviceBag: ServiceBag.ServiceBag,
9699
robloxDataStore: DataStore,
97100
keyGenerator: KeyGenerator,
98101
skipBindingToClose: boolean?
@@ -101,8 +104,9 @@ function PlayerDataStoreManager.new(
101104

102105
assert(type(skipBindingToClose) == "boolean" or skipBindingToClose == nil, "Bad skipBindingToClose")
103106

104-
self._robloxDataStore = robloxDataStore or error("No robloxDataStore")
105-
self._keyGenerator = keyGenerator or error("No keyGenerator")
107+
self._robloxDataStore = assert(robloxDataStore, "No robloxDataStore")
108+
self._keyGenerator = assert(keyGenerator, "No keyGenerator")
109+
self._serviceBag = assert(serviceBag, "No serviceBag")
106110

107111
self._maid._savingConns = Maid.new()
108112

@@ -220,9 +224,10 @@ function PlayerDataStoreManager._createDataStore(
220224

221225
local maid = Maid.new()
222226

223-
-- TODO: Destroy DataStore after cleanup
227+
-- DataStore is cleaned up very carefully in _removePlayerDataStore
224228
local datastore = DataStore.new(self._robloxDataStore, self:_getKey(userId))
225229
datastore:SetSessionLockingEnabled(true)
230+
datastore:SetSessionMessagingEnabled(true, self._serviceBag)
226231
datastore:SetUserIdList({ userId })
227232

228233
maid:GivePromise(datastore:PromiseSessionLockingFailed()):Then(function()
@@ -260,12 +265,16 @@ function PlayerDataStoreManager._createDataStore(
260265
return datastore
261266
end
262267

263-
function PlayerDataStoreManager._removePlayerDataStore(self: PlayerDataStoreManager, userId: PlayerUserId)
268+
function PlayerDataStoreManager._removePlayerDataStore(self: PlayerDataStoreManager, userId: PlayerUserId): ()
264269
local datastore = self._datastores[userId]
265270
if not datastore then
266271
return
267272
end
268273

274+
if self._removing[userId] then
275+
return
276+
end
277+
269278
self._removing[userId] = true
270279

271280
local removingPromises: { Promise.Promise<any?> } = {}

src/datastore/src/Server/PlayerDataStoreService.lua

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ function PlayerDataStoreService.Init(self: PlayerDataStoreService, serviceBag: S
4141

4242
-- External
4343
self._bindToCloseService = self._serviceBag:GetService(require("BindToCloseService"))
44+
self._serviceBag:GetService(require("PlaceMessagingService"))
4445

46+
-- State
4547
self._promiseStarted = self._maid:Add(Promise.new())
46-
4748
self._dataStoreName = "PlayerData"
4849
self._dataStoreScope = "SaveData"
4950
end
@@ -140,7 +141,7 @@ function PlayerDataStoreService.PromiseManager(
140141
return DataStorePromises.promiseDataStore(self._dataStoreName, self._dataStoreScope)
141142
end)
142143
:Then(function(dataStore)
143-
local manager = self._maid:Add(PlayerDataStoreManager.new(dataStore, function(player)
144+
local manager = self._maid:Add(PlayerDataStoreManager.new(self._serviceBag, dataStore, function(player)
144145
if type(player) == "number" then
145146
return tostring(player)
146147
else

src/messagingserviceutils/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
],
3131
"dependencies": {
3232
"@quenty/loader": "workspace:*",
33-
"@quenty/promise": "workspace:*"
33+
"@quenty/maid": "workspace:*",
34+
"@quenty/promise": "workspace:*",
35+
"@quenty/rx": "workspace:*",
36+
"@quenty/servicebag": "workspace:*",
37+
"@quenty/statestack": "workspace:*"
3438
},
3539
"devDependencies": {
3640
"@quenty/loader": "workspace:*"

0 commit comments

Comments
 (0)