Skip to content

Commit eb966c8

Browse files
authored
[Event Hubs] Support Development Emulator (Azure#29195)
### Packages impacted by this PR @azure/core-amqp & @azure/event-hubs ### Issues associated with this PR Azure#28548 ### Describe the problem that is addressed by this PR Supporting local development without hitting the service. ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? The service is rolling out their own emulator and this PR adds support for connecting to it. Design is documented in https://gist.github.com/conniey/680afa1cb66fb3ca56f202241911ba91 ### Are there test cases added in this PR? _(If not, why?)_ Yes ### Provide a list of related PRs _(if any)_ N/A ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [x] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [x] Added a changelog (if necessary)
1 parent 6b099a2 commit eb966c8

File tree

15 files changed

+217
-11
lines changed

15 files changed

+217
-11
lines changed

sdk/core/core-amqp/CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
## 4.3.0 (Unreleased)
44

5-
### Other Changes
6-
5+
### Breaking Changes
76
- Moved to ESM core with builds for ESM, CommonJS, React-Native and Browser.
87
- Moved unit tests from mocha to vitest.
98

9+
### Other Changes
10+
11+
- Adds support for connecting to the development emulator. The connection string for the development emulator should have the `";UseDevelopmentEmulator=true"` slug.
12+
1013
## 4.2.2 (2024-05-02)
1114

1215
### Bugs Fixed

sdk/core/core-amqp/review/core-amqp.api.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ export interface ConnectionConfig {
190190
port?: number;
191191
sharedAccessKey: string;
192192
sharedAccessKeyName: string;
193+
useDevelopmentEmulator?: boolean;
193194
webSocket?: WebSocketImpl;
194195
webSocketConstructorOptions?: any;
195196
webSocketEndpointPath?: string;
@@ -259,6 +260,7 @@ export const Constants: {
259260
readonly partitionId: "partitionId";
260261
readonly readOperation: "READ";
261262
readonly TLS: "tls";
263+
readonly TCP: "tcp";
262264
readonly establishConnection: "establishConnection";
263265
readonly defaultConsumerGroup: "$default";
264266
readonly eventHub: "eventhub";
@@ -425,6 +427,9 @@ export enum ErrorNameConditionMapper {
425427
UnauthorizedError = "amqp:unauthorized-access"
426428
}
427429

430+
// @public
431+
export function isLoopbackAddress(address: string): boolean;
432+
428433
// @public
429434
export function isMessagingError(error: Error | MessagingError): error is MessagingError;
430435

sdk/core/core-amqp/src/ConnectionContextBase.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,11 @@ export const ConnectionContextBase = {
177177
}
178178

179179
const connectionOptions: ConnectionOptions = {
180-
transport: Constants.TLS,
180+
transport: (parameters.config.useDevelopmentEmulator ? Constants.TCP : Constants.TLS) as any,
181181
host: parameters.config.host,
182182
hostname: parameters.config.amqpHostname ?? parameters.config.host,
183183
username: parameters.config.sharedAccessKeyName,
184-
port: parameters.config.port ?? 5671,
184+
port: parameters.config.port ?? (parameters.config.useDevelopmentEmulator ? 5672 : 5671),
185185
reconnect: false,
186186
properties: {
187187
product: parameters.connectionProperties.product,

sdk/core/core-amqp/src/connectionConfig/connectionConfig.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import { WebSocketImpl } from "rhea-promise";
55
import { isDefined } from "@azure/core-util";
6-
import { parseConnectionString } from "../util/utils.js";
6+
import { isLoopbackAddress, parseConnectionString } from "../util/utils.js";
77

88
/**
99
* Describes the options that can be provided while creating a connection config.
@@ -78,6 +78,20 @@ export interface ConnectionConfig {
7878
* Options to be passed to the WebSocket constructor
7979
*/
8080
webSocketConstructorOptions?: any;
81+
/**
82+
* This should be true only if the connection string contains the slug ";UseDevelopmentEmulator=true"
83+
* and the endpoint is a loopback address.
84+
*/
85+
useDevelopmentEmulator?: boolean;
86+
}
87+
88+
function getHost(endpoint: string): string {
89+
const matches = /.*:\/\/([^/]*)/.exec(endpoint);
90+
const match = matches?.[1];
91+
if (!match) {
92+
return isLoopbackAddress(endpoint) ? endpoint : "";
93+
}
94+
return match;
8195
}
8296

8397
/**
@@ -102,6 +116,7 @@ export const ConnectionConfig = {
102116
SharedAccessKeyName: string;
103117
SharedAccessKey: string;
104118
EntityPath?: string;
119+
UseDevelopmentEmulator?: string;
105120
}>(connectionString);
106121
if (!parsedCS.Endpoint) {
107122
throw new TypeError("Missing Endpoint in Connection String.");
@@ -112,9 +127,10 @@ export const ConnectionConfig = {
112127
const result: ConnectionConfig = {
113128
connectionString: connectionString,
114129
endpoint: parsedCS.Endpoint,
115-
host: parsedCS && parsedCS.Endpoint ? (parsedCS.Endpoint.match(".*://([^/]*)") || [])[1] : "",
130+
host: getHost(parsedCS.Endpoint),
116131
sharedAccessKeyName: parsedCS.SharedAccessKeyName,
117132
sharedAccessKey: parsedCS.SharedAccessKey,
133+
useDevelopmentEmulator: parsedCS.UseDevelopmentEmulator === "true",
118134
};
119135

120136
if (path || parsedCS.EntityPath) {
@@ -140,6 +156,12 @@ export const ConnectionConfig = {
140156
}
141157
config.endpoint = String(config.endpoint);
142158

159+
if (config.useDevelopmentEmulator && !isLoopbackAddress(config.endpoint)) {
160+
throw new TypeError(
161+
`When using the development environment, the endpoint should be a localhost. Given endpoint is "${config.endpoint}".`,
162+
);
163+
}
164+
143165
if (!config.host) {
144166
throw new TypeError("Missing 'host' in configuration");
145167
}

sdk/core/core-amqp/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export {
3030
export {
3131
delay,
3232
parseConnectionString,
33+
isLoopbackAddress,
3334
defaultCancellableLock,
3435
ParsedOutput,
3536
WebSocketOptions,

sdk/core/core-amqp/src/util/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export const Constants = {
2929
partitionId: "partitionId",
3030
readOperation: "READ",
3131
TLS: "tls",
32+
TCP: "tcp",
3233
establishConnection: "establishConnection",
3334
defaultConsumerGroup: "$default",
3435
eventHub: "eventhub",

sdk/core/core-amqp/src/util/utils.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ export async function delay<T>(
180180
}
181181
}
182182

183+
/**
184+
* Checks if an address is localhost.
185+
* @param address - The address to check.
186+
* @returns true if the address is localhost, false otherwise.
187+
*/
188+
export function isLoopbackAddress(address: string): boolean {
189+
return /^(.*:\/\/)?(127\.[\d.]+|[0:]+1|localhost)/.test(address.toLowerCase());
190+
}
191+
183192
/**
184193
* @internal
185194
*

sdk/core/core-amqp/test/config.spec.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,15 @@ describe("ConnectionConfig", function () {
8686
}, /Missing Endpoint/);
8787
});
8888

89+
it("Parses the connection string for the development emulator", async function () {
90+
const config = ConnectionConfig.create(
91+
"Endpoint=sb://localhost:6765;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true",
92+
);
93+
assert.equal(config.endpoint, "sb://localhost:6765/");
94+
assert.equal(config.host, "localhost:6765");
95+
assert.isTrue(config.useDevelopmentEmulator);
96+
});
97+
8998
describe("Throws error if required connection config properties are not present", function () {
9099
const connectionString = `
91100
Endpoint=sb://hostname.servicebus.windows.net/;
@@ -211,6 +220,94 @@ describe("ConnectionConfig", function () {
211220
assert.equal(config.entityPath, "entityPath", `EntityPath is not a string`);
212221
});
213222
});
223+
224+
describe("Development Emulator Validation", function () {
225+
it("Accepts localhost", async function () {
226+
const connectionString =
227+
"Endpoint=sb://localhost:6765;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true";
228+
const config: ConnectionConfig = {
229+
connectionString,
230+
endpoint: "localhost:6765/",
231+
host: "sb://localhost:6765/",
232+
sharedAccessKeyName: "sakName",
233+
sharedAccessKey: "abcd",
234+
useDevelopmentEmulator: true,
235+
};
236+
ConnectionConfig.validate(config);
237+
});
238+
239+
it("Accepts 127.0.0.1", async function () {
240+
const connectionString =
241+
"Endpoint=sb://127.0.0.1:6765;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true";
242+
const config: ConnectionConfig = {
243+
connectionString,
244+
endpoint: "127.0.0.1:6765/",
245+
host: "sb://127.0.0.1:6765",
246+
sharedAccessKeyName: "sakName",
247+
sharedAccessKey: "abcd",
248+
useDevelopmentEmulator: true,
249+
};
250+
ConnectionConfig.validate(config);
251+
});
252+
253+
it("Accepts 0:0:0:0:0:0:0:1", async function () {
254+
const connectionString =
255+
"Endpoint=sb://0:0:0:0:0:0:0:1;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true";
256+
const config: ConnectionConfig = {
257+
connectionString,
258+
endpoint: "0:0:0:0:0:0:0:1/",
259+
host: "sb://0:0:0:0:0:0:0:1",
260+
sharedAccessKeyName: "sakName",
261+
sharedAccessKey: "abcd",
262+
useDevelopmentEmulator: true,
263+
};
264+
ConnectionConfig.validate(config);
265+
});
266+
267+
it("Accepts ::1", async function () {
268+
const connectionString =
269+
"Endpoint=sb://::1;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true";
270+
const config: ConnectionConfig = {
271+
connectionString,
272+
endpoint: "::1/",
273+
host: "sb://::1",
274+
sharedAccessKeyName: "sakName",
275+
sharedAccessKey: "abcd",
276+
useDevelopmentEmulator: true,
277+
};
278+
ConnectionConfig.validate(config);
279+
});
280+
281+
it("Accepts localhost with missing scheme", async function () {
282+
const connectionString =
283+
"Endpoint=localhost:6765;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true";
284+
const config: ConnectionConfig = {
285+
connectionString,
286+
endpoint: "localhost:6765/",
287+
host: "localhost:6765/",
288+
sharedAccessKeyName: "sakName",
289+
sharedAccessKey: "abcd",
290+
useDevelopmentEmulator: true,
291+
};
292+
ConnectionConfig.validate(config);
293+
});
294+
295+
it("Rejects non-local host", async function () {
296+
const connectionString =
297+
"Endpoint=sb://hostname.servicebus.windows.net/;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true";
298+
const config: ConnectionConfig = {
299+
connectionString,
300+
endpoint: "sb://hostname.servicebus.windows.net/",
301+
host: "hostname.servicebus.windows.net",
302+
sharedAccessKeyName: "sakName",
303+
sharedAccessKey: "abcd",
304+
useDevelopmentEmulator: true,
305+
};
306+
assert.throw(() => {
307+
ConnectionConfig.validate(config);
308+
}, new RegExp("When using the development environment"));
309+
});
310+
});
214311
});
215312

216313
describe("SharedAccessSignature", () => {

sdk/core/core-amqp/test/context.spec.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT license.
33

44
import { describe, it, assert } from "vitest";
5-
import { CbsClient, ConnectionConfig, ConnectionContextBase } from "../src/index.js";
5+
import { CbsClient, ConnectionConfig, ConnectionContextBase, Constants } from "../src/index.js";
66
import { Connection } from "rhea-promise";
77
import type { ConnectionOptions as TlsConnectionOptions } from "node:tls";
88

@@ -27,6 +27,7 @@ describe("ConnectionContextBase", function () {
2727
assert.isDefined(context.negotiateClaimLock);
2828
assert.isFalse(context.wasConnectionCloseCalled);
2929
assert.instanceOf(context.connection, Connection);
30+
assert.equal(context.connection.options.transport, "tls");
3031
assert.equal(context.connection.options.properties!.product, "MSJSClient");
3132
assert.equal(context.connection.options.properties!["user-agent"], "/js-amqp-client");
3233
assert.equal(context.connection.options.properties!.version, "1.0.0");
@@ -283,6 +284,25 @@ describe("ConnectionContextBase", function () {
283284
}, /user-agent string cannot be more than 512 characters/);
284285
});
285286

287+
it("disables tls when connecting to the development emulator", async function () {
288+
const connectionString =
289+
"Endpoint=sb://localhost;SharedAccessKeyName=sakName;SharedAccessKey=sak;EntityPath=ep;UseDevelopmentEmulator=true";
290+
const path = "mypath";
291+
const config = ConnectionConfig.create(connectionString, path);
292+
const context = ConnectionContextBase.create({
293+
config: config,
294+
connectionProperties: {
295+
product: "MSJSClient",
296+
userAgent: "/js-amqp-client",
297+
version: "1.0.0",
298+
},
299+
});
300+
assert.isDefined(context.connection);
301+
assert.instanceOf(context.connection, Connection);
302+
assert.equal(context.connection.options.transport, Constants.TCP);
303+
assert.equal((context.connection.options as TlsConnectionOptions).port, 5672);
304+
});
305+
286306
describe("#refreshConnection", function () {
287307
it("should update fields on the context", function () {
288308
const connectionString =
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import { describe, it, assert } from "vitest";
5+
import { isLoopbackAddress } from "../src/util/utils.js";
6+
7+
describe("isLoopbackAddress", () => {
8+
it("returns true for localhost", () => {
9+
assert.isTrue(isLoopbackAddress("sb://localhost"));
10+
});
11+
12+
it("returns true for 127-prefix addresses", () => {
13+
assert.isTrue(isLoopbackAddress("sb://127.0.0.1"));
14+
assert.isTrue(isLoopbackAddress("sb://127.0.0.2"));
15+
});
16+
17+
it("returns true for 0:0:0:0:0:1", () => {
18+
assert.isTrue(isLoopbackAddress("sb://0:0:0:0:0:1"));
19+
});
20+
21+
it("returns true for ::1", () => {
22+
assert.isTrue(isLoopbackAddress("sb://::1"));
23+
});
24+
25+
it("returns true for localhost with missing scheme", () => {
26+
assert.isTrue(isLoopbackAddress("localhost"));
27+
});
28+
29+
it("returns false for other addresses", () => {
30+
assert.isFalse(isLoopbackAddress("sb://test.servicebus.windows.net"));
31+
});
32+
});

0 commit comments

Comments
 (0)