Skip to content

Commit ad1c8bd

Browse files
committed
fixed #162 - optimized the mlcp transform six ways to Sunday
fixed #301
1 parent 1123ae9 commit ad1c8bd

File tree

24 files changed

+500
-165
lines changed

24 files changed

+500
-165
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/JobStatusListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22

33
public interface JobStatusListener {
44
public void onStatusChange(long jobId, int percentComplete, String message);
5+
public void onJobFinished();
56
}

marklogic-data-hub/src/main/java/com/marklogic/hub/commands/LoadUserModulesCommand.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.marklogic.client.modulesloader.impl.PropertiesModuleManager;
2323
import com.marklogic.client.modulesloader.impl.XccAssetLoader;
2424
import com.marklogic.hub.HubConfig;
25+
import com.marklogic.hub.flow.FlowCacheInvalidator;
2526
import com.marklogic.hub.util.HubFileFilter;
2627

2728
public class LoadUserModulesCommand extends AbstractCommand {
@@ -97,6 +98,10 @@ else if (isHarmonizeDir) {
9798
}
9899
}
99100
});
101+
102+
// invalidate the server's flow cache
103+
FlowCacheInvalidator invalidator = new FlowCacheInvalidator(stagingClient);
104+
invalidator.invalidateCache();
100105
}
101106
}
102107
catch (IOException e) {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.marklogic.hub.flow;
2+
3+
import com.marklogic.client.DatabaseClient;
4+
import com.marklogic.client.extensions.ResourceManager;
5+
import com.marklogic.client.io.StringHandle;
6+
import com.marklogic.client.util.RequestParameters;
7+
8+
/**
9+
* Invalidates the server cache for flows. The server caches flow definitions
10+
* for optimal performance. This just makes sure that the cache gets invalidated.
11+
*/
12+
public class FlowCacheInvalidator extends ResourceManager {
13+
14+
static final public String NAME = "flow";
15+
16+
private DatabaseClient client;
17+
18+
public FlowCacheInvalidator(DatabaseClient client) {
19+
super();
20+
this.client = client;
21+
this.client.init(NAME, this);
22+
}
23+
24+
public void invalidateCache() {
25+
RequestParameters params = new RequestParameters();
26+
this.getServices().delete(params, new StringHandle());
27+
}
28+
}

marklogic-data-hub/src/main/java/com/marklogic/spring/batch/hub/RunHarmonizeFlowConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public void beforeJob(JobExecution jobExecution) {
4646
public void afterJob(JobExecution jobExecution) {
4747
if (statusListener != null) {
4848
statusListener.onStatusChange(jobId, 100, "");
49+
statusListener.onJobFinished();
4950
}
5051
}
5152
}).build();

marklogic-data-hub/src/main/resources/ml-modules/root/com.marklogic.hub/lib/debug-lib.xqy

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ xquery version "1.0-ml";
1717

1818
module namespace debug = "http://marklogic.com/data-hub/debug";
1919

20+
import module namespace hul = "http://marklogic.com/data-hub/hub-utils-lib"
21+
at "/com.marklogic.hub/lib/hub-utils-lib.xqy";
22+
2023
declare option xdmp:mapping "false";
2124

2225
declare function debug:enable($enabled as xs:boolean)
@@ -28,9 +31,11 @@ declare function debug:enable($enabled as xs:boolean)
2831
xdmp:document-insert(
2932
"/com.marklogic.hub/settings/__debug_enabled__.xml",
3033
element debug:is-debugging-enabled { if ($enabled) then 1 else 0 })
31-
',
32-
map:new((map:entry("enabled", $enabled))),
33-
map:new(map:entry("database", xdmp:modules-database())))
34+
',
35+
map:new((map:entry("enabled", $enabled))),
36+
map:new(map:entry("database", xdmp:modules-database()))
37+
),
38+
hul:invalidate-field-cache("debugging-enabled")
3439
};
3540

