Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions schemaregistry/rest-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class RestService {
this.setHeaders({ 'X-Forward': 'true' });
}
this.setHeaders({ 'Content-Type': 'application/vnd.schemaregistry.v1+json' });
this.setHeaders({ 'Accept-Version': '8.0' });

this.handleBasicAuth(basicAuthCredentials);
this.handleBearerAuth(maxRetries ?? 2, retriesWaitMs ?? 1000, retriesMaxWaitMs ?? 20000, bearerAuthCredentials);
Expand Down
4 changes: 3 additions & 1 deletion schemaregistry/rules/encryption/awskms/aws-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ export class AwsKmsDriver implements KmsDriver {
if (roleExternalId == null) {
roleExternalId = process.env['AWS_ROLE_EXTERNAL_ID']
}
let roleWebIdentityTokenFile = process.env['AWS_WEB_IDENTITY_TOKEN_FILE']
let creds: AwsCredentialIdentity | AwsCredentialIdentityProvider | undefined
if (key != null && secret != null) {
creds = {accessKeyId: key, secretAccessKey: secret}
} else if (profile != null) {
creds = fromIni({profile})
}
if (roleArn != null) {
// If roleWebIdentityTokenFile is set, use the DefaultCredentialsProvider
if (roleArn != null && roleWebIdentityTokenFile == null) {
let keyId = uriPrefix.substring(AwsKmsDriver.PREFIX.length)
const tokens = keyId.split(':')
if (tokens.length < 4) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class DekRegistryClient implements DekClient {
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted: false });

return await this.dekMutex.runExclusive(async () => {
const dek = this.dekCache.get(cacheKey);
let dek = this.dekCache.get(cacheKey);
if (dek) {
return dek;
}
Expand All @@ -159,21 +159,40 @@ class DekRegistryClient implements DekClient {
algorithm,
...encryptedKeyMaterial && { encryptedKeyMaterial },
};
kekName = encodeURIComponent(kekName);

const response = await this.restService.handleRequest<Dek>(
`/dek-registry/v1/keks/${kekName}/deks`,
'POST',
request);
this.dekCache.set(cacheKey, response.data);
dek = await this.createDek(kekName, request);
this.dekCache.set(cacheKey, dek);

this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: false }));
this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: true }));

return response.data;
return dek;
});
}

private async createDek(kekName: string, request: Dek): Promise<Dek> {
try {
// Try newer API with subject in the path
const encodedKekName = encodeURIComponent(kekName);
const encodedSubject = encodeURIComponent(request.subject || '');
const path = `/dek-registry/v1/keks/${encodedKekName}/deks/${encodedSubject}`;

const response = await this.restService.handleRequest<Dek>(path, 'POST', request);
return response.data;
} catch (error: any) {
if (error.response && error.response.status === 405) {
// Try fallback to older API that does not have subject in the path
const encodedKekName = encodeURIComponent(kekName);
const path = `/dek-registry/v1/keks/${encodedKekName}/deks`;

const response = await this.restService.handleRequest<Dek>(path, 'POST', request);
return response.data;
} else {
throw error;
}
}
}

async getDek(kekName: string, subject: string,
algorithm: string, version: number = 1, deleted: boolean = false): Promise<Dek> {
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted });
Expand Down
22 changes: 13 additions & 9 deletions schemaregistry/serde/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class AvroSerializer extends Serializer implements AvroSerde {
throw new SerializationError(
`Invalid message at ${path.join('.')}, expected ${type}, got ${stringify(any)}`)
}})
let msgBytes = avroType.toBuffer(msg)
let msgBytes = avroType.typeName === 'bytes' ? msg : avroType.toBuffer(msg)
msgBytes = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
Expand Down Expand Up @@ -204,18 +204,22 @@ export class AvroDeserializer extends Deserializer implements AvroSerde {
let msg: any
const msgBytes = payload
if (migrations.length > 0) {
msg = writer.fromBuffer(msgBytes)
msg = writer.typeName === 'bytes' ? msgBytes : writer.fromBuffer(msgBytes)
msg = await this.executeMigrations(migrations, subject, topic, msg)
} else {
if (readerMeta != null) {
const [reader, ] = await this.toType(readerMeta)
if (reader.equals(writer)) {
msg = reader.fromBuffer(msgBytes)
if (writer.typeName === 'bytes') {
msg = msgBytes
} else {
if (readerMeta != null) {
const [reader, ] = await this.toType(readerMeta)
if (reader.equals(writer)) {
msg = reader.fromBuffer(msgBytes)
} else {
msg = reader.fromBuffer(msgBytes, reader.createResolver(writer))
}
} else {
msg = reader.fromBuffer(msgBytes, reader.createResolver(writer))
msg = writer.fromBuffer(msgBytes)
}
} else {
msg = writer.fromBuffer(msgBytes)
}
}
let target: SchemaInfo
Expand Down
16 changes: 16 additions & 0 deletions schemaregistry/test/serde/avro.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,22 @@ describe('AvroSerializer', () => {
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('serialize bytes', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let ser = new AvroSerializer(client, SerdeType.VALUE, {autoRegisterSchemas: true})

let obj = Buffer.from([0x02, 0x03, 0x04])
let bytes = await ser.serialize(topic, obj)
expect(bytes).toEqual(Buffer.from([0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04]));

let deser = new AvroDeserializer(client, SerdeType.VALUE, {})
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2).toEqual(obj);
})
it('serialize nested', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
Expand Down