Skip to content

Commit 66c8bd4

Browse files
authored
Fixes #4244: Make apoc.dv.* procedures work in clusters (#4281)
* Fixes #4244: Make apoc.dv.* procedures work in clusters * added procs to extended*.txt * fix tests * cleanup
1 parent d453d5e commit 66c8bd4

16 files changed

+898
-359
lines changed

docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it!
55
= apoc.dv.catalog.add
66
:description: This section contains reference documentation for the apoc.dv.catalog.add procedure.
77

8-
label:procedure[] label:apoc-extended[]
8+
label:procedure[] label:apoc-extended[] label:deprecated[]
99

1010
[.emphasis]
1111
Add a virtualized resource configuration
@@ -17,6 +17,8 @@ Add a virtualized resource configuration
1717
apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
1818
----
1919

20+
include::partial$/dv/deprecated.adoc[]
21+
2022
[WARNING]
2123
====
2224
This procedure is not intended to be used in a cluster environment, and may act unpredictably.

docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it!
55
= apoc.dv.catalog.list
66
:description: This section contains reference documentation for the apoc.dv.catalog.list procedure.
77

8-
label:procedure[] label:apoc-extended[]
8+
label:procedure[] label:apoc-extended[] label:deprecated[]
99

1010
[.emphasis]
1111
List all virtualized resource configuration
@@ -17,6 +17,8 @@ List all virtualized resource configuration
1717
apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
1818
----
1919

20+
include::partial$/dv/deprecated.adoc[]
21+
2022
== Output parameters
2123
[.procedures, opts=header]
2224
|===

docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it!
55
= apoc.dv.catalog.remove
66
:description: This section contains reference documentation for the apoc.dv.catalog.remove procedure.
77

8-
label:procedure[] label:apoc-extended[]
8+
label:procedure[] label:apoc-extended[] label:deprecated[]
99

