diff --git a/.kokoro/coerce_logs.sh b/.kokoro/coerce_logs.sh new file mode 100644 index 000000000..883ee0faa --- /dev/null +++ b/.kokoro/coerce_logs.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script finds and moves sponge logs so that they can be found by placer +# and are not flagged as flaky by sponge. + +set -eo pipefail + +## Get the directory of the build script +scriptDir=$(realpath $(dirname "${BASH_SOURCE[0]}")) +## cd to the parent directory, i.e. the root of the git repo +cd ${scriptDir}/.. + +job=$(basename ${KOKORO_JOB_NAME}) + +echo "coercing sponge logs..." +for xml in `find . -name *-sponge_log.xml` +do + class=$(basename ${xml} | cut -d- -f2) + dir=$(dirname ${xml})/${job}/${class} + text=$(dirname ${xml})/${class}-sponge_log.txt + mkdir -p ${dir} + mv ${xml} ${dir}/sponge_log.xml + mv ${text} ${dir}/sponge_log.txt +done \ No newline at end of file diff --git a/.kokoro/conformance.sh b/.kokoro/conformance.sh index 777cc4aef..6763daa45 100755 --- a/.kokoro/conformance.sh +++ b/.kokoro/conformance.sh @@ -41,5 +41,8 @@ cd cloud-bigtable-clients-test/tests eval "go test -v -proxy_addr=:9999" RETURN_CODE=$? +# fix output location of logs +bash .kokoro/coerce_logs.sh + echo "exiting with ${RETURN_CODE}" exit ${RETURN_CODE} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index dc4143c99..87d9767fd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -69,7 +69,6 @@ export interface RequestOptions { | 'BigtableTableAdminClient' | 'BigtableClient'; reqOpts?: {}; - retryOpts?: {}; gaxOpts?: {}; method?: string; } @@ -445,6 +444,11 @@ export class Bigtable { {}, baseOptions, { + // Setting gaxServerStreamingRetries to true ensures that for readrows, + // sampleRowKeys, mutateRows, generateInitialChangeStreamPartitions and + // readChangeStream calls in the data client that the new streaming + // retries functionality will be used. + gaxServerStreamingRetries: true, servicePath: customEndpointBaseUrl || defaultBaseUrl, 'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer, 'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride, @@ -846,18 +850,6 @@ export class Bigtable { } function makeRequestStream() { - const retryRequestOptions = Object.assign( - { - currentRetryAttempt: 0, - noResponseRetries: 0, - objectMode: true, - }, - config.retryOpts - ); - - config.gaxOpts = Object.assign(config.gaxOpts || {}, { - retryRequestOptions, - }); prepareGaxRequest((err, requestFn) => { if (err) { stream.destroy(err); diff --git a/src/mutation.ts b/src/mutation.ts index 0aef34202..456528c57 100644 --- a/src/mutation.ts +++ b/src/mutation.ts @@ -25,6 +25,24 @@ export type ISetCell = btTypes.bigtable.v2.Mutation.ISetCell; export type Bytes = string | Buffer; // eslint-disable-next-line @typescript-eslint/no-explicit-any export type Data = any; +/* +The Data type is expected to be in the following format: +{ + columnFamily1: { + column1: Cell, + column2: Cell + }, + columnFamily2: { + otherColumn1: Cell, + otherColumn2: Cell + } +} +Where the Cell data type has the following structure: +Uint8Array | string | { + value: Uint8Array|string, + timestamp: number|Long|string, +} +*/ export interface JsonObj { [k: string]: string | JsonObj; } diff --git a/src/table.ts b/src/table.ts index e7c286c0f..5f6757a5c 100644 --- a/src/table.ts +++ b/src/table.ts @@ -14,9 +14,9 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); -import {ServiceError} from 'google-gax'; +import {RetryOptions, ServiceError} from 'google-gax'; import {BackoffSettings} from 'google-gax/build/src/gax'; -import {PassThrough, Transform} from 'stream'; +import {PassThrough, Transform, TransformOptions} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires const concat = require('concat-stream'); @@ -31,7 +31,7 @@ import { CreateFamilyResponse, IColumnFamily, } from './family'; -import {Filter, BoundData, RawFilter} from './filter'; +import {BoundData, RawFilter} from './filter'; import {Mutation} from './mutation'; import {Row} from './row'; import {ChunkTransformer} from './chunktransformer'; @@ -43,19 +43,16 @@ import {CreateBackupCallback, CreateBackupResponse} from './cluster'; import {google} from '../protos/protos'; import {Duplex} from 'stream'; import {TableUtils} from './utils/table'; +import { + DEFAULT_BACKOFF_SETTINGS, + DEFAULT_RETRY_COUNT, + RETRYABLE_STATUS_CODES, +} from './utils/retry-options'; +import {ReadRowsResumptionStrategy} from './utils/read-rows-resumption'; -// See protos/google/rpc/code.proto -// (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE) -const RETRYABLE_STATUS_CODES = new Set([4, 8, 10, 14]); // (1=CANCELLED) const IGNORED_STATUS_CODES = new Set([1]); -const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = { - initialRetryDelayMillis: 10, - retryDelayMultiplier: 2, - maxRetryDelayMillis: 60000, -}; - /** * @typedef {object} Policy * @property {number} [version] Specifies the format of the policy. @@ -287,6 +284,14 @@ export interface MutateOptions { // eslint-disable-next-line @typescript-eslint/no-explicit-any export type Entry = any; +/* +The Entry type is expected to be in the following format: +{ + key?: Uint8Array|string, + data?: Data, // The Data type is described in the Mutation class. + method?: typeof mutation.methods +} +*/ export type DeleteTableCallback = ( err: ServiceError | null, @@ -719,35 +724,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); * region_tag:bigtable_api_table_readstream */ createReadStream(opts?: GetRowsOptions) { - const options = opts || {}; - const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 10; + const options: GetRowsOptions = opts || {}; + const maxRetries = is.number(this.maxRetries) + ? this.maxRetries! + : DEFAULT_RETRY_COUNT; let activeRequestStream: AbortableDuplex | null; - let rowKeys: string[]; - let filter: {} | null; - const rowsLimit = options.limit || 0; - const hasLimit = rowsLimit !== 0; - - let numConsecutiveErrors = 0; - let numRequestsMade = 0; - let retryTimer: NodeJS.Timeout | null; - - rowKeys = options.keys || []; - - const ranges = TableUtils.getRanges(options); - - // If rowKeys and ranges are both empty, the request is a full table scan. - // Add an empty range to simplify the resumption logic. - if (rowKeys.length === 0 && ranges.length === 0) { - ranges.push({}); - } - - if (options.filter) { - filter = Filter.parse(options.filter); - } - - let chunkTransformer: ChunkTransformer; - let rowStream: Duplex; - let userCanceled = false; // The key of the last row that was emitted by the per attempt pipeline // Note: this must be updated from the operation level userStream to avoid referencing buffered rows that will be @@ -799,207 +780,108 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); rowStream?.removeListener('end', originalEnd); }; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { + /* + This end function was redefined in a fix to a data loss issue with streams + to allow the user to cancel the stream and instantly stop receiving more + data in the stream. + */ + userStream.end = (chunkOrCb: () => void | Row) => { rowStreamUnpipe(rowStream, userStream); userCanceled = true; if (activeRequestStream) { activeRequestStream.abort(); } - if (retryTimer) { - clearTimeout(retryTimer); - } - return originalEnd(chunk, encoding, cb); + originalEnd(); + return originalEnd(chunkOrCb); // In practice, this code path is used. }; + // The chunk transformer is used for transforming raw readrows data from + // the server into data that can be consumed by the user. + const chunkTransformer: ChunkTransformer = new ChunkTransformer({ + decode: options.decode, + } as TransformOptions); + + // This defines a strategy object which is used for deciding if the client + // will retry and for deciding what request to retry with. + const strategy = new ReadRowsResumptionStrategy( + chunkTransformer, + options, + Object.assign( + {tableName: this.name}, + this.bigtable.appProfileId + ? {appProfileId: this.bigtable.appProfileId} + : {} + ) + ); - const makeNewRequest = () => { - // Avoid cancelling an expired timer if user - // cancelled the stream in the middle of a retry - retryTimer = null; + const gaxOpts = populateAttemptHeader(0, options.gaxOptions); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - chunkTransformer = new ChunkTransformer({decode: options.decode} as any); + // Attach retry options to gax if they are not provided in the function call. + gaxOpts.retry = strategy.toRetryOptions(gaxOpts); + if (gaxOpts.maxRetries === undefined) { + gaxOpts.maxRetries = maxRetries; + } - const reqOpts = { - tableName: this.name, - appProfileId: this.bigtable.appProfileId, - } as google.bigtable.v2.IReadRowsRequest; - - const retryOpts = { - currentRetryAttempt: 0, // was numConsecutiveErrors - // Handling retries in this client. Specify the retry options to - // make sure nothing is retried in retry-request. - noResponseRetries: 0, - shouldRetryFn: (_: any) => { - return false; - }, - }; + // This gets the first request to send to the readRows endpoint. + const reqOpts = strategy.getResumeRequest(); + const requestStream = this.bigtable.request({ + client: 'BigtableClient', + method: 'readRows', + reqOpts, + gaxOpts, + }); - if (lastRowKey) { - // Readjust and/or remove ranges based on previous valid row reads. - // Iterate backward since items may need to be removed. - for (let index = ranges.length - 1; index >= 0; index--) { - const range = ranges[index]; - const startValue = is.object(range.start) - ? (range.start as BoundData).value - : range.start; - const endValue = is.object(range.end) - ? (range.end as BoundData).value - : range.end; - const startKeyIsRead = - !startValue || - TableUtils.lessThanOrEqualTo( - startValue as string, - lastRowKey as string - ); - const endKeyIsNotRead = - !endValue || - (endValue as Buffer).length === 0 || - TableUtils.lessThan(lastRowKey as string, endValue as string); - if (startKeyIsRead) { - if (endKeyIsNotRead) { - // EndKey is not read, reset the range to start from lastRowKey open - range.start = { - value: lastRowKey, - inclusive: false, - }; - } else { - // EndKey is read, remove this range - ranges.splice(index, 1); - } - } + activeRequestStream = requestStream!; + + // After readrows data has been transformed by the chunk transformer, this + // transform can be used to prepare the data into row objects for the user + // or block more data from being emitted if the stream has been cancelled. + const toRowStream = new Transform({ + transform: (rowData, _, next) => { + if ( + userCanceled || + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (userStream as any)._writableState.ended + ) { + return next(); } + strategy.rowsRead++; + const row = this.row(rowData.key); + row.data = rowData.data; + next(null, row); + }, + objectMode: true, + }); - // Remove rowKeys already read. - rowKeys = rowKeys.filter(rowKey => - TableUtils.greaterThan(rowKey, lastRowKey as string) - ); - - // If there was a row limit in the original request and - // we've already read all the rows, end the stream and - // do not retry. - if (hasLimit && rowsLimit === rowsRead) { - userStream.end(); - return; - } - // If all the row keys and ranges are read, end the stream - // and do not retry. - if (rowKeys.length === 0 && ranges.length === 0) { + // This creates a row stream which is three streams connected in a series. + // Data and errors from the requestStream feed into the chunkTransformer + // and data/errors from the chunk transformer feed into toRowStream. + const rowStream: Duplex = pumpify.obj([ + requestStream, + chunkTransformer, + toRowStream, + ]); + // This code attaches handlers to the row stream to deal with special + // cases when data is received or errors are emitted. + rowStream + .on('error', (error: ServiceError) => { + // This ends the stream for errors that should be ignored. For other + // errors it sends the error to the user. + rowStreamUnpipe(rowStream, userStream); + activeRequestStream = null; + if (IGNORED_STATUS_CODES.has(error.code)) { + // We ignore the `cancelled` "error", since we are the ones who cause + // it when the user calls `.abort()`. userStream.end(); return; } - } - - // Create the new reqOpts - reqOpts.rows = {}; - - // TODO: preprocess all the keys and ranges to Bytes - reqOpts.rows.rowKeys = rowKeys.map( - Mutation.convertToBytes - ) as {} as Uint8Array[]; - - reqOpts.rows.rowRanges = ranges.map(range => - Filter.createRange( - range.start as BoundData, - range.end as BoundData, - 'Key' - ) - ); - - if (filter) { - reqOpts.filter = filter; - } - - if (hasLimit) { - reqOpts.rowsLimit = rowsLimit - rowsRead; - } - - const gaxOpts = populateAttemptHeader( - numRequestsMade, - options.gaxOptions - ); - - const requestStream = this.bigtable.request({ - client: 'BigtableClient', - method: 'readRows', - reqOpts, - gaxOpts, - retryOpts, + userStream.emit('error', error); + }) + .on('end', () => { + activeRequestStream = null; }); - - activeRequestStream = requestStream!; - - const toRowStream = new Transform({ - transform: (rowData, _, next) => { - if ( - userCanceled || - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (userStream as any)._writableState.ended - ) { - return next(); - } - const row = this.row(rowData.key); - row.data = rowData.data; - next(null, row); - }, - objectMode: true, - }); - - rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]); - - // Retry on "received rst stream" errors - const isRstStreamError = (error: ServiceError): boolean => { - if (error.code === 13 && error.message) { - const error_message = (error.message || '').toLowerCase(); - return ( - error.code === 13 && - (error_message.includes('rst_stream') || - error_message.includes('rst stream')) - ); - } - return false; - }; - - rowStream - .on('error', (error: ServiceError) => { - rowStreamUnpipe(rowStream, userStream); - activeRequestStream = null; - if (IGNORED_STATUS_CODES.has(error.code)) { - // We ignore the `cancelled` "error", since we are the ones who cause - // it when the user calls `.abort()`. - userStream.end(); - return; - } - numConsecutiveErrors++; - numRequestsMade++; - if ( - numConsecutiveErrors <= maxRetries && - (RETRYABLE_STATUS_CODES.has(error.code) || isRstStreamError(error)) - ) { - const backOffSettings = - options.gaxOptions?.retry?.backoffSettings || - DEFAULT_BACKOFF_SETTINGS; - const nextRetryDelay = getNextDelay( - numConsecutiveErrors, - backOffSettings - ); - retryTimer = setTimeout(makeNewRequest, nextRetryDelay); - } else { - userStream.emit('error', error); - } - }) - .on('data', _ => { - // Reset error count after a successful read so the backoff - // time won't keep increasing when as stream had multiple errors - numConsecutiveErrors = 0; - }) - .on('end', () => { - activeRequestStream = null; - }); - rowStreamPipe(rowStream, userStream); - }; - - makeNewRequest(); + // rowStreamPipe sends errors and data emitted by the rowStream to the + // userStream. + rowStreamPipe(rowStream, userStream); return userStream; } @@ -1606,20 +1488,25 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); : entryBatch.map(Mutation.parse), }; - const retryOpts = { - currentRetryAttempt: numRequestsMade, - // Handling retries in this client. Specify the retry options to - // make sure nothing is retried in retry-request. - noResponseRetries: 0, - shouldRetryFn: (_: any) => { - return false; - }, - }; - options.gaxOptions = populateAttemptHeader( numRequestsMade, options.gaxOptions ); + if ( + options.gaxOptions?.retry === undefined && + options.gaxOptions?.retryRequestOptions === undefined + ) { + // For now gax will not do any retries for table.mutate unless + // the user specifically provides retry or retryRequestOptions in the + // call. + // Moving retries to gax for table.mutate will be done in a + // separate scope of work. + options.gaxOptions.retry = new RetryOptions( + [], + DEFAULT_BACKOFF_SETTINGS, + () => false + ); + } this.bigtable .request({ @@ -1627,7 +1514,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); method: 'mutateRows', reqOpts, gaxOpts: options.gaxOptions, - retryOpts, }) .on('error', (err: ServiceError) => { onBatchResponse(err); @@ -1704,6 +1590,21 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; const gaxOptions = typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + if ( + gaxOptions?.retry === undefined && + gaxOptions?.retryRequestOptions === undefined + ) { + // For now gax will not do any retries for table.sampleRowKeys unless + // the user specifically provides retry or retryRequestOptions in the + // call. + // Moving retries to gax for table.sampleRowKeys will be done in a + // separate scope of work. + gaxOptions.retry = new RetryOptions( + [], + DEFAULT_BACKOFF_SETTINGS, + () => false + ); + } this.sampleRowKeysStream(gaxOptions) .on('error', callback) .pipe( diff --git a/src/utils/read-rows-resumption.ts b/src/utils/read-rows-resumption.ts new file mode 100644 index 000000000..26300a44c --- /dev/null +++ b/src/utils/read-rows-resumption.ts @@ -0,0 +1,275 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {GetRowsOptions, PrefixRange} from '../table'; +import {ChunkTransformer} from '../chunktransformer'; +import * as protos from '../../protos/protos'; +import {TableUtils} from './table'; +import {google} from '../../protos/protos'; +import {CallOptions, GoogleError, RetryOptions} from 'google-gax'; +import {Mutation} from '../mutation'; +import {BoundData, Filter} from '../filter'; +import {RequestType} from 'google-gax/build/src/apitypes'; +import { + DEFAULT_BACKOFF_SETTINGS, + isRstStreamError, + RETRYABLE_STATUS_CODES, +} from './retry-options'; +import * as is from 'is'; +import arrify = require('arrify'); + +// This interface contains the information that will be used in a request. +interface TableStrategyInfo { + tableName: string; + appProfileId?: string; +} + +// Gets the row keys for a readrows request by filtering out row keys that have +// already been read. +function getRowKeys( + rowKeys: string[], + lastRowKey: string | number | true | Uint8Array +) { + // Remove rowKeys already read. + return rowKeys.filter(rowKey => + TableUtils.greaterThan(rowKey, lastRowKey as string) + ); +} + +// Modifies ranges in place based on the lastRowKey to prepare +// a readrows request. +function spliceRanges( + ranges: PrefixRange[], + lastRowKey: string | number | true | Uint8Array +): void { + // Readjust and/or remove ranges based on previous valid row reads. + // Iterate backward since items may need to be removed. + for (let index = ranges.length - 1; index >= 0; index--) { + const range = ranges[index]; + const startValue = is.object(range.start) + ? (range.start as BoundData).value + : range.start; + const startKeyIsRead = + !startValue || + TableUtils.lessThanOrEqualTo(startValue as string, lastRowKey as string); + if (startKeyIsRead) { + const endValue = is.object(range.end) + ? (range.end as BoundData).value + : range.end; + const endKeyIsNotRead = + !endValue || + (endValue as Buffer).length === 0 || + TableUtils.lessThan(lastRowKey as string, endValue as string); + if (endKeyIsNotRead) { + // EndKey is not read, reset the range to start from lastRowKey open + range.start = { + value: lastRowKey, + inclusive: false, + }; + } else { + // EndKey is read, remove this range + ranges.splice(index, 1); + } + } + } +} + +/** + * Create a ReadRowsResumptionStrategy object to specify retry behaviour + * + * @class + * @param {ChunkTransformer} chunkTransformer A ChunkTransformer stream defined + * in chunktransformer.ts which is typically used for parsing chunked data from + * the server into a format ready for the user. The lastRowKey parameter of the + * chunkTransformer object is used for resumption logic to determine what keys + * and ranges should be included in the request for instance. + * @param {GetRowsOptions} options Options provided to createreadstream used for + * customizing the readRows call. + * @param {TableStrategyInfo} tableStrategyInfo Data passed about the table + * that is necessary for the readRows request. + * + */ +export class ReadRowsResumptionStrategy { + private chunkTransformer: ChunkTransformer; + private rowKeys: string[]; + private ranges: PrefixRange[]; + private rowsLimit: number; + private hasLimit: boolean; + private options: GetRowsOptions; + private tableStrategyInfo: TableStrategyInfo; + private retryCodes; + rowsRead = 0; + constructor( + chunkTransformer: ChunkTransformer, + options: GetRowsOptions, + tableStrategyInfo: TableStrategyInfo + ) { + this.chunkTransformer = chunkTransformer; + this.options = options; + this.rowKeys = options.keys || []; + this.ranges = TableUtils.getRanges(options); + this.rowsLimit = options.limit || 0; + this.hasLimit = this.rowsLimit !== 0; + this.rowsRead = 0; + if (this?.options?.gaxOptions?.retry?.retryCodes) { + // Clone the retry codes + this.retryCodes = this?.options?.gaxOptions?.retry?.retryCodes.slice(0); + } + + this.tableStrategyInfo = tableStrategyInfo; + // If rowKeys and ranges are both empty, the request is a full table scan. + // Add an empty range to simplify the resumption logic. + if (this.rowKeys.length === 0 && this.ranges.length === 0) { + this.ranges.push({}); + } + } + + /** + This function updates the row keys and row ranges based on the lastRowKey + value in the chunk transformer. This idempotent function is called in + canResume, but since canResume is only used when a retry function is not + provided, we need to also call it in getResumeRequest so that it is + guaranteed to be called before an outgoing request is made. + */ + private updateKeysAndRanges() { + const lastRowKey = this.chunkTransformer + ? this.chunkTransformer.lastRowKey + : ''; + if (lastRowKey) { + spliceRanges(this.ranges, lastRowKey); + this.rowKeys = getRowKeys(this.rowKeys, lastRowKey); + } + } + + /** + * Gets the next readrows request. + * + * This function computes the next readRows request that will be sent to the + * server. Based on the last row key calculated by data already passed through + * the chunk transformer, the set of row keys and row ranges is calculated and + * updated. The calculated row keys and ranges are used along with other + * properties provided by the user like limits and filters to compute and + * return a request that will be used in the next read rows call. + * + * @return {protos.google.bigtable.v2.IReadRowsRequest} The request options + * for the next readrows request. + */ + getResumeRequest(): protos.google.bigtable.v2.IReadRowsRequest { + this.updateKeysAndRanges(); + const reqOpts = this + .tableStrategyInfo as google.bigtable.v2.IReadRowsRequest; + + // Create the new reqOpts + reqOpts.rows = {}; + + // Preprocess all the keys and ranges to Bytes + reqOpts.rows.rowKeys = this.rowKeys.map( + Mutation.convertToBytes + ) as {} as Uint8Array[]; + + reqOpts.rows.rowRanges = this.ranges.map(range => + Filter.createRange( + range.start as BoundData, + range.end as BoundData, + 'Key' + ) + ); + + if (this.options.filter) { + reqOpts.filter = Filter.parse(this.options.filter); + } + + if (this.hasLimit) { + reqOpts.rowsLimit = this.rowsLimit - this.rowsRead; + } + return reqOpts; + } + + /** + * Decides if the client is going to retry a request. + * + * canResume contains the logic that will decide if the client will retry with + * another request when it receives an error. This logic is passed along to + * google-gax and used by google-gax to decide if the client should retry + * a request when google-gax receives an error. If canResume returns true then + * the client will retry with another request computed by getResumeRequest. If + * canResume request returns false then the error will bubble up from gax to + * the handwritten layer. + * + * @param {GoogleError} [error] The error that Google Gax receives. + * @return {boolean} True if the client will retry + */ + canResume(error: GoogleError): boolean { + // First update the row keys and the row ranges based on the last row key. + this.updateKeysAndRanges(); + if (error.statusDetails === 'RetryInfo') { + return true; + } + // If all the row keys and ranges are read, end the stream + // and do not retry. + if (this.rowKeys.length === 0 && this.ranges.length === 0) { + return false; + } + // If there was a row limit in the original request and + // we've already read all the rows and met/exceeded that limit, end the + // stream and do not retry. + if (this.hasLimit && this.rowsLimit === this.rowsRead) { + return false; + } + const retryCodesUsed = this.retryCodes + ? this.retryCodes + : arrify(RETRYABLE_STATUS_CODES); + if ( + error.code && + (retryCodesUsed.includes(error.code) || isRstStreamError(error)) + ) { + return true; + } + return false; + } + + /** + * Creates a RetryOptions object that can be used by google-gax. + * + * This class contains the business logic to specify retry behaviour of + * readrows requests and this function packages that logic into a RetryOptions + * object that google-gax expects. + * + * @param {CallOptions} [gaxOpts] The call options that will be used to + * specify retry behaviour. + * @return {RetryOptions} A RetryOptions object that google-gax expects that + * can determine retry behaviour. + * + */ + toRetryOptions(gaxOpts: CallOptions): RetryOptions { + // On individual calls, the user can override any of the default + // retry options. Overrides can be done on the retryCodes, backoffSettings, + // shouldRetryFn or getResumptionRequestFn. + const canResume = (error: GoogleError) => { + return this.canResume(error); + }; + const getResumeRequest = () => { + return this.getResumeRequest() as RequestType; + }; + // In RetryOptions, the 1st parameter, the retryCodes are ignored if a + // shouldRetryFn is provided. + // The 3rd parameter, the shouldRetryFn will determine if the client should retry. + return new RetryOptions( + [], + gaxOpts?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS, + canResume, + getResumeRequest + ); + } +} diff --git a/src/utils/retry-options.ts b/src/utils/retry-options.ts new file mode 100644 index 000000000..d02c4ba23 --- /dev/null +++ b/src/utils/retry-options.ts @@ -0,0 +1,45 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {BackoffSettings} from 'google-gax/build/src/gax'; +import {GoogleError, grpc, ServiceError} from 'google-gax'; + +export const RETRYABLE_STATUS_CODES = new Set([ + grpc.status.DEADLINE_EXCEEDED.valueOf(), + grpc.status.ABORTED.valueOf(), + grpc.status.UNAVAILABLE.valueOf(), +]); +export const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = { + initialRetryDelayMillis: 10, + retryDelayMultiplier: 2, + maxRetryDelayMillis: 60000, +}; +export const DEFAULT_RETRY_COUNT = 10; +export const isRstStreamError = ( + error: GoogleError | ServiceError +): boolean => { + // Retry on "received rst stream" errors + if (error.code === grpc.status.INTERNAL && error.message) { + const error_message = (error.message || '').toLowerCase(); + return ( + error.code === grpc.status.INTERNAL && + (error_message.includes('rst_stream') || + error_message.includes('rst stream') || + error_message.includes( + 'Received unexpected EOS on DATA frame from server' + )) + ); + } + return false; +}; diff --git a/src/utils/table.ts b/src/utils/table.ts index 8785ad516..8dc334027 100644 --- a/src/utils/table.ts +++ b/src/utils/table.ts @@ -16,7 +16,7 @@ import {GetRowsOptions, PrefixRange} from '../table'; import {Mutation} from '../mutation'; export class TableUtils { - static getRanges(options: GetRowsOptions) { + static getRanges(options: GetRowsOptions): PrefixRange[] { const ranges = options.ranges || []; if (options.start || options.end) { if (options.ranges || options.prefix || options.prefixes) { diff --git a/system-test/bigtable.ts b/system-test/bigtable.ts index 10e70e6bc..225426847 100644 --- a/system-test/bigtable.ts +++ b/system-test/bigtable.ts @@ -33,6 +33,7 @@ import {Row} from '../src/row.js'; import {Table} from '../src/table.js'; import {RawFilter} from '../src/filter'; import {generateId, PREFIX} from './common'; +import {Mutation} from '../src/mutation'; describe('Bigtable', () => { const bigtable = new Bigtable(); @@ -1712,6 +1713,90 @@ describe('Bigtable', () => { }); }); }); + + describe('mutateRows entries tests', () => { + const table = INSTANCE.table(generateId('table')); + + afterEach(async () => { + await table.delete(); + }); + + it('should only insert one row in the table with mutate', async () => { + // Create table + const tableOptions = { + families: ['columnFamily'], + }; + await table.create(tableOptions); + // Add entries + const entry = { + columnFamily: { + column: 1, + }, + }; + const mutation = { + key: 'rowKey', + data: entry, + method: Mutation.methods.INSERT, + }; + const gaxOptions = {maxRetries: 4}; + await table.mutate(mutation, {gaxOptions}); + // Get rows and compare + const [rows] = await table.getRows(); + assert.strictEqual(rows.length, 1); + }); + + it('should insert one row in the table using mutate in a similar way to how the documentation says to use insert', async () => { + // Create table + const tableOptions = { + families: ['columnFamily'], + }; + await table.create(tableOptions); + // Add entries + const mutation = { + key: 'rowKey', + data: { + columnFamily: { + column: 1, + }, + }, + method: Mutation.methods.INSERT, + }; + const gaxOptions = {maxRetries: 4}; + await table.mutate(mutation, {gaxOptions}); + // Get rows and compare + const [rows] = await table.getRows(); + assert.strictEqual(rows.length, 1); + }); + + it('should only insert one row in the table with insert as described by the GCP documentation', async () => { + // Create table + const tableOptions = { + families: ['follows'], + }; + await table.create(tableOptions); + // Add entries + const greetings = ['Hello World!', 'Hello Bigtable!', 'Hello Node!']; + const rowsToInsert = greetings.map((greeting, index) => ({ + key: `greeting${index}`, + data: { + follows: { + // 'follows' is the column family + someColumn: { + // Setting the timestamp allows the client to perform retries. If + // server-side time is used, retries may cause multiple cells to + // be generated. + timestamp: new Date(), + value: greeting, + }, + }, + }, + })); + await table.insert(rowsToInsert); + // Get rows and compare + const [rows] = await table.getRows(); + assert.strictEqual(rows.length, 3); + }); + }); }); function createInstanceConfig( diff --git a/system-test/data/read-rows-retry-test.json b/system-test/data/read-rows-retry-test.json index aad5178c6..64f12a010 100644 --- a/system-test/data/read-rows-retry-test.json +++ b/system-test/data/read-rows-retry-test.json @@ -144,12 +144,12 @@ }] }, "request_options": [ - { "rowKeys": [], + { "rowKeys": [], "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "c" }, { "startKeyClosed": "x", "endKeyClosed": "z" } ] }, - { "rowKeys": [], + { "rowKeys": [], "rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] } ], "responses": [ @@ -326,8 +326,298 @@ [ "y" ], [ "z" ] ] - } + }, + { + "name": "should do a retry the stream is interrupted", + "createReadStream_options": {}, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{}] + }, + { + "rowKeys": [], + "rowRanges": [{}] + } + ], + "responses": [ + { "row_keys": [], "end_with_error": 4 }, + { "row_keys": [ "z" ] } + ], + "row_keys_read": [[], ["z"]] + }, + { + "name": "should not retry CANCELLED errors", + "createReadStream_options": {}, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{}] + } + ], + "responses": [ + { "row_keys": [], "end_with_error": 1 } + ], + "row_keys_read": [[]] + }, + { + "name": "should have a range which starts after the last read key", + "createReadStream_options": {}, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{}] + }, + { + "rowKeys": [], + "rowRanges": [{ + "startKeyOpen": "a" + }] + } + ], + "responses": [ + { "row_keys": ["a"], "end_with_error": 4 }, + { "row_keys": ["z"]} + ], + "row_keys_read": [["a"], ["z"]] + }, + { + "name": "should move the active range start to after the last read key", + "createReadStream_options": {"ranges": [{"start": "a"}]}, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{ + "startKeyClosed": "a" + }] + }, + { + "rowKeys": [], + "rowRanges": [{ + "startKeyOpen": "a" + }] + } + ], + "responses": [ + { "row_keys": ["a"], "end_with_error": 4 }, + { "row_keys": ["z"]} + ], + "row_keys_read": [["a"], ["z"]] + }, + { + "name": "should remove ranges which were already read", + "createReadStream_options": { + "ranges": [{"start": "a", "end": "b"}, {"start": "c"}] + }, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{ + "startKeyClosed": "a", + "endKeyClosed": "b" + }, + { + "startKeyClosed": "c" + }] + }, + { + "rowKeys": [], + "rowRanges": [{ + "startKeyClosed": "c" + }] + } + ], + "responses": [ + { "row_keys": ["a", "b"], "end_with_error": 4 }, + { "row_keys": ["c"]} + ], + "row_keys_read": [["a", "b"], ["c"]] + }, + { + "name": "should remove the keys which were already read", + "createReadStream_options": { + "keys": ["a", "b"] + }, + "request_options": [ + { + "rowKeys": ["a", "b"], + "rowRanges": [] + }, + { + "rowKeys": ["b"], + "rowRanges": [] + } + ], + "responses": [ + { "row_keys": ["a"], "end_with_error": 4 }, + { "row_keys": ["c"]} + ], + "row_keys_read": [["a"], ["c"]] + }, + { + "name": "should not retry if limit is reached", + "createReadStream_options": { + "ranges": [{"start": "a", "end": "c"}], + "limit": 2 + }, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{ + "startKeyClosed": "a", + "endKeyClosed": "c" + }], + "rowsLimit": 2 + } + ], + "responses": [ + { "row_keys": ["a", "b"], "end_with_error": 4} + ], + "row_keys_read": [["a", "b"]], + "error": 4 + }, + { + "name": "should not retry if all the keys are read", + "createReadStream_options": { + "keys": ["a"] + }, + "request_options": [ + { + "rowKeys": ["a"], + "rowRanges": [] + } + ], + "responses": [ + { "row_keys": ["a"], "end_with_error": 4} + ], + "row_keys_read": [["a"]], + "error": 4 + }, + { + "name": "should not retry if all the ranges are read", + "createReadStream_options": { + "ranges": [{"start": "a", "end": "c", "endInclusive": true}] + }, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [ + { + "startKeyClosed": "a", + "endKeyClosed": "c" + } + ] + } + ], + "responses": [ + { "row_keys": ["c"], "end_with_error": 4} + ], + "row_keys_read": [["c"]], + "error": 4 + }, + { + "name": "shouldn not retry with keys and ranges that are read", + "createReadStream_options": { + "ranges": [{"start": "a", "end": "b"}], + "keys": ["c", "d"] + }, + "request_options": [ + { + "rowKeys": [ + "c", + "d" + ], + "rowRanges": [ + { + "startKeyClosed": "a", + "endKeyClosed": "b" + } + ] + } + ], + "responses": [ + { "row_keys": ["a1", "d"], "end_with_error": 4} + ], + "row_keys_read": [["a1", "d"]], + "error": 4 + }, + { + "name": "should retry received rst stream errors", + "createReadStream_options": { + "keys": ["a"] + }, + "request_options": [ + { + "rowKeys": [ + "a" + ], + "rowRanges": [] + }, + { + "rowKeys": [ + "a" + ], + "rowRanges": [] + } + ], + "responses": [ + {"row_keys": [], "end_with_error": 13, "error_message": "rst_stream error"}, + {"row_keys": ["a"]} + ], + "row_keys_read": [[],["a"]] + }, + { + "name": "should not retry over maxRetries", + "createReadStream_options": { + "gaxOptions": { + "maxRetries": 0 + } + }, + "request_options": [ + { + "rowKeys": [], + "rowRanges": [{}] + } + ], + "responses": [ + { "row_keys": [],"end_with_error": 4 } + ], + "row_keys_read": [[]], + "error": 4 + }, + { + "name": "should use a different set of retry codes", + "createReadStream_options": { + "keys": ["a", "b", "c"], + "gaxOptions": { + "retry" : { + "retryCodes": [14, 13] + } + } + }, + "request_options": [ + { + "rowKeys": ["a", "b", "c"], + "rowRanges": [] + }, + { + "rowKeys": ["b", "c"], + "rowRanges": [] + }, + { + "rowKeys": ["b", "c"], + "rowRanges": [] + } + ], + "responses": [ + { "row_keys": ["a"], "end_with_error": 14}, + { "row_keys": [], "end_with_error": 13}, + { "row_keys": ["b"], "end_with_error": 4} + ], + "row_keys_read": [["a"], [], ["b"]], + "error": 4 + } ] } diff --git a/system-test/read-rows.ts b/system-test/read-rows.ts index 37beab74c..3e24a2671 100644 --- a/system-test/read-rows.ts +++ b/system-test/read-rows.ts @@ -12,78 +12,123 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {Bigtable} from '../src'; -import {Mutation} from '../src/mutation.js'; +import {Bigtable, protos, Table} from '../src'; const {tests} = require('../../system-test/data/read-rows-retry-test.json') as { - tests: Test[]; + tests: ReadRowsTest[]; }; -import {google} from '../protos/protos'; import * as assert from 'assert'; -import {describe, it, afterEach, beforeEach} from 'mocha'; -import * as sinon from 'sinon'; -import {EventEmitter} from 'events'; -import {Test} from './testTypes'; -import {ServiceError, GrpcClient, GoogleError, CallOptions} from 'google-gax'; -import {PassThrough} from 'stream'; +import {describe, it, before} from 'mocha'; +import {CreateReadStreamRequest, ReadRowsTest} from './testTypes'; +import {ServiceError, GrpcClient, CallOptions, GoogleError} from 'google-gax'; +import {MockServer} from '../src/util/mock-servers/mock-server'; +import {MockService} from '../src/util/mock-servers/mock-service'; +import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service'; +import {ServerWritableStream} from '@grpc/grpc-js'; const {grpc} = new GrpcClient(); -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function dispatch(emitter: EventEmitter, response: any) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const emits: any[] = [{name: 'request'}]; - if (response.row_keys) { - emits.push.apply(emits, [ - {name: 'response', arg: 200}, - { - name: 'data', - arg: {chunks: response.row_keys.map(rowResponse)}, - }, - ]); - } - if (response.end_with_error) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const error: any = new Error(); - error.code = response.end_with_error; - emits.push({name: 'error', arg: error}); - } else { - emits.push({name: 'end'}); - } - let index = 0; - setImmediate(next); - - function next() { - if (index < emits.length) { - const emit = emits[index]; - index++; - emitter.emit(emit.name, emit.arg); - setImmediate(next); - } - } -} - -function rowResponse(rowKey: {}) { +/** + * The read-rows-retry-test.json file contains row key data that represents + * data that the mock server should send back. This function converts this test + * data into responses that the mock server actually sends which match the + * format readrows responses should have. + * + * @param {string} rowKey The row key from read-rows-retry-test.json. + */ +function rowResponseFromServer(rowKey: string) { return { - rowKey: Mutation.convertToBytes(rowKey), + rowKey: Buffer.from(rowKey).toString('base64'), familyName: {value: 'family'}, - qualifier: {value: 'qualifier'}, - valueSize: 0, - timestampMicros: 0, - labels: [], + qualifier: {value: Buffer.from('qualifier').toString('base64')}, commitRow: true, - value: 'value', + value: Buffer.from(rowKey).toString('base64'), }; } +function isRowKeysWithFunction(array: unknown): array is RowKeysWithFunction { + return (array as RowKeysWithFunction).asciiSlice !== undefined; +} + +function isRowKeysWithFunctionArray( + array: unknown[] +): array is RowKeysWithFunction[] { + return array.every((element: unknown) => { + return isRowKeysWithFunction(element); + }); +} + +interface TestRowRange { + startKey?: 'startKeyClosed' | 'startKeyOpen'; + endKey?: 'endKeyOpen' | 'endKeyClosed'; + startKeyClosed?: Uint8Array | string | null; + startKeyOpen?: Uint8Array | string | null; + endKeyOpen?: Uint8Array | string | null; + endKeyClosed?: Uint8Array | string | null; +} +interface RowKeysWithFunction { + asciiSlice: () => string; +} +function getRequestOptions(request: { + rows?: { + rowRanges?: TestRowRange[] | null; + rowKeys?: Uint8Array[] | null; + } | null; + rowsLimit?: string | number | Long | null | undefined; +}): CreateReadStreamRequest { + const requestOptions = {} as CreateReadStreamRequest; + if (request.rows && request.rows.rowRanges) { + requestOptions.rowRanges = request.rows.rowRanges.map( + (range: TestRowRange) => { + const convertedRowRange = {} as {[index: string]: string}; + { + // startKey and endKey get filled in during the grpc request. + // They should be removed as the test data does not look + // for these properties in the request. + if (range.startKey) { + delete range.startKey; + } + if (range.endKey) { + delete range.endKey; + } + } + Object.entries(range).forEach( + ([key, value]) => (convertedRowRange[key] = value.asciiSlice()) + ); + return convertedRowRange; + } + ); + } + if ( + request.rows && + request.rows.rowKeys && + isRowKeysWithFunctionArray(request.rows.rowKeys) + ) { + requestOptions.rowKeys = request.rows.rowKeys.map( + (rowKeys: RowKeysWithFunction) => rowKeys.asciiSlice() + ); + } + // rowsLimit is set to '0' if rowsLimit is not provided in the + // grpc request. + // + // Do not append rowsLimit to collection of request options if received grpc + // rows limit is '0' so that test data in read-rows-retry-test.json remains + // shorter. + if ( + request.rowsLimit && + request.rowsLimit !== '0' && + typeof request.rowsLimit === 'string' + ) { + requestOptions.rowsLimit = parseInt(request.rowsLimit); + } + return requestOptions; +} + describe('Bigtable/Table', () => { const bigtable = new Bigtable(); const INSTANCE_NAME = 'fake-instance2'; // eslint-disable-next-line @typescript-eslint/no-explicit-any (bigtable as any).grpcCredentials = grpc.credentials.createInsecure(); - const INSTANCE = bigtable.instance('instance'); - const TABLE = INSTANCE.table('table'); - describe('close', () => { it('should fail when invoking readRows with closed client', async () => { const instance = bigtable.instance(INSTANCE_NAME); @@ -112,7 +157,8 @@ describe('Bigtable/Table', () => { assert.fail( 'An error should have been thrown because the client is closed' ); - } catch (err: any) { + } catch (err) { + assert(err instanceof GoogleError); assert.strictEqual(err.message, 'The client has already been closed.'); } }); @@ -123,97 +169,95 @@ describe('Bigtable/Table', () => { }); }); - describe('createReadStream', () => { - let clock: sinon.SinonFakeTimers; - let endCalled: boolean; - let error: ServiceError | null; - let requestedOptions: Array<{}>; - let responses: Array<{}> | null; - let rowKeysRead: Array>; - let stub: sinon.SinonStub; - - beforeEach(() => { - clock = sinon.useFakeTimers({ - toFake: [ - 'setTimeout', - 'clearTimeout', - 'setImmediate', - 'clearImmediate', - 'setInterval', - 'clearInterval', - 'Date', - 'nextTick', - ], + describe('createReadStream using mock server', () => { + let server: MockServer; + let service: MockService; + let bigtable = new Bigtable(); + let table: Table; + before(async () => { + // make sure we have everything initialized before starting tests + const port = await new Promise(resolve => { + server = new MockServer(resolve); }); - endCalled = false; - error = null; - responses = null; - rowKeysRead = []; - requestedOptions = []; - stub = sinon.stub(bigtable, 'request').callsFake(cfg => { - const reqOpts = cfg.reqOpts; - const requestOptions = {} as google.bigtable.v2.IRowSet; - if (reqOpts.rows && reqOpts.rows.rowRanges) { - requestOptions.rowRanges = reqOpts.rows.rowRanges.map( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (range: any) => { - const convertedRowRange = {} as {[index: string]: string}; - Object.keys(range).forEach( - key => (convertedRowRange[key] = range[key].asciiSlice()) - ); - return convertedRowRange; - } - ); - } - if (reqOpts.rows && reqOpts.rows.rowKeys) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - requestOptions.rowKeys = reqOpts.rows.rowKeys.map((rowKeys: any) => - rowKeys.asciiSlice() - ); - } - if (reqOpts.rowsLimit) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (requestOptions as any).rowsLimit = reqOpts.rowsLimit; - } - requestedOptions.push(requestOptions); - rowKeysRead.push([]); - const requestStream = new PassThrough({objectMode: true}); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (requestStream as any).abort = () => {}; - dispatch(requestStream, responses!.shift()); - return requestStream; + bigtable = new Bigtable({ + apiEndpoint: `localhost:${port}`, }); + table = bigtable.instance('fake-instance').table('fake-table'); + service = new BigtableClientMockService(server); }); - afterEach(() => { - clock.restore(); - stub.restore(); + after(async () => { + server.shutdown(() => {}); }); tests.forEach(test => { - it(test.name, () => { - responses = test.responses; - TABLE.maxRetries = test.max_retries; - TABLE.createReadStream(test.createReadStream_options) - .on('data', row => rowKeysRead[rowKeysRead.length - 1].push(row.id)) - .on('end', () => (endCalled = true)) - .on('error', err => (error = err as ServiceError)); - clock.runAll(); - - if (test.error) { - assert(!endCalled, ".on('end') should not have been invoked"); - assert.strictEqual(error!.code, test.error); - } else { - assert(endCalled, ".on('end') shoud have been invoked"); - assert.ifError(error); + it(test.name, done => { + // These variables store request/response data capturing data sent + // and received when using readRows with retries. This data is evaluated + // in checkResults at the end of the test for correctness. + const requestedOptions: CreateReadStreamRequest[] = []; + const responses = test.responses; + const rowKeysRead: string[][] = []; + let endCalled = false; + let error: ServiceError | null = null; + function checkResults() { + if (test.error) { + assert(!endCalled, ".on('end') should not have been invoked"); + assert.strictEqual(error!.code, test.error); + } else { + assert(endCalled, ".on('end') should have been invoked"); + assert.ifError(error); + } + assert.deepStrictEqual(rowKeysRead, test.row_keys_read); + assert.strictEqual( + responses.length, + 0, + 'not all the responses were used' + ); + assert.deepStrictEqual(requestedOptions, test.request_options); + done(); } - assert.deepStrictEqual(rowKeysRead, test.row_keys_read); - assert.strictEqual( - responses.length, - 0, - 'not all the responses were used' - ); - assert.deepStrictEqual(requestedOptions, test.request_options); + + table.maxRetries = test.max_retries; + service.setService({ + ReadRows: ( + stream: ServerWritableStream< + protos.google.bigtable.v2.IReadRowsRequest, + protos.google.bigtable.v2.IReadRowsResponse + > + ) => { + const response = responses!.shift(); + assert(response); + rowKeysRead.push([]); + requestedOptions.push(getRequestOptions(stream.request)); + if (response.row_keys) { + stream.write({ + chunks: response.row_keys.map(rowResponseFromServer), + }); + } + if (response.end_with_error) { + const error: GoogleError = new GoogleError(); + if (response.error_message) { + error.message = response.error_message; + } + error.code = response.end_with_error; + stream.emit('error', error); + } else { + stream.end(); + } + }, + }); + table + .createReadStream(test.createReadStream_options) + .on('data', row => rowKeysRead[rowKeysRead.length - 1].push(row.id)) + .on('end', () => { + endCalled = true; + checkResults(); + }) + .on('error', err => { + error = err as ServiceError; + checkResults(); + }); }); }); }); diff --git a/system-test/testTypes.ts b/system-test/testTypes.ts index 43613cc67..210bb2f0d 100644 --- a/system-test/testTypes.ts +++ b/system-test/testTypes.ts @@ -14,6 +14,7 @@ import {ServiceError} from 'google-gax'; import {GetRowsOptions} from '../src/table'; +import {google} from '../protos/protos'; export interface Test { name: string; @@ -46,3 +47,24 @@ export interface Test { row_keys_read: {}; createReadStream_options: GetRowsOptions; } + +interface CreateReadStreamResponse { + row_keys?: string[]; + end_with_error?: number; + error_message?: string; +} + +export interface CreateReadStreamRequest { + rowKeys: string[]; + rowRanges: google.bigtable.v2.IRowRange[]; + rowsLimit?: number; +} +export interface ReadRowsTest { + createReadStream_options?: GetRowsOptions; + max_retries: number; + responses: CreateReadStreamResponse[]; + request_options: CreateReadStreamRequest[]; + error: number; + row_keys_read: string[][]; + name: string; +} diff --git a/test/table.ts b/test/table.ts index f0833ef77..ad39aafb9 100644 --- a/test/table.ts +++ b/test/table.ts @@ -1,4 +1,4 @@ -// Copyright 2016 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import {afterEach, before, beforeEach, describe, it} from 'mocha'; import * as proxyquire from 'proxyquire'; import * as pumpify from 'pumpify'; import * as sinon from 'sinon'; -import {PassThrough, Writable, Duplex} from 'stream'; +import {PassThrough, Writable} from 'stream'; import {ServiceError} from 'google-gax'; import * as inst from '../src/instance'; @@ -30,6 +30,9 @@ import * as tblTypes from '../src/table'; import {Bigtable, RequestOptions} from '../src'; import {EventEmitter} from 'events'; import {TableUtils} from '../src/utils/table'; +import {ReadRowsResumptionStrategy} from '../src/utils/read-rows-resumption'; +import {MockGapicLayer} from './util/mock-gapic-layer'; +import {Table} from '../src/table'; const sandbox = sinon.createSandbox(); const noop = () => {}; @@ -110,12 +113,22 @@ describe('Bigtable/Table', () => { let table: any; before(() => { + const FakeReadRowsResumptionStrategy: ReadRowsResumptionStrategy = + proxyquire('../src/utils/read-rows-resumption', { + '../family.js': {Family: FakeFamily}, + '../mutation.js': {Mutation: FakeMutation}, + '../filter.js': {Filter: FakeFilter}, + '../row.js': {Row: FakeRow}, + }).ReadRowsResumptionStrategy; Table = proxyquire('../src/table.js', { '@google-cloud/promisify': fakePromisify, './family.js': {Family: FakeFamily}, './mutation.js': {Mutation: FakeMutation}, './filter.js': {Filter: FakeFilter}, pumpify, + './utils/read-rows-resumption': { + ReadRowsResumptionStrategy: FakeReadRowsResumptionStrategy, + }, './row.js': {Row: FakeRow}, './chunktransformer.js': {ChunkTransformer: FakeChunkTransformer}, }).Table; @@ -547,6 +560,8 @@ describe('Bigtable/Table', () => { assert.strictEqual(config.reqOpts.appProfileId, undefined); assert.deepStrictEqual(config.gaxOpts, { otherArgs: {headers: {'bigtable-attempt': 0}}, + retry: config.gaxOpts.retry, + maxRetries: 10, }); done(); }; @@ -1114,364 +1129,109 @@ describe('Bigtable/Table', () => { }); }); - describe('retries', () => { - let callCreateReadStream: Function; - let emitters: EventEmitter[] | null; // = [((stream: Writable) => { stream.push([{ key: 'a' }]); - // stream.end(); }, ...]; - let makeRetryableError: Function; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let reqOptsCalls: any[]; - let setTimeoutSpy: sinon.SinonSpy; - - /* - setImmediate is required here to correctly mock events as they will - come in from the request function. It is required for tests to pass, - but it is not a problem that it is required because we never expect - a single Node event to emit data and then emit an error. That is, - a mock without setImmediate around the last error represents a scenario - that will never happen. - */ - function emitRetriableError(stream: Duplex) { - setImmediate(() => { - stream.emit('error', makeRetryableError()); - }); - } - - beforeEach(() => { - FakeChunkTransformer.prototype._transform = function ( - rows: Row[], - enc: {}, - next: Function - ) { - rows.forEach(row => this.push(row)); - this.lastRowKey = rows[rows.length - 1].key; - next(); - }; - - FakeChunkTransformer.prototype._flush = (cb: Function) => { - cb(); - }; - - callCreateReadStream = (options: {}, verify: Function) => { - table.createReadStream(options).on('end', verify).resume(); // The stream starts paused unless it has a `.data()` - // callback. - }; - - emitters = null; // This needs to be assigned in each test case. - - makeRetryableError = () => { - const error = new Error('retry me!') as ServiceError; - error.code = 4; - return error; - }; - - (sandbox.stub(FakeFilter, 'createRange') as sinon.SinonStub).callsFake( - (start, end) => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const range: any = {}; - if (start) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - range.start = (start as any).value || start; - range.startInclusive = - // eslint-disable-next-line @typescript-eslint/no-explicit-any - typeof start === 'object' ? (start as any).inclusive : true; - } - if (end) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - range.end = (end as any).value || end; - } - return range; - } - ); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (FakeMutation as any).convertToBytes = (value: string) => { - return Buffer.from(value); - }; - - reqOptsCalls = []; - - setTimeoutSpy = sandbox - .stub(global, 'setTimeout') - .callsFake(fn => (fn as Function)()); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - table.bigtable.request = (config: any) => { - reqOptsCalls.push(config.reqOpts); - - const stream = new PassThrough({ - objectMode: true, - }); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (stream as any).abort = () => {}; - - setImmediate(() => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (emitters!.shift() as any)(stream); - }); - return stream; - }; - }); - - afterEach(() => { - if (setTimeoutSpy) { - setTimeoutSpy.restore(); - } - }); - - it('should do a retry the stream is interrupted', done => { - emitters = [ - ((stream: Writable) => { - stream.emit('error', makeRetryableError()); - stream.end(); - }) as {} as EventEmitter, - ((stream: Writable) => { - stream.end(); - }) as {} as EventEmitter, - ]; - callCreateReadStream(null, () => { - assert.strictEqual(reqOptsCalls.length, 2); - done(); - }); - }); - - it('should not retry CANCELLED errors', done => { - emitters = [ - ((stream: Writable) => { - const cancelledError = new Error( - 'do not retry me!' - ) as ServiceError; - cancelledError.code = 1; - stream.emit('error', cancelledError); - stream.end(); - }) as {} as EventEmitter, - ]; - callCreateReadStream(null, () => { - assert.strictEqual(reqOptsCalls.length, 1); - done(); - }); - }); - - it('should not retry over maxRetries', done => { - const error = new Error('retry me!') as ServiceError; - error.code = 4; - - emitters = [ - ((stream: Writable) => { - stream.emit('error', error); - stream.end(); - }) as {} as EventEmitter, - ]; - - table.maxRetries = 0; - table - .createReadStream() - .on('error', (err: ServiceError) => { - assert.strictEqual(err, error); - assert.strictEqual(reqOptsCalls.length, 1); - done(); - }) - .on('end', done) - .resume(); - }); - - it('should have a range which starts after the last read key', done => { - emitters = [ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ((stream: any) => { - stream.push([{key: 'a'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ((stream: Writable) => { - stream.end(); - }) as {} as EventEmitter, - ]; - - const fullScan = {rowKeys: [], rowRanges: [{}]}; - - callCreateReadStream(null, () => { - assert.deepStrictEqual(reqOptsCalls[0].rows, fullScan); - assert.deepStrictEqual(reqOptsCalls[1].rows, { - rowKeys: [], - rowRanges: [{start: 'a', startInclusive: false}], - }); - done(); - }); - }); - - it('should move the active range start to after the last read key', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'a'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ((stream: Writable) => { - stream.end(); - }) as {} as EventEmitter, - ]; - - callCreateReadStream({ranges: [{start: 'a'}]}, () => { - assert.deepStrictEqual(reqOptsCalls[0].rows, { - rowKeys: [], - rowRanges: [{start: 'a', startInclusive: true}], - }); - assert.deepStrictEqual(reqOptsCalls[1].rows, { - rowKeys: [], - rowRanges: [{start: 'a', startInclusive: false}], - }); - done(); - }); + describe('createReadStream mocking out the gapic layer', () => { + const bigtable = new Bigtable({ + projectId: 'fake-project-id', }); + const tester = new MockGapicLayer(bigtable); + const table: Table = bigtable + .instance('fake-instance') + .table('fake-table'); + const tableName = + 'projects/fake-project-id/instances/fake-instance/tables/fake-table'; - it('should remove ranges which were already read', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'a'}]); - stream.push([{key: 'b'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ((stream: Duplex) => { - stream.push([{key: 'c'}]); - stream.end(); - }) as {} as EventEmitter, - ]; - - const options = { - ranges: [{start: 'a', end: 'b'}, {start: 'c'}], - }; - - callCreateReadStream(options, () => { - const allRanges = [ - {start: 'a', end: 'b', startInclusive: true}, - {start: 'c', startInclusive: true}, - ]; - assert.deepStrictEqual(reqOptsCalls[0].rows, { - rowKeys: [], - rowRanges: allRanges, - }); - assert.deepStrictEqual(reqOptsCalls[1].rows, { - rowKeys: [], - rowRanges: allRanges.slice(1), - }); - done(); - }); - }); - - it('should remove the keys which were already read', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'a'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ((stream: Duplex) => { - stream.end([{key: 'c'}]); - }) as {} as EventEmitter, - ]; - - callCreateReadStream({keys: ['a', 'b']}, () => { - assert.strictEqual(reqOptsCalls[0].rows.rowKeys.length, 2); - assert.strictEqual(reqOptsCalls[1].rows.rowKeys.length, 1); - done(); - }); - }); - - it('should not retry if limit is reached', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'a'}]); - stream.push([{key: 'b'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ]; - - const options = { - ranges: [{start: 'a', end: 'c'}], - limit: 2, - }; - - callCreateReadStream(options, () => { - assert.strictEqual(reqOptsCalls.length, 1); - done(); - }); - }); - - it('should not retry if all the keys are read', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'a'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ]; - - callCreateReadStream({keys: ['a']}, () => { - assert.strictEqual(reqOptsCalls.length, 1); - done(); - }); - }); - - it('shouldn not retry if all the ranges are read', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'c'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ]; - - const options = { - ranges: [{start: 'a', end: 'c', endInclusive: true}], - }; - - callCreateReadStream(options, () => { - assert.strictEqual(reqOptsCalls.length, 1); - assert.deepStrictEqual(reqOptsCalls[0].rows, { - rowKeys: [], - rowRanges: [{start: 'a', end: 'c', startInclusive: true}], - }); - done(); - }); + it('should pass the right retry configuration to the gapic layer', done => { + const expectedOptions = tester.buildReadRowsGaxOptions(tableName, {}); + tester.testReadRowsGapicCall( + done, + { + rows: { + rowKeys: [], + rowRanges: [{}], + }, + tableName, + }, + expectedOptions + ); + table.createReadStream(); }); - - it('shouldn not retry with keys and ranges that are read', done => { - emitters = [ - ((stream: Duplex) => { - stream.push([{key: 'a1'}]); - stream.push([{key: 'd'}]); - emitRetriableError(stream); - }) as {} as EventEmitter, - ]; - - const options = { - ranges: [{start: 'a', end: 'b'}], - keys: ['c', 'd'], + it('should pass maxRetries to the gapic layer', done => { + const expectedOptions = Object.assign( + {maxRetries: 7}, + tester.buildReadRowsGaxOptions(tableName, {}) + ); + tester.testReadRowsGapicCall( + done, + { + rows: { + rowKeys: [], + rowRanges: [{}], + }, + tableName, + }, + expectedOptions + ); + const tableWithRetries: Table = bigtable + .instance('fake-instance') + .table('fake-table'); + tableWithRetries.maxRetries = 7; + tableWithRetries.createReadStream(); + }); + it('should pass gax options and readrows request data to the gapic layer in a complex example', done => { + const gaxOptions = { + timeout: 734, + autoPaginate: true, + maxResults: 565, + maxRetries: 477, }; - - callCreateReadStream(options, () => { - assert.strictEqual(reqOptsCalls.length, 1); - done(); - }); - }); - - it('should retry received rst stream errors', done => { - const rstStreamError = new Error('Received Rst_stream') as ServiceError; - rstStreamError.code = 13; - emitters = [ - ((stream: Duplex) => { - stream.emit('error', rstStreamError); - }) as {} as EventEmitter, - ((stream: Duplex) => { - stream.end([{key: 'a'}]); - }) as {} as EventEmitter, - ]; - const options = { - keys: ['a'], + filter: { + row: 'cn', + }, + gaxOptions, + keys: ['ey', 'gh'], + limit: 98, + ranges: [ + {start: 'cc', end: 'ef'}, + { + start: {inclusive: false, value: 'pq'}, + end: {inclusive: true, value: 'rt'}, + }, + ], }; - - callCreateReadStream(options, () => { - assert.strictEqual(reqOptsCalls.length, 2); - done(); - }); + const expectedOptions = Object.assign( + gaxOptions, + tester.buildReadRowsGaxOptions(tableName, {}) + ); + tester.testReadRowsGapicCall( + done, + { + rows: { + rowKeys: ['ey', 'gh'].map(key => Buffer.from(key)), + rowRanges: [ + { + startKeyClosed: Buffer.from('cc'), + endKeyClosed: Buffer.from('ef'), + }, + { + startKeyOpen: Buffer.from('pq'), + endKeyClosed: Buffer.from('rt'), + }, + ], + }, + tableName, + filter: { + rowKeyRegexFilter: Buffer.from('cn'), + }, + rowsLimit: 98, + }, + expectedOptions + ); + const tableWithRetries: Table = bigtable + .instance('fake-instance') + .table('fake-table'); + tableWithRetries.maxRetries = 7; + tableWithRetries.createReadStream(options); }); }); }); diff --git a/test/util/mock-gapic-layer.ts b/test/util/mock-gapic-layer.ts new file mode 100644 index 000000000..3a38e7808 --- /dev/null +++ b/test/util/mock-gapic-layer.ts @@ -0,0 +1,209 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ChunkTransformer} from '../../src/chunktransformer'; +import {Bigtable, GetRowsOptions, protos} from '../../src'; +import * as v2 from '../../src/v2'; +import * as mocha from 'mocha'; +import {CallOptions, GoogleError, grpc, RetryOptions} from 'google-gax'; +import * as assert from 'assert'; +import * as gax from 'google-gax'; +import {StreamProxy} from 'google-gax/build/src/streamingCalls/streaming'; +import {ReadRowsResumptionStrategy} from '../../src/utils/read-rows-resumption'; +import {RequestType} from 'google-gax/build/src/apitypes'; +import {DEFAULT_BACKOFF_SETTINGS} from '../../src/utils/retry-options'; + +/** + * Create an MockGapicLayer object used for ensuring the right data reaches the Gapic layer when we are testing the handwritten layer. + * + * @param {Bigtable} An instance of the Bigtable client + * + * @example + * ``` + * const bigtable = new Bigtable({ + * projectId: 'fake-project-id', + * }); + * const tester = new MockGapicLayer(bigtable); + * const table: Table = bigtable.instance('fake-instance').table('fake-table'); + * tester.testReadRowsGapicCall( // Mocks out the readRows function + * done, + * { + * rows: { + * rowKeys: [], + * rowRanges: [{}], + * }, + * tableName, + * }, + * {} + * ); + * table.createReadStream(); + * ``` + */ +export class MockGapicLayer { + private gapicClient: v2.BigtableClient; + constructor(bigtable: Bigtable) { + const clientOptions = bigtable.options.BigtableClient; + this.gapicClient = new v2['BigtableClient'](clientOptions); + bigtable.api['BigtableClient'] = this.gapicClient; + } + + /** + * Returns gax options that contain a retry options object containing the + * usual retry conditions and the usual resumption strategy. + * + * @param {string} tableName The formatted table name + * @param {GetRowsOptions} options Options provided to createreadstream used for + * customizing the readRows call. + */ + buildReadRowsGaxOptions( + tableName: string, + options: GetRowsOptions + ): CallOptions { + const chunkTransformer: ChunkTransformer = new ChunkTransformer({ + decode: false, + } as any); + const expectedStrategy = new ReadRowsResumptionStrategy( + chunkTransformer, + options, + {tableName} + ); + const expectedResumptionRequest = () => { + return expectedStrategy.getResumeRequest() as RequestType; + }; + const expectedCanResume = (error: GoogleError) => { + return expectedStrategy.canResume(error); + }; + const expectedRetryOptions = new RetryOptions( + [], + DEFAULT_BACKOFF_SETTINGS, + expectedCanResume, + expectedResumptionRequest + ); + return { + otherArgs: { + headers: { + 'bigtable-attempt': 0, + }, + }, + retry: expectedRetryOptions, + }; + } + + /** + * Mocks out the ReadRows function in the Gapic layer with a function that + * ensures the data being received in the Gapic layer is correct. + * + * @param {mocha.Done} [done] This is the function that is called when the + * mocha test is completed. + * @param {protos.google.bigtable.v2.IReadRowsRequest} [expectedRequest] The + * request expected in the call to readrows in the Gapic layer + * @param {CallOptions} expectedOptions The gax options expected in the call + * to the readrows function in the Gapic layer. + * + */ + testReadRowsGapicCall( + done: mocha.Done, + expectedRequest: protos.google.bigtable.v2.IReadRowsRequest, + expectedOptions: CallOptions + ) { + this.gapicClient.readRows = ( + request: protos.google.bigtable.v2.IReadRowsRequest, + options: CallOptions + ) => { + try { + assert.deepStrictEqual(request, expectedRequest); + if (options || expectedOptions) { + // Do value comparison on options.retry since + // it won't be reference equal to expectedOptions.retry: + assert(options); + assert(expectedOptions); + const retry = options.retry; + const expectedRetry = expectedOptions.retry; + assert(retry); + assert(expectedRetry); + // This if statement is needed to satisfy the compiler. + // The previous asserts guarantee it evaluates to true. + if (retry && expectedRetry) { + // First check that the retry codes are correct + // These do not need to be reference equal for a passing check + assert.deepStrictEqual( + retry?.retryCodes, + expectedRetry?.retryCodes + ); + // Next check that the backoff settings are correct + // These do not need to be reference equal for a passing check + assert.deepStrictEqual( + retry?.backoffSettings, + expectedRetry?.backoffSettings + ); + // Next check that the shouldRetryFn gets the right result for + // each error type. + assert(retry); + assert(expectedRetry); + assert(retry.shouldRetryFn); + assert(expectedRetry.shouldRetryFn); + const grpcErrorCodes = Object.values(grpc.status) + .map((status, index) => index) + .slice(1); + // This function maps a shouldRetryFn in the retry parameter + // to an array of what its output would be for each grpc code. + const mapCodeToShouldRetryArray = ( + retryParameter: Partial + ) => + grpcErrorCodes + .map((code: number) => + Object.assign(new GoogleError('Test error'), {code: code}) + ) + .map((error: GoogleError) => { + retryParameter.shouldRetryFn + ? retryParameter.shouldRetryFn(error) + : undefined; + }); + // This check ensures that for each grpc error code, the boolean + // return value of the shouldRetryFn is correct. + assert.deepStrictEqual( + mapCodeToShouldRetryArray(retry), + mapCodeToShouldRetryArray(expectedRetry) + ); + // Check that the output of the resumption function is correct: + assert(retry.getResumptionRequestFn); + assert(expectedRetry.getResumptionRequestFn); + // This if statement is needed to satisfy the compiler. + // The previous asserts guarantee it evaluates to true. + if ( + retry.getResumptionRequestFn && + expectedRetry.getResumptionRequestFn + ) { + assert.deepStrictEqual( + retry.getResumptionRequestFn({}), + expectedRetry.getResumptionRequestFn({}) + ); + } + } + done(); + } + } catch (e: unknown) { + done(e); + } + // The following code is added just so the mocked gapic function will compile. + // A return value is provided to match the return value of the readrows + // Gapic function. + const duplex: gax.CancellableStream = new StreamProxy( + gax.StreamType.SERVER_STREAMING, + () => {} + ); + return duplex; + }; + } +} diff --git a/test/utils/read-rows-resumption.ts b/test/utils/read-rows-resumption.ts new file mode 100644 index 000000000..a026017e3 --- /dev/null +++ b/test/utils/read-rows-resumption.ts @@ -0,0 +1,244 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it} from 'mocha'; +import {ChunkTransformer} from '../../src/chunktransformer'; +import {ReadRowsResumptionStrategy} from '../../src/utils/read-rows-resumption'; +import * as assert from 'assert'; +import {GoogleError} from 'google-gax'; + +describe('Bigtable/Utils/ReadrowsResumptionStrategy', () => { + const tableName = 'fake-table-name'; + [ + { + name: 'should generate the right resumption request with no options each time', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: [], + rowRanges: [{}], + }, + tableName, + }, + options: {}, + }, + { + name: 'should generate the right resumption requests with a last row key', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: ['c'].map(key => Buffer.from(key)), + rowRanges: [], + }, + tableName, + }, + options: { + keys: ['a', 'b', 'c'], + }, + lastRowKey: 'b', + }, + { + name: 'should generate the right resumption request with the lastrow key in a row range', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: [], + rowRanges: [ + {startKeyOpen: Buffer.from('b'), endKeyClosed: Buffer.from('c')}, + {startKeyClosed: Buffer.from('e'), endKeyClosed: Buffer.from('g')}, + ], + }, + tableName, + }, + options: { + ranges: [ + {start: 'a', end: 'c'}, + {start: 'e', end: 'g'}, + ], + }, + lastRowKey: 'b', + }, + { + name: 'should generate the right resumption request with the lastrow key at the end of a row range', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: [], + rowRanges: [ + {startKeyClosed: Buffer.from('e'), endKeyClosed: Buffer.from('g')}, + ], + }, + tableName, + }, + options: { + ranges: [ + {start: 'a', end: 'c'}, + {start: 'e', end: 'g'}, + ], + }, + lastRowKey: 'c', + }, + { + name: 'should generate the right resumption request with start and end', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: [], + rowRanges: [ + { + startKeyOpen: Buffer.from('d'), + endKeyClosed: Buffer.from('m'), + }, + ], + }, + tableName, + }, + options: { + start: 'b', + end: 'm', + }, + lastRowKey: 'd', + }, + { + name: 'should generate the right resumption request with prefixes', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: [], + rowRanges: [ + { + startKeyClosed: Buffer.from('f'), + endKeyOpen: Buffer.from('g'), + }, + { + startKeyClosed: Buffer.from('h'), + endKeyOpen: Buffer.from('i'), + }, + ], + }, + tableName, + }, + options: { + prefixes: ['d', 'f', 'h'], + }, + lastRowKey: 'e', + }, + { + name: 'should generate the right resumption request with row ranges and row keys', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: ['d'].map(key => Buffer.from(key)), + rowRanges: [ + {startKeyClosed: Buffer.from('e'), endKeyClosed: Buffer.from('g')}, + ], + }, + tableName, + }, + options: { + keys: ['a', 'c', 'd'], + ranges: [ + {start: 'a', end: 'c'}, + {start: 'e', end: 'g'}, + ], + }, + lastRowKey: 'c', + }, + { + name: 'should generate the right resumption request without any filtering', + shouldRetry: true, + expectedResumeRequest: { + rows: { + rowKeys: ['c', 'd', 'e'].map(key => Buffer.from(key)), + rowRanges: [ + {startKeyClosed: Buffer.from('d'), endKeyClosed: Buffer.from('f')}, + {startKeyClosed: Buffer.from('g'), endKeyClosed: Buffer.from('h')}, + ], + }, + tableName, + }, + options: { + keys: ['c', 'd', 'e'], + ranges: [ + {start: 'd', end: 'f'}, + {start: 'g', end: 'h'}, + ], + }, + lastRowKey: 'b', + }, + { + name: 'should not retry again if the last row key exceeds all the row keys requested', + shouldRetry: false, + options: { + keys: ['a', 'b', 'c'], + }, + lastRowKey: 'e', + }, + ].forEach(test => { + it(test.name, () => { + const chunkTransformer = new ChunkTransformer({ + decode: false, + } as any); + if (test.lastRowKey) { + chunkTransformer.lastRowKey = test.lastRowKey; + } + const strategy = new ReadRowsResumptionStrategy( + chunkTransformer, + test.options, + { + tableName, + } + ); + const error = new GoogleError(); + error.code = 4; + const willRetry = strategy.canResume(error); // Updates strategy state. + // Do this check 2 times to make sure getResumeRequest is idempotent. + assert.strictEqual(willRetry, test.shouldRetry); + if (willRetry) { + assert.deepStrictEqual( + strategy.getResumeRequest(), + test.expectedResumeRequest + ); + assert.deepStrictEqual( + strategy.getResumeRequest(), + test.expectedResumeRequest + ); + } + }); + }); + it('should generate the right resumption request with the limit', () => { + const chunkTransformer = new ChunkTransformer({ + decode: false, + } as any); + const strategy = new ReadRowsResumptionStrategy( + chunkTransformer, + { + limit: 71, + }, + { + tableName, + } + ); + strategy.rowsRead = 37; + strategy.canResume(new GoogleError()); // Updates strategy state. + assert.deepStrictEqual(strategy.getResumeRequest(), { + rows: { + rowKeys: [], + rowRanges: [{}], + }, + rowsLimit: 34, + tableName, + }); + }); +}); diff --git a/testproxy/known_failures.txt b/testproxy/known_failures.txt new file mode 100644 index 000000000..cce1bd7fa --- /dev/null +++ b/testproxy/known_failures.txt @@ -0,0 +1 @@ +TestMutateRow_Generic_Headers\|TestMutateRow_Generic_DeadlineExceeded|TestMutateRows_Generic_CloseClient\|TestMutateRows_Retry_WithRoutingCookie\|TestReadModifyWriteRow_Generic_Headers\|TestReadModifyWriteRow_NoRetry_TransientError\|TestReadModifyWriteRow_Generic_DeadlineExceeded\|TestReadRow_Generic_DeadlineExceeded\|TestReadRow_Retry_WithRoutingCookie\|TestReadRow_Retry_WithRetryInfo\|TestReadRows_ReverseScans_FeatureFlag_Enabled\|TestReadRows_NoRetry_OutOfOrderError_Reverse\|TestReadRows_Retry_PausedScan\|TestReadRows_Retry_LastScannedRow\|TestReadRows_Retry_LastScannedRow_Reverse\|TestCheckAndMutateRow_NoRetry_TransientError\|TestCheckAndMutateRow_Generic_DeadlineExceeded \ No newline at end of file