3434import static com .alipay .oceanbase .rpc .mutation .MutationFactory .*;
3535
3636public class ObTableHotkeyThrottleUtil extends Thread {
37- int testNum = 100 ;
37+ int threadIdx ;
38+ int testNum ;
39+ String tableName = null ;
40+ String [] rowKeyColumnName = null ;
41+
42+ public int threadNum ;
43+ public static List <Integer > unitOperationTimes = null ;
44+ public static List <Integer > unitBlockTimes = null ;
45+ public static List <Integer > totalOperationTimes = null ;
46+ public static List <Integer > totalBlockTimes = null ;
47+
3848
3949 public enum TestType {
4050 random , specifiedKey
@@ -51,11 +61,19 @@ public enum OperationType {
5161 int throttleNum = 0 ;
5262 int passNum = 0 ;
5363 int batchSize = 64 ;
64+ long startTime = 0 ;
65+ int unitBlockTime = 0 ;
66+ int unitOperationTime = 0 ;
5467
55- public void init (TestType testType , OperationType operationType , Table client , int batchSize ,
56- ColumnValue ... rowKeyColumnValues ) throws Exception {
68+ public void init (int threadNum ,int threadIdx , long startTime , String tableName , String [] rowKeyColumnName ,
69+ TestType testType , OperationType operationType , int testNum ,
70+ Table client , int batchSize , ColumnValue ... rowKeyColumnValues ) throws Exception {
5771 System .setProperty ("ob_table_min_rslist_refresh_interval_millis" , "1" );
58-
72+ this .threadNum = threadNum ;
73+ this .threadIdx = threadIdx ;
74+ this .startTime = startTime ;
75+ this .tableName = tableName ;
76+ this .testNum = testNum ;
5977 switch (testType ) {
6078 case random : {
6179 rowKey = null ;
@@ -92,8 +110,32 @@ public void init(TestType testType, OperationType operationType, Table client, i
92110 } else {
93111 this .client = client ;
94112 }
95- ((ObTableClient ) this .client )
96- .addRowKeyElement ("test_throttle" , new String [] { "c1" , "c2" });
113+ ((ObTableClient ) this .client ).addRowKeyElement (this .tableName , rowKeyColumnName );
114+
115+ if (null == unitOperationTimes || threadIdx == 0 ) {
116+ unitOperationTimes = new ArrayList <Integer >(threadNum );
117+ for (int i = 0 ; i < threadNum ; ++i ) {
118+ unitOperationTimes .add (0 );
119+ }
120+ }
121+ if (null == unitBlockTimes || threadIdx == 0 ) {
122+ unitBlockTimes = new ArrayList <Integer >(threadNum );
123+ for (int i = 0 ; i < threadNum ; ++i ) {
124+ unitBlockTimes .add (0 );
125+ }
126+ }
127+ if (null == totalOperationTimes || threadIdx == 0 ) {
128+ totalOperationTimes = new ArrayList <Integer >(threadNum );
129+ for (int i = 0 ; i < threadNum ; ++i ) {
130+ totalOperationTimes .add (0 );
131+ }
132+ }
133+ if (null == totalBlockTimes || threadIdx == 0 ) {
134+ totalBlockTimes = new ArrayList <Integer >(threadNum );
135+ for (int i = 0 ; i < threadNum ; ++i ) {
136+ totalBlockTimes .add (0 );
137+ }
138+ }
97139 }
98140
99141 @ Override
@@ -141,7 +183,17 @@ private void runRandom() throws Exception {
141183 batchOperationTest ();
142184 break ;
143185 }
186+ // record operation time for each 2s
187+ if (System .currentTimeMillis () - startTime > 2000 ) {
188+ unitOperationTimes .set (threadIdx , unitOperationTime );
189+ unitBlockTimes .set (threadIdx , unitBlockTime );
190+ unitOperationTime = 0 ;
191+ unitBlockTime = 0 ;
192+ while (System .currentTimeMillis () - startTime > 2000 ) startTime += 2000 ;
193+ }
144194 }
195+ totalOperationTimes .set (threadIdx , throttleNum + passNum );
196+ totalBlockTimes .set (threadIdx , throttleNum );
145197 }
146198
147199 private void runSpecifiedKey () throws Exception {
@@ -166,7 +218,17 @@ private void runSpecifiedKey() throws Exception {
166218 colVal ("c4" , 0L ));
167219 break ;
168220 }
221+ // record operation time for each 2s
222+ if (System .currentTimeMillis () - startTime > 2000 ) {
223+ unitOperationTimes .set (threadIdx , unitOperationTime );
224+ unitBlockTimes .set (threadIdx , unitBlockTime );
225+ unitOperationTime = 0 ;
226+ unitBlockTime = 0 ;
227+ while (System .currentTimeMillis () - startTime > 2000 ) startTime += 2000 ;
228+ }
169229 }
230+ totalOperationTimes .set (threadIdx , throttleNum + passNum );
231+ totalBlockTimes .set (threadIdx , throttleNum );
170232 }
171233
172234 private void insertTest (Row rowkey , ColumnValue ... columnValues ) throws Exception {
@@ -176,17 +238,32 @@ private void insertTest(Row rowkey, ColumnValue... columnValues) throws Exceptio
176238 }
177239
178240 private void updateTest (Row rowkey , ColumnValue ... columnValues ) throws Exception {
241+ ++unitOperationTime ;
179242 MutationResult updateResult = client .update ("test_throttle" ).setRowKey (rowKey )
180243 .addMutateColVal (columnValues ).execute ();
244+ if (updateResult .getAffectedRows () != 1 ) {
245+ ++unitBlockTime ;
246+ ++throttleNum ;
247+ } else {
248+ ++passNum ;
249+ }
181250 }
182251
183252 private void insertOrUpdateTest (Row rowkey , ColumnValue ... columnValues ) throws Exception {
253+ ++unitOperationTime ;
184254 MutationResult insertOrUpdateResult = client .insertOrUpdate ("test_throttle" )
185255 .setRowKey (rowkey ).addMutateColVal (columnValues ).execute ();
256+ if (insertOrUpdateResult .getAffectedRows () != 1 ) {
257+ ++unitBlockTime ;
258+ ++throttleNum ;
259+ } else {
260+ ++passNum ;
261+ }
186262 }
187263
188264 private void queryTest (Row rowkey ) throws Exception {
189265 try {
266+ ++unitOperationTime ;
190267 TableQuery tableQuery = client .query ("test_throttle" );
191268 tableQuery .addScanRange (rowkey .getValues (), rowkey .getValues ());
192269 tableQuery .select ("c1" , "c2" , "c3" , "c4" );
@@ -195,12 +272,8 @@ private void queryTest(Row rowkey) throws Exception {
195272 } catch (Exception e ) {
196273 if (e instanceof ObTableUnexpectedException ) {
197274 if (((ObTableUnexpectedException ) e ).getErrorCode () == -4039 ) {
198- if (++throttleNum % 50 == 0 ) {
199- System .out .println (Thread .currentThread ().getName () + " rowkey num is "
200- + rowkey .get ("c1" ) + " has pass " + passNum
201- + " operations, and has throttle " + throttleNum
202- + " operations" );
203- }
275+ ++throttleNum ;
276+ ++unitBlockTime ;
204277 } else {
205278 e .printStackTrace ();
206279 Assert .assertNull (e );
@@ -231,16 +304,16 @@ private List<Mutation> generateBatchOpertaionIoU() {
231304
232305 private void batchOperationTest () throws Exception {
233306 try {
307+ ++unitOperationTime ;
234308 BatchOperationResult batchResult = client .batchOperation ("test_throttle" )
235309 .addOperation (generateBatchOpertaionIoU ()).execute ();
236310 ++passNum ;
237311 } catch (Exception e ) {
238312 if (e instanceof ObTableUnexpectedException ) {
239313 if (((ObTableUnexpectedException ) e ).getErrorCode () == -4039 ) {
240314 if (++throttleNum % 50 == 0 ) {
241- System .out .println (Thread .currentThread ().getName () + " has pass "
242- + passNum + " batch operations, and has throttle "
243- + throttleNum + " batch operations" );
315+ ++throttleNum ;
316+ ++unitBlockTime ;
244317 }
245318 } else {
246319 e .printStackTrace ();
@@ -253,6 +326,22 @@ private void batchOperationTest() throws Exception {
253326 }
254327 }
255328
329+ public List <Integer > getUnitOperationTimes () {
330+ return unitOperationTimes ;
331+ }
332+
333+ public List <Integer > getUnitBlockTimes () {
334+ return unitBlockTimes ;
335+ }
336+
337+ public List <Integer > getTotalOperationTimes () {
338+ return totalOperationTimes ;
339+ }
340+
341+ public List <Integer > getTotalBlockTimes () {
342+ return totalBlockTimes ;
343+ }
344+
256345 public void syncRefreshMetaHelper (final ObTableClient obTableClient ) {
257346 if (obTableClient .isOdpMode ()) {
258347 // do noting
0 commit comments