1- import { extendLogger , Processor , Reader , Writer } from "@rdfc/js-runner" ;
1+ import { Processor , extendLogger } from "@rdfc/js-runner" ;
22import { SDS } from "@treecg/types" ;
33import { DataFactory } from "rdf-data-factory" ;
44import { RdfStore } from "rdf-stores" ;
5- import { Parser } from "n3" ;
5+ import { Parser , Writer as N3Writer } from "n3" ;
66import { writeFile } from "fs/promises" ;
77import { CREATE , DELETE , UPDATE } from "./SPARQLQueries" ;
88import { doSPARQLRequest , getObjects , sanitizeQuads } from "./Utils" ;
9-
10- import type { Quad_Subject , Term } from "@rdfjs/types" ;
119import { Logger } from "winston" ;
1210
11+ import type { Quad , Quad_Subject , Term } from "@rdfjs/types" ;
12+ import type { Reader , Writer } from "@rdfc/js-runner" ;
13+
1314const df = new DataFactory ( ) ;
1415
1516// TODO: This should be obtained from an SDS metadata stream
@@ -33,7 +34,14 @@ export type PerformanceConfig = {
3334 failureIsFatal ?: boolean ;
3435} ;
3536
37+ export enum OperationMode {
38+ REPLICATION = "Replication" ,
39+ SYNC = "Sync"
40+ }
41+
3642export type IngestConfig = {
43+ operationMode ?: OperationMode ;
44+ memberBatchSize ?: number ;
3745 memberIsGraph ?: boolean ;
3846 memberShapes ?: string [ ] ; // TODO: This should be obtained from an SDS metadata stream
3947 changeSemantics ?: ChangeSemantics ;
@@ -45,7 +53,7 @@ export type IngestConfig = {
4553 measurePerformance ?: PerformanceConfig ;
4654} ;
4755
48- export type TransactionMember = {
56+ type TransactionMember = {
4957 memberId : string ,
5058 transactionId : string ,
5159 store : RdfStore
@@ -59,17 +67,32 @@ type SPARQLIngestArgs = {
5967
6068export class SPARQLIngest extends Processor < SPARQLIngestArgs > {
6169 protected transactionMembers : TransactionMember [ ] = [ ] ;
70+ protected memberBatch : Quad [ ] = [ ] ;
6271 protected requestsPerformance : number [ ] = [ ] ;
72+ protected batchCount = 0 ;
6373
6474 protected createTransactionQueriesLogger : Logger ;
6575 protected doSPARQLRequestLogger : Logger ;
6676
6777 async init ( this : SPARQLIngestArgs & this) : Promise < void > {
6878 this . createTransactionQueriesLogger = extendLogger ( this . logger , "createTransactionQueries" ) ;
6979 this . doSPARQLRequestLogger = extendLogger ( this . logger , "doSPARQLRequest" ) ;
80+
81+ if ( ! this . config . operationMode ) {
82+ this . config . operationMode = OperationMode . SYNC ;
83+ }
84+
85+ if ( ! this . config . memberBatchSize ) {
86+ this . config . memberBatchSize = 100 ;
87+ }
88+
89+ if ( this . config . accessToken === "" ) {
90+ this . config . accessToken = undefined ;
91+ }
7092 }
7193
7294 async transform ( this : SPARQLIngestArgs & this) : Promise < void > {
95+
7396 for await ( const rawQuads of this . memberStream . strings ( ) ) {
7497 this . logger . debug ( `Raw member data received: \n${ rawQuads } ` ) ;
7598 const quads = new Parser ( ) . parse ( rawQuads ) ;
@@ -177,7 +200,11 @@ export class SPARQLIngest extends Processor<SPARQLIngestArgs> {
177200 this . transactionMembers = [ ] ;
178201 } else {
179202 // Determine if we have a named graph (either explicitly configured or as the member itself)
180- const ng = this . getNamedGraphIfAny ( memberIRI , this . config . memberIsGraph , this . config . targetNamedGraph ) ;
203+ const ng = this . getNamedGraphIfAny (
204+ memberIRI ,
205+ this . config . memberIsGraph ,
206+ this . config . targetNamedGraph
207+ ) ;
181208 // Get the type of change
182209 // TODO: use rdf-lens to support complex paths
183210 const ctv = store . getQuads (
@@ -212,19 +239,37 @@ export class SPARQLIngest extends Processor<SPARQLIngestArgs> {
212239 this . logger . info ( `Preparing 'DELETE {} WHERE {} + INSERT DATA {}' SPARQL query for transaction member ${ memberIRI . value } ` ) ;
213240 query = UPDATE ( store , this . config . forVirtuoso , this . config . targetNamedGraph ) ;
214241 } else {
215- // Determine if we have a named graph (either explicitly configure or as the member itself)
216- const ng = this . getNamedGraphIfAny ( memberIRI , this . config . memberIsGraph , this . config . targetNamedGraph ) ;
217- // No change semantics are provided so we do a DELETE/INSERT query by default
218- this . logger . info ( `Preparing 'DELETE {} WHERE {} + INSERT DATA {}' SPARQL query for member ${ memberIRI . value } ` ) ;
219- query = UPDATE ( store , this . config . forVirtuoso , ng ) ;
242+ // Check operation mode
243+ if ( this . config . operationMode === OperationMode . REPLICATION ) {
244+ this . memberBatch . push ( ...store . getQuads ( null , null , null , null ) ) ;
245+ this . batchCount ++ ;
246+ if ( this . batchCount < this . config . memberBatchSize ! ) {
247+ continue ;
248+ }
249+ } else {
250+ // Determine if we have a named graph (either explicitly configure or as the member itself)
251+ const ng = this . getNamedGraphIfAny ( memberIRI , this . config . memberIsGraph , this . config . targetNamedGraph ) ;
252+ // No change semantics are provided so we do a DELETE/INSERT query by default
253+ this . logger . info ( `Preparing 'DELETE {} WHERE {} + INSERT DATA {}' SPARQL query for member ${ memberIRI . value } ` ) ;
254+ query = UPDATE ( store , this . config . forVirtuoso , ng ) ;
255+ }
220256 }
221257 }
222258 } else {
223259 // Non-SDS data
224260
225- // TODO: Handle change semantics(?) and transactions for non-SDS data
226- this . logger . info ( `Preparing 'DELETE {} WHERE {} + INSERT DATA {}' SPARQL query for received triples (${ store . size } )` ) ;
227- query = UPDATE ( store , this . config . forVirtuoso , this . config . targetNamedGraph ) ;
261+ // Check operation mode
262+ if ( this . config . operationMode === OperationMode . REPLICATION ) {
263+ this . memberBatch . push ( ...store . getQuads ( null , null , null , null ) ) ;
264+ this . batchCount ++ ;
265+ if ( this . batchCount < this . config . memberBatchSize ! ) {
266+ continue ;
267+ }
268+ } else {
269+ // TODO: Handle change semantics(?) and transactions for non-SDS data
270+ this . logger . info ( `Preparing 'DELETE {} WHERE {} + INSERT DATA {}' SPARQL query for received triples (${ store . size } )` ) ;
271+ query = UPDATE ( store , this . config . forVirtuoso , this . config . targetNamedGraph ) ;
272+ }
228273 }
229274
230275 // Execute the update query
@@ -255,7 +300,56 @@ export class SPARQLIngest extends Processor<SPARQLIngestArgs> {
255300 await this . sparqlWriter . string ( query . join ( "\n" ) ) ;
256301 }
257302 } else {
258- this . logger . warn ( `No query generated for member ${ memberIRI . value } ` ) ;
303+ if ( this . config . operationMode === OperationMode . REPLICATION ) {
304+ try {
305+ // Execute the ingestion of the collected member batch via the SPARQL Graph Store protocol
306+ const t0 = Date . now ( ) ;
307+ await doSPARQLRequest ( this . memberBatch , this . config , this . doSPARQLRequestLogger ) ;
308+ const reqTime = Date . now ( ) - t0 ;
309+ if ( this . config . measurePerformance ) {
310+ this . requestsPerformance . push ( reqTime ) ;
311+ }
312+ this . logger . info ( `Executed query on remote SPARQL server ${ this . config . graphStoreUrl } (took ${ reqTime } ms)` ) ;
313+ this . batchCount = 0 ;
314+ this . memberBatch = [ ] ;
315+ } catch ( error ) {
316+ if ( ! this . config . measurePerformance || this . config . measurePerformance . failureIsFatal ) {
317+ this . logger . error ( `Error executing query on remote SPARQL server ${ this . config . graphStoreUrl } : ${ error } ` ) ;
318+ throw error ;
319+ } else {
320+ if ( this . config . measurePerformance ) {
321+ this . requestsPerformance . push ( - 1 ) ; // -1 indicates a failure
322+ }
323+ }
324+ }
325+ } else {
326+ this . logger . warn ( `No query generated for member ${ memberIRI . value } ` ) ;
327+ }
328+ }
329+ }
330+
331+ // Flush remaining member batch if any
332+ if ( this . config . operationMode === OperationMode . REPLICATION && this . memberBatch . length > 0 ) {
333+ try {
334+ // Execute the ingestion of the collected member batch via the SPARQL Graph Store protocol
335+ const t0 = Date . now ( ) ;
336+ await doSPARQLRequest ( this . memberBatch , this . config , this . doSPARQLRequestLogger ) ;
337+ const reqTime = Date . now ( ) - t0 ;
338+ if ( this . config . measurePerformance ) {
339+ this . requestsPerformance . push ( reqTime ) ;
340+ }
341+ this . logger . info ( `Executed query on remote SPARQL server ${ this . config . graphStoreUrl } (took ${ reqTime } ms)` ) ;
342+ this . batchCount = 0 ;
343+ this . memberBatch = [ ] ;
344+ } catch ( error ) {
345+ if ( ! this . config . measurePerformance || this . config . measurePerformance . failureIsFatal ) {
346+ this . logger . error ( `Error executing query on remote SPARQL server ${ this . config . graphStoreUrl } : ${ error } ` ) ;
347+ throw error ;
348+ } else {
349+ if ( this . config . measurePerformance ) {
350+ this . requestsPerformance . push ( - 1 ) ; // -1 indicates a failure
351+ }
352+ }
259353 }
260354 }
261355
0 commit comments