Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 7dc9aef

Browse files
committed
Pulled in Guava RateLimiter from sources
1 parent 84103aa commit 7dc9aef

File tree

11 files changed

+1499
-52
lines changed

11 files changed

+1499
-52
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ scalaVersion := "2.11.12"
1010

1111
crossScalaVersions := Seq("2.11.12", "2.12.7")
1212

13+
compileOrder := CompileOrder.JavaThenScala
14+
1315
resolvers += "DynamoDBLocal" at "https://s3-us-west-2.amazonaws.com/dynamodb-local/release"
1416

1517
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.678"

src/main/java/com/audienceproject/com/google/common/base/Preconditions.java

Lines changed: 451 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.audienceproject.com.google.common.base;
2+
3+
/*
4+
* Notice:
5+
* This file was modified at AudienceProject ApS by Cosmin Catalin Sanda ([email protected])
6+
*/
7+
8+
/*
9+
* Copyright (C) 2011 The Guava Authors
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*/
23+
24+
/**
25+
* A time source; returns a time value representing the number of nanoseconds elapsed since some
26+
* fixed but arbitrary point in time.
27+
*
28+
* <p><b>Warning:</b> this interface can only be used to measure elapsed time, not wall time.
29+
*
30+
* @author Kevin Bourrillion
31+
* @since 10.0
32+
* (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
33+
* >mostly source-compatible</a> since 9.0)
34+
*/
35+
public abstract class Ticker {
36+
/**
37+
* Constructor for use by subclasses.
38+
*/
39+
protected Ticker() {}
40+
41+
/**
42+
* Returns the number of nanoseconds elapsed since this ticker's fixed
43+
* point of reference.
44+
*/
45+
public abstract long read();
46+
47+
/**
48+
* A ticker that reads the current time using {@link System#nanoTime}.
49+
*
50+
* @since 10.0
51+
*/
52+
public static Ticker systemTicker() {
53+
return SYSTEM_TICKER;
54+
}
55+
56+
private static final Ticker SYSTEM_TICKER = new Ticker() {
57+
@Override
58+
public long read() {
59+
return System.nanoTime();
60+
}
61+
};
62+
}
63+

src/main/java/com/audienceproject/com/google/common/util/concurrent/RateLimiter.java

