1+ /*
2+ *
3+ * Licensed to the Apache Software Foundation (ASF) under one
4+ * or more contributor license agreements. See the NOTICE file
5+ * distributed with this work for additional information
6+ * regarding copyright ownership. The ASF licenses this file
7+ * to you under the Apache License, Version 2.0 (the
8+ * "License"); you may not use this file except in compliance
9+ * with the License. You may obtain a copy of the License at
10+ *
11+ * http://www.apache.org/licenses/LICENSE-2.0
12+ *
13+ * Unless required by applicable law or agreed to in writing,
14+ * software distributed under the License is distributed on an
15+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+ * KIND, either express or implied. See the License for the
17+ * specific language governing permissions and limitations
18+ * under the License.
19+ *
20+ */
21+ package org .apache .bookkeeper .client ;
22+
23+ import io .netty .buffer .ByteBuf ;
24+ import java .util .ArrayList ;
25+ import java .util .BitSet ;
26+ import java .util .List ;
27+ import java .util .concurrent .TimeUnit ;
28+ import org .apache .bookkeeper .client .api .LedgerEntry ;
29+ import org .apache .bookkeeper .client .impl .LedgerEntriesImpl ;
30+ import org .apache .bookkeeper .client .impl .LedgerEntryImpl ;
31+ import org .apache .bookkeeper .net .BookieId ;
32+ import org .apache .bookkeeper .proto .BookieProtocol ;
33+ import org .apache .bookkeeper .proto .BookkeeperInternalCallbacks .BatchedReadEntryCallback ;
34+ import org .apache .bookkeeper .proto .checksum .DigestManager ;
35+ import org .apache .bookkeeper .util .ByteBufList ;
36+ import org .apache .bookkeeper .util .MathUtils ;
37+ import org .slf4j .Logger ;
38+ import org .slf4j .LoggerFactory ;
39+
40+ public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallback {
41+
42+ private static final Logger LOG = LoggerFactory .getLogger (BatchedReadOp .class );
43+
44+ final int maxCount ;
45+ final long maxSize ;
46+
47+ BatchedLedgerEntryRequest request ;
48+
49+ BatchedReadOp (LedgerHandle lh ,
50+ ClientContext clientCtx ,
51+ long startEntryId ,
52+ int maxCount ,
53+ long maxSize ,
54+ boolean isRecoveryRead ) {
55+ super (lh , clientCtx , startEntryId , -1L , isRecoveryRead );
56+ this .maxCount = maxCount ;
57+ this .maxSize = maxSize ;
58+ }
59+
60+ @ Override
61+ void initiate () {
62+ this .requestTimeNanos = MathUtils .nowInNano ();
63+ List <BookieId > ensemble = getLedgerMetadata ().getEnsembleAt (startEntryId );
64+ request = new SequenceReadRequest (ensemble , lh .ledgerId , startEntryId , maxCount , maxSize );
65+ request .read ();
66+ if (clientCtx .getConf ().readSpeculativeRequestPolicy .isPresent ()) {
67+ speculativeTask = clientCtx .getConf ().readSpeculativeRequestPolicy .get ()
68+ .initiateSpeculativeRequest (clientCtx .getScheduler (), request );
69+ }
70+ }
71+
72+ @ Override
73+ protected void submitCallback (int code ) {
74+ // ensure callback once
75+ if (!complete .compareAndSet (false , true )) {
76+ return ;
77+ }
78+
79+ cancelSpeculativeTask (true );
80+
81+ long latencyNanos = MathUtils .elapsedNanos (requestTimeNanos );
82+ if (code != BKException .Code .OK ) {
83+ LOG .error (
84+ "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
85+ + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})" ,
86+ lh .getId (), startEntryId , endEntryId , sentToHosts , heardFromHosts , heardFromHostsBitSet ,
87+ BKException .getMessage (code ), startEntryId , code );
88+ clientCtx .getClientStats ().getReadOpLogger ().registerFailedEvent (latencyNanos , TimeUnit .NANOSECONDS );
89+ // release the entries
90+
91+ request .close ();
92+ future .completeExceptionally (BKException .create (code ));
93+ } else {
94+ clientCtx .getClientStats ().getReadOpLogger ().registerSuccessfulEvent (latencyNanos , TimeUnit .NANOSECONDS );
95+ future .complete (LedgerEntriesImpl .create (request .entries ));
96+ }
97+ }
98+
99+ @ Override
100+ public void readEntriesComplete (int rc , long ledgerId , long startEntryId , ByteBufList bufList , Object ctx ) {
101+ final ReadContext rctx = (ReadContext ) ctx ;
102+ final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest ) rctx .entry ;
103+
104+ if (rc != BKException .Code .OK ) {
105+ entry .logErrorAndReattemptRead (rctx .bookieIndex , rctx .to , "Error: " + BKException .getMessage (rc ), rc );
106+ return ;
107+ }
108+
109+ heardFromHosts .add (rctx .to );
110+ heardFromHostsBitSet .set (rctx .bookieIndex , true );
111+
112+ bufList .retain ();
113+ // if entry has completed don't handle twice
114+ if (entry .complete (rctx .bookieIndex , rctx .to , bufList )) {
115+ if (!isRecoveryRead ) {
116+ // do not advance LastAddConfirmed for recovery reads
117+ lh .updateLastConfirmed (rctx .getLastAddConfirmed (), 0L );
118+ }
119+ submitCallback (BKException .Code .OK );
120+ } else {
121+ bufList .release ();
122+ }
123+ }
124+
125+ void sendReadTo (int bookieIndex , BookieId to , BatchedLedgerEntryRequest entry ) throws InterruptedException {
126+ if (lh .throttler != null ) {
127+ lh .throttler .acquire ();
128+ }
129+ if (isRecoveryRead ) {
130+ int flags = BookieProtocol .FLAG_HIGH_PRIORITY | BookieProtocol .FLAG_DO_FENCING ;
131+ clientCtx .getBookieClient ().batchReadEntries (to , lh .ledgerId , entry .eId ,
132+ maxCount , maxSize , this , new ReadContext (bookieIndex , to , entry ), flags , lh .ledgerKey );
133+ } else {
134+ clientCtx .getBookieClient ().batchReadEntries (to , lh .ledgerId , entry .eId , maxCount , maxSize ,
135+ this , new ReadContext (bookieIndex , to , entry ), BookieProtocol .FLAG_NONE );
136+ }
137+ }
138+
139+ abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest {
140+
141+ //Indicate which ledger the BatchedLedgerEntryRequest is reading.
142+ final long lId ;
143+ final int maxCount ;
144+ final long maxSize ;
145+
146+ final List <LedgerEntry > entries ;
147+
148+ BatchedLedgerEntryRequest (List <BookieId > ensemble , long lId , long eId , int maxCount , long maxSize ) {
149+ super (ensemble , eId );
150+ this .lId = lId ;
151+ this .maxCount = maxCount ;
152+ this .maxSize = maxSize ;
153+ this .entries = new ArrayList <>(maxCount );
154+ }
155+
156+ boolean complete (int bookieIndex , BookieId host , final ByteBufList bufList ) {
157+ if (isComplete ()) {
158+ return false ;
159+ }
160+ if (!complete .getAndSet (true )) {
161+ for (int i = 0 ; i < bufList .size (); i ++) {
162+ ByteBuf buffer = bufList .getBuffer (i );
163+ ByteBuf content ;
164+ try {
165+ content = lh .macManager .verifyDigestAndReturnData (eId + i , buffer );
166+ } catch (BKException .BKDigestMatchException e ) {
167+ clientCtx .getClientStats ().getReadOpDmCounter ().inc ();
168+ logErrorAndReattemptRead (bookieIndex , host , "Mac mismatch" ,
169+ BKException .Code .DigestMatchException );
170+ return false ;
171+ }
172+ rc = BKException .Code .OK ;
173+ /*
174+ * The length is a long and it is the last field of the metadata of an entry.
175+ * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
176+ */
177+ LedgerEntryImpl entryImpl = LedgerEntryImpl .create (lh .ledgerId , startEntryId + i );
178+ entryImpl .setLength (buffer .getLong (DigestManager .METADATA_LENGTH - 8 ));
179+ entryImpl .setEntryBuf (content );
180+ entries .add (entryImpl );
181+ }
182+ writeSet .recycle ();
183+ return true ;
184+ } else {
185+ writeSet .recycle ();
186+ return false ;
187+ }
188+ }
189+
190+ @ Override
191+ public String toString () {
192+ return String .format ("L%d-E%d~%d s-%d" , lh .getId (), eId , eId + maxCount , maxSize );
193+ }
194+ }
195+
196+ class SequenceReadRequest extends BatchedLedgerEntryRequest {
197+
198+ static final int NOT_FOUND = -1 ;
199+ int nextReplicaIndexToReadFrom = 0 ;
200+ final BitSet sentReplicas ;
201+ final BitSet erroredReplicas ;
202+ SequenceReadRequest (List <BookieId > ensemble ,
203+ long lId ,
204+ long eId ,
205+ int maxCount ,
206+ long maxSize ) {
207+ super (ensemble , lId , eId , maxCount , maxSize );
208+ this .sentReplicas = new BitSet (lh .getLedgerMetadata ().getWriteQuorumSize ());
209+ this .erroredReplicas = new BitSet (lh .getLedgerMetadata ().getWriteQuorumSize ());
210+ }
211+
212+ @ Override
213+ void read () {
214+ sendNextRead ();
215+ }
216+
217+ private synchronized int getNextReplicaIndexToReadFrom () {
218+ return nextReplicaIndexToReadFrom ;
219+ }
220+
221+ private BitSet getSentToBitSet () {
222+ BitSet b = new BitSet (ensemble .size ());
223+
224+ for (int i = 0 ; i < sentReplicas .length (); i ++) {
225+ if (sentReplicas .get (i )) {
226+ b .set (writeSet .get (i ));
227+ }
228+ }
229+ return b ;
230+ }
231+
232+ private boolean readsOutstanding () {
233+ return (sentReplicas .cardinality () - erroredReplicas .cardinality ()) > 0 ;
234+ }
235+
236+ @ Override
237+ synchronized BookieId maybeSendSpeculativeRead (BitSet heardFrom ) {
238+ if (nextReplicaIndexToReadFrom >= getLedgerMetadata ().getWriteQuorumSize ()) {
239+ return null ;
240+ }
241+
242+ BitSet sentTo = getSentToBitSet ();
243+ sentTo .and (heardFrom );
244+
245+ // only send another read if we have had no successful response at all
246+ // (even for other entries) from any of the other bookies we have sent the
247+ // request to
248+ if (sentTo .cardinality () == 0 ) {
249+ clientCtx .getClientStats ().getSpeculativeReadCounter ().inc ();
250+ return sendNextRead ();
251+ } else {
252+ return null ;
253+ }
254+ }
255+
256+ synchronized BookieId sendNextRead () {
257+ if (nextReplicaIndexToReadFrom >= getLedgerMetadata ().getWriteQuorumSize ()) {
258+ // we are done, the read has failed from all replicas, just fail the
259+ // read
260+ fail (firstError );
261+ return null ;
262+ }
263+
264+ // ToDo: pick replica with writable PCBC. ISSUE #1239
265+ // https://github.com/apache/bookkeeper/issues/1239
266+ int replica = nextReplicaIndexToReadFrom ;
267+ int bookieIndex = writeSet .get (nextReplicaIndexToReadFrom );
268+ nextReplicaIndexToReadFrom ++;
269+
270+ try {
271+ BookieId to = ensemble .get (bookieIndex );
272+ sendReadTo (bookieIndex , to , this );
273+ sentToHosts .add (to );
274+ sentReplicas .set (replica );
275+ return to ;
276+ } catch (InterruptedException ie ) {
277+ LOG .error ("Interrupted reading entry " + this , ie );
278+ Thread .currentThread ().interrupt ();
279+ fail (BKException .Code .InterruptedException );
280+ return null ;
281+ }
282+ }
283+
284+ @ Override
285+ synchronized void logErrorAndReattemptRead (int bookieIndex , BookieId host , String errMsg , int rc ) {
286+ super .logErrorAndReattemptRead (bookieIndex , host , errMsg , rc );
287+ int replica = writeSet .indexOf (bookieIndex );
288+ if (replica == NOT_FOUND ) {
289+ LOG .error ("Received error from a host which is not in the ensemble {} {}." , host , ensemble );
290+ return ;
291+ }
292+ erroredReplicas .set (replica );
293+ if (isRecoveryRead && (numBookiesMissingEntry >= requiredBookiesMissingEntryForRecovery )) {
294+ /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies report that they do not
295+ * have the entry */
296+ fail (BKException .Code .NoSuchEntryException );
297+ return ;
298+ }
299+
300+ if (!readsOutstanding ()) {
301+ sendNextRead ();
302+ }
303+ }
304+
305+ @ Override
306+ boolean complete (int bookieIndex , BookieId host , final ByteBufList bufList ) {
307+ boolean completed = super .complete (bookieIndex , host , bufList );
308+ if (completed ) {
309+ int numReplicasTried = getNextReplicaIndexToReadFrom ();
310+ // Check if any speculative reads were issued and mark any slow bookies before
311+ // the first successful speculative read as "slow"
312+ for (int i = 0 ; i < numReplicasTried - 1 ; i ++) {
313+ int slowBookieIndex = writeSet .get (i );
314+ BookieId slowBookieSocketAddress = ensemble .get (slowBookieIndex );
315+ clientCtx .getPlacementPolicy ().registerSlowBookie (slowBookieSocketAddress , eId );
316+ }
317+ }
318+ return completed ;
319+ }
320+ }
321+ }
0 commit comments