Skip to content
This repository was archived by the owner on Apr 22, 2025. It is now read-only.

Commit 42b302c

Browse files
Refactor DiscoveryService usage (#608)
Discovery results parsing makes use of partial results, but these should not be set on the discovery service itself during the parsing process to ensure no callers ever see partial discovery results. In combination with the above behaviour, the way the Contract obtained discovery results was prone to race conditions that could cause multiple discovery services to be created and invoked under load, and for partial discovery results to be obtained. Signed-off-by: Mark S. Lewis <[email protected]>
1 parent a5da38f commit 42b302c

File tree

9 files changed

+741
-724
lines changed

9 files changed

+741
-724
lines changed

.github/workflows/build.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,17 @@ jobs:
4545

4646
- name: Run tests
4747
run: npm test
48+
49+
- name: "Archive unit test debug log"
50+
if: ${{ failure() }}
51+
uses: actions/upload-artifact@v3
52+
with:
53+
name: ${{ matrix.node-version }}-unit-test-debug.log
54+
path: test/temp/debug.log
55+
56+
- name: "Archive scenario test debug log"
57+
if: ${{ failure() }}
58+
uses: actions/upload-artifact@v3
59+
with:
60+
name: ${{ matrix.node-version }}-cucumber-debug.log
61+
path: test/temp/debugc.log
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
/*
2+
* Copyright 2022 IBM All Rights Reserved.
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
const TYPE = 'DiscoveryResultsProcessor';
8+
const Long = require('long');
9+
10+
const {byteToNormalizedPEM, checkParameter, getLogger} = require('./Utils.js');
11+
12+
const logger = getLogger(TYPE);
13+
14+
const fabproto6 = require('fabric-protos');
15+
16+
class DiscoveryResultsProcessor {
17+
constructor(service, results) {
18+
this.service = service;
19+
this.results = results;
20+
this.parsedResults = {};
21+
}
22+
23+
async parseDiscoveryResults() {
24+
const method = `parseDiscoveryResults[${this.service.name}]`;
25+
logger.debug(`${method} - start`);
26+
27+
for (const index in this.results) {
28+
const result = this.results[index];
29+
if (result.result === 'error') {
30+
logger.error(`${method} - Channel:${this.service.channel.name} received discovery error:${result.error.content}`);
31+
throw Error(`DiscoveryService: ${this.service.name} error: ${result.error.content}`);
32+
}
33+
34+
logger.debug(`${method} - process result index:${index}`);
35+
if (result.config_result) {
36+
logger.debug(`${method} - process result - have configResult in ${index}`);
37+
const config = this._processConfig(result.config_result);
38+
this.parsedResults.msps = config.msps;
39+
this.parsedResults.orderers = await this._buildOrderers(config.orderers);
40+
}
41+
if (result.members) {
42+
logger.debug(`${method} - process result - have members in ${index}`);
43+
this.parsedResults.peers_by_org = await this._processMembership(result.members);
44+
}
45+
if (result.cc_query_res) {
46+
logger.debug(`${method} - process result - have ccQueryRes in ${index}`);
47+
this.parsedResults.endorsement_plan = await this._processChaincode(result.cc_query_res);
48+
}
49+
logger.debug(`${method} - completed processing result ${index}`);
50+
}
51+
52+
return this.parsedResults;
53+
}
54+
55+
_processConfig(q_config) {
56+
const method = `_processConfig[${this.service.name}]`;
57+
logger.debug(`${method} - start`);
58+
const config = {};
59+
config.msps = {};
60+
config.orderers = {};
61+
62+
try {
63+
if (q_config.msps) {
64+
for (const id in q_config.msps) {
65+
logger.debug(`${method} - found organization ${id}`);
66+
const q_msp = q_config.msps[id];
67+
const msp_config = {
68+
id: id,
69+
name: id,
70+
organizationalUnitIdentifiers: q_msp.organizational_unit_identifiers,
71+
rootCerts: byteToNormalizedPEM(q_msp.root_certs),
72+
intermediateCerts: byteToNormalizedPEM(q_msp.intermediate_certs),
73+
admins: byteToNormalizedPEM(q_msp.admins),
74+
tlsRootCerts: byteToNormalizedPEM(q_msp.tls_root_certs),
75+
tlsIntermediateCerts: byteToNormalizedPEM(q_msp.tls_intermediate_certs)
76+
};
77+
config.msps[id] = msp_config;
78+
this.service.channel.addMsp(msp_config, true);
79+
}
80+
} else {
81+
logger.debug(`${method} - no msps found`);
82+
}
83+
/*
84+
"orderers":{"OrdererMSP":{"endpoint":[{"host":"orderer.example.com","port":7050}]}}}
85+
*/
86+
if (q_config.orderers) {
87+
for (const mspid in q_config.orderers) {
88+
logger.debug(`${method} - found orderer org: ${mspid}`);
89+
config.orderers[mspid] = {};
90+
config.orderers[mspid].endpoints = [];
91+
for (const endpoint of q_config.orderers[mspid].endpoint) {
92+
config.orderers[mspid].endpoints.push(endpoint);
93+
}
94+
}
95+
} else {
96+
logger.debug(`${method} - no orderers found`);
97+
}
98+
} catch (err) {
99+
logger.error(`${method} - Problem with discovery config: ${err}`);
100+
}
101+
102+
return config;
103+
}
104+
105+
async _buildOrderers(orderers) {
106+
const method = `_buildOrderers[${this.service.name}]`;
107+
logger.debug(`${method} - start`);
108+
109+
if (!orderers) {
110+
logger.debug('%s - no orderers to build', method);
111+
} else {
112+
for (const msp_id in orderers) {
113+
logger.debug(`${method} - orderer msp:${msp_id}`);
114+
for (const endpoint of orderers[msp_id].endpoints) {
115+
endpoint.name = await this._buildOrderer(endpoint.host, endpoint.port, msp_id);
116+
}
117+
}
118+
}
119+
120+
return orderers;
121+
}
122+
123+
async _buildOrderer(host, port, msp_id) {
124+
const method = `_buildOrderer[${this.service.name}]`;
125+
logger.debug(`${method} - start mspid:${msp_id} endpoint:${host}:${port}`);
126+
127+
const name = `${host}:${port}`;
128+
const url = this.service._buildUrl(host, port);
129+
logger.debug(`${method} - create a new orderer ${url}`);
130+
const orderer = this.service.client.newCommitter(name, msp_id);
131+
const end_point = this.service.client.newEndpoint(this._buildOptions(name, url, host, msp_id));
132+
try {
133+
// first check to see if orderer is already on this channel
134+
let same;
135+
const channelOrderers = this.service.channel.getCommitters();
136+
for (const channelOrderer of channelOrderers) {
137+
logger.debug('%s - checking %s', method, channelOrderer);
138+
if (channelOrderer.endpoint && channelOrderer.endpoint.url === url) {
139+
same = channelOrderer;
140+
break;
141+
}
142+
}
143+
if (!same) {
144+
await orderer.connect(end_point);
145+
this.service.channel.addCommitter(orderer);
146+
} else {
147+
await same.checkConnection();
148+
logger.debug('%s - orderer already added to this channel', method);
149+
}
150+
} catch (error) {
151+
logger.error(`${method} - Unable to connect to the discovered orderer ${name} due to ${error}`);
152+
}
153+
154+
return name;
155+
}
156+
157+
_buildOptions(name, url, host, msp_id) {
158+
const method = `_buildOptions[${this.service.name}]`;
159+
logger.debug(`${method} - start`);
160+
const caroots = this._buildTlsRootCerts(msp_id);
161+
return {
162+
url: url,
163+
pem: caroots,
164+
'ssl-target-name-override': host,
165+
name: name
166+
};
167+
}
168+
169+
_buildTlsRootCerts(msp_id = checkParameter('msp_id')) {
170+
const method = `_buildTlsRootCerts[${this.service.name}]`;
171+
logger.debug(`${method} - start`);
172+
let ca_roots = '';
173+
174+
if (!this.parsedResults.msps) {
175+
logger.error('Missing MSPs discovery results');
176+
return ca_roots;
177+
}
178+
179+
const mspDiscovered = this.parsedResults.msps[msp_id];
180+
if (!mspDiscovered) {
181+
logger.error(`Missing msp ${msp_id} in discovery results`);
182+
return ca_roots;
183+
}
184+
185+
logger.debug(`Found msp ${msp_id}`);
186+
187+
if (mspDiscovered.tlsRootCerts) {
188+
ca_roots = ca_roots + mspDiscovered.tlsRootCerts;
189+
} else {
190+
logger.debug('%s - no tls root certs', method);
191+
}
192+
if (mspDiscovered.tlsIntermediateCerts) {
193+
ca_roots = ca_roots + mspDiscovered.tlsIntermediateCerts;
194+
} else {
195+
logger.debug('%s - no tls intermediate certs', method);
196+
}
197+
198+
return ca_roots;
199+
}
200+
201+
async _processMembership(q_members) {
202+
const method = `_processMembership[${this.service.name}]`;
203+
logger.debug(`${method} - start`);
204+
const peersByOrg = {};
205+
if (q_members.peers_by_org) {
206+
for (const mspid in q_members.peers_by_org) {
207+
logger.debug(`${method} - found org:${mspid}`);
208+
peersByOrg[mspid] = {};
209+
peersByOrg[mspid].peers = await this._processPeers(q_members.peers_by_org[mspid].peers);
210+
}
211+
} else {
212+
logger.debug(`${method} - missing peers by org`);
213+
}
214+
return peersByOrg;
215+
}
216+
217+
// message Peers
218+
async _processPeers(q_peers) {
219+
const method = `_processPeers[${this.service.name}]`;
220+
const peers = [];
221+
// message Peer
222+
for (const q_peer of q_peers) {
223+
const peer = {};
224+
// IDENTITY
225+
const q_identity = fabproto6.msp.SerializedIdentity.decode(q_peer.identity);
226+
peer.mspid = q_identity.mspid;
227+
228+
// MEMBERSHIP - Peer.membership_info
229+
// fabproto6.gossip.Envelope.payload
230+
const q_membership_message = fabproto6.gossip.GossipMessage.decode(q_peer.membership_info.payload);
231+
peer.endpoint = q_membership_message.alive_msg.membership.endpoint;
232+
peer.name = q_membership_message.alive_msg.membership.endpoint;
233+
logger.debug(`${method} - peer :${peer.endpoint}`);
234+
235+
// STATE
236+
if (q_peer.state_info) {
237+
const message_s = fabproto6.gossip.GossipMessage.decode(q_peer.state_info.payload);
238+
peer.ledgerHeight = Long.fromValue(message_s.state_info.properties.ledger_height);
239+
logger.debug(`${method} - ledgerHeight :${peer.ledgerHeight}`);
240+
peer.chaincodes = [];
241+
for (const index in message_s.state_info.properties.chaincodes) {
242+
const q_chaincode = message_s.state_info.properties.chaincodes[index];
243+
const chaincode = {};
244+
chaincode.name = q_chaincode.name;
245+
chaincode.version = q_chaincode.version;
246+
// TODO metadata ?
247+
logger.debug(`${method} - chaincode :${JSON.stringify(chaincode)}`);
248+
peer.chaincodes.push(chaincode);
249+
}
250+
} else {
251+
logger.debug(`${method} - no state info for peer ${peer.endpoint}`);
252+
}
253+
254+
// all done with this peer
255+
peers.push(peer);
256+
// build the GRPC instance
257+
await this._buildPeer(peer);
258+
}
259+
260+
return peers;
261+
}
262+
263+
async _buildPeer(discovery_peer) {
264+
const method = `_buildPeer[${this.service.name}]`;
265+
logger.debug(`${method} - start`);
266+
267+
if (!discovery_peer) {
268+
throw Error('Missing discovery_peer parameter');
269+
}
270+
const address = discovery_peer.endpoint;
271+
const msp_id = discovery_peer.mspid;
272+
273+
const host_port = address.split(':');
274+
const url = this.service._buildUrl(host_port[0], host_port[1]);
275+
276+
// first check to see if peer is already on this channel
277+
let peer;
278+
const channelPeers = this.service.channel.getEndorsers();
279+
for (const channelPeer of channelPeers) {
280+
logger.debug('%s - checking channel peer %s', method, channelPeer.name);
281+
if (channelPeer.endpoint && channelPeer.endpoint.url === url) {
282+
logger.debug('%s - url: %s - already added to this channel', method, url);
283+
peer = channelPeer;
284+
break;
285+
}
286+
}
287+
if (!peer) {
288+
logger.debug(`${method} - create a new endorser ${url}`);
289+
peer = this.service.client.newEndorser(address, msp_id);
290+
const end_point = this.service.client.newEndpoint(this._buildOptions(address, url, host_port[0], msp_id));
291+
try {
292+
logger.debug(`${method} - about to connect to endorser ${address} url:${url}`);
293+
await peer.connect(end_point);
294+
this.service.channel.addEndorser(peer);
295+
logger.debug(`${method} - connected to peer ${address} url:${url}`);
296+
} catch (error) {
297+
logger.error(`${method} - Unable to connect to the discovered peer ${address} due to ${error}`);
298+
}
299+
} else {
300+
// make sure the existing connect is still good
301+
await peer.checkConnection();
302+
}
303+
304+
// indicate that this peer has been touched by the discovery service
305+
peer.discovered = true;
306+
307+
// make sure that this peer has all the found installed chaincodes
308+
if (discovery_peer.chaincodes) {
309+
for (const chaincode of discovery_peer.chaincodes) {
310+
logger.debug(`${method} - adding chaincode ${chaincode.name} to peer ${peer.name}`);
311+
peer.addChaincode(chaincode.name);
312+
}
313+
}
314+
315+
logger.debug(`${method} - end`);
316+
return peer;
317+
}
318+
319+
// -- process the ChaincodeQueryResult - fabproto6.discovery.QueryResult.ChaincodeQueryResult
320+
async _processChaincode(q_chaincodes) {
321+
const method = '_processChaincode';
322+
logger.debug(`${method} - start`);
323+
const endorsement_plan = {};
324+
// repeated EndorsementDescriptor content, but we should only have one
325+
if (q_chaincodes && q_chaincodes.content && Array.isArray(q_chaincodes.content)) {
326+
for (const q_endors_desc of q_chaincodes.content) {
327+
endorsement_plan.chaincode = q_endors_desc.chaincode;
328+
329+
// named groups of Peers
330+
endorsement_plan.groups = {};
331+
for (const group_name in q_endors_desc.endorsers_by_groups) {
332+
logger.debug(`${method} - found group: ${group_name}`);
333+
const group = {};
334+
group.peers = await this._processPeers(q_endors_desc.endorsers_by_groups[group_name].peers);
335+
// all done with this group
336+
endorsement_plan.groups[group_name] = group;
337+
}
338+
339+
// LAYOUTS
340+
endorsement_plan.layouts = [];
341+
for (const q_layout of q_endors_desc.layouts) {
342+
const layout = Object.assign({}, q_layout.quantities_by_group);
343+
logger.debug(`${method} - layout :${layout}`);
344+
endorsement_plan.layouts.push(layout);
345+
}
346+
}
347+
} else {
348+
throw Error('Plan layouts are invalid');
349+
}
350+
351+
return endorsement_plan;
352+
}
353+
}
354+
355+
module.exports = DiscoveryResultsProcessor;

0 commit comments

Comments
 (0)