Skip to content

Commit 97e52de

Browse files
authored
Merge pull request #159 from mongoosejs/vkarpov15/express-change-streams
add auto-refresh ability based on change streams to document-details
2 parents ea63f92 + 43f43b9 commit 97e52de

File tree

12 files changed

+446
-24
lines changed

12 files changed

+446
-24
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
uses: supercharge/mongodb-github-action@v1.10.0
2929
with:
3030
mongodb-version: ${{ matrix.mongodb }}
31+
mongodb-replica-set: rs0
3132

3233
- run: npm install
3334
- name: NPM Test

backend/actions/Model/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ exports.getDocumentsStream = require('./getDocumentsStream');
1313
exports.getCollectionInfo = require('./getCollectionInfo');
1414
exports.getIndexes = require('./getIndexes');
1515
exports.listModels = require('./listModels');
16+
exports.streamDocumentChanges = require('./streamDocumentChanges');
1617
exports.streamChatMessage = require('./streamChatMessage');
1718
exports.updateDocument = require('./updateDocument');
1819
exports.updateDocuments = require('./updateDocuments');
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
'use strict';
2+
3+
const Archetype = require('archetype');
4+
const authorize = require('../../authorize');
5+
6+
const StreamDocumentChangesParams = new Archetype({
7+
model: {
8+
$type: 'string',
9+
$required: true
10+
},
11+
documentId: {
12+
$type: 'string',
13+
$required: true
14+
},
15+
roles: {
16+
$type: ['string']
17+
}
18+
}).compile('StreamDocumentChangesParams');
19+
20+
module.exports = ({ db, changeStream }) => async function* streamDocumentChanges(params) {
21+
const { model, documentId, roles } = new StreamDocumentChangesParams(params);
22+
23+
await authorize('Model.streamDocumentChanges', roles);
24+
25+
const Model = db.models[model];
26+
if (Model == null) {
27+
throw new Error(`Model ${model} not found`);
28+
}
29+
30+
if (!changeStream) {
31+
throw new Error('Change streams are not enabled');
32+
}
33+
34+
const collectionName = Model.collection.name;
35+
const targetId = String(documentId);
36+
37+
const queue = [];
38+
let resolveQueue = null;
39+
let streamError = null;
40+
let streamEnded = false;
41+
42+
function enqueue(payload) {
43+
queue.push(payload);
44+
if (resolveQueue) {
45+
const resolve = resolveQueue;
46+
resolveQueue = null;
47+
resolve();
48+
}
49+
}
50+
51+
function handleChange(change) {
52+
if (!change || change.ns?.coll !== collectionName) {
53+
return;
54+
}
55+
if (!change.documentKey || change.documentKey._id == null) {
56+
return;
57+
}
58+
if (String(change.documentKey._id) !== targetId) {
59+
return;
60+
}
61+
62+
enqueue({
63+
type: 'change',
64+
operationType: change.operationType,
65+
documentKey: change.documentKey,
66+
ns: change.ns,
67+
updateDescription: change.updateDescription,
68+
clusterTime: change.clusterTime
69+
});
70+
}
71+
72+
function handleError(err) {
73+
streamError = err || new Error('Change stream error');
74+
enqueue({ type: 'error', message: streamError.message });
75+
}
76+
77+
function handleEnd() {
78+
streamEnded = true;
79+
enqueue({ type: 'end' });
80+
}
81+
82+
changeStream.on('change', handleChange);
83+
changeStream.on('error', handleError);
84+
changeStream.on('end', handleEnd);
85+
86+
try {
87+
while (true) {
88+
if (streamError) {
89+
throw streamError;
90+
}
91+
92+
if (queue.length === 0) {
93+
await new Promise(resolve => {
94+
resolveQueue = resolve;
95+
});
96+
}
97+
98+
if (streamError) {
99+
throw streamError;
100+
}
101+
102+
while (queue.length > 0) {
103+
const payload = queue.shift();
104+
if (payload?.type === 'end') {
105+
return;
106+
}
107+
yield payload;
108+
}
109+
110+
if (streamEnded) {
111+
return;
112+
}
113+
}
114+
} finally {
115+
changeStream.off('change', handleChange);
116+
changeStream.off('error', handleError);
117+
changeStream.off('end', handleEnd);
118+
if (resolveQueue) {
119+
resolveQueue();
120+
resolveQueue = null;
121+
}
122+
}
123+
};

