Skip to content

Commit f3fb2ec

Browse files
authored
DGS-22734 Add Accept-Version header (#395)
* DGS-22734 Add Accept-Version header * Add aws fix * Fix Avro bytes serialization
1 parent 9ff739c commit f3fb2ec

File tree

5 files changed

+60
-18
lines changed

5 files changed

+60
-18
lines changed

schemaregistry/rest-service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export class RestService {
7575
this.setHeaders({ 'X-Forward': 'true' });
7676
}
7777
this.setHeaders({ 'Content-Type': 'application/vnd.schemaregistry.v1+json' });
78+
this.setHeaders({ 'Accept-Version': '8.0' });
7879

7980
this.handleBasicAuth(basicAuthCredentials);
8081
this.handleBearerAuth(maxRetries ?? 2, retriesWaitMs ?? 1000, retriesMaxWaitMs ?? 20000, bearerAuthCredentials);

schemaregistry/rules/encryption/awskms/aws-driver.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ export class AwsKmsDriver implements KmsDriver {
4141
if (roleExternalId == null) {
4242
roleExternalId = process.env['AWS_ROLE_EXTERNAL_ID']
4343
}
44+
let roleWebIdentityTokenFile = process.env['AWS_WEB_IDENTITY_TOKEN_FILE']
4445
let creds: AwsCredentialIdentity | AwsCredentialIdentityProvider | undefined
4546
if (key != null && secret != null) {
4647
creds = {accessKeyId: key, secretAccessKey: secret}
4748
} else if (profile != null) {
4849
creds = fromIni({profile})
4950
}
50-
if (roleArn != null) {
51+
// If roleWebIdentityTokenFile is set, use the DefaultCredentialsProvider
52+
if (roleArn != null && roleWebIdentityTokenFile == null) {
5153
let keyId = uriPrefix.substring(AwsKmsDriver.PREFIX.length)
5254
const tokens = keyId.split(':')
5355
if (tokens.length < 4) {

schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class DekRegistryClient implements DekClient {
148148
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted: false });
149149

150150
return await this.dekMutex.runExclusive(async () => {
151-
const dek = this.dekCache.get(cacheKey);
151+
let dek = this.dekCache.get(cacheKey);
152152
if (dek) {
153153
return dek;
154154
}
@@ -159,21 +159,40 @@ class DekRegistryClient implements DekClient {
159159
algorithm,
160160
...encryptedKeyMaterial && { encryptedKeyMaterial },
161161
};
162-
kekName = encodeURIComponent(kekName);
163162

164-
const response = await this.restService.handleRequest<Dek>(
165-
`/dek-registry/v1/keks/${kekName}/deks`,
166-
'POST',
167-
request);
168-
this.dekCache.set(cacheKey, response.data);
163+
dek = await this.createDek(kekName, request);
164+
this.dekCache.set(cacheKey, dek);
169165

170166
this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: false }));
171167
this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: true }));
172168

173-
return response.data;
169+
return dek;
174170
});
175171
}
176172

173+
private async createDek(kekName: string, request: Dek): Promise<Dek> {
174+
try {
175+
// Try newer API with subject in the path
176+
const encodedKekName = encodeURIComponent(kekName);
177+
const encodedSubject = encodeURIComponent(request.subject || '');
178+
const path = `/dek-registry/v1/keks/${encodedKekName}/deks/${encodedSubject}`;
179+
180+
const response = await this.restService.handleRequest<Dek>(path, 'POST', request);
181+
return response.data;
182+
} catch (error: any) {
183+
if (error.response && error.response.status === 405) {
184+
// Try fallback to older API that does not have subject in the path
185+
const encodedKekName = encodeURIComponent(kekName);
186+
const path = `/dek-registry/v1/keks/${encodedKekName}/deks`;
187+
188+
const response = await this.restService.handleRequest<Dek>(path, 'POST', request);
189+
return response.data;
190+
} else {
191+
throw error;
192+
}
193+
}
194+
}
195+
177196
async getDek(kekName: string, subject: string,
178197
algorithm: string, version: number = 1, deleted: boolean = false): Promise<Dek> {
179198
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted });

schemaregistry/serde/avro.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export class AvroSerializer extends Serializer implements AvroSerde {
9797
throw new SerializationError(
9898
`Invalid message at ${path.join('.')}, expected ${type}, got ${stringify(any)}`)
9999
}})
100-
let msgBytes = avroType.toBuffer(msg)
100+
let msgBytes = avroType.typeName === 'bytes' ? msg : avroType.toBuffer(msg)
101101
msgBytes = await this.executeRulesWithPhase(
102102
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
103103
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
@@ -204,18 +204,22 @@ export class AvroDeserializer extends Deserializer implements AvroSerde {
204204
let msg: any
205205
const msgBytes = payload
206206
if (migrations.length > 0) {
207-
msg = writer.fromBuffer(msgBytes)
207+
msg = writer.typeName === 'bytes' ? msgBytes : writer.fromBuffer(msgBytes)
208208
msg = await this.executeMigrations(migrations, subject, topic, msg)
209209
} else {
210-
if (readerMeta != null) {
211-
const [reader, ] = await this.toType(readerMeta)
212-
if (reader.equals(writer)) {
213-
msg = reader.fromBuffer(msgBytes)
210+
if (writer.typeName === 'bytes') {
211+
msg = msgBytes
212+
} else {
213+
if (readerMeta != null) {
214+
const [reader, ] = await this.toType(readerMeta)
215+
if (reader.equals(writer)) {
216+
msg = reader.fromBuffer(msgBytes)
217+
} else {
218+
msg = reader.fromBuffer(msgBytes, reader.createResolver(writer))
219+
}
214220
} else {
215-
msg = reader.fromBuffer(msgBytes, reader.createResolver(writer))
221+
msg = writer.fromBuffer(msgBytes)
216222
}
217-
} else {
218-
msg = writer.fromBuffer(msgBytes)
219223
}
220224
}
221225
let target: SchemaInfo

schemaregistry/test/serde/avro.spec.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,22 @@ describe('AvroSerializer', () => {
425425
expect(obj2.boolField).toEqual(obj.boolField);
426426
expect(obj2.bytesField).toEqual(obj.bytesField);
427427
})
428+
it('serialize bytes', async () => {
429+
let conf: ClientConfig = {
430+
baseURLs: [baseURL],
431+
cacheCapacity: 1000
432+
}
433+
let client = SchemaRegistryClient.newClient(conf)
434+
let ser = new AvroSerializer(client, SerdeType.VALUE, {autoRegisterSchemas: true})
435+
436+
let obj = Buffer.from([0x02, 0x03, 0x04])
437+
let bytes = await ser.serialize(topic, obj)
438+
expect(bytes).toEqual(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04]));
439+
440+
let deser = new AvroDeserializer(client, SerdeType.VALUE, {})
441+
let obj2 = await deser.deserialize(topic, bytes)
442+
expect(obj2).toEqual(obj);
443+
})
428444
it('serialize nested', async () => {
429445
let conf: ClientConfig = {
430446
baseURLs: [baseURL],

0 commit comments

Comments
 (0)