Le module xcraft-core-cryo est une couche de persistance sophistiquée pour l'écosystème Xcraft, basée sur SQLite. Il implémente un système d'event sourcing qui permet de sauvegarder, récupérer et gérer l'historique des mutations d'état des acteurs Goblin et Elf. Ce module est fondamental pour la persistance des données dans les applications Xcraft, offrant des fonctionnalités avancées comme la recherche plein texte (FTS), la recherche vectorielle (VEC), la synchronisation distribuée et le nettoyage automatique des données obsolètes.
- Structure du module
- Fonctionnement global
- Exemples d'utilisation
- Interactions avec d'autres modules
- Configuration avancée
- Détails des sources
Le module xcraft-core-cryo est organisé autour de plusieurs composants clés :
cryo.js(racine) : Point d'entrée qui expose les commandes Xcraft et initialise les endpoints configuréslib/cryo.js: Classe principaleCryohéritant deSQLite, implémentant toutes les fonctionnalités de persistancelib/index.js: Instance singleton deCryoconfigurée viaxcraft-core-etclib/soulSweeper.js: Utilitaire de nettoyage et d'optimisation des bases de donnéeslib/streamSQL.js: ClassesReadableSQLetWritableSQLpour le streaming de données SQLitelib/streamPort.js: ClassesMessagePortReadableetMessagePortWritablepour le streaming inter-threads via MessageChannellib/sqlite-vec/loader.js: Chargement de l'extension SQLite pour la recherche vectoriellelib/endpoints/googleQueue.js: Endpoint optionnel pour publier les actions dans Google Cloud Pub/Sublib/workers/insert.js: Worker thread dédié au peuplement FTS/VEC et au traitement des embeddingslib/workers/retrieve.js: Worker thread dédié à la récupération streamée des actions persist
Cryo fonctionne selon le principe d'event sourcing :
- Persistance : Les actions (événements) sont "gelées" (
freeze) dans la base de données SQLite - Reconstruction : Chaque action contient les informations nécessaires pour reconstruire l'état d'un acteur
- Récupération : Les actions peuvent être "dégelées" (
thaw) pour reconstruire l'état à un moment précis - Historique : Le système maintient un historique complet des changements
Les actions sont stockées dans une table actions avec les colonnes suivantes :
rowid: Clé primaire auto-incrémentée (avec index explicite pour les performances de comptage)timestamp: Horodatage de l'action au format ISOgoblin: Identifiant de l'acteur concerné (ex :myEntity-myEntity@1)action: Contenu JSON de l'actionversion: Version de l'application au moment de la persistancetype: Type d'action (create,persist, etc.)commitId: Identifiant de commit pour la synchronisation distribuée (NULL si non synchronisé)
Recherche plein texte (FTS5) — Quand enableFTS est activé, une table lastPersistedActions maintient la dernière action persist par goblin (hors statut trashed). Une table virtuelle fts_idx indexe le champ meta.index de chaque état. Des triggers SQLite maintiennent automatiquement cet index et peuvent déclencher des notifications via des topics configurables.
Recherche vectorielle (VEC) — Quand enableVEC est activé (nécessite enableFTS), une table virtuelle embeddings (via sqlite-vec) stocke les embeddings de chaque document partitionnés par locale. Le traitement des embeddings est délégué à un worker thread Piscina pour éviter de bloquer le thread principal. Un index embeddingsIndex associe chaque document à la ligne de son action persist la plus récente.
Transactions et verrous — Les transactions sont gérées avec un mutex par base de données (_syncLock) pour garantir la cohérence des accès concurrents. Les notifications déclenchées par les triggers FTS sont mises en file d'attente pendant une transaction et envoyées après le commit.
Middleware de transformation — Un mécanisme de middleware chainé permet de transformer les lignes lors de la récupération (thaw). Un middleware peut diviser une action en plusieurs actions ou la supprimer. Cela est utile pour les migrations de modèles de données.
Synchronisation distribuée — Le système utilise des commitId (UUID) pour tracer l'état de synchronisation de chaque action. Le flux de synchronisation comprend : récupération des actions en attente (getDataForSync), marquage temporaire avec un commitId zéro (prepareDataForSync), puis mise à jour avec le commitId serveur définitif (updateActionsAfterSync).
Bootstrap — Pour initialiser une base de données depuis un flux distant, bootstrapActions crée une base temporaire préfixée par un point (.db), la peuple via un stream, puis la renomme. Les actions locales en attente de synchronisation sont préservées lors de ce processus.
Nettoyage (SoulSweeper) — Chaque base de données dispose d'un SoulSweeper dédié qui peut nettoyer les actions obsolètes selon deux stratégies : par nombre maximum de persists par goblin ou par date limite. La stratégie combinée sweepForDays applique les deux en séquence.
Table temporelle — Optionnellement activée via enableTimetable, une table timetable précalculée de 64 000 jours (depuis l'an 2000) permet des analyses chronologiques avancées.
Action persist insérée
→ Trigger SQL onInsert/onUpdate sur lastPersistedActions
→ Notification sur topic <worker-vec-embed>
→ Souscription resp.events → Piscina.run({name: 'embed'})
→ Worker thread: delete + insert dans embeddings + embeddingsIndex
// Dans une méthode d'un acteur Elf
async cryoStuff() {
const cryo = this.quest.getAPI('cryo');
const result = await cryo.isEmpty({ db: 'myDatabase' });
console.log(result); // {exists: true, empty: false}
// Ouvrir/initialiser explicitement une base
const ok = await cryo.init({ db: 'myDatabase' });
}async freezeSomething() {
const cryo = this.quest.getAPI('cryo');
await cryo.freeze({
db: 'myDatabase',
action: {
type: 'persist',
payload: {
state: {
id: 'myEntity@1',
meta: {
status: 'published',
index: 'contenu indexable pour FTS',
locale: 'fr',
vectors: {
chunk1: {
chunk: 'Premier morceau de texte',
embedding: 'deadbeef...' // hexadécimal
}
}
}
}
}
},
rules: {
goblin: 'myEntity-myEntity@1',
mode: 'last' // Conserver uniquement la dernière action
}
});
}async thawSomething() {
const cryo = this.quest.getAPI('cryo');
// Les résultats arrivent via des événements 'cryo.thawed.myDatabase'
const count = await cryo.thaw({
db: 'myDatabase',
timestamp: '2023-05-01T12:00:00.000Z'
});
console.log(`${count} actions récupérées`);
// Récupération partielle par type
const partial = await cryo.thaw({
db: 'myDatabase',
timestamp: '2023-05-01T12:00:00.000Z',
type: 'myEntity',
length: 100,
offset: 0
});
}async withTransaction() {
const cryo = this.quest.getAPI('cryo');
await cryo.immediate({ db: 'myDatabase' });
try {
await cryo.freeze({
db: 'myDatabase',
action: { type: 'persist', payload: {/* ... */} },
rules: { goblin: 'myEntity-myEntity@1', mode: 'last' }
});
await cryo.commit({ db: 'myDatabase' });
} catch (error) {
await cryo.rollback({ db: 'myDatabase' });
throw error;
}
}async bootstrapDatabase(streamId, routingKey, count) {
const cryo = this.quest.getAPI('cryo');
await cryo.bootstrapActions({
db: 'myDatabase',
streamId,
routingKey,
rename: true, // Renommer l'ancienne base avant remplacement
count // Nombre total d'actions pour la progression
});
}async cleanupDatabase() {
const cryo = this.quest.getAPI('cryo');
// Stratégie combinée : max 10 persists récents + 1 persist si > 30 jours
const changes = await cryo.sweep({ dbs: ['myDatabase'] });
console.log(changes); // { myDatabase: 1234 }
// Ou garder seulement les 5 derniers persists par goblin
await cryo.sweepByMaxCount({ dbs: ['myDatabase'], max: 5 });
}async syncData() {
const cryo = this.quest.getAPI('cryo');
const { stagedActions, commitIds } = await cryo.getDataForSync({ db: 'myDatabase' });
// Marquer les actions avec le commitId zéro (en transit)
await cryo.prepareDataForSync({
db: 'myDatabase',
rows: stagedActions.map(a => a.rowid),
zero: true
});
// Après confirmation du serveur, appliquer le vrai commitId
await cryo.updateActionsAfterSync({
db: 'myDatabase',
serverCommitId: 'abc123-def456-...',
rows: stagedActions.map(a => a.rowid)
});
}async syncPersists() {
const cryo = this.quest.getAPI('cryo');
// Récupérer tous les persists via stream
const { xcraftStream, routingKey, count } = await cryo.getAllPersist({ db: 'myDatabase' });
// Récupérer les persists dans une plage de commitIds
const rangeResult = await cryo.getPersistFromRange({
db: 'myDatabase',
fromCommitId: 'uuid-from',
toCommitId: 'uuid-to',
toInclusive: true
});
}- xcraft-core-book : Fournit la classe
SQLitedontCryohérite ; gère l'ouverture, les requêtes préparées et les migrations - xcraft-core-utils : Utilisé pour les mutex (
locks.getMutex) et utilitaires JS - xcraft-core-fs : Gestion des fichiers et répertoires (copie, suppression de bases SQLite)
- xcraft-core-transport : Streaming des données via
Streamerlors du bootstrap - xcraft-core-etc : Chargement de la configuration du module
- xcraft-core-goblin : Les acteurs Goblin et Elf utilisent Cryo pour persister leur état via les commandes exposées sur le bus
- xcraft-core-host : Fournit
appVersion,resourcesPathetgetRoutingKey() - @google-cloud/pubsub : Dépendance optionnelle (peer) utilisée par l'endpoint
googleQueue
| Option | Description | Type | Valeur par défaut |
|---|---|---|---|
journal |
Mode journal SQLite (journal ou WAL) |
String | "WAL" |
endpoints |
Liste des endpoints à activer | Array | [] |
enableFTS |
Activer la recherche plein texte (FTS5) | Boolean | false |
enableVEC |
Activer la recherche vectorielle (nécessite enableFTS) |
Boolean | false |
fts.list |
Bases de données où activer FTS (toutes si vide) | Array | [] |
vec.list |
Bases de données où activer VEC (toutes si vide) | Array | [] |
vec.dimensions |
Nombre de dimensions pour les embeddings | Number | 4096 |
vec.vecFunc |
Fonction de conversion vectorielle (vec_f32 ou vec_int8) |
String | "vec_f32" |
vec.defaultLocale |
Locale par défaut pour le partitionnement des vecteurs | String | "fr" |
migrations.cleanings |
Règles de nettoyage par nom de base (types de goblins à supprimer) | Object | null |
enableTimetable |
Activer la table de temps précalculée | Boolean | false |
googleQueue.topic |
Topic Google Pub/Sub pour publier les messages | String | "" |
googleQueue.authFile |
Chemin relatif vers le fichier d'authentification Google Cloud | String | "" |
googleQueue.orderingPrefix |
Partie fixe de la clé d'ordonnancement des messages | String | "" |
| Variable | Description | Exemple | Valeur par défaut |
|---|---|---|---|
GOOGLE_APPLICATION_CREDENTIALS |
Chemin vers le fichier d'authentification Google Cloud ; défini automatiquement par l'endpoint googleQueue au démarrage |
/app/resources/gcp-credentials.json |
— |
Ce fichier agit comme façade entre le bus Xcraft et la classe Cryo. Au démarrage, il charge dynamiquement les endpoints activés dans la configuration (cryoConfig.endpoints), puis expose automatiquement toutes les méthodes publiques de l'instance Cryo comme commandes parallèles sur le bus via xcraftCommands.
Pour chaque commande, le fichier gère le cycle complet : appel de la méthode Cryo, propagation aux endpoints actifs (si la méthode existe sur l'endpoint), puis envoi de l'événement cryo.<n>.<msgId>.finished avec le résultat ou cryo.<n>.<msgId>.error en cas d'exception. Un traitement spécial est appliqué à freeze : seul {action} est retourné pour les actions de type persist (les autres résultats sont ignorés).
La méthode dispose exportée ferme proprement l'instance Cryo (workers Piscina, souscriptions d'embeddings, optimisation SQLite).
La classe Cryo hérite de SQLite (xcraft-core-book) et constitue le cœur du module.
Le constructeur détermine le répertoire de stockage (xcraft.xcraftRoot/var/cryo par défaut), charge la configuration et prépare toutes les requêtes SQL préparées. Selon la configuration, il génère les DDL pour les tables optionnelles (timetable, lastPersistedActions, fts_idx, embeddings, embeddingsIndex) et leurs triggers associés. Les indices sont créés sur goblin, timestamp, type, commitId et rowid (index couvrant explicite pour optimiser les requêtes de comptage).
La version de schéma (PRAGMA user_version) est gérée via un mécanisme de migration incrémentale jusqu'à la version 11.
Deux pools de workers sont gérés :
#piscina[db]: Un pool par base de données, avec un seul thread (minThreads: 1, maxThreads: 1), utilisé pourembedetpopulate. Il se ferme automatiquement après 1 seconde d'inactivité.#piscinaSync: Pool partagé pour les opérations de récupération streamée (getAllPersist,getPersistFromRange), avec 0 à N threads et 30 secondes de timeout d'inactivité.
-
freeze(resp, msg)— Persiste une action. Supporte les modesall(conservation de tout l'historique) etlast(suppression des anciennes actions d'un goblin). Avant insertion, vérifie si la dernière action persist est identique pour éviter les doublons (via comparaison du champstate). Si identique, les actions intermédiaires inutiles sont annulées (revert). Les actionsraw(provenant d'un serveur de sync) sont ignorées si une transaction Cryo est ouverte pour ce goblin. -
thaw(resp, msg)— Récupère la dernière action par goblin jusqu'à un timestamp. Supporte la pagination viatype,lengthetoffset. Applique la chaîne de middlewares sur chaque ligne. Les résultats sont envoyés par événementscryo.thawed.<db>, groupés par action source (une action source peut produire 0 à N lignes après middleware). -
frozen(resp, msg)— Retourne le nombre d'actions et le timestamp de la dernière action, avec filtrage optionnel par type. -
isEmpty(resp, msg)— Retourne{exists, empty}selon l'existence et le contenu de la base. -
init(resp, msg)— Ouvre une base de données et déclenche le rafraîchissement des embeddings en arrière-plan si VEC est activé. -
immediate(resp, msg)/exclusive(resp, msg)/begin(resp, msg)— Démarrent une transaction avec acquisition du mutex associé à la base. -
commit(resp, msg)— Valide la transaction et envoie les notifications trigger FTS en attente, puis libère le mutex. -
rollback(resp, msg)— Annule la transaction et libère le mutex. -
restore(resp, msg)— Restaure une base à un timestamp en copiant la source vers la destination, puis en supprimant les actions postérieures au timestamp. Si source et destination sont identiques, l'original est d'abord archivé sous un nom horodaté (branche). -
branch(resp, msg)— Archive la base actuelle en la renommant avec un timestamp (viafs-extra.rename). -
dump(resp, msg)— Exporte les dernières actions par goblin jusqu'à un timestamp vers une base de données distincte viaATTACH DATABASE. -
registerLastActionTriggers(resp, msg)— Enregistre des topics d'événements déclenchés par les triggers FTS lors des insertions, mises à jour ou suppressions danslastPersistedActions. NécessiteenableFTS. -
unregisterLastActionTriggers(resp, msg)— Retire des topics précédemment enregistrés. -
actions(resp, msg)— Itère sur les actions dans une plage de timestamps et envoie chaque ligne viacryo.actions.<db>. -
getEntityTypeCount(resp, msg)— Retourne un tableau d'objets{type, count}en extrayant le préfixe de type de chaque goblin. -
branches(resp)— Liste les fichiers.dbdu répertoire Cryo et détecte les branches (fichiers<n>_<timestamp>.db). Retourne{[db]: {branches: [timestamp, ...]}}. -
isAlreadyCreated(resp, msg)— Détermine si un goblin a déjà été créé : retournetrues'il a au moins unpersist, ou s'il a plus d'une actioncreate(la deuxièmecreatesignifie que le goblin avait déjà existé). -
hasGoblin(resp, msg)— Vérifie la présence d'au moins une action pour un goblin donné. -
loadMiddleware(resp, msg)— Charge dynamiquement un fichier middleware et l'ajoute à la chaîne de transformation. Le middleware est caché viarequire.cachepour éviter les rechargements. -
getDataForSync(resp, msg)— Retourne les actions en attente (allStagedActions: actions non-persist sans commitId situées avant le dernier persist sans commitId) ainsi qu'une sélection de commitIds récents (dernier, 10ème, 100ème, 200ème, 1000ème) pour la négociation de synchronisation. -
prepareDataForSync(resp, msg)— Marque les lignes spécifiées avec le commitId zéro (00000000-0000-0000-0000-000000000000) pour indiquer qu'elles sont en cours de synchronisation. -
updateActionsAfterSync(resp, msg)— Remplace le commitId zéro par le commitId serveur définitif sur les lignes confirmées. -
countPersistsFrom(resp, msg)/countNewPersistsFrom(resp, msg)— Comptent les actions persist (total ou depuis un commitId donné). -
hasCommitId(resp, msg)/getLastCommitId(resp, msg)/getSomeCommitIds(resp, msg)— Requêtes utilitaires pour la négociation de synchronisation. -
getPersistFromRange(resp, msg)— Récupère les derniers persists par goblin dans une plage de commitIds, via un worker Piscina et unMessageChannel. Retourne unxcraftStreamconsommable par le transport Xcraft. -
getAllPersist(resp, msg)— Récupère tous les derniers persists par goblin via streaming. Retourne{xcraftStream, routingKey, count}. -
bootstrapActions(resp, msg, next)— Générateur watt. Crée une base temporaire.db, la peuple depuis un flux de données (viaWritableSQLetStreamer), réinjecte les actions locales en attente, peuple FTS/VEC via worker, puis remplace la base principale. En cas d'erreur, nettoie la base temporaire. -
getZeroActions(resp, msg)— Retourne les actions non-persist marquées avec le commitId zéro. -
getActionsByIds(resp, msg)— Retourne le dernier persist par goblin pour une liste d'identifiants. -
hasActions(resp, msg)— Vérifie que tous les goblins spécifiés ont au moins un persist. -
sweep(resp, msg)— LancesweepForDays(30, 10)sur les bases spécifiées (ou toutes). Retourne un objet{[db]: changes}. -
sweepByMaxCount(resp, msg)— LancesweepByCount(max)sur les bases spécifiées. -
refreshEmbeddings(resp, msg)— Retraite les embeddings obsolètes sur les bases VEC activées via le workerrefreshEmbeddings. -
usable()— Délègue àSQLite.usable(). -
getLocation()— Retourne le chemin du répertoire de stockage Cryo. -
dispose()— Ferme les pools Piscina, désabonne les listeners d'embeddings, optimise toutes les bases (PRAGMA optimize) et appellesuper.dispose().
SoulSweeper encapsule la logique de nettoyage des actions SQLite avec des requêtes CTE optimisées. Il est instancié une fois par base de données ouverte.
Le paramètre withCommits (défaut true) contrôle si le nettoyage se limite aux actions ayant un commitId non-null, ce qui est important pour ne pas supprimer des données non synchronisées.
-
sweepByCount(count=4, dryrun=true)— Garde lescountderniers persists par goblin (entre 1 et 100), supprime tous ceux en dessous du seuil ainsi que leurs actions intermédiaires. Lance unANALYZEavant et unVACUUMsi plus de 100 000 lignes sont supprimées. -
sweepByDatetime(datetime=now, dryrun=true)— Supprime les actions dont le timestamp est antérieur àdatetime, en gardant au minimum les 2 derniers persists par goblin. -
sweepForDays(days=30, max=10, dryrun=true)— Stratégie combinée : d'abordsweepByCount(max)pour limiter le nombre total, puissweepByDatetime(now - days)pour éliminer les anciennes données. Retourne le cumul des suppressions.
Le mode dryrun=true (défaut) calcule le nombre de lignes qui seraient supprimées sans les effacer, permettant une prévisualisation.
ReadableSQL — Stream Node.js lisible alimenté par un itérateur SQLite. Lit les lignes par lots de 128 et les sérialise en JSON. Supporte une fonction wait pour les opérations SQLite asynchrones.
WritableSQL — Stream Node.js inscriptible pour l'insertion en masse. Démarre automatiquement une transaction à la construction, effectue des commits intermédiaires tous les step lots (défaut 1024), et accepte un callback de progression progressCb(pos).
MessagePortWritable — Stream inscriptible qui envoie les chunks via un MessagePort. Implémente un protocole de back-pressure : le lecteur envoie un message vide pour signaler sa demande, et le writer attend cette demande avant d'envoyer. Tente le transfert zero-copy (Transferable) et bascule en copie en cas d'échec. Un watchdog détecte les lecteurs morts (60 secondes sans demande).
MessagePortReadable — Stream lisible qui reçoit les chunks depuis un MessagePort et les pousse dans le flux Node.js. Envoie une demande initiale au writer, puis une nouvelle demande à chaque _read(). Gère la terminaison propre du port.
Exporte l'unique instance de Cryo utilisée par tous les handlers de commandes, configurée avec les paramètres chargés via xcraft-core-etc.
Endpoint optionnel activé en ajoutant "googleQueue" à la liste endpoints de la configuration. Il intercepte les résultats de la commande freeze et les publie dans un topic Google Pub/Sub avec ordonnancement des messages.
freeze(resp, msg, results)— Publie l'action dans Pub/Sub avec les attributsorigin,publish_timestamp,timestamp,goblinetversion. Utilise la clé d'ordonnancement configurée et activeenableMessageOrderingsur le topic.
Module utilitaire qui charge l'extension native sqlite-vec (vec0) dans une instance de base de données SQLite. Supporte les plateformes Linux, macOS et Windows en architecture x86_64 et aarch64. Gère le chemin app.asar.unpacked pour les applications Electron packagées.
load(db)— Charge l'extension dans l'instance SQLite fournie viadb.loadExtension(path).getLoadablePath()— Résout le chemin de la bibliothèque native selon la plateforme et l'architecture courantes.
Worker thread Piscina exposant trois tâches :
populate({db, location, enableFTS, enableVEC, defaultLocale, indices, vecFunc}) — Exécuté après un bootstrap ou une migration. Peuple fts_idx depuis lastPersistedActions si FTS est activé, et peuple embeddings + embeddingsIndex depuis les dernières actions persist si VEC est activé. Applique ensuite les indices personnalisés et exécute ANALYZE.
embed({db, location, goblin, defaultLocale, vecFunc}) — Exécuté à chaque modification d'une entité indexedContent (déclenchée par les triggers FTS). Supprime les embeddings existants pour le documentId et les réinsère depuis la dernière action persist du goblin. Opère dans une transaction IMMEDIATE.
refreshEmbeddings({db, location, defaultLocale, vecFunc}) — Détecte les embeddings obsolètes (ceux dont le documentRowid dans embeddingsIndex ne correspond plus au max(rowid) de l'action persist courante), les supprime et les réinsère. Utile après une synchronisation ou un import de données.
La table embeddings utilise la structure suivante :
CREATE VIRTUAL TABLE embeddings USING vec0(
locale TEXT partition key,
scope TEXT,
documentId TEXT,
+chunkId TEXT,
+chunk TEXT,
embedding FLOAT[dimensions] distance_metric=cosine
);Le partitionnement par locale permet des recherches vectorielles ciblées par langue. La métrique de distance cosinus est utilisée pour la similarité sémantique.
Worker thread Piscina exposant deux tâches pour la récupération en lecture seule des actions persist via MessagePortWritable et ReadableSQL :
getAllPersist({port, location, db}) — Récupère le dernier persist par goblin (ayant un commitId non-null) trié par rowid, et l'envoie ligne par ligne via le port.
getPersistFromRange({port, location, db, fromCommitId, toCommitId, toInclusive}) — Récupère les derniers persists dans une plage de commitIds via des CTE SQL. Si fromCommitId est absent, récupère tous les persists jusqu'à toCommitId. Si toInclusive est vrai, inclut les actions du toCommitId dans le résultat.
test/soulSweeper.spec.js — Suite de tests pour SoulSweeper avec Mocha/Chai. Contient des tests unitaires sur base SQLite en mémoire (toujours actifs) et des tests d'intégration sur une vraie base cms.db (désactivés par défaut avec describe.skip, nécessitent une extraction manuelle du fichier compressé).
Ce module est distribué sous licence MIT.
Ce contenu a été généré par IA