Skip to content
This repository was archived by the owner on Feb 9, 2019. It is now read-only.

Commit e080d86

Browse files
committed
RC2/3 changes;
better logging Pushlösung
1 parent bb25c7d commit e080d86

13 files changed

+148
-167
lines changed

conf/brs-default.properties

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,18 +79,11 @@ P2P.TimeoutIdle_ms = 30000
7979
# Blacklist peers for 600000 milliseconds (i.e. 10 minutes by default).
8080
P2P.BlacklistingTime_ms = 600000
8181

82-
# Enable re-broadcasting of new transactions until they are received back from at least one
83-
# peer, or found in the blockchain. This feature can optionally be disabled, to avoid the
84-
# risk of revealing that this node is the submitter of such re-broadcasted new transactions.
82+
# Enable priority (re-)broadcasting of transactions. When enabled incoming transactions
83+
# will be priority resent to the rebroadcast targets
8584
P2P.enableTxRebroadcast = yes
8685

87-
# Transactions that aren't confirmed for this many blocks start getting rebroadcast.
88-
P2P.rebroadcastTxAfter = 5
89-
90-
# Transactions being rebroadcast get rebroadcast every this many blocks until they are confirmed.
91-
P2P.rebroadcastTxEvery = 2
92-
93-
# Consider a new transaction or block sent after 10 peers have received it.
86+
# Amount of extra peers to send a transaction to after sending to all rebroadcast targets
9487
P2P.sendToLimit=10
9588

9689
# Max number of unconfirmed transactions that will be kept in cache.
@@ -99,13 +92,10 @@ P2P.maxUnconfirmedTransactions = 8192
9992
# Max percentage of unconfirmed transactions that have a full hash reference to another transaction kept in cache
10093
P2P.maxUnconfirmedTransactionsFullHashReferencePercentage = 5
10194

102-
# Max amount of unconfirmed transactions that will be asked to a peer at the same time
103-
P2P.limitUnconfirmedTransactionsToRetrieve = 1000
104-
10595
# JETTY pass-through options. See documentation at
10696
# https://www.eclipse.org/jetty/documentation/9.2.22.v20170531/dos-filter.html
10797
# P2P section:
108-
JETTY.P2P.DoSFilter = off
98+
JETTY.P2P.DoSFilter = on
10999
JETTY.P2P.DoSFilter.maxRequestsPerSec = 30
110100
JETTY.P2P.DoSFilter.delayMs = 500
111101
JETTY.P2P.DoSFilter.maxRequestMs = 300000
@@ -308,3 +298,6 @@ brs.debugTraceQuote =
308298

309299
# Log changes to unconfirmed balances.
310300
brs.debugLogUnconfirmed = false
301+
302+
# Timeout in Seconds to wait for a graceful shutdown
303+
brs.ShutdownTimeout = 180

