Skip to content

Commit f6ab47b

Browse files
IGNITE-27829 Java thin: Add cache name to exceptions with cache operations
1 parent 128d3f3 commit f6ab47b

File tree

6 files changed

+241
-27
lines changed

6 files changed

+241
-27
lines changed

modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.ignite.cache.QueryIndexType;
4949
import org.apache.ignite.cache.query.SqlFieldsQuery;
5050
import org.apache.ignite.client.ClientCacheConfiguration;
51+
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
5152
import org.apache.ignite.internal.binary.BinaryContext;
5253
import org.apache.ignite.internal.binary.BinaryFieldMetadata;
5354
import org.apache.ignite.internal.binary.BinaryMetadata;
@@ -369,16 +370,17 @@ void cacheConfiguration(ClientCacheConfiguration cfg, boolean sql, BinaryOutputS
369370
});
370371
}
371372
else if (cfg.getExpiryPolicy() != null) {
372-
throw new ClientProtocolError(String.format("Expire policies are not supported by the server " +
373-
"version %s, required version %s", protocolCtx.version(), EXPIRY_POLICY.verIntroduced()));
373+
throw new ClientFeatureNotSupportedByServerException(String.format(
374+
"Expire policies are not supported by the server version %s, required version %s",
375+
protocolCtx.version(), EXPIRY_POLICY.verIntroduced()));
374376
}
375377

376378
if (protocolCtx.isFeatureSupported(CACHE_STORAGES)) {
377379
itemWriter.accept(CfgItem.STORAGE_PATH, w -> w.writeStringArray(cfg.getStoragePaths()));
378380
itemWriter.accept(CfgItem.IDX_PATH, w -> w.writeString(cfg.getIndexPath()));
379381
}
380382
else if (!F.isEmpty(cfg.getStoragePaths()) || !F.isEmpty(cfg.getIndexPath()))
381-
throw new ClientProtocolError("Cache storages are not supported by the server");
383+
throw new ClientFeatureNotSupportedByServerException("Cache storages are not supported by the server");
382384

383385
writer.writeInt(origPos, out.position() - origPos - 4); // configuration length
384386
writer.writeInt(origPos + 4, propCnt.get()); // properties count

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java

Lines changed: 145 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Set;
2727
import java.util.SortedMap;
2828
import java.util.SortedSet;
29+
import java.util.UUID;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.function.Consumer;
3132
import java.util.function.Function;
@@ -108,7 +109,7 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
108109
private final int cacheId;
109110

110111
/** Channel. */
111-
private final ReliableChannelImpl ch;
112+
private final ReliableChannelEx ch;
112113

113114
/** Cache name. */
114115
private final String name;
@@ -144,11 +145,13 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
144145
/** Constructor. */
145146
TcpClientCache(String name, ReliableChannelImpl ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
146147
ClientCacheEntryListenersRegistry lsnrsRegistry, IgniteLogger log) {
147-
this(name, ch, marsh, transactions, lsnrsRegistry, false, null, log);
148+
this(name, new ReliableChannelWrapper(ch, name), marsh, transactions, lsnrsRegistry, false, null, log);
149+
150+
ch.registerCacheIfCustomAffinity(name);
148151
}
149152

