1818
1919package org .apache .gora .hive .store ;
2020
21+ import static org .junit .Assert .assertEquals ;
2122import static org .junit .Assert .assertTrue ;
23+ import static org .junit .Assert .fail ;
2224
2325import java .nio .ByteBuffer ;
2426import java .nio .charset .Charset ;
2527import java .util .ArrayList ;
28+ import java .util .Arrays ;
29+ import java .util .Collections ;
30+ import java .util .HashSet ;
31+ import java .util .LinkedHashSet ;
32+ import java .util .List ;
2633import java .util .Set ;
2734import org .apache .avro .util .Utf8 ;
35+ import org .apache .gora .examples .WebPageDataCreator ;
2836import org .apache .gora .examples .generated .Employee ;
2937import org .apache .gora .examples .generated .Metadata ;
3038import org .apache .gora .examples .generated .WebPage ;
3139import org .apache .gora .hive .GoraHiveTestDriver ;
3240import org .apache .gora .persistency .impl .BeanFactoryImpl ;
41+ import org .apache .gora .query .Query ;
42+ import org .apache .gora .query .Result ;
3343import org .apache .gora .store .DataStoreTestBase ;
3444import org .apache .gora .store .DataStoreTestUtil ;
3545import org .apache .gora .util .GoraException ;
3646import org .apache .gora .util .StringUtils ;
47+ import org .apache .metamodel .query .parser .QueryParserException ;
3748import org .junit .Ignore ;
3849import org .junit .Test ;
3950
@@ -56,6 +67,146 @@ public void assertSchemaExists(String schemaName) throws Exception {
5667 assertTrue (employeeStore .schemaExists ());
5768 }
5869
70+ private void awaitWebPageSchema (String key ) throws Exception {
71+ // wait until Hive exposes the schema and specific record to avoid empty query results
72+ for (int attempt = 0 ; attempt < 100 ; attempt ++) {
73+ webPageStore .flush ();
74+ if (!webPageStore .schemaExists ()) {
75+ Thread .sleep (100L );
76+ continue ;
77+ }
78+ if (key == null ) {
79+ return ;
80+ }
81+ try {
82+ webPageStore .get (key , new String [] {"url" });
83+ return ;
84+ } catch (QueryParserException e ) {
85+ webPageStore .close ();
86+ webPageStore = testDriver .createDataStore (String .class , WebPage .class );
87+ }
88+ }
89+ fail ("Hive web page schema or record was not visible" );
90+ }
91+
92+ private void populateWebPages () throws Exception {
93+ // load deterministic fixtures and block until every inserted URL becomes queryable
94+ webPageStore .createSchema ();
95+ awaitWebPageSchema (null );
96+ WebPageDataCreator .createWebPageData (webPageStore );
97+ for (String url : WebPageDataCreator .URLS ) {
98+ awaitWebPageSchema (url );
99+ }
100+ }
101+
102+ private List <String > sortedWebPageUrls () {
103+ // copy and sort URLs to get a stable ordering for range expectations
104+ List <String > sorted = new ArrayList <>(Arrays .asList (WebPageDataCreator .URLS ));
105+ Collections .sort (sorted );
106+ return sorted ;
107+ }
108+
109+ private void assertResultSize (boolean setStartKey , boolean setEndKey , boolean setLimit )
110+ throws Exception {
111+ // exhaustively exercise start/end/limit combinations while guarding against stale reads
112+ populateWebPages ();
113+ List <String > urls = sortedWebPageUrls ();
114+ int total = urls .size ();
115+
116+ for (int i = 0 , iLimit = setStartKey ? total : 1 ; i < iLimit ; i ++) {
117+ // choose every permissible end index for the current start index
118+ int jStart = setEndKey ? i : total - 1 ;
119+ int jLimit = setEndKey ? total : jStart + 1 ;
120+ for (int j = jStart ; j < jLimit ; j ++) {
121+ Query <String , WebPage > query = webPageStore .newQuery ();
122+ if (setStartKey ) {
123+ query .setStartKey (urls .get (i ));
124+ }
125+ if (setEndKey ) {
126+ query .setEndKey (urls .get (j ));
127+ }
128+
129+ int startIndex = setStartKey ? i : 0 ;
130+ int endExclusive = setEndKey ? j + 1 : total ;
131+ List <String > eligibleUrls = new ArrayList <>(urls .subList (startIndex , endExclusive ));
132+ Set <String > allowed = new HashSet <>(eligibleUrls ); // membership guard for returned URLs
133+ LinkedHashSet <String > actual = new LinkedHashSet <>(); // record unique URLs actually read
134+
135+ int expectedCount = eligibleUrls .size ();
136+ if (setLimit ) {
137+ int limit = expectedCount / 2 ;
138+ if (limit == 0 ) {
139+ continue ;
140+ }
141+ query .setLimit (limit );
142+ expectedCount = Math .min (limit , expectedCount );
143+ }
144+
145+ Result <String , WebPage > result = query .execute ();
146+ int reportedSize = result .size ();
147+ while (result .next ()) {
148+ WebPage page = result .get ();
149+ DataStoreTestUtil .assertWebPage (
150+ page , WebPageDataCreator .URL_INDEXES .get (page .getUrl ().toString ()));
151+ String url = page .getUrl ().toString ();
152+ assertTrue ("Unexpected url returned: " + url , allowed .contains (url ));
153+ assertTrue ("Duplicate url returned: " + url , actual .add (url ));
154+ }
155+ result .close ();
156+
157+ if (!setLimit ) {
158+ // full-range scans should yield every eligible URL, in deterministic order
159+ assertEquals (new LinkedHashSet <>(eligibleUrls ), actual );
160+ } else {
161+ // limited queries only guarantee the expected cardinality
162+ assertEquals (expectedCount , actual .size ());
163+ }
164+ // Result.size() must match the number of rows we iterated over
165+ assertEquals (expectedCount , reportedSize );
166+ }
167+ }
168+ }
169+
170+ @ Override
171+ public void testResultSize () throws Exception {
172+ assertResultSize (false , false , false );
173+ }
174+
175+ @ Override
176+ public void testResultSizeEndKey () throws Exception {
177+ assertResultSize (false , true , false );
178+ }
179+
180+ @ Override
181+ public void testResultSizeEndKeyWithLimit () throws Exception {
182+ assertResultSize (false , true , true );
183+ }
184+
185+ @ Override
186+ public void testResultSizeWithLimit () throws Exception {
187+ assertResultSize (false , false , true );
188+ }
189+
190+ @ Override
191+ public void testResultSizeKeyRange () throws Exception {
192+ assertResultSize (true , true , false );
193+ }
194+
195+ @ Override
196+ public void testResultSizeKeyRangeWithLimit () throws Exception {
197+ assertResultSize (true , true , true );
198+ }
199+
200+ @ Override
201+ public void testResultSizeStartKey () throws Exception {
202+ assertResultSize (true , false , false );
203+ }
204+
205+ @ Override
206+ public void testResultSizeStartKeyWithLimit () throws Exception {
207+ assertResultSize (true , false , true );
208+ }
209+
59210 @ Override
60211 public void assertPut (Employee employee ) throws GoraException {
61212 employeeStore .put (employee .getSsn ().toString (), employee );
0 commit comments