Lines changed: 687 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package com.audienceproject.com.google.common.util.concurrent;
2+
3+
/*
4+
* Notice:
5+
* This file was modified at AudienceProject ApS by Cosmin Catalin Sanda ([email protected])
6+
*/
7+
8+
/*
9+
* Copyright (C) 2011 The Guava Authors
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*/
23+
24+
import com.audienceproject.com.google.common.base.Preconditions;
25+
26+
import java.util.concurrent.*;
27+
28+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
29+
30+
/**
31+
* Utilities for treating interruptible operations as uninterruptible.
32+
* In all cases, if a thread is interrupted during such a call, the call
33+
* continues to block until the result is available or the timeout elapses,
34+
* and only then re-interrupts the thread.
35+
*
36+
* @author Anthony Zana
37+
* @since 10.0
38+
*/
39+
public final class Uninterruptibles {
40+
41+
// Implementation Note: As of 3-7-11, the logic for each blocking/timeout
42+
// methods is identical, save for method being invoked.
43+
44+
/**
45+
* Invokes {@code latch.}{@link CountDownLatch#await() await()}
46+
* uninterruptibly.
47+
*/
48+
public static void awaitUninterruptibly(CountDownLatch latch) {
49+
boolean interrupted = false;
50+
try {
51+
while (true) {
52+
try {
53+
latch.await();
54+
return;
55+
} catch (InterruptedException e) {
56+
interrupted = true;
57+
}
58+
}
59+
} finally {
60+
if (interrupted) {
61+
Thread.currentThread().interrupt();
62+
}
63+
}
64+
}
65+
66+
/**
67+
* Invokes
68+
* {@code latch.}{@link CountDownLatch#await(long, TimeUnit)
69+
* await(timeout, unit)} uninterruptibly.
70+
*/
71+
public static boolean awaitUninterruptibly(CountDownLatch latch,
72+
long timeout, TimeUnit unit) {
73+
boolean interrupted = false;
74+
try {
75+
long remainingNanos = unit.toNanos(timeout);
76+
long end = System.nanoTime() + remainingNanos;
77+
78+
while (true) {
79+
try {
80+
// CountDownLatch treats negative timeouts just like zero.
81+
return latch.await(remainingNanos, NANOSECONDS);
82+
} catch (InterruptedException e) {
83+
interrupted = true;
84+
remainingNanos = end - System.nanoTime();
85+
}
86+
}
87+
} finally {
88+
if (interrupted) {
89+
Thread.currentThread().interrupt();
90+
}
91+
}
92+
}
93+
94+
/**
95+
* Invokes {@code toJoin.}{@link Thread#join() join()} uninterruptibly.
96+
*/
97+
public static void joinUninterruptibly(Thread toJoin) {
98+
boolean interrupted = false;
99+
try {
100+
while (true) {
101+
try {
102+
toJoin.join();
103+
return;
104+
} catch (InterruptedException e) {
105+
interrupted = true;
106+
}
107+
}
108+
} finally {
109+
if (interrupted) {
110+
Thread.currentThread().interrupt();
111+
}
112+
}
113+
}
114+
115+
/**
116+
* Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
117+
* To get uninterruptibility and remove checked exceptions.
118+
*
119+
* <p>If instead, you wish to treat {@link InterruptedException} uniformly
120+
* with other exceptions.
121+
*
122+
* @throws ExecutionException if the computation threw an exception
123+
* @throws CancellationException if the computation was cancelled
124+
*/
125+
public static <V> V getUninterruptibly(Future<V> future)
126+
throws ExecutionException {
127+
boolean interrupted = false;
128+
try {
129+
while (true) {
130+
try {
131+
return future.get();
132+
} catch (InterruptedException e) {
133+
interrupted = true;
134+
}
135+
}
136+
} finally {
137+
if (interrupted) {
138+
Thread.currentThread().interrupt();
139+
}
140+
}
141+
}
142+
143+
/**
144+
* Invokes
145+
* {@code future.}{@link Future#get(long, TimeUnit) get(timeout, unit)}
146+
* uninterruptibly.
147+
*
148+
* <p>If instead, you wish to treat {@link InterruptedException} uniformly
149+
* with other exceptions.
150+
*
151+
* @throws ExecutionException if the computation threw an exception
152+
* @throws CancellationException if the computation was cancelled
153+
* @throws TimeoutException if the wait timed out
154+
*/
155+
public static <V> V getUninterruptibly(
156+
Future<V> future, long timeout, TimeUnit unit)
157+
throws ExecutionException, TimeoutException {
158+
boolean interrupted = false;
159+
try {
160+
long remainingNanos = unit.toNanos(timeout);
161+
long end = System.nanoTime() + remainingNanos;
162+
163+
while (true) {
164+
try {
165+
// Future treats negative timeouts just like zero.
166+
return future.get(remainingNanos, NANOSECONDS);
167+
} catch (InterruptedException e) {
168+
interrupted = true;
169+
remainingNanos = end - System.nanoTime();
170+
}
171+
}
172+
} finally {
173+
if (interrupted) {
174+
Thread.currentThread().interrupt();
175+
}
176+
}
177+
}
178+
179+
/**
180+
* Invokes
181+
* {@code unit.}{@link TimeUnit#timedJoin(Thread, long)
182+
* timedJoin(toJoin, timeout)} uninterruptibly.
183+
*/
184+
public static void joinUninterruptibly(Thread toJoin,
185+
long timeout, TimeUnit unit) {
186+
Preconditions.checkNotNull(toJoin);
187+
boolean interrupted = false;
188+
try {
189+
long remainingNanos = unit.toNanos(timeout);
190+
long end = System.nanoTime() + remainingNanos;
191+
while (true) {
192+
try {
193+
// TimeUnit.timedJoin() treats negative timeouts just like zero.
194+
NANOSECONDS.timedJoin(toJoin, remainingNanos);
195+
return;
196+
} catch (InterruptedException e) {
197+
interrupted = true;
198+
remainingNanos = end - System.nanoTime();
199+
}
200+
}
201+
} finally {
202+
if (interrupted) {
203+
Thread.currentThread().interrupt();
204+
}
205+
}
206+
}
207+
208+
/**
209+
* Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
210+
*/
211+
public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
212+
boolean interrupted = false;
213+
try {
214+
while (true) {
215+
try {
216+
return queue.take();
217+
} catch (InterruptedException e) {
218+
interrupted = true;
219+
}
220+
}
221+
} finally {
222+
if (interrupted) {
223+
Thread.currentThread().interrupt();
224+
}
225+
}
226+
}
227+
228+
/**
229+
* Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)}
230+
* uninterruptibly.
231+
*
232+
* @throws ClassCastException if the class of the specified element prevents
233+
* it from being added to the given queue
234+
* @throws IllegalArgumentException if some property of the specified element
235+
* prevents it from being added to the given queue
236+
*/
237+
public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
238+
boolean interrupted = false;
239+
try {
240+
while (true) {
241+
try {
242+
queue.put(element);
243+
return;
244+
} catch (InterruptedException e) {
245+
interrupted = true;
246+
}
247+
}
248+
} finally {
249+
if (interrupted) {
250+
Thread.currentThread().interrupt();
251+
}
252+
}
253+
}
254+
255+
// TODO(user): Support Sleeper somehow (wrapper or interface method)?
256+
/**
257+
* Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)}
258+
* uninterruptibly.
259+
*/
260+
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
261+
boolean interrupted = false;
262+
try {
263+
long remainingNanos = unit.toNanos(sleepFor);
264+
long end = System.nanoTime() + remainingNanos;
265+
while (true) {
266+
try {
267+
// TimeUnit.sleep() treats negative timeouts just like zero.
268+
NANOSECONDS.sleep(remainingNanos);
269+
return;
270+
} catch (InterruptedException e) {
271+
interrupted = true;
272+
remainingNanos = end - System.nanoTime();
273+
}
274+
}
275+
} finally {
276+
if (interrupted) {
277+
Thread.currentThread().interrupt();
278+
}
279+
}
280+
}
281+
282+
// TODO(user): Add support for waitUninterruptibly.
283+
284+
private Uninterruptibles() {}
285+
}
286+

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
package com.audienceproject.spark.dynamodb.connector
2222

