Skip to content
Open
17 changes: 16 additions & 1 deletion packages/nice-grpc-web/src/__tests__/unary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@ import {
} from '../../test-server/client';
import {NodeHttpTransport} from '../client/transports/nodeHttp';
import {defer} from './utils/defer';
import {XHRTransport} from "../client/transports/xhr";

const environment = detect();

[
...cartesianProduct([
['envoy' as const, 'grpcwebproxy' as const, 'traefik' as const],
['fetch' as const, 'node-http' as const],
['fetch' as const, 'node-http' as const, 'xhr' as const, 'fetch-blob' as const],
['http' as const, 'https' as const],
]),
['grpcwebproxy', 'websocket', 'http'] as const,
].forEach(([proxyType, transport, protocol]) => {
if (transport === 'node-http' && environment?.type !== 'node') {
return;
}
if (transport === 'xhr' && environment?.type === 'node'){
return;
}
if (transport === 'fetch-blob' && environment?.type === 'node') {
return;
}

describe(`unary / ${proxyType} / ${transport} / ${protocol}`, () => {
let server: RemoteTestServer;
Expand All @@ -60,6 +67,10 @@ const environment = detect();
? WebsocketTransport()
: transport === 'node-http'
? NodeHttpTransport()
: transport === 'xhr'
? XHRTransport()
: transport === 'fetch-blob'
? FetchTransport({blobMode: true})
: assertNever(transport),
),
);
Expand Down Expand Up @@ -449,6 +460,10 @@ const environment = detect();
? WebsocketTransport()
: transport === 'node-http'
? NodeHttpTransport()
: transport === 'fetch-blob'
? FetchTransport({blobMode: true})
: transport === 'xhr'
? XHRTransport()
: assertNever(transport),
),
);
Expand Down
54 changes: 46 additions & 8 deletions packages/nice-grpc-web/src/client/transports/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ import {Transport} from '../Transport';
export interface FetchTransportConfig {
credentials?: RequestCredentials;
cache?: RequestCache;
blobMode?: boolean;
}

/**
* Required because React Native (expo) does not support FileReader.arrayBuffer().
*/
export function blobReaderAsync(myBlob: Blob): Promise<Uint8Array> {
return new Promise((resolve, reject) => {
const fileReader = new FileReader();
fileReader.onloadend = function () {
const dataString = (fileReader.result as string);
const base64String = dataString.substring(dataString.indexOf(",")+1);
const uint8Data = Base64.toUint8Array(base64String);
resolve(uint8Data);
}
fileReader.readAsDataURL(myBlob);
});
}