src/brs/BlockchainProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ enum Event {
2323

2424
int getMinRollbackHeight();
2525

26-
void processPeerBlock(JSONObject request) throws BurstException;
26+
void processPeerBlock(JSONObject request, Peer peer) throws BurstException;
2727

2828
void fullReset();
2929

src/brs/BlockchainProcessorImpl.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import brs.statistics.StatisticsManagerImpl;
2121
import brs.services.AccountService;
2222
import brs.transactionduplicates.TransactionDuplicatesCheckerImpl;
23-
import brs.transactionduplicates.TransactionDuplicationResult;
2423
import brs.unconfirmedtransactions.UnconfirmedTransactionStore;
2524
import brs.util.ThreadPool;
2625

@@ -32,11 +31,8 @@
3231
import java.util.ArrayList;
3332
import java.util.Arrays;
3433
import java.util.Collections;
35-
import java.util.HashMap;
3634
import java.util.LinkedList;
3735
import java.util.List;
38-
import java.util.Map;
39-
import java.util.Set;
4036
import java.util.SortedSet;
4137
import java.util.TreeSet;
4238
import java.util.concurrent.Semaphore;
@@ -744,10 +740,10 @@ public int getMinRollbackHeight() {
744740
}
745741

746742
@Override
747-
public void processPeerBlock(JSONObject request) throws BurstException {
743+
public void processPeerBlock(JSONObject request, Peer peer) throws BurstException {
748744
Block newBlock = Block.parseBlock(request, blockchain.getHeight());
749745
if (newBlock == null) {
750-
logger.debug("Peer has announced an unprocessable block.");
746+
logger.debug("Peer {} has announced an unprocessable block.", peer.getPeerAddress());
751747
return;
752748
}
753749
/*
@@ -760,10 +756,9 @@ public void processPeerBlock(JSONObject request) throws BurstException {
760756
newBlock.setByteLength(newBlock.toString().length());
761757
blockService.calculateBaseTarget(newBlock, chainblock);
762758
downloadCache.addBlock(newBlock);
763-
logger.debug("Added from Anounce: Id: " +newBlock.getId()+" Height: "+newBlock.getHeight());
759+
logger.debug("Peer {} added block from Announce: Id: {} Height: {}", peer.getPeerAddress(), newBlock.getId(), newBlock.getHeight());
764760
} else {
765-
logger.debug("Peer sent us block: " + newBlock.getPreviousBlockId()
766-
+ " that does not match our chain.");
761+
logger.debug("Peer {} sent us block: {} which is not the follow-up block for {}", peer.getPeerAddress(), newBlock.getPreviousBlockId(), chainblock.getId());
767762
}
768763
}
769764

src/brs/TransactionProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ enum Event {
2525

2626
void clearUnconfirmedTransactions();
2727

28-
void broadcast(Transaction transaction) throws BurstException.ValidationException;
28+
Integer broadcast(Transaction transaction) throws BurstException.ValidationException;
2929

3030
void processPeerTransactions(JSONObject request, Peer peer) throws BurstException.ValidationException;
3131

src/brs/TransactionProcessorImpl.java

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.function.BiConsumer;
2222
import java.util.function.Function;
23+
import java.util.stream.Collectors;
2324
import org.json.simple.JSONArray;
2425
import org.json.simple.JSONObject;
2526
import org.slf4j.Logger;
@@ -84,11 +85,12 @@ public TransactionProcessorImpl(PropertyService propertyService,
8485

8586
JSONArray transactionsData = (JSONArray) response.get(UNCONFIRMED_TRANSACTIONS_RESPONSE);
8687

87-
if (transactionsData == null || transactionsData.isEmpty()) {
88+
if (transactionsData == null) {
8889
return;
8990
}
9091
try {
9192
List<Transaction> addedTransactions = processPeerTransactions(transactionsData, peer);
93+
Peers.feedingTime(peer, foodDispenser, doneFeedingLog);
9294

9395
if(! addedTransactions.isEmpty()) {
9496
List<Peer> activePrioPlusExtra = Peers.getAllActivePriorityPlusSomeExtraPeers();
@@ -180,40 +182,25 @@ public Transaction.Builder newTransactionBuilder(byte[] senderPublicKey, long am
180182
}
181183

182184
@Override
183-
public void broadcast(Transaction transaction) throws BurstException.ValidationException {
185+
public Integer broadcast(Transaction transaction) throws BurstException.ValidationException {
184186
if (! transaction.verifySignature()) {
185187
throw new BurstException.NotValidException("Transaction signature verification failed");
186188
}
187189
List<Transaction> processedTransactions;
188190
if (dbs.getTransactionDb().hasTransaction(transaction.getId())) {
189191
logger.info("Transaction " + transaction.getStringId() + " already in blockchain, will not broadcast again");
190-
return;
192+
return null;
191193
}
192194

193195
if (unconfirmedTransactionStore.exists(transaction.getId())) {
194-
/*
195-
if (enableTransactionRebroadcasting) {
196-
nonBroadcastedTransactions.add(transaction);
197-
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will re-broadcast");
198-
} else {*/
199-
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again");
200-
///}
201-
return;
196+
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again");
197+
return null;
202198
}
203199

204200
processedTransactions = processTransactions(Collections.singleton(transaction), null);
205201

206202
if(! processedTransactions.isEmpty()) {
207-
broadcastToPeers();
208-
}
209-
210-
if (processedTransactions.contains(transaction)) {
211-
/*
212-
if (enableTransactionRebroadcasting) {
213-
nonBroadcastedTransactions.add(transaction);
214-
}
215-
*/
216-
logger.debug("Accepted new transaction " + transaction.getStringId());
203+
return broadcastToPeers(true);
217204
} else {
218205
logger.debug("Could not accept new transaction " + transaction.getStringId());
219206
throw new BurstException.NotValidException("Invalid transaction " + transaction.getStringId());
@@ -226,7 +213,7 @@ public void processPeerTransactions(JSONObject request, Peer peer) throws BurstE
226213
List<Transaction> processedTransactions = processPeerTransactions(transactionsData, peer);
227214

228215
if(! processedTransactions.isEmpty()) {
229-
broadcastToPeers();
216+
broadcastToPeers(false);
230217
}
231218
}
232219

@@ -373,10 +360,16 @@ private List<Transaction> processTransactions(Collection<Transaction> transactio
373360
}
374361
}
375362

376-
private void broadcastToPeers() {
377-
for(Peer p: Peers.getAllActivePriorityPlusSomeExtraPeers()) {
363+
private int broadcastToPeers(boolean toAll) {
364+
List<? extends Peer> peersToSendTo = toAll ? Peers.getActivePeers().stream().limit(100).collect(Collectors.toList()) : Peers.getAllActivePriorityPlusSomeExtraPeers();
365+
366+
logger.info("Queueing up {} Peers for feeding", peersToSendTo.size());
367+
368+
for(Peer p: peersToSendTo) {
378369
Peers.feedingTime(p, foodDispenser, doneFeedingLog);
379370
}
371+
372+
return peersToSendTo.size();
380373
}
381374

382375
public void revalidateUnconfirmedTransactions() {

src/brs/http/APITransactionManagerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static brs.http.common.ResultFields.BROADCASTED_RESPONSE;
2424
import static brs.http.common.ResultFields.ERROR_RESPONSE;
2525
import static brs.http.common.ResultFields.FULL_HASH_RESPONSE;
26+
import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE;
2627
import static brs.http.common.ResultFields.SIGNATURE_HASH_RESPONSE;
2728
import static brs.http.common.ResultFields.TRANSACTION_BYTES_RESPONSE;
2829
import static brs.http.common.ResultFields.TRANSACTION_JSON_RESPONSE;
@@ -38,7 +39,6 @@
3839
import brs.Blockchain;
3940
import brs.Burst;
4041
import brs.BurstException;
41-
import brs.Constants;
4242
import brs.Transaction;
4343
import brs.Transaction.Builder;
4444
import brs.TransactionProcessor;
@@ -187,7 +187,7 @@ public JSONStreamAware createTransaction(HttpServletRequest req, Account senderA
187187
response.put(TRANSACTION_BYTES_RESPONSE, Convert.toHexString(transaction.getBytes()));
188188
response.put(SIGNATURE_HASH_RESPONSE, Convert.toHexString(Crypto.sha256().digest(transaction.getSignature())));
189189
if (broadcast) {
190-
transactionProcessor.broadcast(transaction);
190+
response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction));
191191
response.put(BROADCASTED_RESPONSE, true);
192192
} else {
193193
response.put(BROADCASTED_RESPONSE, false);

src/brs/http/BroadcastTransaction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static brs.http.common.ResultFields.ERROR_DESCRIPTION_RESPONSE;
77
import static brs.http.common.ResultFields.ERROR_RESPONSE;
88
import static brs.http.common.ResultFields.FULL_HASH_RESPONSE;
9+
import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE;
910
import static brs.http.common.ResultFields.TRANSACTION_RESPONSE;
1011

1112
import brs.BurstException;
@@ -45,7 +46,7 @@ JSONStreamAware processRequest(HttpServletRequest req) throws BurstException {
4546
JSONObject response = new JSONObject();
4647
try {
4748
transactionService.validate(transaction);
48-
transactionProcessor.broadcast(transaction);
49+
response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction));
4950
response.put(TRANSACTION_RESPONSE, transaction.getStringId());
5051
response.put(FULL_HASH_RESPONSE, transaction.getFullHash());
5152
} catch (BurstException.ValidationException | RuntimeException e) {

src/brs/http/common/ResultFields.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class ResultFields {
2121
public static final String DONE_RESPONSE = "done";
2222
public static final String SCAN_TIME_RESPONSE = "scanTime";
2323
public static final String BROADCASTED_RESPONSE = "broadcasted";
24+
public static final String NUMBER_PEERS_SENT_TO_RESPONSE = "numberPeersSentTo";
2425
public static final String UNSIGNED_TRANSACTION_BYTES_RESPONSE = "unsignedTransactionBytes";
2526
public static final String TRANSACTION_JSON_RESPONSE = "transactionJSON";
2627
public static final String TRANSACTION_BYTES_RESPONSE = "transactionBytes";

src/brs/peer/Peers.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static brs.props.Props.P2P_ENABLE_TX_REBROADCAST;
44
import static brs.props.Props.P2P_SEND_TO_LIMIT;
5+
import static brs.util.JSON.prepareRequest;
56

67
import brs.*;
78
import brs.props.Props;
@@ -164,7 +165,7 @@ public static void init(TimeService timeService, AccountService accountService,
164165
logger.debug("My peer info:\n" + json.toJSONString());
165166
myPeerInfoResponse = JSON.prepare(json);
166167
json.put("requestType", "getInfo");
167-
myPeerInfoRequest = JSON.prepareRequest(json);
168+
myPeerInfoRequest = prepareRequest(json);
168169

169170
if(propertyService.getBoolean(P2P_ENABLE_TX_REBROADCAST)) {
170171
rebroadcastPeers = Collections
@@ -509,7 +510,7 @@ private void updateSavedPeers() {
509510
{
510511
JSONObject request = new JSONObject();
511512
request.put("requestType", "getPeers");
512-
getPeersRequest = JSON.prepareRequest(request);
513+
getPeersRequest = prepareRequest(request);
513514
}
514515

515516
private volatile boolean addedNewPeer;
@@ -575,7 +576,7 @@ public void run() {
575576
JSONObject request = new JSONObject();
576577
request.put("requestType", "addPeers");
577578
request.put("peers", myPeers);
578-
peer.send(JSON.prepareRequest(request));
579+
peer.send(prepareRequest(request));
579580
}
580581

581582
} catch (Exception e) {
@@ -639,7 +640,7 @@ public static Collection<? extends Peer> getAllPeers() {
639640
return allPeers;
640641
}
641642

642-
public static Collection<? extends Peer> getActivePeers() {
643+
public static List<? extends Peer> getActivePeers() {
643644
List<PeerImpl> activePeers = new ArrayList<>();
644645
for (PeerImpl peer : peers.values()) {
645646
if (peer.getState() != Peer.State.NON_CONNECTED) {
@@ -752,7 +753,7 @@ public static void sendToSomePeers(Block block) {
752753
request.put("requestType", "processBlock");
753754

754755
blocksSendingService.submit(() -> {
755-
final JSONStreamAware jsonRequest = JSON.prepareRequest(request);
756+
final JSONStreamAware jsonRequest = prepareRequest(request);
756757

757758
int successful = 0;
758759
List<Future<JSONObject>> expectedResponses = new ArrayList<>();
@@ -789,7 +790,7 @@ public static void sendToSomePeers(Block block) {
789790
static {
790791
JSONObject request = new JSONObject();
791792
request.put("requestType", "getUnconfirmedTransactions");
792-
getUnconfirmedTransactionsRequest = JSON.prepareRequest(request);
793+
getUnconfirmedTransactionsRequest = prepareRequest(request);
793794
}
794795

795796
private static final ExecutorService utReceivingService = Executors.newCachedThreadPool();
@@ -814,26 +815,30 @@ public synchronized static void feedingTime(Peer peer, Function<Peer, List<Trans
814815

815816
private static void feedPeer(Peer peer, Function<Peer, List<Transaction>> foodDispenser, BiConsumer<Peer, List<Transaction>> doneFeedingLog) {
816817
List<Transaction> transactionsToSend = foodDispenser.apply(peer);
818+
817819
if(! transactionsToSend.isEmpty()) {
818-
logger.debug("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
819-
peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));
820+
logger.info("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
821+
JSONObject response = peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));
822+
823+
if(response != null && response.get("error") == null) {
824+
doneFeedingLog.accept(peer, transactionsToSend);
825+
} else {
826+
logger.error("Error feeding {} transactions: {} error: {}", peer.getPeerAddress(), transactionsToSend.stream().map(t -> t.getId()), response);
827+
}
820828
} else {
821-
logger.debug("No need to feed {}", peer.getPeerAddress());
829+
logger.info("No need to feed {}", peer.getPeerAddress());
822830
}
823831

824832
beingProcessed.remove(peer);
825833

826-
doneFeedingLog.accept(peer, transactionsToSend);
827-
828834
if(processingQueue.contains(peer)) {
829835
processingQueue.remove(peer);
830836
beingProcessed.add(peer);
831837
feedPeer(peer, foodDispenser, doneFeedingLog);
832838
}
833839
}
834840

835-
836-
private static JSONObject sendUnconfirmedTransactionsRequest(List<Transaction> transactions) {
841+
private static JSONStreamAware sendUnconfirmedTransactionsRequest(List<Transaction> transactions) {
837842
JSONObject request = new JSONObject();
838843
JSONArray transactionsData = new JSONArray();
839844

@@ -844,7 +849,7 @@ private static JSONObject sendUnconfirmedTransactionsRequest(List<Transaction> t
844849
request.put("requestType", "processTransactions");
845850
request.put("transactions", transactionsData);
846851

847-
return request;
852+
return prepareRequest(request);
848853
}
849854

850855
private static boolean peerEligibleForSending(Peer peer, boolean sendSameBRSclass) {

src/brs/peer/ProcessBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public JSONStreamAware processRequest(JSONObject request, Peer peer) {
4141
// when loading blockchain from scratch
4242
return NOT_ACCEPTED;
4343
}
44-
blockchainProcessor.processPeerBlock(request);
44+
blockchainProcessor.processPeerBlock(request, peer);
4545
return ACCEPTED;
4646

4747
} catch (BurstException|RuntimeException e) {

0 commit comments

Comments
 (0)