Skip to content

Commit be82e4c

Browse files
authored
232 error not supported format code 83 when messsage is published using amqplibamqp 091 (#236)
* managed header * it works! * removed comments * npm check * added tests * better test * fix last red test
1 parent 07da189 commit be82e4c

File tree

10 files changed

+106
-92
lines changed

10 files changed

+106
-92
lines changed

package-lock.json

Lines changed: 4 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"@types/node": "^20.11.5",
3838
"@typescript-eslint/eslint-plugin": "^6.19.0",
3939
"@typescript-eslint/parser": "^6.19.0",
40-
"amqplib": "^0.10.3",
40+
"amqplib": "^0.10.5",
4141
"chai": "^4.3.7",
4242
"chai-as-promised": "^7.1.1",
4343
"chai-spies": "^1.1.0",

src/amqp10/applicationProperties.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ export class ApplicationProperties {
1010
return range(numEntries).reduce((acc: MessageApplicationProperties, _) => {
1111
const propertyKey = readUTF8String(dataReader)
1212
const nextByteType = dataReader.readUInt8()
13-
dataReader.rewind(1)
14-
const propertyValue = decodeFormatCode(dataReader, nextByteType, true)
13+
const propertyValue = decodeFormatCode(dataReader, nextByteType)
1514
if (!propertyValue) throw new Error(`invalid nextByteType %#02x: ${nextByteType}`)
1615
acc[propertyKey] = propertyValue as string | number
1716
return acc

src/amqp10/messageAnnotations.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ export class Annotations {
1010
return range(numEntries).reduce((acc: MessageAnnotations, _) => {
1111
const propertyKey = readUTF8String(dataReader)
1212
const nextByteType = dataReader.readUInt8()
13-
dataReader.rewind(1)
14-
const propertyValue = decodeFormatCode(dataReader, nextByteType, true)
13+
const propertyValue = decodeFormatCode(dataReader, nextByteType)
1514
if (propertyValue === undefined) throw new Error(`invalid nextByteType %#02x: ${nextByteType}`)
1615
acc[propertyKey] = propertyValue as string | number
1716
return acc

src/amqp10/messageHeader.ts

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,34 @@ import { DataReader } from "../responses/raw_response"
22
import { MessageHeader } from "../publisher"
33
import { range } from "../util"
44
import { decodeFormatCode, decodeBooleanType } from "../response_decoder"
5+
import { FormatCode } from "./decoder"
56

67
export class Header {
78
public static parse(dataResponse: DataReader, fields: number): MessageHeader {
89
return range(fields).reduce((acc: MessageHeader, index) => {
910
if (dataResponse.isAtEnd()) return acc
10-
switch (index) {
11-
case 0:
12-
const formatCode = dataResponse.readUInt8()
13-
dataResponse.rewind(1)
14-
const decodedBoolean = decodeFormatCode(dataResponse, formatCode)
15-
if (!decodedBoolean) throw new Error(`invalid formatCode %#02x: ${formatCode}`)
16-
acc.durable = decodedBoolean as boolean
17-
break
18-
case 1:
19-
dataResponse.readUInt8() // Read type
20-
acc.priority = dataResponse.readUInt8()
21-
break
22-
case 2:
23-
const type = dataResponse.readUInt8()
24-
acc.ttl = decodeFormatCode(dataResponse, type) as number
25-
break
26-
case 3:
27-
acc.firstAcquirer = decodeBooleanType(dataResponse, true)
28-
break
29-
case 4:
30-
acc.deliveryCount = dataResponse.readUInt32()
31-
break
32-
default:
33-
throw new Error(`PropertiesError`)
11+
12+
const type = dataResponse.readUInt8()
13+
if (type !== FormatCode.Null) {
14+
switch (index) {
15+
case 0:
16+
acc.durable = decodeBooleanType(dataResponse, type)
17+
break
18+
case 1:
19+
acc.priority = decodeFormatCode(dataResponse, type) as number
20+
break
21+
case 2:
22+
acc.ttl = decodeFormatCode(dataResponse, type) as number
23+
break
24+
case 3:
25+
acc.firstAcquirer = decodeBooleanType(dataResponse, type)
26+
break
27+
case 4:
28+
acc.deliveryCount = decodeFormatCode(dataResponse, type) as number
29+
break
30+
default:
31+
throw new Error(`HeaderError`)
32+
}
3433
}
3534
return acc
3635
}, {})

src/amqp10/properties.ts

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { MessageProperties } from "../publisher"
2-
import { readUTF8String } from "../response_decoder"
2+
import { decodeFormatCode } from "../response_decoder"
33
import { DataReader } from "../responses/raw_response"
44
import { range } from "../util"
55
import { FormatCode } from "./decoder"
@@ -12,52 +12,47 @@ export class Properties {
1212
if (formatCode === FormatCode.Null) {
1313
return acc
1414
}
15-
dataResponse.rewind(1)
1615
switch (index) {
1716
case 0:
18-
acc.messageId = readUTF8String(dataResponse)
17+
acc.messageId = decodeFormatCode(dataResponse, formatCode) as string
1918
break
2019
case 1:
2120
// Reading of binary type
22-
dataResponse.readUInt8()
2321
const userIdLength = dataResponse.readUInt8()
2422
acc.userId = dataResponse.readBufferOf(userIdLength)
2523
break
2624
case 2:
27-
acc.to = readUTF8String(dataResponse)
25+
acc.to = decodeFormatCode(dataResponse, formatCode) as string
2826
break
2927
case 3:
30-
acc.subject = readUTF8String(dataResponse)
28+
acc.subject = decodeFormatCode(dataResponse, formatCode) as string
3129
break
3230
case 4:
33-
acc.replyTo = readUTF8String(dataResponse)
31+
acc.replyTo = decodeFormatCode(dataResponse, formatCode) as string
3432
break
3533
case 5:
36-
acc.correlationId = readUTF8String(dataResponse)
34+
acc.correlationId = decodeFormatCode(dataResponse, formatCode) as string
3735
break
3836
case 6:
39-
acc.contentType = readUTF8String(dataResponse)
37+
acc.contentType = decodeFormatCode(dataResponse, formatCode) as string
4038
break
4139
case 7:
42-
acc.contentEncoding = readUTF8String(dataResponse)
40+
acc.contentEncoding = decodeFormatCode(dataResponse, formatCode) as string
4341
break
4442
case 8:
45-
dataResponse.readUInt8()
4643
acc.absoluteExpiryTime = new Date(Number(dataResponse.readInt64()))
4744
break
4845
case 9:
49-
dataResponse.readUInt8()
5046
acc.creationTime = new Date(Number(dataResponse.readInt64()))
5147
break
5248
case 10:
53-
acc.groupId = readUTF8String(dataResponse)
49+
acc.groupId = decodeFormatCode(dataResponse, formatCode) as string
5450
break
5551
case 11:
56-
dataResponse.readUInt8()
5752
acc.groupSequence = dataResponse.readUInt32()
5853
break
5954
case 12:
60-
acc.replyToGroupId = readUTF8String(dataResponse)
55+
acc.replyToGroupId = decodeFormatCode(dataResponse, formatCode) as string
6156
break
6257
default:
6358
throw new Error(`PropertiesError`)

src/response_decoder.ts

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ function decodeMessageProperties(dataResponse: DataReader) {
328328
dataResponse.rewind(3)
329329
const type = dataResponse.readInt8()
330330
if (type !== 0) {
331-
throw new Error(`invalid composite header: ${type}`)
331+
throw new Error(`invalid message properties: ${type}`)
332332
}
333333

334334
const nextType = dataResponse.readInt8()
@@ -347,8 +347,7 @@ function decodeMessageHeader(dataResponse: DataReader) {
347347
throw new Error(`invalid composite header: ${type}`)
348348
}
349349

350-
// next, the composite type is encoded as an AMQP uint8
351-
dataResponse.readUInt64()
350+
decodeAmqpValue(dataResponse)
352351

353352
const formatCode = dataResponse.readUInt8()
354353
const headerLength = decodeFormatCode(dataResponse, formatCode)
@@ -365,8 +364,7 @@ function decodeApplicationData(dataResponse: DataReader) {
365364

366365
function decodeAmqpValue(dataResponse: DataReader) {
367366
const amqpFormatCode = dataResponse.readUInt8()
368-
dataResponse.rewind(1)
369-
return decodeFormatCode(dataResponse, amqpFormatCode, true) as string
367+
return decodeFormatCode(dataResponse, amqpFormatCode) as string
370368
}
371369

372370
function readFormatCodeType(dataResponse: DataReader) {
@@ -379,13 +377,12 @@ function readFormatCodeType(dataResponse: DataReader) {
379377
export function readUTF8String(dataResponse: DataReader) {
380378
const formatCode = dataResponse.readUInt8()
381379
const decodedString = decodeFormatCode(dataResponse, formatCode)
382-
if (!decodedString) throw new Error(`invalid formatCode %#02x: ${formatCode}`)
380+
if (!decodedString) throw new Error(`invalid formatCode 0x${formatCode.toString(16)}`)
383381

384382
return decodedString as string
385383
}
386384

387-
export function decodeBooleanType(dataResponse: DataReader, defaultValue: boolean) {
388-
const boolType = dataResponse.readInt8()
385+
export function decodeBooleanType(dataResponse: DataReader, boolType: number) {
389386
switch (boolType) {
390387
case FormatCode.Bool:
391388
const boolValue = dataResponse.readInt8()
@@ -395,11 +392,11 @@ export function decodeBooleanType(dataResponse: DataReader, defaultValue: boolea
395392
case FormatCode.BoolFalse:
396393
return false
397394
default:
398-
return defaultValue
395+
throw new Error(`Expected boolean format code, got 0x${boolType.toString(16)}`)
399396
}
400397
}
401398

402-
export function decodeFormatCode(dataResponse: DataReader, formatCode: number, skipByte = false) {
399+
export function decodeFormatCode(dataResponse: DataReader, formatCode: number) {
403400
switch (formatCode) {
404401
case FormatCode.Map8:
405402
// Read first empty byte
@@ -412,7 +409,6 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
412409
case FormatCode.SmallUlong:
413410
return dataResponse.readInt8() // Read a SmallUlong
414411
case FormatCode.Ubyte:
415-
dataResponse.forward(1)
416412
return dataResponse.readUInt8()
417413
case FormatCode.ULong:
418414
return dataResponse.readUInt64() // Read an ULong
@@ -430,35 +426,30 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
430426
return dataResponse.readUInt32()
431427
case FormatCode.Str8:
432428
case FormatCode.Sym8:
433-
if (skipByte) dataResponse.forward(1)
434429
return dataResponse.readString8()
435430
case FormatCode.Str32:
436431
case FormatCode.Sym32:
437-
if (skipByte) dataResponse.forward(1)
438432
return dataResponse.readString32()
439433
case FormatCode.Uint0:
440434
return 0
441435
case FormatCode.SmallUint:
442-
dataResponse.forward(1) // Skipping formatCode
443436
return dataResponse.readUInt8()
444437
case FormatCode.Uint:
445-
dataResponse.forward(1) // Skipping formatCode
446438
return dataResponse.readUInt32()
447439
case FormatCode.SmallInt:
448-
dataResponse.forward(1) // Skipping formatCode
449440
return dataResponse.readInt8()
450441
case FormatCode.Int:
451-
dataResponse.forward(1) // Skipping formatCode
452442
return dataResponse.readInt32()
453443
case FormatCode.Bool:
454444
case FormatCode.BoolTrue:
455445
case FormatCode.BoolFalse:
456-
return decodeBooleanType(dataResponse, true)
446+
return decodeBooleanType(dataResponse, formatCode)
457447
case FormatCode.Null:
458-
dataResponse.forward(1) // Skipping formatCode
448+
return 0
449+
case FormatCode.ULong0:
459450
return 0
460451
default:
461-
throw new Error(`ReadCompositeHeader Invalid type ${formatCode}`)
452+
throw new Error(`FormatCode Invalid type ${formatCode}`)
462453
}
463454
}
464455

0 commit comments

Comments
 (0)