backend/authorize.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const actionsToRequiredRoles = {
2323
'Model.getDocumentsStream': ['owner', 'admin', 'member', 'readonly'],
2424
'Model.getIndexes': ['owner', 'admin', 'member', 'readonly'],
2525
'Model.listModels': ['owner', 'admin', 'member', 'readonly'],
26+
'Model.streamDocumentChanges': ['owner', 'admin', 'member', 'readonly'],
2627
'Model.streamChatMessage': ['owner', 'admin', 'member', 'readonly'],
2728
'Model.updateDocuments': ['owner', 'admin', 'member']
2829
};

backend/index.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ module.exports = function backend(db, studioConnection, options) {
1616
const ChatMessage = studioConnection.model('__Studio_ChatMessage', chatMessageSchema, 'studio__chatMessages');
1717
const ChatThread = studioConnection.model('__Studio_ChatThread', chatThreadSchema, 'studio__chatThreads');
1818

19-
const actions = applySpec(Actions, { db, studioConnection, options });
19+
let changeStream = null;
20+
if (options?.changeStream) {
21+
changeStream = db.watch();
22+
}
23+
24+
const actions = applySpec(Actions, { db, studioConnection, options, changeStream });
25+
actions.services = { changeStream };
2026
return actions;
2127
};

eslint.config.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ module.exports = defineConfig([
3939
process: true,
4040
setTimeout: true,
4141
navigator: true,
42-
TextDecoder: true
42+
TextDecoder: true,
43+
AbortController: true,
44+
clearTimeout: true
4345
},
4446
sourceType: 'commonjs'
4547
},

express.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ const { toRoute, objectRouter } = require('extrovert');
77