/**
Expand All @@ -29,6 +46,7 @@ export function FetchTransport(config?: FetchTransportConfig): Transport {
let iterator: AsyncIterator<Uint8Array> | undefined;

requestBody = new ReadableStream({
// @ts-ignore
type: 'bytes',
start() {
iterator = body[Symbol.asyncIterator]();
Expand Down Expand Up @@ -76,17 +94,32 @@ export function FetchTransport(config?: FetchTransportConfig): Transport {

throwIfAborted(signal);

const reader = response.body!.getReader();
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined = undefined;
let value: Uint8Array | undefined = undefined;
let done: boolean = false;
let abortListener: any;
if (config?.blobMode ?? false) {
const dataBlob = await response.blob();
value = await blobReaderAsync(dataBlob);
done = true;
} else {
reader = response.body!.getReader();

const abortListener = () => {
reader.cancel().catch(() => {});
};
abortListener = () => {
reader!.cancel().catch(() => {
});
};

signal.addEventListener('abort', abortListener);
signal.addEventListener('abort', abortListener);
}

try {
while (true) {
const {done, value} = await reader.read();
if (config?.blobMode ?? false) {
const readResult = await reader!.read();
value = readResult.value;
done = readResult.done;
}

if (value != null) {
yield {
Expand All @@ -100,7 +133,9 @@ export function FetchTransport(config?: FetchTransportConfig): Transport {
}
}
} finally {
signal.removeEventListener('abort', abortListener);
if (config?.blobMode ?? false) {
signal.removeEventListener('abort', abortListener);
}

throwIfAborted(signal);
}
Expand All @@ -114,7 +149,7 @@ function metadataToHeaders(metadata: Metadata): Headers {
for (const value of values) {
headers.append(
key,
typeof value === 'string' ? value : Base64.fromUint8Array(value),
typeof value === 'string' ? value : Base64.fromUint8Array(value, true),
);
}
}
Expand All @@ -125,6 +160,7 @@ function metadataToHeaders(metadata: Metadata): Headers {
function headersToMetadata(headers: Headers): Metadata {
const metadata = new Metadata();

// @ts-ignore
for (const [key, value] of headers) {
if (key.endsWith('-bin')) {
for (const item of value.split(/,\s?/)) {
Expand All @@ -140,6 +176,8 @@ function headersToMetadata(headers: Headers): Metadata {

function getStatusFromHttpCode(statusCode: number): Status {
switch (statusCode) {
case 200:
return Status.OK;
case 400:
return Status.INTERNAL;
case 401:
Expand Down
193 changes: 193 additions & 0 deletions packages/nice-grpc-web/src/client/transports/xhr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import {throwIfAborted} from 'abort-controller-x';
import {Base64} from 'js-base64';
import {ClientError, Metadata, Status} from 'nice-grpc-common';
import {Transport} from "../Transport";

class GrpcCallData {
responseHeaders: Metadata = new Metadata();
responseChunks: Uint8Array[] = [];
grpcStatus: Status = Status.UNKNOWN;
statusMessage: string = "";
}

export interface XHRTransportConfig {
credentials?: boolean;
}

async function xhrPost(url: string, metadata: Metadata, requestBody: BodyInit, config?: XHRTransportConfig): Promise<GrpcCallData> {
const callData: GrpcCallData = new GrpcCallData();
return new Promise(function(resolve, reject) {
// TODO - Support fallback for node?
const xhr = new XMLHttpRequest();
xhr.open("POST", url, true);
xhr.withCredentials = config?.credentials ?? true;
xhr.responseType = "arraybuffer";

for (const [key, values] of metadata) {
for (const value of values) {
xhr.setRequestHeader(
key,
typeof value === 'string' ? value : Base64.fromUint8Array(value),
);
}
}

xhr.onreadystatechange = function() {
if (xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) {
callData.responseHeaders = headersToMetadata(xhr.getAllResponseHeaders());
} else if (xhr.readyState === XMLHttpRequest.DONE) {
resolve(callData);
}
}
xhr.onerror = function() {
callData.statusMessage = getErrorDetailsFromHttpResponse(xhr.status, xhr.statusText);
}
xhr.onloadend = function() {
callData.responseChunks.push(new Uint8Array(xhr.response as ArrayBuffer));
callData.grpcStatus = getStatusFromHttpCode(xhr.status);
}

// Tested, this works.
// @ts-ignore
xhr.send(requestBody);
});
}

function concatenateChunks(chunks: Uint8Array[]): Uint8Array {
// Using the performant method vs spread syntax: https://stackoverflow.com/a/60590943
let totalSize = 0;
for (const chunk of chunks) {
totalSize += chunk.length;
}
const newData = new Uint8Array(totalSize)
let setIndex = 0;
for (const chunk of chunks) {
newData.set(chunk, setIndex);
setIndex += chunk.length;
}
return newData;
}

/**
* Transport for browsers based on `XMLHttpRequest` API.
*/
export function XHRTransport(config?: XHRTransportConfig): Transport {
return async function* fetchTransport({url, body, metadata, signal, method}) {
let requestBody: BodyInit;

if (!method.requestStream) {
let bodyBuffer: Uint8Array | undefined;

for await (const chunk of body) {
bodyBuffer = chunk;

break;
}

requestBody = bodyBuffer!;
} else {
let iterator: AsyncIterator<Uint8Array> | undefined;

requestBody = new ReadableStream({
// @ts-ignore
type: 'bytes',
start() {
iterator = body[Symbol.asyncIterator]();
},

async pull(controller) {
const {done, value} = await iterator!.next();

if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
async cancel() {
await iterator!.return?.();
},
});
}

const xhrData = await xhrPost(url, metadata, requestBody, config);

yield {
type: 'header',
header: xhrData.responseHeaders,
};

if (xhrData.grpcStatus !== Status.OK) {
const decoder = new TextDecoder();
const message = decoder.decode(concatenateChunks(xhrData.responseChunks));
console.warn(message, xhrData.statusMessage);
throw new ClientError(
method.path,
xhrData.grpcStatus,
`status=${xhrData.statusMessage}, message=${message}`
);
}

throwIfAborted(signal);

try {
for (const xhrChunk of xhrData.responseChunks) {
if (xhrChunk != null) {
yield {
type: 'data',
data: xhrChunk,
};
}
}
} finally {
throwIfAborted(signal);
}
};
}

function headersToMetadata(headers: string): Metadata {
const metadata = new Metadata();
const arr = headers.trim().split(/[\r\n]+/);

arr.forEach((line) => {
const parts = line.split(': ');
const header = parts.shift() ?? "";
const value = parts.join(': ');
metadata.set(header, value);
});
return metadata;
}

function getStatusFromHttpCode(statusCode: number): Status {
switch (statusCode) {
case 200:
return Status.OK;
case 400:
return Status.INTERNAL;
case 401:
return Status.UNAUTHENTICATED;
case 403:
return Status.PERMISSION_DENIED;
case 404:
return Status.UNIMPLEMENTED;
case 429:
case 502:
case 503:
case 504:
return Status.UNAVAILABLE;
default:
return Status.UNKNOWN;
}
}

function getErrorDetailsFromHttpResponse(
statusCode: number,
responseText: string,
): string {
return (
`Received HTTP ${statusCode} response: ` +
(responseText.length > 1000
? responseText.slice(0, 1000) + '... (truncated)'
: responseText)
);
}