Skip to content

Extend serialized record encoding to allow for multiple encryption keys #3522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ public static Cipher borrowCipher(@Nonnull String cipherName) throws GeneralSecu
public static void returnCipher(@Nonnull Cipher cipher) {
MAPPED_POOL.offer(cipher.getAlgorithm(), cipher);
}

public static void invalidateAll() {
MAPPED_POOL.invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ public int getPoolSize(K key) {
return queue == null ? 0 : queue.size();
}

/**
* Invalidate all entries in the pool.
*/
public void invalidateAll() {
pool.invalidateAll();
}

/**
* Function with Exceptions to provide the pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.RecordType;
import com.apple.foundationdb.tuple.Tuple;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
Expand All @@ -50,7 +47,8 @@
* added in the future.
*
* <p>
* This serializer will begin each serialized string with a one-byte prefix
* This serializer will begin each serialized string with a prefix
* (see {@link TransformedRecordSerializerPrefix} for details)
* containing information about which transformations were performed. This
* way, when deserializing, it can detect which transformations were applied
* so it knows which ones it needs to use to restore the original record.
Expand Down Expand Up @@ -78,15 +76,6 @@
*/
@API(API.Status.UNSTABLE)
public class TransformedRecordSerializer<M extends Message> implements RecordSerializer<M> {
@VisibleForTesting
protected static final int ENCODING_ENCRYPTED = 1;
@VisibleForTesting
protected static final int ENCODING_CLEAR = 2;
@VisibleForTesting
protected static final int ENCODING_COMPRESSED = 4;
// TODO: Can remove this after transition to write everything with _CLEAR.
protected static final int ENCODING_PROTO_MESSAGE_FIELD = 0x02;
protected static final int ENCODING_PROTO_TYPE_MASK = 0x07;
protected static final int DEFAULT_COMPRESSION_LEVEL = Deflater.BEST_COMPRESSION;
protected static final int MIN_COMPRESSION_VERSION = 1;
protected static final int MAX_COMPRESSION_VERSION = 1;
Expand All @@ -110,94 +99,48 @@ protected TransformedRecordSerializer(@Nonnull RecordSerializer<M> inner,
this.writeValidationRatio = writeValidationRatio;
}

@SpotBugsSuppressWarnings("EI_EXPOSE_REP")
protected static class TransformState {
public boolean compressed;
public boolean encrypted;

@Nonnull public byte[] data;
public int offset;
public int length;

public TransformState(@Nonnull byte[] data) {
this(data, 0, data.length);
}

public TransformState(@Nonnull byte[] data, int offset, int length) {
this.compressed = false;
this.encrypted = false;
this.data = data;
this.offset = offset;
this.length = length;
}

@Nonnull
public byte[] getDataArray() {
if (offset == 0 && length == data.length) {
return data;
} else {
byte[] newData = Arrays.copyOfRange(data, offset, offset + length);
offset = 0;
length = newData.length;
data = newData;
return newData;
}
}


public void setDataArray(@Nonnull byte[] data) {
setDataArray(data, 0, data.length);
}

public void setDataArray(@Nonnull byte[] data, int offset, int length) {
this.data = data;
this.offset = offset;
this.length = length;
}
}

protected void compress(@Nonnull TransformState state, @Nullable StoreTimer timer) {
protected void compress(@Nonnull TransformedRecordSerializerState state, @Nullable StoreTimer timer) {
long startTime = System.nanoTime();

increment(timer, Counts.RECORD_BYTES_BEFORE_COMPRESSION, state.length);
increment(timer, Counts.RECORD_BYTES_BEFORE_COMPRESSION, state.getLength());

// compressed data stores 5 bytes of header info. Hence, it is only fruitful to compress if the uncompressed data
// has more than 5 bytes otherwise the compressed data will always be more than the original.
if (state.length > 5) {
if (state.getLength() > 5) {
// Compressed bytes have 5 bytes of prefixed information about the compression state.
byte[] compressed = new byte[state.length];
byte[] compressed = new byte[state.getLength()];

// Actually compress. If we end up filling the buffer, then just
// return the uncompressed value because it's pointless to compress
// if we actually increase the amount of data.
Deflater compressor = new Deflater(compressionLevel);
int compressedLength;
try {
compressor.setInput(state.data, state.offset, state.length);
compressor.setInput(state.getData(), state.getOffset(), state.getLength());
compressor.finish(); // necessary to include checksum
compressedLength = compressor.deflate(compressed, 5, compressed.length - 5, Deflater.FULL_FLUSH);
} finally {
compressor.end();
}
if (compressedLength == compressed.length - 5) {
increment(timer, Counts.RECORD_BYTES_AFTER_COMPRESSION, state.length);
state.compressed = false;
increment(timer, Counts.RECORD_BYTES_AFTER_COMPRESSION, state.getLength());
state.setCompressed(false);
} else {
// Write compression version number and uncompressed size as these
// meta-data are needed when decompressing.
compressed[0] = (byte)MAX_COMPRESSION_VERSION;
ByteBuffer.wrap(compressed, 1, 4).order(ByteOrder.BIG_ENDIAN).putInt(state.length);
state.compressed = true;
ByteBuffer.wrap(compressed, 1, 4).order(ByteOrder.BIG_ENDIAN).putInt(state.getLength());
state.setCompressed(true);
increment(timer, Counts.RECORD_BYTES_AFTER_COMPRESSION, compressedLength + 5);
state.setDataArray(compressed, 0, compressedLength + 5);
}
} else {
increment(timer, Counts.RECORD_BYTES_AFTER_COMPRESSION, state.length);
increment(timer, Counts.RECORD_BYTES_AFTER_COMPRESSION, state.getLength());
}

if (timer != null) {
timer.recordSinceNanoTime(Events.COMPRESS_SERIALIZED_RECORD, startTime);
if (!state.compressed) {
if (!state.isCompressed()) {
timer.increment(Counts.ESCHEW_RECORD_COMPRESSION);
}
}
Expand All @@ -209,7 +152,7 @@ private void increment(@Nullable StoreTimer timer, StoreTimer.Count counter, int
}
}

protected void encrypt(@Nonnull TransformState state, @Nullable StoreTimer timer) throws GeneralSecurityException {
protected void encrypt(@Nonnull TransformedRecordSerializerState state, @Nullable StoreTimer timer) throws GeneralSecurityException {
throw new RecordSerializationException("this serializer cannot encrypt");
}

Expand All @@ -225,7 +168,7 @@ public byte[] serialize(@Nonnull RecordMetaData metaData,
@Nullable StoreTimer timer) {
byte[] innerSerialized = inner.serialize(metaData, recordType, rec, timer);

TransformState state = new TransformState(innerSerialized);
TransformedRecordSerializerState state = new TransformedRecordSerializerState(innerSerialized);

if (compressWhenSerializing) {
compress(state, timer);
Expand All @@ -241,50 +184,34 @@ public byte[] serialize(@Nonnull RecordMetaData metaData,
}
}

int code;
if (state.compressed || state.encrypted) {
code = 0;
if (state.compressed) {
code = code | ENCODING_COMPRESSED;
}
if (state.encrypted) {
code = code | ENCODING_ENCRYPTED;
}
} else {
code = ENCODING_CLEAR;
}

int size = state.length + 1;
byte[] serialized = new byte[size];
serialized[0] = (byte) code;
System.arraycopy(state.data, state.offset, serialized, 1, state.length);
TransformedRecordSerializerPrefix.encodePrefix(state);

if (shouldValidateSerialization()) {
validateSerialization(metaData, recordType, rec, serialized, timer);
validateSerialization(metaData, recordType, rec, state.getDataArray(), timer);
}

return serialized;
return state.getDataArray();
}

protected void decompress(@Nonnull TransformState state, @Nullable StoreTimer timer) throws DataFormatException {
protected void decompress(@Nonnull TransformedRecordSerializerState state, @Nullable StoreTimer timer) throws DataFormatException {
final long startTime = System.nanoTime();

// At the moment, there is only one compression version, so
// we after we've verified it is in the right range, we
// can just move on. If we ever introduce a new format version,
// we will need to make this code more complicated.
int compressionVersion = state.data[state.offset];
int compressionVersion = state.getData()[state.getOffset()];
if (compressionVersion < MIN_COMPRESSION_VERSION || compressionVersion > MAX_COMPRESSION_VERSION) {
throw new RecordSerializationException("unknown compression version")
.addLogInfo("compressionVersion", compressionVersion);
}

int decompressedLength = ByteBuffer.wrap(state.data, state.offset + 1, 4).order(ByteOrder.BIG_ENDIAN).getInt();
int decompressedLength = ByteBuffer.wrap(state.getData(), state.getOffset() + 1, 4).order(ByteOrder.BIG_ENDIAN).getInt();
byte[] decompressed = new byte[decompressedLength];

Inflater decompressor = new Inflater();
try {
decompressor.setInput(state.data, state.offset + 5, state.length - 5);
decompressor.setInput(state.getData(), state.getOffset() + 5, state.getLength() - 5);
int actualDecompressedSize = decompressor.inflate(decompressed);
if (actualDecompressedSize < decompressedLength) {
throw new RecordSerializationException("decompressed record too small")
Expand All @@ -305,7 +232,7 @@ protected void decompress(@Nonnull TransformState state, @Nullable StoreTimer ti
}
}

protected void decrypt(@Nonnull TransformState state, @Nullable StoreTimer timer) throws GeneralSecurityException {
protected void decrypt(@Nonnull TransformedRecordSerializerState state, @Nullable StoreTimer timer) throws GeneralSecurityException {
throw new RecordSerializationException("this serializer cannot decrypt");
}

Expand All @@ -316,52 +243,35 @@ public M deserialize(@Nonnull RecordMetaData metaData,
@Nonnull Tuple primaryKey,
@Nonnull byte[] serialized,
@Nullable StoreTimer timer) {
int encoding = serialized[0];
if (encoding != ENCODING_CLEAR && (encoding & ENCODING_PROTO_TYPE_MASK) == ENCODING_PROTO_MESSAGE_FIELD) {
// TODO: Can remove this after transition to write everything with _CLEAR.
TransformedRecordSerializerState state = new TransformedRecordSerializerState(serialized);
if (!TransformedRecordSerializerPrefix.decodePrefix(state, primaryKey)) {
return inner.deserialize(metaData, primaryKey, serialized, timer);
} else {
TransformState state = new TransformState(serialized, 1, serialized.length - 1);
if (encoding != ENCODING_CLEAR) {
if ((encoding & ENCODING_COMPRESSED) == ENCODING_COMPRESSED) {
state.compressed = true;
}
if ((encoding & ENCODING_ENCRYPTED) == ENCODING_ENCRYPTED) {
state.encrypted = true;
}
if ((encoding & ~(ENCODING_COMPRESSED | ENCODING_ENCRYPTED)) != 0) {
throw new RecordSerializationException("unrecognized transformation encoding")
.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey)
.addLogInfo("encoding", encoding);
}
}
if (state.encrypted) {
try {
decrypt(state, timer);
} catch (RecordCoreException ex) {
throw ex.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
} catch (GeneralSecurityException ex) {
throw new RecordSerializationException("decryption error", ex)
.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
}
}
if (state.isEncrypted()) {
try {
decrypt(state, timer);
} catch (RecordCoreException ex) {
throw ex.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
} catch (GeneralSecurityException ex) {
throw new RecordSerializationException("decryption error", ex)
.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
}
if (state.compressed) {
try {
decompress(state, timer);
} catch (RecordCoreException ex) {
throw ex.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
} catch (DataFormatException ex) {
throw new RecordSerializationException("decompression error", ex)
.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
}
}
if (state.isCompressed()) {
try {
decompress(state, timer);
} catch (RecordCoreException ex) {
throw ex.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
} catch (DataFormatException ex) {
throw new RecordSerializationException("decompression error", ex)
.addLogInfo(LogMessageKeys.META_DATA_VERSION, metaData.getVersion())
.addLogInfo(LogMessageKeys.PRIMARY_KEY, primaryKey);
}
return inner.deserialize(metaData, primaryKey, state.getDataArray(), timer);
}
return inner.deserialize(metaData, primaryKey, state.getDataArray(), timer);
}

@Nonnull
Expand Down
Loading