Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ClientAtomicLongImpl implements ClientAtomicLong {
private final String groupName;

/** */
private final ReliableChannel ch;
private final ReliableChannelEx ch;

/** Cache id. */
private final int cacheId;
Expand All @@ -47,7 +47,7 @@ public class ClientAtomicLongImpl implements ClientAtomicLong {
* @param groupName Cache group name.
* @param ch Channel.
*/
public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableChannel ch) {
public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableChannelEx ch) {
// name and groupName uniquely identify the data structure.
this.name = name;
this.groupName = groupName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ClientCacheEntryListenerHandler<K, V> implements NotificationListen
private final Cache<K, V> jCacheAdapter;

/** */
private final ReliableChannel ch;
private final ReliableChannelEx ch;

/** */
private final boolean keepBinary;
Expand All @@ -76,7 +76,7 @@ public class ClientCacheEntryListenerHandler<K, V> implements NotificationListen
/** */
ClientCacheEntryListenerHandler(
Cache<K, V> jCacheAdapter,
ReliableChannel ch,
ReliableChannelEx ch,
ClientBinaryMarshaller marsh,
boolean keepBinary
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
*/
class ClientClusterGroupImpl implements ClientClusterGroup {
/** Channel. */
protected final ReliableChannel ch;
protected final ReliableChannelEx ch;

/** Marshaller utils. */
protected final ClientUtils utils;
Expand All @@ -79,7 +79,7 @@ class ClientClusterGroupImpl implements ClientClusterGroup {
/**
*
*/
ClientClusterGroupImpl(ReliableChannel ch, ClientBinaryMarshaller marsh) {
ClientClusterGroupImpl(ReliableChannelEx ch, ClientBinaryMarshaller marsh) {
this.ch = ch;

utils = new ClientUtils(marsh);
Expand All @@ -90,7 +90,7 @@ class ClientClusterGroupImpl implements ClientClusterGroup {
/**
*
*/
private ClientClusterGroupImpl(ReliableChannel ch, ClientUtils utils,
private ClientClusterGroupImpl(ReliableChannelEx ch, ClientUtils utils,
ProjectionFilters projectionFilters) {
this.ch = ch;
this.utils = utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ClientClusterImpl extends ClientClusterGroupImpl implements ClientC
/**
* Constructor.
*/
ClientClusterImpl(ReliableChannel ch, ClientBinaryMarshaller marsh) {
ClientClusterImpl(ReliableChannelEx ch, ClientBinaryMarshaller marsh) {
super(ch, marsh);

dfltClusterGrp = (ClientClusterGroupImpl)forServers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ClientComputeImpl implements ClientCompute {
private static final byte NO_RESULT_CACHE_FLAG_MASK = 0x02;

/** Channel. */
private final ReliableChannel ch;
private final ReliableChannelEx ch;

/** Utils for serialization/deserialization. */
private final ClientUtils utils;
Expand All @@ -69,7 +69,7 @@ class ClientComputeImpl implements ClientCompute {
private final AtomicInteger tasksCnt = new AtomicInteger();

/** Constructor. */
ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl dfltGrp) {
ClientComputeImpl(ReliableChannelEx ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl dfltGrp) {
this.ch = ch;
this.dfltGrp = dfltGrp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ClientFieldsQueryPager extends GenericQueryPager<List<?>> implements Field

/** Constructor. */
ClientFieldsQueryPager(
ReliableChannel ch,
ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
Expand All @@ -59,7 +59,7 @@ class ClientFieldsQueryPager extends GenericQueryPager<List<?>> implements Field

/** Constructor. */
ClientFieldsQueryPager(
ReliableChannel ch,
ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ClientIgniteSetImpl<T> implements ClientIgniteSet<T> {
private final String name;

/** */
private final ReliableChannel ch;
private final ReliableChannelEx ch;

/** */
private final ClientUtils serDes;
Expand All @@ -69,7 +69,7 @@ class ClientIgniteSetImpl<T> implements ClientIgniteSet<T> {
* @param cacheId Cache id.
*/
public ClientIgniteSetImpl(
ReliableChannel ch,
ReliableChannelEx ch,
ClientUtils serDes,
String name,
boolean colocated,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ClientQueryPager<K, V> extends GenericQueryPager<Cache.Entry<K, V>> {

/** Constructor. */
ClientQueryPager(
ReliableChannel ch,
ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
Expand All @@ -53,7 +53,7 @@ class ClientQueryPager<K, V> extends GenericQueryPager<Cache.Entry<K, V>> {

/** Constructor. */
ClientQueryPager(
ReliableChannel ch,
ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ClientServicesImpl implements ClientServices {
static final int SRV_TOP_UPDATE_PERIOD = 60_000;

/** Channel. */
private final ReliableChannel ch;
private final ReliableChannelImpl ch;

/** Binary marshaller. */
private final ClientBinaryMarshaller marsh;
Expand All @@ -75,7 +75,7 @@ class ClientServicesImpl implements ClientServices {
private final Map<String, ServiceTopology> servicesTopologies;

/** Constructor. */
ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl grp, IgniteLogger log) {
ClientServicesImpl(ReliableChannelImpl ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl grp, IgniteLogger log) {
this.ch = ch;
this.marsh = marsh;
this.grp = grp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
private final Consumer<PayloadOutputChannel> qryWriter;

/** Channel. */
private final ReliableChannel ch;
private final ReliableChannelEx ch;

/** Client Transaction. */
private final @Nullable TcpClientTransaction tx;
Expand All @@ -64,7 +64,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {

/** Constructor. */
GenericQueryPager(
ReliableChannel ch,
ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
Expand All @@ -83,7 +83,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {

/** Constructor. */
GenericQueryPager(
ReliableChannel ch,
ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.client.thin;

import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.client.ClientAuthenticationException;
import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.IgniteClientFuture;

/**
* Interface for communication channel with failover and partition awareness.
*/
interface ReliableChannelEx extends AutoCloseable {
/**
* Send request and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError;

/**
* Send request to one of the passed nodes and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader,
List<UUID> targetNodes
) throws ClientException, ClientError;

/**
* Send request and handle response asynchronously.
*/
public <T> IgniteClientFuture<T> serviceAsync(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError;

/**
* Send request to affinity node and handle response.
*/
public <T> T affinityService(
int cacheId,
Object key,
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError;

/**
* Send request to affinity node and handle response.
*/
public <T> T affinityService(
int cacheId,
int part,
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError;

/**
* Send request to affinity node and handle response asynchronously.
*/
public <T> IgniteClientFuture<T> affinityServiceAsync(
int cacheId,
Object key,
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError;

/**
* Send request without payload and handle response.
*/
public default <T> T service(
ClientOperation op,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
return service(op, null, payloadReader);
}

/**
* Send request without payload and handle response asynchronously.
*/
public default <T> IgniteClientFuture<T> serviceAsync(
ClientOperation op,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
return serviceAsync(op, null, payloadReader);
}

/**
* Send request and handle response without payload.
*/
public default void request(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter
) throws ClientException, ClientError {
service(op, payloadWriter, null);
}

/**
* Send request and handle response without payload asynchronously.
*/
public default IgniteClientFuture<Void> requestAsync(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter
) throws ClientException, ClientError {
return serviceAsync(op, payloadWriter, null);
}
}
Loading
Loading