20
20
import java .util .concurrent .ExecutionException ;
21
21
import java .util .concurrent .TimeUnit ;
22
22
23
+ import com .amazonaws .services .dynamodbv2 .model .CreateTableRequest ;
24
+ import com .amazonaws .services .dynamodbv2 .model .DeleteItemRequest ;
25
+ import com .amazonaws .services .dynamodbv2 .model .GetItemRequest ;
26
+ import com .amazonaws .services .dynamodbv2 .model .ProvisionedThroughput ;
27
+ import com .amazonaws .services .dynamodbv2 .model .QueryRequest ;
28
+ import com .amazonaws .services .dynamodbv2 .model .ReturnConsumedCapacity ;
29
+ import com .amazonaws .services .dynamodbv2 .model .ScanRequest ;
30
+ import com .amazonaws .services .dynamodbv2 .model .UpdateItemRequest ;
31
+ import lombok .Getter ;
23
32
import org .apache .commons .codec .binary .Hex ;
33
+ import org .apache .commons .lang .builder .EqualsBuilder ;
24
34
import org .apache .commons .lang3 .tuple .Pair ;
25
35
import org .janusgraph .diskstorage .BackendException ;
26
36
import org .janusgraph .diskstorage .Entry ;
31
41
import org .janusgraph .diskstorage .keycolumnvalue .StoreTransaction ;
32
42
import org .janusgraph .diskstorage .locking .TemporaryLockingException ;
33
43
34
- import com .amazonaws .services .dynamodbv2 .model .CreateTableRequest ;
35
- import com .amazonaws .services .dynamodbv2 .model .GetItemRequest ;
36
- import com .amazonaws .services .dynamodbv2 .model .ProvisionedThroughput ;
37
- import com .amazonaws .services .dynamodbv2 .model .QueryRequest ;
38
- import com .amazonaws .services .dynamodbv2 .model .ScanRequest ;
39
- import com .amazonaws .services .dynamodbv2 .model .UpdateItemRequest ;
40
44
import com .google .common .cache .Cache ;
41
45
import com .google .common .cache .CacheBuilder ;
42
46
import com .google .common .cache .RemovalListener ;
57
61
@ Slf4j
58
62
public abstract class AbstractDynamoDbStore implements AwsStore {
59
63
protected final Client client ;
60
- protected final String tableName ;
64
+ @ Getter
65
+ private final String tableName ;
61
66
private final DynamoDBStoreManager manager ;
62
- private final String storeName ;
67
+ @ Getter
68
+ private final String name ;
63
69
private final boolean forceConsistentRead ;
64
70
private final Cache <Pair <StaticBuffer , StaticBuffer >, DynamoDbStoreTransaction > keyColumnLocalLocks ;
65
71
66
- private static final class ReportingRemovalListener implements RemovalListener <Pair <StaticBuffer , StaticBuffer >, DynamoDbStoreTransaction > {
67
- private static final ReportingRemovalListener INSTANCE = new ReportingRemovalListener ();
68
- private static ReportingRemovalListener theInstance () {
69
- return INSTANCE ;
70
- }
71
- private ReportingRemovalListener () { }
72
+ private enum ReportingRemovalListener implements RemovalListener <Pair <StaticBuffer , StaticBuffer >, DynamoDbStoreTransaction > {
73
+ INSTANCE ;
72
74
73
75
@ Override
74
76
public void onRemoval (final RemovalNotification <Pair <StaticBuffer , StaticBuffer >, DynamoDbStoreTransaction > notice ) {
75
77
log .trace ("Expiring {} in tx {} because of {}" , notice .getKey ().toString (), notice .getValue ().toString (), notice .getCause ());
76
78
}
77
79
}
78
80
79
- protected CreateTableRequest createTableRequest () {
80
- return new CreateTableRequest ()
81
- .withTableName (tableName )
82
- .withProvisionedThroughput (new ProvisionedThroughput (client .readCapacity (tableName ),
83
- client .writeCapacity (tableName )));
84
- }
85
-
86
81
protected void mutateOneKey (final StaticBuffer key , final KCVMutation mutation , final StoreTransaction txh ) throws BackendException {
87
- manager .mutateMany (Collections .singletonMap (storeName , Collections .singletonMap (key , mutation )), txh );
88
- }
89
-
90
- @ Override
91
- public String getName () {
92
- return storeName ;
82
+ manager .mutateMany (Collections .singletonMap (name , Collections .singletonMap (key , mutation )), txh );
93
83
}
94
84
95
85
protected UpdateItemRequest createUpdateItemRequest () {
96
- return new UpdateItemRequest ().withTableName (tableName );
86
+ return new UpdateItemRequest ()
87
+ .withTableName (tableName )
88
+ .withReturnConsumedCapacity (ReturnConsumedCapacity .TOTAL );
97
89
}
98
90
99
91
protected GetItemRequest createGetItemRequest () {
100
- return new GetItemRequest ().withTableName (tableName ).withConsistentRead (forceConsistentRead );
92
+ return new GetItemRequest ()
93
+ .withTableName (tableName )
94
+ .withConsistentRead (forceConsistentRead )
95
+ .withReturnConsumedCapacity (ReturnConsumedCapacity .TOTAL );
96
+ }
97
+
98
+ protected DeleteItemRequest createDeleteItemRequest () {
99
+ return new DeleteItemRequest ()
100
+ .withTableName (tableName )
101
+ .withReturnConsumedCapacity (ReturnConsumedCapacity .TOTAL );
101
102
}
102
103
103
104
protected QueryRequest createQueryRequest () {
104
- return new QueryRequest ().withTableName (tableName ).withConsistentRead (forceConsistentRead );
105
+ return new QueryRequest ()
106
+ .withTableName (tableName )
107
+ .withConsistentRead (forceConsistentRead )
108
+ .withReturnConsumedCapacity (ReturnConsumedCapacity .TOTAL );
105
109
}
106
110
protected ScanRequest createScanRequest () {
107
- return new ScanRequest ().withTableName (tableName ).withConsistentRead (forceConsistentRead );
111
+ return new ScanRequest ().withTableName (tableName )
112
+ .withConsistentRead (forceConsistentRead )
113
+ .withLimit (client .scanLimit (tableName ))
114
+ .withReturnConsumedCapacity (ReturnConsumedCapacity .TOTAL );
108
115
}
109
116
AbstractDynamoDbStore (final DynamoDBStoreManager manager , final String prefix , final String storeName ) {
110
117
this .manager = manager ;
111
118
this .client = this .manager .getClient ();
112
- this .storeName = storeName ;
119
+ this .name = storeName ;
113
120
this .tableName = prefix + "_" + storeName ;
114
121
this .forceConsistentRead = client .isForceConsistentRead ();
115
122
116
123
final CacheBuilder <Pair <StaticBuffer , StaticBuffer >, DynamoDbStoreTransaction > builder = CacheBuilder .newBuilder ().concurrencyLevel (client .getDelegate ().getMaxConcurrentUsers ())
117
124
.expireAfterWrite (manager .getLockExpiresDuration ().toMillis (), TimeUnit .MILLISECONDS )
118
- .removalListener (ReportingRemovalListener .theInstance () );
125
+ .removalListener (ReportingRemovalListener .INSTANCE );
119
126
this .keyColumnLocalLocks = builder .build ();
120
127
}
121
128
122
129
/**
123
130
* Creates the schemata for the DynamoDB table or tables each store requires.
131
+ * Implementations should override and reuse this logic
124
132
* @return a create table request appropriate for the schema of the selected implementation.
125
133
*/
126
- public abstract CreateTableRequest getTableSchema ();
134
+ public CreateTableRequest getTableSchema () {
135
+ return new CreateTableRequest ()
136
+ .withTableName (tableName )
137
+ .withProvisionedThroughput (new ProvisionedThroughput (client .readCapacity (tableName ),
138
+ client .writeCapacity (tableName )));
139
+ }
127
140
128
141
@ Override
129
142
public final void ensureStore () throws BackendException {
@@ -133,7 +146,7 @@ public final void ensureStore() throws BackendException {
133
146
134
147
@ Override
135
148
public final void deleteStore () throws BackendException {
136
- log .debug ("Entering deleteStore name:{}" , storeName );
149
+ log .debug ("Entering deleteStore name:{}" , name );
137
150
client .getDelegate ().deleteTable (getTableSchema ().getTableName ());
138
151
//block until the tables are actually deleted
139
152
client .getDelegate ().ensureTableDeleted (getTableSchema ().getTableName ());
@@ -179,11 +192,6 @@ public void close() throws BackendException {
179
192
log .debug ("Closing table:{}" , tableName );
180
193
}
181
194
182
- @ Override
183
- public String getTableName () {
184
- return tableName ;
185
- }
186
-
187
195
protected String encodeKeyForLog (final StaticBuffer key ) {
188
196
if (null == key ) {
189
197
return "" ;
@@ -209,6 +217,31 @@ String encodeForLog(final List<?> columns) {
209
217
return result .append ("]" ).toString ();
210
218
}
211
219
220
+ @ Override
221
+ public int hashCode () {
222
+ return tableName .hashCode ();
223
+ }
224
+
225
+ @ Override
226
+ public boolean equals (final Object obj ) {
227
+ if (obj == null ) {
228
+ return false ;
229
+ }
230
+ if (obj == this ) {
231
+ return true ;
232
+ }
233
+ if (obj .getClass () != getClass ()) {
234
+ return false ;
235
+ }
236
+ final AbstractDynamoDbStore rhs = (AbstractDynamoDbStore ) obj ;
237
+ return new EqualsBuilder ().append (tableName , rhs .tableName ).isEquals ();
238
+ }
239
+
240
+ @ Override
241
+ public String toString () {
242
+ return this .getClass ().getName () + ":" + getTableName ();
243
+ }
244
+
212
245
protected String encodeForLog (final SliceQuery query ) {
213
246
return "slice[rk:" + encodeKeyForLog (query .getSliceStart ()) + " -> " + encodeKeyForLog (query .getSliceEnd ()) + " limit:" + query .getLimit () + "]" ;
214
247
}
0 commit comments