Skip to content

Commit 8c2f77d

Browse files
committed
Use --type=TxGroupingParallelGatewaySender... as suggested in jbarrett's review
1 parent cbe859d commit 8c2f77d

File tree

49 files changed

+524
-213
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+524
-213
lines changed

geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
210210
clonedEvent.setEventId(newEventId);
211211
}
212212

213+
@Override
214+
public String getType() {
215+
return "ParallelAsyncEventQueue";
216+
}
217+
213218
private ThreadsMonitoring getThreadMonitorObj() {
214219
DistributionManager distributionManager = cache.getDistributionManager();
215220
if (distributionManager != null) {
@@ -218,4 +223,5 @@ private ThreadsMonitoring getThreadMonitorObj() {
218223
return null;
219224
}
220225
}
226+
221227
}

geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
264264
clonedEvent.setEventId(newEventId);
265265
}
266266

267+
@Override
268+
public String getType() {
269+
return "SerialAsyncEventQueue";
270+
}
271+
267272
private ThreadsMonitoring getThreadMonitorObj() {
268273
DistributionManager distributionManager = cache.getDistributionManager();
269274
if (distributionManager != null) {

geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2588,6 +2588,7 @@ public void setOverflowDirectory(String value) {
25882588
* <attribute name="id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" />
25892589
* <attribute name="remote-distributed-system-id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" />
25902590
* <attribute name="parallel" type="{http://www.w3.org/2001/XMLSchema}boolean" />
2591+
* <attribute name="type" type="{http://www.w3.org/2001/XMLSchema}string" />
25912592
* <attribute name="manual-start" type="{http://www.w3.org/2001/XMLSchema}boolean" />
25922593
* <attribute name="socket-buffer-size" type="{http://www.w3.org/2001/XMLSchema}string" />
25932594
* <attribute name="socket-read-timeout" type="{http://www.w3.org/2001/XMLSchema}string" />
@@ -2628,6 +2629,8 @@ public static class GatewaySender {
26282629
protected String remoteDistributedSystemId;
26292630
@XmlAttribute(name = "parallel")
26302631
protected Boolean parallel;
2632+
@XmlAttribute(name = "type")
2633+
protected String type;
26312634
@XmlAttribute(name = "manual-start")
26322635
protected Boolean manualStart;
26332636
@XmlAttribute(name = "socket-buffer-size")
@@ -2820,6 +2823,29 @@ public void setParallel(Boolean value) {
28202823
parallel = value;
28212824
}
28222825

2826+
/**
2827+
* Gets the value of the parallel property.
2828+
*
2829+
* possible object is
2830+
* {@link String }
2831+
*
2832+
*/
2833+
2834+
public String getType() {
2835+
return type;
2836+
}
2837+
2838+
/**
2839+
* Sets the value of the type property.
2840+
*
2841+
* allowed object is
2842+
* {@link String }
2843+
*
2844+
*/
2845+
public void setType(String value) {
2846+
this.type = value;
2847+
}
2848+
28232849
/**
28242850
* Gets the value of the manualStart property.
28252851
*

geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
import java.util.List;
1818

1919
import org.apache.geode.annotations.Immutable;
20-
import org.apache.geode.internal.lang.SystemProperty;
21-
import org.apache.geode.internal.lang.SystemPropertyHelper;
2220
import org.apache.geode.util.internal.GeodeGlossary;
2321

2422
/**
@@ -155,31 +153,7 @@ public interface GatewaySender {
155153
int CONNECTION_RETRY_INTERVAL = Integer
156154
.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000);
157155

158-
/**
159-
* Number of times to retry to get events for a transaction from the gateway sender queue when
160-
* group-transaction-events is set to true.
161-
* When group-transaction-events is set to true and a batch ready to be sent does not contain
162-
* all the events for all the transactions to which the events belong, the gateway sender will try
163-
* to get the missing events of the transactions from the queue to add them to the batch
164-
* before sending it.
165-
* If the missing events are not in the queue when the gateway sender tries to get them
166-
* it will retry for a maximum of times equal to the value set in this parameter before
167-
* delivering the batch without the missing events and logging an error.
168-
* Setting this parameter to a very low value could cause that under heavy load and
169-
* group-transaction-events set to true, batches are sent with incomplete transactions. Setting it
170-
* to a high value could cause that under heavy load and group-transaction-events set to true,
171-
* batches are held for some time before being sent.
172-
*/
173-
int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
174-
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries",
175-
10);
176-
/**
177-
* Milliseconds to wait before retrying to get events for a transaction from the
178-
* gateway sender queue when group-transaction-events is true.
179-
*/
180-
int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
181-
SystemProperty.getProductIntegerProperty(
182-
SystemPropertyHelper.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS).orElse(1);
156+
String DEFAULT_TYPE = "SerialGatewaySender";
183157

184158
/**
185159
* The order policy. This enum is applicable only when concurrency-level is > 1.
@@ -418,10 +392,13 @@ enum OrderPolicy {
418392
*/
419393
boolean isParallel();
420394

395+
String getType();
396+
421397
/**
422398
* Returns groupTransactionEvents boolean property for this GatewaySender.
423399
*
424400
* @return groupTransactionEvents boolean property for this GatewaySender
401+
* @deprecated use {@link #getType()}.
425402
*
426403
*/
427404
boolean mustGroupTransactionEvents();

geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public interface GatewaySenderFactory {
4949
*/
5050
GatewaySenderFactory setGroupTransactionEvents(boolean groupTransactionEvents);
5151

52+
GatewaySenderFactory setType(String type);
53+
5254
/**
5355
* Adds a <code>GatewayEventFilter</code>
5456
*

geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
126126

127127
protected boolean isParallel;
128128

129-
protected int retriesToGetTransactionEventsFromQueue;
130-
131129
protected boolean isForInternalUse;
132130

133131
protected boolean isDiskSynchronous;
@@ -255,7 +253,6 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
255253
alertThreshold = attrs.getAlertThreshold();
256254
copyDeprecatedAttributes(attrs);
257255
isParallel = attrs.isParallel();
258-
retriesToGetTransactionEventsFromQueue = attrs.getRetriesToGetTransactionEventsFromQueue();
259256
isForInternalUse = attrs.isForInternalUse();
260257
diskStoreName = attrs.getDiskStoreName();
261258
remoteDSId = attrs.getRemoteDSId();
@@ -559,21 +556,12 @@ public boolean isParallel() {
559556
return isParallel;
560557
}
561558

559+
@Deprecated
562560
@Override
563561
public boolean mustGroupTransactionEvents() {
564562
return false;
565563
}
566564

567-
/**
568-
* Returns retriesToGetTransactionEventsFromQueue int property for this GatewaySender.
569-
*
570-
* @return retriesToGetTransactionEventsFromQueue int property for this GatewaySender
571-
*
572-
*/
573-
public int getRetriesToGetTransactionEventsFromQueue() {
574-
return retriesToGetTransactionEventsFromQueue;
575-
}
576-
577565
public boolean isForInternalUse() {
578566
return isForInternalUse;
579567
}
@@ -1008,11 +996,6 @@ private boolean checkForDistribution(EntryEventImpl event) {
1008996
return true;
1009997
}
1010998

1011-
public void distribute(EnumListenerEvent operation, EntryEventImpl event,
1012-
List<Integer> allRemoteDSIds) {
1013-
distribute(operation, event, allRemoteDSIds, false);
1014-
}
1015-
1016999
public void distribute(EnumListenerEvent operation, EntryEventImpl event,
10171000
List<Integer> allRemoteDSIds, boolean isLastEventInTransaction) {
10181001

geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ public interface GatewaySenderAttributes {
6666

6767
boolean mustGroupTransactionEvents();
6868

69-
int getRetriesToGetTransactionEventsFromQueue();
70-
7169
boolean isForInternalUse();
7270

7371
String getId();
@@ -91,4 +89,6 @@ public interface GatewaySenderAttributes {
9189
boolean isForwardExpirationDestroy();
9290

9391
boolean getEnforceThreadsConnectSameReceiver();
92+
93+
String getType();
9494
}

geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ public class GatewaySenderAttributesImpl implements MutableGatewaySenderAttribut
8181

8282
private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS;
8383

84-
private int retriesToGetTransactionEventsFromQueue =
85-
GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES;
84+
private String type = GatewaySender.DEFAULT_TYPE;
8685

8786
private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE;
8887

@@ -178,8 +177,9 @@ public void setGroupTransactionEvents(boolean groupTransEvents) {
178177
groupTransactionEvents = groupTransEvents;
179178
}
180179

181-
public void setRetriesToGetTransactionEventsFromQueue(int retries) {
182-
retriesToGetTransactionEventsFromQueue = retries;
180+
public void setType(String type) {
181+
this.type = type;
182+
isParallel = type.equals("Parallel") ? true : false;
183183
}
184184

185185
public void setForInternalUse(boolean forInternalUse) {
@@ -288,11 +288,6 @@ public boolean mustGroupTransactionEvents() {
288288
return groupTransactionEvents;
289289
}
290290

291-
@Override
292-
public int getRetriesToGetTransactionEventsFromQueue() {
293-
return retriesToGetTransactionEventsFromQueue;
294-
}
295-
296291
@Override
297292
public boolean isForInternalUse() {
298293
return isForInternalUse;
@@ -360,4 +355,9 @@ public boolean getEnforceThreadsConnectSameReceiver() {
360355
return enforceThreadsConnectSameReceiver;
361356
}
362357

358+
@Override
359+
public String getType() {
360+
return type;
361+
}
362+
363363
}

geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySenderFactory.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,4 @@ public interface InternalGatewaySenderFactory extends GatewaySenderFactory {
3030
void configureGatewaySender(GatewaySender senderCreation);
3131

3232
GatewaySenderFactory setLocatorDiscoveryCallback(LocatorDiscoveryCallback myLocatorCallback);
33-
34-
/**
35-
* Sets the maximum number of retries to get events from the queue
36-
* to complete a transaction when groupTransactionEvents is true.
37-
*
38-
* @param retries the maximum number of retries.
39-
*/
40-
GatewaySenderFactory setRetriesToGetTransactionEventsFromQueue(int retries);
4133
}

geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
1818

19-
import java.util.List;
20-
2119
import org.apache.geode.CancelCriterion;
2220
import org.apache.geode.cache.wan.GatewaySender;
2321
import org.apache.geode.distributed.internal.DistributionAdvisee;
@@ -26,7 +24,6 @@
2624
import org.apache.geode.distributed.internal.DistributionManager;
2725
import org.apache.geode.distributed.internal.InternalDistributedSystem;
2826
import org.apache.geode.internal.cache.EntryEventImpl;
29-
import org.apache.geode.internal.cache.EnumListenerEvent;
3027
import org.apache.geode.internal.cache.InternalCache;
3128
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
3229
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
@@ -38,10 +35,6 @@ public ParallelAsyncEventQueueCreation(InternalCache cache, GatewaySenderAttribu
3835
super(cache, disabledClock(), attrs);
3936
}
4037

41-
@Override
42-
public void distribute(EnumListenerEvent operation, EntryEventImpl event,
43-
List<Integer> remoteDSIds) {}
44-
4538
@Override
4639
public void start() {}
4740

@@ -56,6 +49,11 @@ public void rebalance() {
5649
throw new UnsupportedOperationException();
5750
}
5851

52+
@Override
53+
public String getType() {
54+
return "ParallelAsyncEventQueue";
55+
}
56+
5957
@Override
6058
public void fillInProfile(Profile profile) {}
6159

0 commit comments

Comments
 (0)