diff --git a/schemaregistry/rest-service.ts b/schemaregistry/rest-service.ts index e183de90..a01e8692 100644 --- a/schemaregistry/rest-service.ts +++ b/schemaregistry/rest-service.ts @@ -75,6 +75,7 @@ export class RestService { this.setHeaders({ 'X-Forward': 'true' }); } this.setHeaders({ 'Content-Type': 'application/vnd.schemaregistry.v1+json' }); + this.setHeaders({ 'Accept-Version': '8.0' }); this.handleBasicAuth(basicAuthCredentials); this.handleBearerAuth(maxRetries ?? 2, retriesWaitMs ?? 1000, retriesMaxWaitMs ?? 20000, bearerAuthCredentials); diff --git a/schemaregistry/rules/encryption/awskms/aws-driver.ts b/schemaregistry/rules/encryption/awskms/aws-driver.ts index 564af065..8c1f9e1c 100644 --- a/schemaregistry/rules/encryption/awskms/aws-driver.ts +++ b/schemaregistry/rules/encryption/awskms/aws-driver.ts @@ -41,13 +41,15 @@ export class AwsKmsDriver implements KmsDriver { if (roleExternalId == null) { roleExternalId = process.env['AWS_ROLE_EXTERNAL_ID'] } + let roleWebIdentityTokenFile = process.env['AWS_WEB_IDENTITY_TOKEN_FILE'] let creds: AwsCredentialIdentity | AwsCredentialIdentityProvider | undefined if (key != null && secret != null) { creds = {accessKeyId: key, secretAccessKey: secret} } else if (profile != null) { creds = fromIni({profile}) } - if (roleArn != null) { + // If roleWebIdentityTokenFile is set, use the DefaultCredentialsProvider + if (roleArn != null && roleWebIdentityTokenFile == null) { let keyId = uriPrefix.substring(AwsKmsDriver.PREFIX.length) const tokens = keyId.split(':') if (tokens.length < 4) { diff --git a/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts b/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts index 8b4420b2..2914f6fd 100644 --- a/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts +++ b/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts @@ -148,7 +148,7 @@ class DekRegistryClient implements DekClient { const cacheKey = stringify({ kekName, subject, version, algorithm, deleted: false }); return await this.dekMutex.runExclusive(async () => { - const dek = this.dekCache.get(cacheKey); + let dek = this.dekCache.get(cacheKey); if (dek) { return dek; } @@ -159,21 +159,40 @@ class DekRegistryClient implements DekClient { algorithm, ...encryptedKeyMaterial && { encryptedKeyMaterial }, }; - kekName = encodeURIComponent(kekName); - const response = await this.restService.handleRequest( - `/dek-registry/v1/keks/${kekName}/deks`, - 'POST', - request); - this.dekCache.set(cacheKey, response.data); + dek = await this.createDek(kekName, request); + this.dekCache.set(cacheKey, dek); this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: false })); this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: true })); - return response.data; + return dek; }); } + private async createDek(kekName: string, request: Dek): Promise { + try { + // Try newer API with subject in the path + const encodedKekName = encodeURIComponent(kekName); + const encodedSubject = encodeURIComponent(request.subject || ''); + const path = `/dek-registry/v1/keks/${encodedKekName}/deks/${encodedSubject}`; + + const response = await this.restService.handleRequest(path, 'POST', request); + return response.data; + } catch (error: any) { + if (error.response && error.response.status === 405) { + // Try fallback to older API that does not have subject in the path + const encodedKekName = encodeURIComponent(kekName); + const path = `/dek-registry/v1/keks/${encodedKekName}/deks`; + + const response = await this.restService.handleRequest(path, 'POST', request); + return response.data; + } else { + throw error; + } + } + } + async getDek(kekName: string, subject: string, algorithm: string, version: number = 1, deleted: boolean = false): Promise { const cacheKey = stringify({ kekName, subject, version, algorithm, deleted }); diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index 54bc262f..a08b9ab1 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -97,7 +97,7 @@ export class AvroSerializer extends Serializer implements AvroSerde { throw new SerializationError( `Invalid message at ${path.join('.')}, expected ${type}, got ${stringify(any)}`) }}) - let msgBytes = avroType.toBuffer(msg) + let msgBytes = avroType.typeName === 'bytes' ? msg : avroType.toBuffer(msg) msgBytes = await this.executeRulesWithPhase( subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) return this.serializeSchemaId(topic, msgBytes, schemaId, headers) @@ -204,18 +204,22 @@ export class AvroDeserializer extends Deserializer implements AvroSerde { let msg: any const msgBytes = payload if (migrations.length > 0) { - msg = writer.fromBuffer(msgBytes) + msg = writer.typeName === 'bytes' ? msgBytes : writer.fromBuffer(msgBytes) msg = await this.executeMigrations(migrations, subject, topic, msg) } else { - if (readerMeta != null) { - const [reader, ] = await this.toType(readerMeta) - if (reader.equals(writer)) { - msg = reader.fromBuffer(msgBytes) + if (writer.typeName === 'bytes') { + msg = msgBytes + } else { + if (readerMeta != null) { + const [reader, ] = await this.toType(readerMeta) + if (reader.equals(writer)) { + msg = reader.fromBuffer(msgBytes) + } else { + msg = reader.fromBuffer(msgBytes, reader.createResolver(writer)) + } } else { - msg = reader.fromBuffer(msgBytes, reader.createResolver(writer)) + msg = writer.fromBuffer(msgBytes) } - } else { - msg = writer.fromBuffer(msgBytes) } } let target: SchemaInfo diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index 7d450af3..0cd105df 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -425,6 +425,22 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('serialize bytes', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new AvroSerializer(client, SerdeType.VALUE, {autoRegisterSchemas: true}) + + let obj = Buffer.from([0x02, 0x03, 0x04]) + let bytes = await ser.serialize(topic, obj) + expect(bytes).toEqual(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04])); + + let deser = new AvroDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj); + }) it('serialize nested', async () => { let conf: ClientConfig = { baseURLs: [baseURL],