Skip to content

Commit 4cba6bf

Browse files
committed
Fix #183 by sorting multi-action RPCs by row key as well as region so that
if a later version of HBase is used that sorts the requests on receipt then the RPCs will still match up with the responses. Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>
1 parent 147b4e9 commit 4cba6bf

File tree

2 files changed

+66
-10
lines changed

2 files changed

+66
-10
lines changed

src/MultiAction.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2010-2016 The Async HBase Authors. All rights reserved.
2+
* Copyright (C) 2010-2018 The Async HBase Authors. All rights reserved.
33
* This file is part of Async HBase.
44
*
55
* Redistribution and use in source and binary forms, with or without
@@ -45,6 +45,11 @@
4545
* <p>
4646
* This RPC is guaranteed to be sent atomically (but HBase doesn't guarantee
4747
* that it will apply it atomically).
48+
* <p>
49+
* Before serializing, the RPCs are sorted by region and (as of 1.8.1)
50+
* by row key (to fix issues with
51+
* https://issues.apache.org/jira/browse/HBASE-17924) so that the
52+
* responses can be matched to the original RPC.
4853
*/
4954
final class MultiAction extends HBaseRpc implements HBaseRpc.IsEdit {
5055

@@ -70,7 +75,7 @@ final class MultiAction extends HBaseRpc implements HBaseRpc.IsEdit {
7075
private static final byte[] MULTI = { 'm', 'u', 'l', 't', 'i' };
7176

7277
/** RPC method name for HBase 0.95 and above. */
73-
private static final byte[] MMULTI = { 'M', 'u', 'l', 't', 'i' };
78+
static final byte[] MMULTI = { 'M', 'u', 'l', 't', 'i' };
7479

7580
/** Template for NSREs. */
7681
private static final NotServingRegionException NSRE =
@@ -214,7 +219,7 @@ ChannelBuffer serialize(final byte server_version) {
214219
}
215220

216221
// we create a new RegionAction for each region.
217-
Collections.sort(batch, SORT_BY_REGION);
222+
Collections.sort(batch, SORT_BY_REGION_AND_KEY);
218223
final MultiRequest.Builder req = MultiRequest.newBuilder();
219224
RegionAction.Builder actions = null;
220225
byte[] prev_region = HBaseClient.EMPTY_ARRAY;
@@ -508,24 +513,30 @@ public int compare(final BatchableRpc a, final BatchableRpc b) {
508513
}
509514

510515
/**
511-
* Sorts {@link BatchableRpc}s appropriately for HBase 0.95+ multi-action.
516+
* Sorts {@link BatchableRpc}s appropriately for HBase 0.95+ multi-action
517+
* as well as 1.3x where the response is also sorted by key.
512518
*/
513-
static final RegionComparator SORT_BY_REGION = new RegionComparator();
519+
static final RegionAndKeyComparator SORT_BY_REGION_AND_KEY =
520+
new RegionAndKeyComparator();
514521

515522
/**
516523
* Sorts {@link BatchableRpc}s by region.
517524
* Used with HBase 0.95+ only.
518525
*/
519-
private static final class RegionComparator
526+
private static final class RegionAndKeyComparator
520527
implements Comparator<BatchableRpc> {
521528

522-
private RegionComparator() { // Can't instantiate outside of this class.
529+
private RegionAndKeyComparator() { // Can't instantiate outside of this class.
523530
}
524531

525532
@Override
526533
/** Compares two RPCs. */
527534
public int compare(final BatchableRpc a, final BatchableRpc b) {
528-
return Bytes.memcmp(a.getRegion().name(), b.getRegion().name());
535+
int region_cmp = Bytes.memcmp(a.getRegion().name(), b.getRegion().name());
536+
if (region_cmp != 0) {
537+
return region_cmp;
538+
}
539+
return Bytes.memcmp(a.key, b.key);
529540
}
530541

531542
}

test/TestMultiAction.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727
package org.hbase.async;
2828

29+
import static org.junit.Assert.assertArrayEquals;
2930
import static org.junit.Assert.assertEquals;
3031
import static org.junit.Assert.assertSame;
3132
import static org.junit.Assert.assertTrue;
@@ -40,7 +41,9 @@
4041
import java.util.Collections;
4142
import java.util.List;
4243

44+
import org.hbase.async.generated.ClientPB.MultiRequest;
4345
import org.hbase.async.generated.ClientPB.MultiResponse;
46+
import org.hbase.async.generated.ClientPB.RegionAction;
4447
import org.hbase.async.generated.ClientPB.RegionActionResult;
4548
import org.hbase.async.generated.ClientPB.ResultOrException;
4649
import org.hbase.async.generated.ClientPB.RegionActionResult.Builder;
@@ -212,6 +215,48 @@ public void addErrors() throws Exception {
212215
}
213216

214217
// NOTE: The following are tests for HBase 0.96 and up
218+
219+
@Test
220+
public void serdesOrdering() throws Exception {
221+
PutRequest put1 = new PutRequest(TABLE, concat(KEY, new byte[] { 2 }),
222+
FAMILY, QUALIFIER, VALUE);
223+
put1.region = region;
224+
PutRequest put2 = new PutRequest(TABLE, KEY, FAMILY, "myqual".getBytes(), VALUE);
225+
put2.region = region;
226+
PutRequest put3 = new PutRequest(TABLE, concat(KEY, new byte[] { 2 }),
227+
FAMILY, "myqual".getBytes(), VALUE);
228+
put3.region = region2;
229+
PutRequest put4 = new PutRequest(TABLE, KEY, FAMILY, "myqual".getBytes(), VALUE);
230+
put4.region = region2;
231+
MultiAction multi = new MultiAction();
232+
multi.add(put1);
233+
multi.add(put2);
234+
multi.add(put3);
235+
multi.add(put4);
236+
237+
ChannelBuffer buffer = multi.serialize(RegionClient.SERVER_VERSION_095_OR_ABOVE);
238+
buffer.readerIndex(4 + 19 + MultiAction.MMULTI.length);
239+
HBaseRpc.readProtoBufVarint(buffer);
240+
byte[] bytes = new byte[buffer.writerIndex() - buffer.readerIndex()];
241+
buffer.readBytes(bytes);
242+
MultiRequest parsed = MultiRequest.parseFrom(bytes);
243+
assertEquals(2, parsed.getRegionActionCount());
244+
245+
RegionAction actions = parsed.getRegionAction(0);
246+
assertEquals(2, actions.getActionCount());
247+
assertArrayEquals(region.name(), actions.getRegion().getValue().toByteArray());
248+
assertArrayEquals(KEY, actions.getAction(0).getMutation().getRow().toByteArray());
249+
assertArrayEquals(concat(KEY, new byte[] { 2 }),
250+
actions.getAction(1).getMutation().getRow().toByteArray());
251+
252+
actions = parsed.getRegionAction(1);
253+
assertEquals(2, actions.getActionCount());
254+
assertArrayEquals(region2.name(), actions.getRegion().getValue().toByteArray());
255+
assertArrayEquals(KEY, actions.getAction(0).getMutation().getRow().toByteArray());
256+
assertArrayEquals(concat(KEY, new byte[] { 2 }),
257+
actions.getAction(1).getMutation().getRow().toByteArray());
258+
}
259+
215260
@Test
216261
public void deserializePuts() throws Exception {
217262
final List<ResultOrException> results = new ArrayList<ResultOrException>(2);
@@ -870,7 +915,7 @@ public void deserializeMultiRegionOneFailed() throws Exception {
870915
multi.add(put2);
871916
multi.add(put3);
872917
multi.add(put4);
873-
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION);
918+
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION_AND_KEY);
874919

875920
final MultiAction.Response decoded =
876921
(MultiAction.Response)multi.deserialize(
@@ -916,7 +961,7 @@ public void deserializeMultiRegionTwoFailed() throws Exception {
916961
multi.add(put2);
917962
multi.add(put3);
918963
multi.add(put4);
919-
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION);
964+
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION_AND_KEY);
920965

921966
final MultiAction.Response decoded =
922967
(MultiAction.Response)multi.deserialize(

0 commit comments

Comments
 (0)