2323
import com.amazonaws.services.dynamodbv2.document.DynamoDB
24-
import com.audienceproject.spark.dynamodb.util.RateLimiter
24+
import com.audienceproject.com.google.common.util.concurrent.RateLimiter
2525
import org.apache.spark.sql.catalyst.InternalRow
2626

2727
private[dynamodb] trait DynamoWritable {

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import com.amazonaws.services.dynamodbv2.document._
2424
import com.amazonaws.services.dynamodbv2.document.spec.{BatchWriteItemSpec, ScanSpec, UpdateItemSpec}
2525
import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity
2626
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder
27+
import com.audienceproject.com.google.common.util.concurrent.RateLimiter
2728
import com.audienceproject.spark.dynamodb.catalyst.JavaConverter
28-
import com.audienceproject.spark.dynamodb.util.RateLimiter
2929
import org.apache.spark.sql.catalyst.InternalRow
3030
import org.apache.spark.sql.sources.Filter
3131

@@ -168,7 +168,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
168168
// Update item and rate limit on write capacity.
169169
val response = client.getTable(tableName).updateItem(updateItemSpec)
170170
Option(response.getUpdateItemResult.getConsumedCapacity)
171-
.foreach(cap => rateLimiter.acquire(cap.getCapacityUnits))
171+
.foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt))
172172
}
173173

174174
@tailrec

0 commit comments

Comments
 (0)