1010
[.emphasis]
1111
Remove a virtualized resource config by name
@@ -17,6 +17,8 @@ Remove a virtualized resource config by name
1717
apoc.dv.catalog.remove(name :: STRING?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
1818
----
1919

20+
include::partial$/dv/deprecated.adoc[]
21+
2022
[WARNING]
2123
====
2224
This procedure is not intended to be used in a cluster environment, and may act unpredictably.

docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
= Virtual Resource
33
:description: This chapter describes how to handle external data sources as virtual resource without persisting them in the database
44

5+
include::partial$systemdbonly.note.adoc[]
6+
57
[NOTE]
68
====
79
There are situations where we would like to enrich/complement the results of a cypher query in a Neo4j graph with additional
@@ -40,10 +42,11 @@ image::apoc.dv.imported-graph-from-RDB.png[scaledwidth="100%"]
4042
== Managing a Virtualized Resource via JDBC
4143

4244
=== Creating a Virtualized Resource (JDBC)
43-
Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.add` procedure.
44-
The procedure takes two parameters:
45+
Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.install` procedure.
46+
The procedure takes three parameters:
4547

4648
* a name that uniquely identifies the virtualized resource and can be used to query that resource
49+
* the database name where we want to use the resource (default is `'neo4j'`)
4750
* a set of parameters indicating the type of the resource (type), the access point (url), the parameterised query
4851
that will be run on the access point (query) and the labels that will be applied to the generated virtual nodes (labels).
4952

@@ -56,7 +59,7 @@ Here is the cypher that creates such virtualized resource:
5659

5760
[source,cypher]
5861
----
59-
CALL apoc.dv.catalog.add("fr-towns-by-dept", {
62+
CALL apoc.dv.catalog.install("fr-towns-by-dept", "neo4j", {
6063
type: "JDBC",
6164
url: "jdbc:postgresql://localhost/communes?user=jb&password=jb",
6265
labels: ["Town","PopulatedPlace"],
@@ -124,19 +127,19 @@ RETURN path
124127
----
125128

126129
=== Listing the Virtualized Resource Catalog
127-
The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It takes no parameters.
130+
The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It accepts one parameter: i.e. the database name where we want to use the resource (default is 'neo4j').
128131

129132
[source,cypher]
130133
----
131-
CALL apoc.dv.catalog.list()
134+
CALL apoc.dv.catalog.show()
132135
----
133136

134137
=== Removing Virtualized Resources from the Catalog
135138
When a Virtualized Resource is no longer needed it can be removed from the catalog by using the apoc.dv.catalog.remove procedure passing as parameter the unique name of the VR.
136139

137140
[source,cypher]
138141
----
139-
CALL apoc.dv.catalog.remove("vr-name")
142+
CALL apoc.dv.catalog.drop("vr-name", <dbName>)
140143
----
141144

142145
=== Export metadata
@@ -165,7 +168,7 @@ Here is the cypher that creates such virtualized resource:
165168

166169
[source,cypher]
167170
----
168-
CALL apoc.dv.catalog.add("prod-details-by-id", {
171+
CALL apoc.dv.catalog.install("prod-details-by-id", "neo4j", {
169172
type: "CSV",
170173
url: "http://data.neo4j.com/northwind/products.csv",
171174
labels: ["ProductDetails"],
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[WARNING]
2+
====
3+
Please note that this procedure is deprecated.
4+
5+
Use the following ones instead, which allow for better support in a cluster:
6+
7+
[opts="header"]
8+
|===
9+
| deprecated procedure | new procedure
10+
| `apoc.dv.catalog.add(<name>, $config)` | `apoc.dv.catalog.install('<name>', '<dbName>', $config)`
11+
| `apoc.dv.catalog.remove('<name>')` | `apoc.dv.catalog.drop('<name>', '<dbName>')`
12+
| `apoc.dv.catalog.list()` | `apoc.dv.catalog.show('<dbName>')`
13+
|===
14+
15+
where `<dbName>` is the database where we want to execute the procedure
16+
17+
xref::virtual-resource/index.adoc[See here for more info].
18+
19+
====
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package apoc.dv;
2+
3+
import apoc.util.Neo4jContainerExtension;
4+
import apoc.util.TestContainerUtil;
5+
import apoc.util.TestcontainersCausalCluster;
6+
import org.apache.commons.io.FileUtils;
7+
import org.junit.AfterClass;
8+
import org.junit.BeforeClass;
9+
import org.junit.Test;
10+
import org.neo4j.driver.Driver;
11+
import org.neo4j.driver.Result;
12+
import org.neo4j.driver.Session;
13+
import org.neo4j.driver.SessionConfig;
14+
import org.neo4j.driver.types.Node;
15+
import org.neo4j.driver.types.Path;
16+
import org.neo4j.driver.types.Relationship;
17+
18+
import java.io.File;
19+
import java.net.URI;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.function.Consumer;
24+
25+
import static apoc.dv.DataVirtualizationCatalogTestUtil.*;
26+
import static apoc.util.ExtendedTestContainerUtil.dbIsWriter;
27+
import static apoc.util.ExtendedTestContainerUtil.getBoltAddress;
28+
import static apoc.util.ExtendedTestContainerUtil.getDriverIfNotReplica;
29+
import static apoc.util.MapUtil.map;
30+
import static apoc.util.SystemDbUtil.PROCEDURE_NOT_ROUTED_ERROR;
31+
import static apoc.util.TestContainerUtil.importFolder;
32+
import static apoc.util.TestContainerUtil.testCall;
33+
import static apoc.util.TestContainerUtil.testCallEmpty;
34+
import static org.junit.Assert.assertEquals;
35+
import static org.junit.Assert.assertTrue;
36+
import static org.junit.Assert.fail;
37+
import static org.neo4j.configuration.GraphDatabaseSettings.SYSTEM_DATABASE_NAME;
38+
39+
40+
public class DataVirtualizationCatalogClusterRoutingTest {
41+
private static final int NUM_CORES = 3;
42+
private static TestcontainersCausalCluster cluster;
43+
private static Session clusterSession;
44+
private static List<Neo4jContainerExtension> members;
45+
46+
@BeforeClass
47+
public static void setupCluster() throws Exception {
48+
cluster = TestContainerUtil
49+
.createEnterpriseCluster(List.of(TestContainerUtil.ApocPackage.EXTENDED, TestContainerUtil.ApocPackage.CORE), NUM_CORES, 0,
50+
Collections.emptyMap(),
51+
Map.of("NEO4J_dbms_routing_enabled", "true")
52+
);
53+
clusterSession = cluster.getSession();
54+
members = cluster.getClusterMembers();
55+
FileUtils.copyFileToDirectory(new File(new URI(FILE_URL).toURL().getPath()), importFolder);
56+
assertEquals(NUM_CORES, members.size());
57+
}
58+
59+
@AfterClass
60+
public static void bringDownCluster() {
61+
cluster.close();
62+
}
63+
64+
@Test
65+
public void testVirtualizeCSV() {
66+
dvInSysLeaderMemberCommon(PROCEDURE_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME,
67+
(session) -> testCall(session, APOC_DV_INSTALL_QUERY,
68+
APOC_DV_INSTALL_PARAMS,
69+
(row) -> assertCatalogContent(row, CSV_TEST_FILE)), APOC_DV_INSTALL_PARAMS
70+
);
71+
72+
clusterSession.executeRead(tx -> {
73+
final Result result = tx.run(APOC_DV_QUERY,
74+
Map.of(NAME_KEY, CSV_NAME_VALUE,
75+
APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS,
76+
CONFIG_KEY, CONFIG_VALUE)
77+
);
78+
79+
Node node = result.single().get(NODE_KEY).asNode();
80+
assertEquals(NAME_VALUE, node.get(NAME_KEY).asString());
81+
assertEquals(AGE_VALUE, node.get(AGE_KEY).asString());
82+
assertEquals(List.of(LABELS_VALUE), node.labels());
83+
84+
return result.consume();
85+
}
86+
);
87+
88+
clusterSession.executeWrite(tx -> tx.run(CREATE_HOOK_QUERY, CREATE_HOOK_PARAMS).consume());
89+
90+
clusterSession.executeRead(tx -> {
91+
final Result result = tx.run(APOC_DV_QUERY_AND_LINK_QUERY,
92+
map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE)
93+
);
94+
95+
Path path = result.single().get("path").asPath();
96+
Node node = path.end();
97+
assertEquals(NAME_VALUE, node.get(NAME_KEY).asString());
98+
assertEquals(AGE_VALUE, node.get(AGE_KEY).asString());
99+
assertEquals(List.of(LABELS_VALUE), node.labels());
100+
101+
Node hook = path.start();
102+
assertEquals(HOOK_NODE_NAME_VALUE, hook.get(NAME_KEY).asString());
103+
assertEquals(List.of("Hook"), hook.labels());
104+
105+
Relationship relationship = path.relationships().iterator().next();
106+
assertEquals(hook.elementId(), relationship.startNodeElementId());
107+
assertEquals(node.elementId(), relationship.endNodeElementId());
108+
assertEquals(RELTYPE_VALUE, relationship.type());
109+
110+
return result.consume();
111+
}
112+
);
113+
114+
dvInSysLeaderMemberCommon(PROCEDURE_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME,
115+
(session) -> testCallEmpty(session, APOC_DV_DROP_QUERY,
116+
APOC_DV_DROP_PARAMS), APOC_DV_DROP_PARAMS
117+
);
118+
119+
}
120+
121+
private static void dvInSysLeaderMemberCommon(String uuidNotRoutedError, String dbName, Consumer<Session> testDv, Map<String, Object> params) {
122+
dvInSysLeaderMemberCommon(uuidNotRoutedError, dbName, testDv, false, params);
123+
}
124+
125+
private static void dvInSysLeaderMemberCommon(String uuidNotRoutedError, String dbName, Consumer<Session> testDv, boolean readOnlyOperation, Map<String, Object> params) {
126+
final List<Neo4jContainerExtension> members = cluster.getClusterMembers();
127+
assertEquals(NUM_CORES, members.size());
128+
boolean writeExecuted = false;
129+
for (Neo4jContainerExtension container: members) {
130+
// we skip READ_REPLICA members with write operations
131+
// instead, we consider all members with a read only operations
132+
final Driver driver = readOnlyOperation
133+
? container.getDriver()
134+
: getDriverIfNotReplica(container);
135+
if (driver == null) {
136+
continue;
137+
}
138+
Session session = driver.session(SessionConfig.forDatabase(dbName));
139+
boolean isWriter = dbIsWriter(dbName, session, getBoltAddress(container));
140+
if (isWriter) {
141+
testDv.accept(session);
142+
writeExecuted = true;
143+
} else {
144+
try {
145+
testDv.accept(session);
146+
fail("Should fail because of non leader Data Virtualization addition");
147+
} catch (Exception e) {
148+
String errorMsg = e.getMessage();
149+
assertTrue("The actual message is: " + errorMsg, errorMsg.contains(uuidNotRoutedError));
150+
}
151+
}
152+
}
153+
assertTrue(writeExecuted);
154+
}
155+
}

0 commit comments

Comments
 (0)