150153
/** Constructor. */
151-
TcpClientCache(String name, ReliableChannelImpl ch, ClientBinaryMarshaller marsh,
154+
private TcpClientCache(String name, ReliableChannelEx ch, ClientBinaryMarshaller marsh,
152155
TcpClientTransactions transactions, ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary,
153156
ExpiryPolicy expiryPlc, IgniteLogger log) {
154157
this.name = name;
@@ -165,8 +168,6 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
165168

166169
jCacheAdapter = new ClientJCacheAdapter<>(this);
167170

168-
this.ch.registerCacheIfCustomAffinity(this.name);
169-
170171
this.log = log;
171172
}
172173

@@ -1428,6 +1429,9 @@ private <T> T txAwareService(
14281429
throw new ClientException("Transaction context has been lost due to connection errors. " +
14291430
"Cache operations are prohibited until current transaction closed.", e);
14301431
}
1432+
catch (Exception e) {
1433+
throw convertException(e, name);
1434+
}
14311435
}
14321436
else if (affKey != null)
14331437
return ch.affinityService(cacheId, affKey, op, payloadWriter, payloadReader);
@@ -1459,7 +1463,7 @@ private <T> IgniteClientFuture<T> txAwareServiceAsync(
14591463
"Cache operations are prohibited until current transaction closed.", err));
14601464
}
14611465
else if (err != null)
1462-
fut.completeExceptionally(err);
1466+
fut.completeExceptionally(convertException((Exception)err, name));
14631467
else
14641468
fut.complete(res);
14651469
});
@@ -1540,8 +1544,9 @@ private void writeCacheInfo(PayloadOutputChannel payloadCh, TcpClientTransaction
15401544
ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx();
15411545

15421546
if (!protocolCtx.isFeatureSupported(EXPIRY_POLICY)) {
1543-
throw new ClientProtocolError(String.format("Expire policies are not supported by the server " +
1544-
"version %s, required version %s", protocolCtx.version(), EXPIRY_POLICY.verIntroduced()));
1547+
throw new ClientFeatureNotSupportedByServerException(String.format(
1548+
"Expire policies are not supported by the server version %s, required version %s",
1549+
protocolCtx.version(), EXPIRY_POLICY.verIntroduced()));
15451550
}
15461551

15471552
flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
@@ -1754,4 +1759,136 @@ private boolean canBlockTx(boolean isGetOp, TransactionConcurrency concurrency,
17541759

17551760
return true;
17561761
}
1762+
1763+
/** */
1764+
private static ClientException convertException(Exception e, String cacheName) {
1765+
String msg = "Failed to perform cache operation [cacheName=" + cacheName + "]: " + e.getMessage();
1766+
1767+
// Exception can be ClientProtocolError - it's an internal exception, can't be thrown to user.
1768+
if (!(e instanceof ClientException))
1769+
return new ClientException(msg, e);
1770+
else if (X.hasCause(e, ClientServerError.class)) // Wrap server errors.
1771+
return new ClientException(msg, e);
1772+
else // Don't wrap authentication, authorization, connection errors.
1773+
return (ClientException)e;
1774+
}
1775+
1776+
/** */
1777+
private static class ReliableChannelWrapper implements ReliableChannelEx {
1778+
/** */
1779+
private final ReliableChannelImpl delegate;
1780+
1781+
/** */
1782+
private final String cacheName;
1783+
1784+
/** */
1785+
public ReliableChannelWrapper(ReliableChannelImpl delegate, String cacheName) {
1786+
this.delegate = delegate;
1787+
this.cacheName = cacheName;
1788+
}
1789+
1790+
/** {@inheritDoc} */
1791+
@Override public <T> T service(
1792+
ClientOperation op,
1793+
Consumer<PayloadOutputChannel> payloadWriter,
1794+
Function<PayloadInputChannel, T> payloadReader
1795+
) throws ClientException, ClientError {
1796+
try {
1797+
return delegate.service(op, payloadWriter, payloadReader);
1798+
}
1799+
catch (ClientException e) {
1800+
throw convertException(e, cacheName);
1801+
}
1802+
}
1803+
1804+
/** {@inheritDoc} */
1805+
@Override public <T> T service(
1806+
ClientOperation op,
1807+
Consumer<PayloadOutputChannel> payloadWriter,
1808+
Function<PayloadInputChannel, T> payloadReader,
1809+
List<UUID> targetNodes
1810+
) throws ClientException, ClientError {
1811+
try {
1812+
return delegate.service(op, payloadWriter, payloadReader, targetNodes);
1813+
}
1814+
catch (ClientException e) {
1815+
throw convertException(e, cacheName);
1816+
}
1817+
}
1818+
1819+
/** {@inheritDoc} */
1820+
@Override public <T> IgniteClientFuture<T> serviceAsync(
1821+
ClientOperation op,
1822+
Consumer<PayloadOutputChannel> payloadWriter,
1823+
Function<PayloadInputChannel, T> payloadReader
1824+
) throws ClientException, ClientError {
1825+
CompletableFuture<T> fut = new CompletableFuture<>();
1826+
1827+
delegate.serviceAsync(op, payloadWriter, payloadReader).whenComplete((res, err) -> {
1828+
if (err != null)
1829+
fut.completeExceptionally(convertException((Exception)err, cacheName));
1830+
else
1831+
fut.complete(res);
1832+
});
1833+
1834+
return new IgniteClientFutureImpl<>(fut);
1835+
}
1836+
1837+
/** {@inheritDoc} */
1838+
@Override public <T> T affinityService(
1839+
int cacheId,
1840+
Object key,
1841+
ClientOperation op,
1842+
Consumer<PayloadOutputChannel> payloadWriter,
1843+
Function<PayloadInputChannel, T> payloadReader
1844+
) throws ClientException, ClientError {
1845+
try {
1846+
return delegate.affinityService(cacheId, key, op, payloadWriter, payloadReader);
1847+
}
1848+
catch (ClientException e) {
1849+
throw convertException(e, cacheName);
1850+
}
1851+
}
1852+
1853+
/** {@inheritDoc} */
1854+
@Override public <T> T affinityService(
1855+
int cacheId,
1856+
int part,
1857+
ClientOperation op,
1858+
Consumer<PayloadOutputChannel> payloadWriter,
1859+
Function<PayloadInputChannel, T> payloadReader
1860+
) throws ClientException, ClientError {
1861+
try {
1862+
return delegate.affinityService(cacheId, part, op, payloadWriter, payloadReader);
1863+
}
1864+
catch (ClientException e) {
1865+
throw convertException(e, cacheName);
1866+
}
1867+
}
1868+
1869+
/** {@inheritDoc} */
1870+
@Override public <T> IgniteClientFuture<T> affinityServiceAsync(
1871+
int cacheId,
1872+
Object key,
1873+
ClientOperation op,
1874+
Consumer<PayloadOutputChannel> payloadWriter,
1875+
Function<PayloadInputChannel, T> payloadReader
1876+
) throws ClientException, ClientError {
1877+
CompletableFuture<T> fut = new CompletableFuture<>();
1878+
1879+
delegate.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader).whenComplete((res, err) -> {
1880+
if (err != null)
1881+
fut.completeExceptionally(convertException((Exception)err, cacheName));
1882+
else
1883+
fut.complete(res);
1884+
});
1885+
1886+
return new IgniteClientFutureImpl<>(fut);
1887+
}
1888+
1889+
/** {@inheritDoc} */
1890+
@Override public void close() {
1891+
delegate.close();
1892+
}
1893+
}
17571894
}

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.atomic.AtomicLong;
2323
import org.apache.ignite.client.ClientConnectionException;
2424
import org.apache.ignite.client.ClientException;
25+
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
2526
import org.apache.ignite.client.ClientTransaction;
2627
import org.apache.ignite.client.ClientTransactions;
2728
import org.apache.ignite.configuration.ClientTransactionConfiguration;
@@ -95,8 +96,9 @@ private ClientTransaction txStart0(TransactionConcurrency concurrency, Transacti
9596
ProtocolContext protocolCtx = req.clientChannel().protocolCtx();
9697

9798
if (!protocolCtx.isFeatureSupported(TRANSACTIONS)) {
98-
throw new ClientProtocolError(String.format("Transactions are not supported by the server's " +
99-
"protocol version %s, required version %s", protocolCtx.version(), TRANSACTIONS.verIntroduced()));
99+
throw new ClientFeatureNotSupportedByServerException(String.format(
100+
"Transactions are not supported by the server's protocol version %s, required version %s",
101+
protocolCtx.version(), TRANSACTIONS.verIntroduced()));
100102
}
101103

102104
try (BinaryWriterEx writer = BinaryUtils.writer(marsh.context(), req.out(), null)) {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.client.thin;
19+
20+
import java.util.Objects;
21+
import org.apache.ignite.cache.query.ScanQuery;
22+
import org.apache.ignite.client.ClientCache;
23+
import org.apache.ignite.client.ClientException;
24+
import org.apache.ignite.client.ClientTransaction;
25+
import org.apache.ignite.client.IgniteClient;
26+
import org.apache.ignite.testframework.GridTestUtils;
27+
import org.junit.Test;
28+
29+
/**
30+
* Thin client cache exceptions tests.
31+
*/
32+
public class CacheExceptionsTest extends AbstractThinClientTest {
33+
/** {@inheritDoc} */
34+
@Override protected void beforeTestsStarted() throws Exception {
35+
super.beforeTestsStarted();
36+
37+
startGrid(0);
38+
}
39+
40+
/**
41+
* Tests cache name in wrapped cache exception for server errors.
42+
*/
43+
@Test
44+
public void testCacheExceptionWrapped() throws Exception {
45+
try (IgniteClient client = startClient(0)) {
46+
String cacheName = "testCacheName";
47+
48+
ClientCache<Object, Objects> cache = client.cache(cacheName);
49+
50+
// Affinity call.
51+
GridTestUtils.assertThrowsAnyCause(log, () -> cache.get(0), ClientException.class, cacheName);
52+
53+
// Async affinity call.
54+
GridTestUtils.assertThrowsAnyCause(log, () -> cache.getAsync(0).get(), ClientException.class, cacheName);
55+
56+
// Non-affinity call.
57+
GridTestUtils.assertThrowsAnyCause(log, () -> cache.size(), ClientException.class, cacheName);
58+
59+
// Async non-affinity call.
60+
GridTestUtils.assertThrowsAnyCause(log, () -> cache.sizeAsync().get(), ClientException.class, cacheName);
61+
62+
// Transactional call.
63+
GridTestUtils.assertThrowsAnyCause(log, () -> {
64+
try (ClientTransaction ignore = client.transactions().txStart()) {
65+
return cache.get(0);
66+
}
67+
}, ClientException.class, cacheName);
68+
69+
// Async transactional call.
70+
GridTestUtils.assertThrowsAnyCause(log, () -> {
71+
try (ClientTransaction ignore = client.transactions().txStart()) {
72+
return cache.getAsync(0).get();
73+
}
74+
}, ClientException.class, cacheName);
75+
76+
// Query.
77+
GridTestUtils.assertThrowsAnyCause(log, () -> cache.query(new ScanQuery<>()).getAll(), ClientException.class, cacheName);
78+
79+
// Affinity query.
80+
GridTestUtils.assertThrowsAnyCause(log, () -> cache.query(new ScanQuery<>().setPartition(0)).getAll(),
81+
ClientException.class, cacheName);
82+
}
83+
}
84+
}

modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,19 +1671,6 @@ protected <T extends Event> IgniteFuture<T> waitForLocalEvent(IgniteEvents evts,
16711671
return fut;
16721672
}
16731673

1674-
/**
1675-
* @param e Exception.
1676-
* @param exCls Ex class.
1677-
*/
1678-
protected <T extends IgniteException> void assertCacheExceptionWithCause(RuntimeException e, Class<T> exCls) {
1679-
if (exCls.isAssignableFrom(e.getClass()))
1680-
return;
1681-
1682-
if (e.getClass() != CacheException.class
1683-
|| e.getCause() == null || !exCls.isAssignableFrom(e.getCause().getClass()))
1684-
throw e;
1685-
}
1686-
16871674
/**
16881675
* @param cache Cache.
16891676
*/

modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.ignite.internal.client.thin.BlockingTxOpsTest;
2424
import org.apache.ignite.internal.client.thin.CacheAsyncTest;
2525
import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
26+
import org.apache.ignite.internal.client.thin.CacheExceptionsTest;
2627
import org.apache.ignite.internal.client.thin.ClusterApiTest;
2728
import org.apache.ignite.internal.client.thin.ClusterGroupClusterRestartTest;
2829
import org.apache.ignite.internal.client.thin.ClusterGroupTest;
@@ -108,7 +109,8 @@
108109
InvokeTest.class,
109110
ExtraColumnInH2RowsTest.class,
110111
RecoveryModeTest.class,
111-
ReliableChannelDuplicationTest.class
112+
ReliableChannelDuplicationTest.class,
113+
CacheExceptionsTest.class,
112114
})
113115
public class ClientTestSuite {
114116
// No-op.

0 commit comments

Comments
 (0)