2
2
3
3
import static com .datastax .driver .core .Assertions .assertThat ;
4
4
import static com .datastax .driver .core .Metadata .handleId ;
5
+ import static org .awaitility .Awaitility .await ;
5
6
import static org .mockito .Matchers .anyObject ;
6
7
import static org .mockito .Mockito .after ;
7
8
import static org .mockito .Mockito .mock ;
@@ -43,7 +44,7 @@ public class TabletMapListenerTest extends CCMTestsSupport {
43
44
private static final String CREATE_KEYSPACE = CREATE_TABLETS_KEYSPACE_QUERY ;
44
45
private static final String ALTER_KEYSPACE =
45
46
"ALTER KEYSPACE " + KEYSPACE_NAME + " WITH durable_writes = false" ;
46
- private static final String DROP_KEYSPACE = "DROP KEYSPACE " + KEYSPACE_NAME ;
47
+ private static final String DROP_KEYSPACE = "DROP KEYSPACE IF EXISTS " + KEYSPACE_NAME ;
47
48
48
49
private static final String CREATE_TABLE =
49
50
"CREATE TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + "(i int primary key)" ;
@@ -56,6 +57,8 @@ public class TabletMapListenerTest extends CCMTestsSupport {
56
57
private static final String ALTER_TABLE =
57
58
"ALTER TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + " ADD j int" ;
58
59
private static final String DROP_TABLE = "DROP TABLE " + KEYSPACE_NAME + "." + TABLE_NAME ;
60
+ private static final TabletMap .KeyspaceTableNamePair TABLET_MAP_KEY =
61
+ new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME ));
59
62
60
63
/** The maximum time that the test will wait to check that listeners have been notified. */
61
64
private static final long NOTIF_TIMEOUT_MS = TimeUnit .MINUTES .toMillis (1 );
@@ -94,16 +97,15 @@ public void should_remove_tablets_on_table_alterations() throws InterruptedExcep
94
97
tabletMap = cluster .getMetadata ().getTabletMap ();
95
98
96
99
session .execute (CREATE_TABLE );
97
- assertThat ( tabletMap . getMapping () )
98
- .doesNotContainKey (
99
- new TabletMap . KeyspaceTableNamePair ( handleId ( KEYSPACE_NAME ), handleId ( TABLE_NAME ) ));
100
+ await ( )
101
+ .atMost ( SHORT_TIMEOUT_MS , TimeUnit . MILLISECONDS )
102
+ . until (() -> ! tabletMap . getMapping (). containsKey ( TABLET_MAP_KEY ));
100
103
101
104
session .execute (String .format (INSERT_QUERY_TEMPLATE , "42" ));
102
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
103
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
104
- assertThat (tabletMap .getMapping ())
105
- .containsKey (
106
- new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME )));
105
+ executeOnAllHosts (session .prepare (SELECT_PK_WHERE ).bind (42 ), session );
106
+ await ()
107
+ .atMost (SHORT_TIMEOUT_MS , TimeUnit .MILLISECONDS )
108
+ .until (() -> tabletMap .getMapping ().containsKey (TABLET_MAP_KEY ));
107
109
108
110
session .execute (ALTER_TABLE );
109
111
for (SchemaChangeListener listener : listeners ) {
@@ -114,16 +116,15 @@ public void should_remove_tablets_on_table_alterations() throws InterruptedExcep
114
116
assertThat (previous .getValue ().getKeyspace ()).hasName (handleId (KEYSPACE_NAME ));
115
117
assertThat (previous .getValue ()).hasName (handleId (TABLE_NAME ));
116
118
}
117
- assertThat ( tabletMap . getMapping () )
118
- .doesNotContainKey (
119
- new TabletMap . KeyspaceTableNamePair ( handleId ( KEYSPACE_NAME ), handleId ( TABLE_NAME ) ));
119
+ await ( )
120
+ .atMost ( SHORT_TIMEOUT_MS , TimeUnit . MILLISECONDS )
121
+ . until (() -> ! tabletMap . getMapping (). containsKey ( TABLET_MAP_KEY ));
120
122
121
123
session .execute (String .format (INSERT_ALTERED_TEMPLATE , "42" , "42" ));
122
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
123
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
124
- assertThat (tabletMap .getMapping ())
125
- .containsKey (
126
- new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME )));
124
+ executeOnAllHosts (session .prepare (SELECT_PK_WHERE ).bind (42 ), session );
125
+ await ()
126
+ .atMost (SHORT_TIMEOUT_MS , TimeUnit .MILLISECONDS )
127
+ .until (() -> tabletMap .getMapping ().containsKey (TABLET_MAP_KEY ));
127
128
128
129
session .execute (DROP_TABLE );
129
130
ArgumentCaptor <TableMetadata > removed = null ;
@@ -133,12 +134,10 @@ public void should_remove_tablets_on_table_alterations() throws InterruptedExcep
133
134
assertThat (removed .getValue ().getKeyspace ()).hasName (handleId (KEYSPACE_NAME ));
134
135
assertThat (removed .getValue ()).hasName (handleId (TABLE_NAME ));
135
136
}
136
- assert removed != null ;
137
- assertThat (tabletMap .getMapping ())
138
- .doesNotContainKey (
139
- new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME )));
140
-
141
- session .execute (DROP_KEYSPACE );
137
+ assertThat (removed ).isNotNull ();
138
+ await ()
139
+ .atMost (SHORT_TIMEOUT_MS , TimeUnit .MILLISECONDS )
140
+ .until (() -> !tabletMap .getMapping ().containsKey (TABLET_MAP_KEY ));
142
141
}
143
142
144
143
@ Test (groups = "short" )
@@ -157,13 +156,10 @@ public void should_remove_tablets_on_keyspace_alterations() {
157
156
158
157
session .execute (CREATE_TABLE );
159
158
session .execute (String .format (INSERT_QUERY_TEMPLATE , "42" ));
160
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
161
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
162
- assertThat (tabletMap .getMapping ())
163
- .containsKey (
164
- new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME )));
165
-
166
- assertThat (cluster .getMetadata ().getKeyspace (KEYSPACE_NAME ).isDurableWrites ()).isTrue ();
159
+ executeOnAllHosts (session .prepare (SELECT_PK_WHERE ).bind (42 ), session );
160
+ await ()
161
+ .atMost (SHORT_TIMEOUT_MS , TimeUnit .MILLISECONDS )
162
+ .until (() -> tabletMap .getMapping ().containsKey (TABLET_MAP_KEY ));
167
163
168
164
session .execute (ALTER_KEYSPACE );
169
165
assertThat (cluster .getMetadata ().getKeyspace (KEYSPACE_NAME )).isNotDurableWrites ();
@@ -178,31 +174,38 @@ public void should_remove_tablets_on_keyspace_alterations() {
178
174
verify (listener , after ((int ) SHORT_TIMEOUT_MS ).never ())
179
175
.onTableChanged (anyObject (), anyObject ());
180
176
}
181
- assertThat (tabletMap .getMapping ())
182
- .doesNotContainKey (
183
- new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME )));
184
-
185
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
186
- session .execute (session .prepare (SELECT_PK_WHERE ).bind (42 ));
177
+ await ()
178
+ .atMost (SHORT_TIMEOUT_MS , TimeUnit .MILLISECONDS )
179
+ .until (() -> !tabletMap .getMapping ().containsKey (TABLET_MAP_KEY ));
187
180
188
- assertThat (tabletMap .getMapping ())
189
- .containsKey (
190
- new TabletMap .KeyspaceTableNamePair (handleId (KEYSPACE_NAME ), handleId (TABLE_NAME )));
181
+ executeOnAllHosts (session .prepare (SELECT_PK_WHERE ).bind (42 ), session );
182
+ await ()
183
+ .atMost (SHORT_TIMEOUT_MS , TimeUnit .MILLISECONDS )
184
+ .until (() -> tabletMap .getMapping ().containsKey (TABLET_MAP_KEY ));
191
185
192
186
session .execute (DROP_KEYSPACE );
193
187
for (SchemaChangeListener listener : listeners ) {
194
188
ArgumentCaptor <KeyspaceMetadata > removed = ArgumentCaptor .forClass (KeyspaceMetadata .class );
195
189
verify (listener , timeout (NOTIF_TIMEOUT_MS ).times (1 )).onKeyspaceRemoved (removed .capture ());
196
190
assertThat (removed .getValue ()).hasName (handleId (KEYSPACE_NAME ));
197
191
}
198
- assertThat ( tabletMap . getMapping () )
199
- .doesNotContainKey (
200
- new TabletMap . KeyspaceTableNamePair ( handleId ( KEYSPACE_NAME ), handleId ( TABLE_NAME ) ));
192
+ await ( )
193
+ .atMost ( SHORT_TIMEOUT_MS , TimeUnit . MILLISECONDS )
194
+ . until (() -> ! tabletMap . getMapping (). containsKey ( TABLET_MAP_KEY ));
201
195
}
202
196
203
197
@ AfterMethod (groups = "short" , alwaysRun = true )
204
198
public void teardown () {
205
- if (session != null ) session .close ();
199
+ if (session != null ) {
200
+ session .execute (DROP_KEYSPACE );
201
+ session .close ();
202
+ }
206
203
if (cluster != null ) cluster .close ();
207
204
}
205
+
206
+ private void executeOnAllHosts (Statement statement , Session session ) {
207
+ for (Host host : session .getCluster ().getMetadata ().getAllHosts ()) {
208
+ session .execute (statement .setHost (host ));
209
+ }
210
+ }
208
211
}
0 commit comments