14
14
*/
15
15
package com .amazon .janusgraph ;
16
16
17
+ import static org .apache .tinkerpop .gremlin .process .traversal .dsl .graph .__ .inV ;
17
18
import static org .junit .Assert .fail ;
18
19
20
+ import java .io .BufferedReader ;
21
+ import java .io .IOException ;
22
+ import java .io .InputStreamReader ;
23
+ import java .net .URL ;
24
+ import java .util .List ;
25
+ import java .util .concurrent .ConcurrentHashMap ;
26
+ import java .util .concurrent .ConcurrentMap ;
19
27
import java .util .concurrent .ExecutionException ;
20
28
import java .util .concurrent .TimeUnit ;
29
+ import java .util .function .BiConsumer ;
30
+ import java .util .stream .Collectors ;
21
31
import java .util .stream .IntStream ;
22
32
23
33
import org .apache .commons .configuration .Configuration ;
28
38
import org .janusgraph .core .Cardinality ;
29
39
import org .janusgraph .core .JanusGraph ;
30
40
import org .janusgraph .core .JanusGraphFactory ;
41
+ import org .janusgraph .core .JanusGraphTransaction ;
42
+ import org .janusgraph .core .Multiplicity ;
31
43
import org .janusgraph .core .PropertyKey ;
32
44
import org .janusgraph .core .VertexLabel ;
33
45
import org .janusgraph .core .schema .ConsistencyModifier ;
34
46
import org .janusgraph .core .schema .JanusGraphIndex ;
47
+ import org .janusgraph .core .schema .JanusGraphManagement ;
35
48
import org .janusgraph .core .schema .SchemaAction ;
36
49
import org .janusgraph .core .schema .SchemaStatus ;
37
50
import org .janusgraph .diskstorage .BackendException ;
41
54
import org .janusgraph .graphdb .database .management .ManagementSystem ;
42
55
import org .janusgraph .graphdb .transaction .StandardJanusGraphTx ;
43
56
import org .junit .Test ;
57
+ import org .junit .experimental .categories .Category ;
44
58
45
59
import com .amazon .janusgraph .diskstorage .dynamodb .BackendDataModel ;
46
60
import com .amazon .janusgraph .diskstorage .dynamodb .DynamoDBStoreTransaction ;
61
+ import com .amazon .janusgraph .testcategory .IsolateRemainingTestsCategory ;
62
+ import com .google .common .base .Preconditions ;
47
63
import com .google .common .base .Stopwatch ;
48
64
65
+ import lombok .Getter ;
66
+ import lombok .ToString ;
67
+ import lombok .extern .slf4j .Slf4j ;
68
+
49
69
/**
50
70
*
51
71
* @author Alexander Patrikalakis
72
+ * @author John Stephenson
73
+ * @author Addison Slabaugh
52
74
*
53
75
*/
76
+ @ Slf4j
77
+ @ Category ({IsolateRemainingTestsCategory .class })
54
78
public class ScenarioTests {
55
79
56
- public static final String LABEL = "myLabel" ;
57
- public static final String BY_DATABASE_METADATA_VERSION = "byDatabaseMetadataVersion" ;
58
- public static final String DATABASE_METADATA_LABEL = "databaseMetadata" ;
59
- public static final String VERSION_PROPERTY = "version" ;
80
+ private static final String LABEL = "myLabel" ;
81
+ private static final String BY_DATABASE_METADATA_VERSION = "byDatabaseMetadataVersion" ;
82
+ private static final String DATABASE_METADATA_LABEL = "databaseMetadata" ;
83
+ private static final String VERSION_PROPERTY = "version" ;
60
84
private static final boolean USE_STORAGE_NATIVE_LOCKING = true ;
61
85
private static final boolean USE_JANUSGRAPH_LOCKING = false ;
62
86
private static final boolean USE_GRAPHINDEX_LOCKING = true ;
63
87
private static final boolean USE_EDGESTORE_LOCKING = true ;
64
- public static final String VERSION_ONE = "0.0.1" ;
65
- public static final String VERSION_TWO = "0.0.2" ;
66
- public static final String VERSION_THREE = "0.0.3" ;
88
+ private static final String VERSION_ONE = "0.0.1" ;
89
+ private static final String VERSION_TWO = "0.0.2" ;
90
+ private static final String VERSION_THREE = "0.0.3" ;
67
91
68
92
/**
69
93
* This test is to demonstrate performance in response to a report of elevated latency for committing 30 vertices.
70
94
* http://stackoverflow.com/questions/42899388/titan-dynamodb-local-incredibly-slow-8s-commit-for-30-vertices
71
- * @throws BackendException
95
+ * @throws BackendException in case cleanUpTables fails
72
96
*/
73
97
@ Test
74
98
public void performanceTest () throws BackendException {
@@ -92,12 +116,137 @@ public void lockingTest() throws BackendException, InterruptedException {
92
116
93
117
/**
94
118
* https://stackoverflow.com/questions/44535054/generalizing-dynamodb-janusgraph-factory-lock-and-schema-problems
95
- * @throws Exception
96
119
*/
120
+ public enum Relationship {
121
+ instanceOf , hotelBrandType ;
122
+ }
123
+ @ Getter
124
+ @ ToString
125
+ public class Triple {
126
+ private final String leftPropertyValue ;
127
+ private final String leftPropertyName ;
128
+ private final Relationship relationship ;
129
+ private final String rightPropertyValue ;
130
+ private final String rightPropertyName ;
131
+
132
+ Triple (final String [] line ) {
133
+ Preconditions .checkArgument (line .length == 3 );
134
+ final String [] left = line [0 ].split (":" );
135
+ final String [] right = line [2 ].split (":" );
136
+ this .leftPropertyName = left [0 ];
137
+ this .leftPropertyValue = left [1 ];
138
+ this .relationship = Relationship .valueOf (line [1 ]);
139
+ this .rightPropertyName = right [0 ];
140
+ this .rightPropertyValue = right [1 ];
141
+ }
142
+ }
143
+ private static void createHotelSchema (JanusGraph graph ) {
144
+ //another issue, you should only try to create the schema once.
145
+ //you use uniqueness constraints, so you need to define the schema up front with the unique() call,
146
+ //but if you did not use uniqueness constraints, you could just let JanusGraph create the schema for you.
147
+ final JanusGraphManagement mgmt = graph .openManagement ();
148
+ final PropertyKey brandtypePropertyKey = mgmt .makePropertyKey ("brandtype" ).dataType (String .class ).make ();
149
+ mgmt .buildIndex ("brandtypeIndex" , Vertex .class ).addKey (brandtypePropertyKey ).unique ().buildCompositeIndex ();
150
+ final PropertyKey namePropertyKey = mgmt .makePropertyKey ("name" ).dataType (String .class ).make ();
151
+ mgmt .buildIndex ("nameIndex" , Vertex .class ).addKey (namePropertyKey ).unique ().buildCompositeIndex ();
152
+ mgmt .makeEdgeLabel (Relationship .hotelBrandType .name ()).multiplicity (Multiplicity .MANY2ONE ).make ();
153
+ mgmt .makeEdgeLabel (Relationship .instanceOf .name ()).multiplicity (Multiplicity .MANY2ONE ).make ();
154
+ mgmt .commit ();
155
+ }
156
+
157
+ private void tripleIngestBase (BiConsumer <StandardJanusGraph , List <Triple >> writer ) throws BackendException {
158
+ final Stopwatch watch = Stopwatch .createStarted ();
159
+ final StandardJanusGraph graph = (StandardJanusGraph ) JanusGraphFactory .open (TestGraphUtil .instance .createTestGraphConfig (BackendDataModel .MULTI ));
160
+ log .info ("Created graph in " + watch .elapsed (TimeUnit .MILLISECONDS ) + " ms" );
161
+ watch .reset ();
162
+ watch .start ();
163
+ createHotelSchema (graph );
164
+ log .info ("Created schema in " + watch .elapsed (TimeUnit .MILLISECONDS ) + " ms" );
165
+
166
+ watch .reset ();
167
+ watch .start ();
168
+ final URL url = ScenarioTests .class .getClassLoader ().getResource ("META-INF/HotelTriples.txt" );
169
+ final List <Triple > lines ;
170
+ try (final BufferedReader bf = new BufferedReader (new InputStreamReader (url .openStream ()))) {
171
+ lines = bf .lines ()
172
+ .map (line -> line .split ("\t " ))
173
+ .map (Triple ::new )
174
+ .collect (Collectors .toList ());
175
+ } catch (IOException e ) {
176
+ throw new IllegalStateException ("Error processing triple file" , e );
177
+ }
178
+ log .info ("Read file into Triple objects in " + watch .elapsed (TimeUnit .MILLISECONDS ) + " ms" );
179
+ watch .reset ();
180
+ watch .start ();
181
+ writer .accept (graph , lines );
182
+ log .info ("Added objects in " + watch .elapsed (TimeUnit .MILLISECONDS ) + " ms" );
183
+ TestGraphUtil .instance .cleanUpTables ();
184
+ }
185
+
186
+ @ Test
187
+ public void processTripleWithTraversals () throws BackendException {
188
+ tripleIngestBase ((StandardJanusGraph graph , List <Triple > lines ) -> {
189
+ final GraphTraversalSource g = graph .traversal ();
190
+ lines .parallelStream ().forEach (triple -> {
191
+ final Vertex left = getVertexIfDoesntExist (g , triple .getLeftPropertyName (),
192
+ triple .getLeftPropertyValue ());
193
+ final Vertex right = getVertexIfDoesntExist (g , triple .getRightPropertyName (),
194
+ triple .getRightPropertyValue ());
195
+ //your original method was creating vertices in the processRelationship method.
196
+ //this caused the uniqueness constraint violation (one of a few issues in your
197
+ //original code) because you have a unique index on the rightPropertyName
198
+ g .V ()
199
+ .is (left )
200
+ .outE (triple .getRelationship ().name ())
201
+ .filter (inV ().is (right ))
202
+ .tryNext ()
203
+ .orElseGet (() -> left .addEdge (triple .getRelationship ().name (), right ));
204
+ });
205
+ final Stopwatch watch = Stopwatch .createStarted ();
206
+ g .tx ().commit ();
207
+ log .info ("Committed in " + watch .elapsed (TimeUnit .MILLISECONDS ) + " ms" );
208
+ watch .stop ();
209
+ });
210
+ }
211
+ private static Vertex getVertexIfDoesntExist (GraphTraversalSource g ,
212
+ String propertyName , String propertyValue ) {
213
+ return g .V ()
214
+ .has (propertyName , propertyValue )
215
+ .tryNext ()
216
+ .orElseGet (() -> g .addV ().property (propertyName , propertyValue ).next ());
217
+ }
218
+
97
219
@ Test
98
- public void tripleIngestTest (StandardJanusGraph graph ) throws Exception {
99
- TravelGraphFactory graphFactory = new TravelGraphFactory ();
100
- graphFactory .loadGraphFactory (graph , false );
220
+ public void processTripleWithMaps () throws BackendException {
221
+ final ConcurrentMap <String , Vertex > brandTypeMap = new ConcurrentHashMap <>();
222
+ final ConcurrentMap <String , Vertex > companyMap = new ConcurrentHashMap <>();
223
+ final ConcurrentMap <String , Vertex > hotelBrandMap = new ConcurrentHashMap <>();
224
+ tripleIngestBase ((StandardJanusGraph graph , List <Triple > triples ) -> {
225
+ final JanusGraphTransaction threadedGraph = graph .newTransaction ();
226
+ triples .parallelStream ().forEach (triple -> {
227
+ final Vertex outV = hotelBrandMap .computeIfAbsent (triple .getLeftPropertyValue (),
228
+ value -> threadedGraph .addVertex (triple .getLeftPropertyName (), value ));
229
+ //your original method was creating vertices in the processRelationship method.
230
+ //this caused the uniqueness constraint violation (one of a few issues in your
231
+ //original code) because you have a unique index on the rightPropertyName
232
+ switch (triple .getRelationship ()) {
233
+ case hotelBrandType :
234
+ outV .addEdge ("hotelBrandType" ,brandTypeMap .computeIfAbsent (triple .getRightPropertyValue (),
235
+ value -> threadedGraph .addVertex (triple .getRightPropertyName (), value )));
236
+ break ;
237
+ case instanceOf :
238
+ outV .addEdge ("instanceOf" , companyMap .computeIfAbsent (triple .getRightPropertyValue (),
239
+ value -> threadedGraph .addVertex (triple .getRightPropertyName (), value )));
240
+ break ;
241
+ default :
242
+ throw new IllegalArgumentException ("unexpected relationship type" );
243
+ }
244
+ });
245
+ final Stopwatch watch = Stopwatch .createStarted ();
246
+ threadedGraph .commit ();
247
+ log .info ("Committed in " + watch .elapsed (TimeUnit .MILLISECONDS ) + " ms" );
248
+ watch .stop ();
249
+ });
101
250
}
102
251
103
252
@ Test
@@ -117,7 +266,7 @@ private void createSchemaAndDemoLockExpiry(boolean useNativeLocking, boolean use
117
266
}
118
267
}
119
268
120
- private static final DynamoDBStoreTransaction getStoreTransaction (ManagementSystem mgmt ) {
269
+ private static DynamoDBStoreTransaction getStoreTransaction (ManagementSystem mgmt ) {
121
270
return DynamoDBStoreTransaction .getTx (((CacheTransaction ) mgmt .getWrappedTx ().getTxHandle ().getStoreTransaction ()).getWrappedTransaction ());
122
271
}
123
272
@@ -193,7 +342,7 @@ private JanusGraph createGraphWithSchema(final boolean useNativeLocking, final b
193
342
final JanusGraphIndex indexAfterFirstCommit = mgmt .getGraphIndex (BY_DATABASE_METADATA_VERSION );
194
343
final PropertyKey propertyKeySecond = mgmt .getPropertyKey (VERSION_PROPERTY );
195
344
if (indexAfterFirstCommit .getIndexStatus (propertyKeySecond ) == SchemaStatus .INSTALLED ) {
196
- (( ManagementSystem ) mgmt ) .awaitGraphIndexStatus (graph , BY_DATABASE_METADATA_VERSION ).status (SchemaStatus .REGISTERED ).timeout (10 , java .time .temporal .ChronoUnit .MINUTES ).call ();
345
+ ManagementSystem .awaitGraphIndexStatus (graph , BY_DATABASE_METADATA_VERSION ).status (SchemaStatus .REGISTERED ).timeout (10 , java .time .temporal .ChronoUnit .MINUTES ).call ();
197
346
}
198
347
mgmt .commit ();
199
348
@@ -216,15 +365,15 @@ private JanusGraph createGraphWithSchema(final boolean useNativeLocking, final b
216
365
if (useNativeLocking ) {
217
366
System .out .println ("mgmt tx five " + getStoreTransaction (mgmt ).getId ());
218
367
}
219
- (( ManagementSystem ) mgmt ) .awaitGraphIndexStatus (graph , BY_DATABASE_METADATA_VERSION ).status (SchemaStatus .ENABLED ).timeout (10 , java .time .temporal .ChronoUnit .MINUTES )
368
+ ManagementSystem .awaitGraphIndexStatus (graph , BY_DATABASE_METADATA_VERSION ).status (SchemaStatus .ENABLED ).timeout (10 , java .time .temporal .ChronoUnit .MINUTES )
220
369
.call ();
221
370
}
222
371
mgmt .commit ();
223
372
//END code from second code listing (simplified index setup)
224
373
return graph ;
225
374
}
226
375
227
- private static final DynamoDBStoreTransaction getTxFromGraph (StandardJanusGraph graph ) {
376
+ private static DynamoDBStoreTransaction getTxFromGraph (StandardJanusGraph graph ) {
228
377
return DynamoDBStoreTransaction .getTx (((CacheTransaction ) ((StandardJanusGraphTx ) graph .getCurrentThreadTx ()).getTxHandle ().getStoreTransaction ()).getWrappedTransaction ());
229
378
}
230
379
0 commit comments