"scriptLines": [ "parameters{", " Offset as integer (0),", " Limit as integer (200),", " ApiVersion as integer (2),", " StartTime as string ('0001-01-01T00:00:00Z'),", " EndTime as string ('9999-12-31T23:59:59Z'),", " ContainerName as string ('dicom'),", " InstanceTablePath as string ('instance'),", " SeriesTablePath as string ('series'),", " StudyTablePath as string ('study'),", " RetentionHours as integer (720)", "}", "source(output(", " body as (action as string, metadata as (undefined as string), partitionName as string, sequence as short, seriesInstanceUid as string, sopInstanceUid as string, state as string, studyInstanceUid as string, timestamp as string),", " headers as [string,string]", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " format: 'rest',", " timeout: 30,", " requestInterval: 0,", " entity: (concat('/v', toString($ApiVersion), '/changefeed')),", " queryParameters: ['includeMetadata' -> 'true', 'offset' -> ($Offset), 'limit' -> ($Limit), 'startTime' -> ($StartTime), 'endTime' -> ($EndTime)],", " httpMethod: 'GET',", " responseFormat: ['type' -> 'json', 'documentForm' -> 'arrayOfDocuments']) ~> changeFeed", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " sopInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " sopClassUid as string,", " metadata as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath)) ~> existingInstances", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " instanceCount as long", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath)) ~> existingSeries", "flattened derive(timestamp = toTimestamp(substring(timestamp, 1, 23), 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS', 'UTC'),", " studyDate = toDate(byPath('metadata.{00080020}.Value[1]'), 'yyyyMMdd', 'UTC'),", " studyDescription = toString(byPath('metadata.{00081030}.Value[1]')),", " issuerOfPatientId = toString(byPath('metadata.{00100021}.Value[1]')),", " patientId = toString(byPath('metadata.{00100020}.Value[1]')),", " patientName = toString(byPath('metadata.{00100010}.Value[1].Alphabetic')),", " modality = toString(byPath('metadata.{00080060}.Value[1]')),", " sopClassUid = toString(byPath('metadata.{00080016}.Value[1]')),", " metadata = toString(byPath('metadata'))) ~> extracted", "changeFeed select(mapColumn(", " action = body.action,", " timestamp = body.timestamp,", " partitionName = body.partitionName,", " studyInstanceUid = body.studyInstanceUid,", " seriesInstanceUid = body.seriesInstanceUid,", " sopInstanceUid = body.sopInstanceUid,", " metadata = body.metadata", " ),", " skipDuplicateMapInputs: false,", " skipDuplicateMapOutputs: false) ~> flattened", "upToDate alterRow(upsertIf(or(equals(action,'Create'),equals(action,'Update'))),", " deleteIf(equals(action,'Delete'))) ~> instanceSinkUpdates", "extracted aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " sopInstanceUid),", " action = last(action),", " lastModifiedTimestamp = last(timestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " modality = last(modality),", " sopClassUid = last(sopClassUid),", " metadata = last(metadata)) ~> aggregatedChanges", "existingInstances aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " modality = last(modality),", " instanceCount = count()) ~> allSeries", "existingSeries aggregate(groupBy(partitionName,", " studyInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " instanceCount = sum(instanceCount),", " seriesCount = count()) ~> allStudies", "modifiedSeries alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> seriesSinkUpdates", "modifiedStudies alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> studySinkUpdate", "upToDate aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid),", " instanceDifference = sum(iif(equals(action, 'Create'), 1, iif(equals(action, 'Delete'), -1, 0)))) ~> seriesChanges", "seriesChanges aggregate(groupBy(partitionName,", " studyInstanceUid),", " instanceDifference = sum(instanceDifference)) ~> studyChanges", "annotatedSeries filter(hasChange) ~> modifiedSeries", "allSeries derive(hasChange = not(isNull(seriesCache#lookup(partitionName, studyInstanceUid, seriesInstanceUid)))) ~> annotatedSeries", "allStudies derive(hasChange = not(isNull(studyCache#lookup(partitionName, studyInstanceUid)))) ~> annotatedStudies", "annotatedStudies filter(hasChange) ~> modifiedStudies", "aggregatedChanges filter(or(not(isNull(metadata)), equals(action, 'Delete'))) ~> upToDate", "instanceSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable:true,", " insertable:false,", " updateable:false,", " upsertable:true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid','sopInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 1,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " sopInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " sopClassUid,", " metadata", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> instanceTable", "seriesSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable:true,", " insertable:false,", " updateable:false,", " upsertable:true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 2,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> seriesTable", "studySinkUpdate sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($StudyTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable:true,", " insertable:false,", " updateable:false,", " upsertable:true,", " keys:['partitionName','studyInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 3,", " mapColumn(", " partitionName,", " studyInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " seriesCount,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> studyTable", "seriesChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> seriesCache", "studyChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> studyCache" ]
0 commit comments