3641
(:~
@@ -40,16 +45,19 @@ declare function debug:enable($enabled as xs:boolean)
4045
:)
4146
declare function debug:on() as xs:boolean
4247
{
43-
xdmp:eval('
44-
declare namespace debug = "http://marklogic.com/data-hub/debug";
45-
fn:exists(
46-
cts:search(
47-
fn:doc("/com.marklogic.hub/settings/__debug_enabled__.xml"),
48-
cts:element-value-query(xs:QName("debug:is-debugging-enabled"), "1", ("exact")),
49-
("unfiltered", "score-zero", "unchecked", "unfaceted")
48+
hul:from-field-cache("debugging-enabled", function() {
49+
xdmp:eval('
50+
declare namespace debug = "http://marklogic.com/data-hub/debug";
51+
fn:exists(
52+
cts:search(
53+
fn:doc("/com.marklogic.hub/settings/__debug_enabled__.xml"),
54+
cts:element-value-query(xs:QName("debug:is-debugging-enabled"), "1", ("exact")),
55+
("unfiltered", "score-zero", "unchecked", "unfaceted")
56+
)
5057
)
51-
)
52-
',(), map:new(map:entry("database", xdmp:modules-database())))
58+
',(), map:new(map:entry("database", xdmp:modules-database())))
59+
},
60+
xs:dayTimeDuration("PT1M"))
5361
};
5462

5563
(:~
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
xquery version "1.0-ml";
2+
3+
module namespace err = "http://marklogic.com/data-hub/err";
4+
5+
import module namespace functx = "http://www.functx.com"
6+
at "/MarkLogic/functx/functx-1.0-nodoc-2007-01.xqy";
7+
8+
declare option xdmp:mapping "false";
9+
10+
declare %private function err:to-camel($str)
11+
{
12+
let $toks := fn:tokenize($str, "-")
13+
return
14+
fn:string-join((
15+
$toks[1],
16+
fn:subsequence($toks, 2) ! functx:capitalize-first(.)
17+
), "")
18+
};
19+
20+
declare function err:error-to-json($error as element(error:error))
21+
{
22+
let $o := json:object()
23+
let $_ :=
24+
for $e in $error/*[fn:not(self::error:stack or self::error:data)]
25+
return
26+
map:put($o, err:to-camel(fn:local-name($e)), fn:data($e))
27+
let $_ :=
28+
let $data := json:array()
29+
let $_ :=
30+
for $datum in $error/error:data/error:datum
31+
return
32+
json:array-push($data, fn:string($datum))
33+
return
34+
map:put($o, "data", $data)
35+
let $_ :=
36+
let $stack := json:array()
37+
let $_ :=
38+
for $frame in $error/error:stack/error:frame
39+
let $f := json:object()
40+
let $_ :=
41+
for $e in $frame/*[fn:not(self::error:variables)]
42+
return
43+
map:put($f, err:to-camel(fn:local-name($e)), fn:data($e))
44+
let $_ :=
45+
let $variables := json:object()
46+
let $_ :=
47+
for $variable in $frame/error:variables/error:variable
48+
return
49+
map:put($variables, $variable/error:name/fn:string(), $variable/error:value/fn:data(.))
50+
return
51+
map:put($f, "variables", $variables)
52+
return
53+
json:array-push($stack, $f)
54+
return
55+
map:put($o, "stacks", $stack)
56+
return
57+
$o
58+
};

marklogic-data-hub/src/main/resources/ml-modules/root/com.marklogic.hub/lib/flow-lib.xqy

Lines changed: 93 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -220,17 +220,30 @@ declare function flow:get-flow(
220220
$flow-name as xs:string,
221221
$flow-type as xs:string?) as element(hub:flow)?
222222
{
223-
let $uris :=
224-
hul:run-in-modules(function() {
225-
let $type :=
226-
if ($flow-type) then $flow-type
227-
else "*"
223+
let $cache-name := fn:string-join(("flow:", $entity-name, $flow-name, $flow-type), "-")
224+
return
225+
hul:from-field-cache($cache-name, function() {
226+
let $uris :=
227+
hul:run-in-modules(function() {
228+
let $type :=
229+
if ($flow-type) then $flow-type
230+
else "*"
231+
return
232+
cts:uri-match($ENTITIES-DIR || $entity-name || "/" || $type || "/" || $flow-name || "/*")
233+
})
234+
where fn:count($uris) > 0
228235
return
229-
cts:uri-match($ENTITIES-DIR || $entity-name || "/" || $type || "/" || $flow-name || "/*")
230-
})
231-
where fn:count($uris) > 0
236+
flow:get-flow($entity-name, $flow-name, $flow-type, $uris)
237+
},
238+
xs:dayTimeDuration("PT10M"))
239+
};
240+
241+
declare function flow:invalidate-flow-caches()
242+
{
243+
for $name in xdmp:get-server-field-names()
244+
where fn:starts-with($name, "flow:")
232245
return
233-
flow:get-flow($entity-name, $flow-name, $flow-type, $uris)
246+
xdmp:set-server-field($name, ())
234247
};
235248

236249
(:~
@@ -591,67 +604,65 @@ declare function flow:run-plugin(
591604
"create-" || $destination
592605
let $func := xdmp:function(fn:QName($ns, $func-name), $module-uri)
593606
let $trace-input :=
594-
if (trace:enabled()) then
595-
if ($data-format = 'application/xml') then
596-
switch($destination)
597-
case "content" return
598-
if ($flow-type = "input") then
599-
let $kind := try {
600-
xdmp:node-kind($content)
601-
}
602-
catch($ex) {}
603-
return
604-
if ($kind eq "binary") then
605-
element rawContent { "binary document" }
606-
else
607-
element rawContent { $content }
608-
else
609-
()
610-
case "headers" return
611-
element content { $content }
612-
case "triples" return
613-
(
614-
element content { $content },
615-
element headers { $headers }
616-
)
617-
default return ()
618-
else
619-
let $o := json:object()
620-
let $_ :=
621-
let $content :=
622-
(
623-
if ($content instance of document-node()) then
624-
$content/node()
607+
if ($data-format = 'application/xml') then
608+
switch($destination)
609+
case "content" return
610+
if ($flow-type = "input") then
611+
let $kind := try {
612+
xdmp:node-kind($content)
613+
}
614+
catch($ex) {}
615+
return
616+
if ($kind eq "binary") then
617+
element rawContent { "binary document" }
625618
else
626-
$content,
627-
null-node{}
628-
)[1]
629-
return
630-
switch($destination)
631-
case "content" return
632-
if ($flow-type = "input") then
633-
let $kind := try {
634-
xdmp:node-kind($content)
635-
}
636-
catch($ex) {}
637-
return
638-
if ($kind eq "binary") then
639-
map:put($o, "rawContent", "binary document")
640-
else
641-
map:put($o, "rawContent", $content)
642-
else
643-
()
644-
case "headers" return
645-
map:put($o, "content", $content)
646-
case "triples" return
647-
(
648-
map:put($o, "content", $content),
649-
map:put($o, "headers", ($headers, null-node{})[1])
650-
)
651-
default return ()
619+
element rawContent { $content }
620+
else
621+
()
622+
case "headers" return
623+
element content { $content }
624+
case "triples" return
625+
(
626+
element content { $content },
627+
element headers { $headers }
628+
)
629+
default return ()
630+
else
631+
let $o := json:object()
632+
let $_ :=
633+
let $content :=
634+
(
635+
if ($content instance of document-node()) then
636+
$content/node()
637+
else
638+
$content,
639+
null-node{}
640+
)[1]
652641
return
653-
$o
654-
else ()
642+
switch($destination)
643+
case "content" return
644+
if ($flow-type = "input") then
645+
let $kind := try {
646+
xdmp:node-kind($content)
647+
}
648+
catch($ex) {}
649+
return
650+
if ($kind eq "binary") then
651+
map:put($o, "rawContent", "binary document")
652+
else
653+
map:put($o, "rawContent", $content)
654+
else
655+
()
656+
case "headers" return
657+
map:put($o, "content", $content)
658+
case "triples" return
659+
(
660+
map:put($o, "content", $content),
661+
map:put($o, "headers", ($headers, null-node{})[1])
662+
)
663+
default return ()
664+
return
665+
$o
655666
let $before := xdmp:elapsed-time()
656667
let $resp :=
657668
try {
@@ -720,20 +731,18 @@ declare function flow:run-plugin(
720731
else
721732
$resp
722733
let $_ :=
723-
if (trace:enabled()) then
724-
trace:plugin-trace(
725-
$identifier,
726-
$module-uri,
727-
$destination,
728-
$flow-type,
729-
$trace-input,
730-
if ($data-format = 'application/xml') then
731-
$resp
732-
else
733-
($resp, null-node{})[1],
734-
$duration
735-
)
736-
else ()
734+
trace:plugin-trace(
735+
$identifier,
736+
$module-uri,
737+
$destination,
738+
$flow-type,
739+
$trace-input,
740+
if ($data-format = 'application/xml') then
741+
$resp
742+
else
743+
($resp, null-node{})[1],
744+
$duration
745+
)
737746
return
738747
$resp
739748
};
@@ -853,6 +862,7 @@ declare function flow:validate-entities()
853862
let $_ :=
854863
for $entity in flow:get-entities()/hub:entity
855864
for $flow in $entity/hub:flows/hub:flow
865+
let $_ := xdmp:log(("flow!", $flow))
856866
let $data-format := $flow/hub:data-format
857867
(: validate collector :)
858868
let $_ :=
@@ -888,7 +898,7 @@ declare function flow:validate-entities()
888898
}
889899
(: validate plugins :)
890900
let $_ :=
891-
for $plugin in $flow/hub:plugins/hub:plugin
901+
for $plugin in $flow/hub:plugins/hub:plugin[fn:exists(@module)]
892902
let $destination := $plugin/@dest
893903
let $module-uri := $plugin/@module
894904
let $filename as xs:string := hul:get-file-from-uri($module-uri)

0 commit comments

Comments
 (0)