88
module.exports = async function mongooseStudioExpressApp(apiUrl, conn, options) {
99
const router = express.Router();
10+
options = options ? { changeStream: true, ...options } : { changeStream: true };
1011

11-
const mothershipUrl = options?._mothershipUrl || 'https://mongoose-js.netlify.app/.netlify/functions';
12+
const mothershipUrl = options._mothershipUrl || 'https://mongoose-js.netlify.app/.netlify/functions';
1213
let workspace = null;
1314
if (options?.apiKey) {
1415
({ workspace } = await fetch(`${mothershipUrl}/getWorkspace`, {
@@ -31,7 +32,7 @@ module.exports = async function mongooseStudioExpressApp(apiUrl, conn, options)
3132
}
3233

3334
apiUrl = apiUrl || 'api';
34-
const backend = Backend(conn, options?.studioConnection, options);
35+
const backend = Backend(conn, options.studioConnection, options);
3536

3637
router.use(
3738
'/api',

frontend/src/api.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ if (window.MONGOOSE_STUDIO_CONFIG.isLambda) {
132132
yield { document: doc };
133133
}
134134
},
135+
streamDocumentChanges: async function* streamDocumentChanges(params, options = {}) {
136+
const pollIntervalMs = 5000;
137+
while (!options.signal?.aborted) {
138+
await new Promise(resolve => setTimeout(resolve, pollIntervalMs));
139+
if (options.signal?.aborted) {
140+
return;
141+
}
142+
yield { type: 'poll', model: params.model, documentId: params.documentId };
143+
}
144+
},
135145
getCollectionInfo: function getCollectionInfo(params) {
136146
return client.post('', { action: 'Model.getCollectionInfo', ...params }).then(res => res.data);
137147
},
@@ -352,6 +362,56 @@ if (window.MONGOOSE_STUDIO_CONFIG.isLambda) {
352362
}
353363
}
354364
},
365+
streamDocumentChanges: async function* streamDocumentChanges(params, options = {}) {
366+
const accessToken = window.localStorage.getItem('_mongooseStudioAccessToken') || null;
367+
const url = window.MONGOOSE_STUDIO_CONFIG.baseURL + '/Model/streamDocumentChanges?' + new URLSearchParams(params).toString();
368+
369+
const response = await fetch(url, {
370+
method: 'GET',
371+
headers: {
372+
Authorization: `${accessToken}`,
373+
Accept: 'text/event-stream'
374+
},
375+
signal: options.signal
376+
});
377+
378+
if (!response.ok) {
379+
throw new Error(`HTTP error! Status: ${response.status}`);
380+
}
381+
382+
const reader = response.body.getReader();
383+
const decoder = new TextDecoder('utf-8');
384+
let buffer = '';
385+
386+
while (true) {
387+
const { done, value } = await reader.read();
388+
if (done) break;
389+
buffer += decoder.decode(value, { stream: true });
390+
391+
let eventEnd;
392+
while ((eventEnd = buffer.indexOf('\n\n')) !== -1) {
393+
const eventStr = buffer.slice(0, eventEnd);
394+
buffer = buffer.slice(eventEnd + 2);
395+
396+
// Parse SSE event
397+
const lines = eventStr.split('\n');
398+
let data = '';
399+
for (const line of lines) {
400+
if (line.startsWith('data:')) {
401+
data += line.slice(5).trim();
402+
}
403+
}
404+
if (data) {
405+
try {
406+
yield JSON.parse(data);
407+
} catch (err) {
408+
// If not JSON, yield as string
409+
yield data;
410+
}
411+
}
412+
}
413+
}
414+
},
355415
getCollectionInfo: function getCollectionInfo(params) {
356416
return client.post('/Model/getCollectionInfo', params).then(res => res.data);
357417
},

frontend/src/document/document.html

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@
2929
</button>
3030
</div>
3131

32+
<div class="hidden md:flex items-center gap-3 text-sm text-slate-600">
33+
<div class="text-right leading-tight">
34+
<div class="text-xs uppercase tracking-wide text-slate-400 flex items-center gap-2 justify-end">
35+
<span>Loaded at</span>
36+
<span class="inline-flex items-center gap-1 text-[10px] font-semibold" :class="autoRefreshEnabled ? 'text-forest-green-600' : 'text-slate-400'">
37+
<span class="inline-block h-1.5 w-1.5 rounded-full" :class="autoRefreshEnabled ? 'bg-forest-green-500' : 'bg-slate-300'"></span>
38+
</span>
39+
</div>
40+
<div class="font-medium text-slate-700">{{lastUpdatedLabel}}</div>
41+
</div>
42+
</div>
43+
3244
<div class="gap-2 hidden md:flex items-center">
3345
<button
3446
v-if="!editting"
@@ -77,6 +89,23 @@
7789
class="origin-top-right absolute right-0 mt-2 w-48 rounded-md shadow-lg bg-white ring-1 ring-black ring-opacity-5 z-50"
7890
>
7991
<div class="py-1 flex flex-col">
92+
<button
93+
@click="refreshDocument({ force: true, source: 'manual' }); desktopMenuOpen = false"
94+
:disabled="isRefreshing"
95+
:class="['flex items-center px-4 py-2 text-sm text-gray-700', isRefreshing ? 'cursor-not-allowed opacity-50' : 'hover:bg-slate-100']"
96+
type="button"
97+
>
98+
{{isRefreshing ? 'Refreshing...' : 'Refresh'}}
99+
</button>
100+
<button
101+
@click="toggleAutoRefresh(); desktopMenuOpen = false"
102+
:disabled="isLambda"
103+
type="button"
104+
:class="['flex items-center px-4 py-2 text-sm', isLambda ? 'text-gray-400 cursor-not-allowed' : 'text-gray-700 hover:bg-slate-100']"
105+
:title="isLambda ? 'Auto-refresh only available on Express deployments' : ''"
106+
>
107+
{{autoRefreshEnabled ? 'Disable Auto-Refresh' : 'Enable Auto-Refresh'}}
108+
</button>
80109
<button
81110
@click="addField(); desktopMenuOpen = false"
82111
type="button"
@@ -150,6 +179,21 @@
150179
>
151180
Save
152181
</button>
182+
<button
183+
@click="refreshDocument({ force: true, source: 'manual' }); mobileMenuOpen = false"
184+
:disabled="isRefreshing"
185+
:class="['flex items-center px-4 py-2 text-sm text-gray-700', isRefreshing ? 'cursor-not-allowed opacity-50' : 'hover:bg-slate-100']"
186+
type="button"
187+
>
188+
{{isRefreshing ? 'Refreshing...' : 'Refresh'}}
189+
</button>
190+
<button
191+
@click="toggleAutoRefresh(); mobileMenuOpen = false"
192+
type="button"
193+
class="flex items-center px-4 py-2 text-sm text-gray-700 hover:bg-slate-100"
194+
>
195+
Auto-Refresh {{autoRefreshEnabled ? 'ON' : 'OFF'}}
196+
</button>
153197
<button
154198
@click="addField(); mobileMenuOpen = false"
155199
type="button"

0 commit comments

Comments
 (0)