Skip to content

Commit ae1cd02

Browse files
committed
Rethrowing Exception from CassandraIO's ReadFn
1 parent 39892d1 commit ae1cd02

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
* (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)).
8989
* (Java) Fixed TIME field encodings for BigQuery Storage API writes on GenericRecords ([#34059](https://github.com/apache/beam/pull/34059)).
9090
* (Java) Fixed a race condition in JdbcIO which could cause hangs trying to acquire a connection ([#34058](https://github.com/apache/beam/pull/34058)).
91+
* (Java) Fixed cassandraIO ReadAll does not let a pipeline handle or retry exceptions ([#34191](https://github.com/apache/beam/pull/34191)).
9192

9293
## Security Fixes
9394
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
4242
private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
4343

4444
@ProcessElement
45-
public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
45+
public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) throws Exception {
4646
try {
4747
Session session = ConnectionManager.getSession(read);
4848
Mapper<T> mapper = read.mapperFactoryFn().apply(session);
@@ -89,6 +89,7 @@ public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
8989
}
9090
} catch (Exception ex) {
9191
LOG.error("error", ex);
92+
throw ex;
9293
}
9394
}
9495

@@ -107,7 +108,7 @@ private static String getHighestSplitQuery(
107108
String finalHighQuery =
108109
(spec.query() == null)
109110
? buildInitialQuery(spec, true) + highestClause
110-
: spec.query() + getJoinerClause(spec.query().get()) + highestClause;
111+
: spec.query().get() + getJoinerClause(spec.query().get()) + highestClause;
111112
LOG.debug("CassandraIO generated a wrapAround query : {}", finalHighQuery);
112113
return finalHighQuery;
113114
}
@@ -117,7 +118,7 @@ private static String getLowestSplitQuery(Read<?> spec, String partitionKey, Big
117118
String finalLowQuery =
118119
(spec.query() == null)
119120
? buildInitialQuery(spec, true) + lowestClause
120-
: spec.query() + getJoinerClause(spec.query().get()) + lowestClause;
121+
: spec.query().get() + getJoinerClause(spec.query().get()) + lowestClause;
121122
LOG.debug("CassandraIO generated a wrapAround query : {}", finalLowQuery);
122123
return finalLowQuery;
123124
}

0 commit comments

Comments
 (0)