Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.msgpack.core.MessageUnpacker;
import org.msgpack.value.ArrayValue;
import org.msgpack.value.Value;
import org.msgpack.value.ValueType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +34,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public abstract class Deserializer<A> {
private final Logger logger = LoggerFactory.getLogger(getClass());
Expand All @@ -41,19 +43,25 @@ public abstract class Deserializer<A> {

protected abstract boolean elementRefRequiresAdjacentElements();

/** edgeId maps are passed dependent on `elementRefRequiresAdjacentElements`*/
/**
* edgeId maps are passed dependent on `elementRefRequiresAdjacentElements`
*/
protected abstract ElementRef createElementRef(long id,
String label,
Map<String, long[]> inEdgeIdsByLabel,
Map<String, long[]> outEdgeIdsByLabel);

protected abstract A createElement(long id,
String label,
Map<String, Object> properties,
Optional<Map<String, Object>> properties,
Map<String, long[]> inEdgeIdsByLabel,
Map<String, long[]> outEdgeIdsByLabel);

public A deserialize(byte[] bytes) throws IOException {
// TODO speedup by restructuring or memoizing the results
protected abstract Map<Integer, Class> propertyTypeByIndex(String label);
protected abstract Map<Integer, String> propertyNamesByIndex(String label);

public A deserialize(final byte[] bytes, final boolean readProperties) throws IOException {
long start = System.currentTimeMillis();
if (null == bytes)
return null;
Expand All @@ -63,7 +71,14 @@ public A deserialize(byte[] bytes) throws IOException {
final String label = unpacker.unpackString();
final Map<String, long[]> inEdgeIdsByLabel = unpackEdgeIdsByLabel(unpacker);
final Map<String, long[]> outEdgeIdsByLabel = unpackEdgeIdsByLabel(unpacker);
final Map<String, Object> properties = unpackProperties(unpacker);

final Optional<Map<String, Object>> properties;
if (readProperties) {
final Map<Integer, Object> valuesByPropertyIndex = unpackAllProperties(unpacker, propertyTypeByIndex(label));
properties = Optional.of(convertPropertyIndexToPropertyNames(valuesByPropertyIndex, propertyNamesByIndex(label)));
} else {
properties = Optional.empty();
}

A a = createElement(id, label, properties, inEdgeIdsByLabel, outEdgeIdsByLabel);

Expand All @@ -80,7 +95,7 @@ public A deserialize(byte[] bytes) throws IOException {
/**
* only deserialize the part we're keeping in memory, used during startup when initializing from disk
*/
public ElementRef deserializeRef(byte[] bytes) throws IOException {
public ElementRef deserializeRef(final byte[] bytes) throws IOException {
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) {
long id = unpacker.unpackLong();
String label = unpacker.unpackString();
Expand All @@ -95,49 +110,73 @@ public ElementRef deserializeRef(byte[] bytes) throws IOException {
}
}

private Map<String, Object> unpackProperties(MessageUnpacker unpacker) throws IOException {
int propertyCount = unpacker.unpackMapHeader();
Map<String, Object> res = new THashMap<>(propertyCount);
for (int i = 0; i < propertyCount; i++) {
final String key = unpacker.unpackString();
final Object unpackedProperty = unpackProperty(unpacker.unpackValue().asArrayValue());
res.put(key, unpackedProperty);
/**
* @return propertyValue by propertyIndex. properties are complete, but (therefor) values may be `null`
*/
private Map<Integer, Object> unpackAllProperties(final MessageUnpacker unpacker, final Map<Integer, Class> propertyTypeByIndex) throws IOException {
int propertyCount = unpacker.unpackArrayHeader();
Map<Integer, Object> res = new THashMap<>(propertyCount);
for (int idx = 0; idx < propertyCount; idx++) {
final Class propertyType = propertyTypeByIndex.get(idx);
res.put(idx, unpackProperty(unpacker.unpackValue(), propertyType));
}
return res;
}

private Object unpackProperty(final ArrayValue packedValueAndType) {
final Iterator<Value> iter = packedValueAndType.iterator();
final byte valueTypeId = iter.next().asIntegerValue().asByte();
final Value value = iter.next();

switch (ValueTypes.lookup(valueTypeId)) {
case BOOLEAN:
return value.asBooleanValue().getBoolean();
case STRING:
return value.asStringValue().asString();
case BYTE:
return value.asIntegerValue().asByte();
case SHORT:
return value.asIntegerValue().asShort();
case INTEGER:
return value.asIntegerValue().asInt();
case LONG:
return value.asIntegerValue().asLong();
case FLOAT:
return value.asFloatValue().toFloat();
case DOUBLE:
return Double.valueOf(value.asFloatValue().toFloat());
case LIST:
final ArrayValue arrayValue = value.asArrayValue();
List deserializedArray = new ArrayList(arrayValue.size());
final Iterator<Value> valueIterator = arrayValue.iterator();
while (valueIterator.hasNext()) {
deserializedArray.add(unpackProperty(valueIterator.next().asArrayValue()));
}
return deserializedArray;
default:
throw new NotImplementedException("unknown valueTypeId=`" + valueTypeId);
/**
* only deserialize one specific property, identified by it's index
*/
public Object unpackSpecificProperty(byte[] bytes, int propertyIdx, final Class propertyType) throws IOException {
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) {
// skip over values we don't care about
unpacker.skipValue(2); // id and label
unpacker.skipValue(2); // [in|out]EdgeIdsByLabel maps
unpacker.unpackArrayHeader(); // skip over array header (for property count)
unpacker.skipValue(propertyIdx); // skip to required property

return unpackProperty(unpacker.unpackValue(), propertyType);
}
}

/**
* `nil` in the binary is mapped to `null`
*/
private Object unpackProperty(final Value value, final Class propertyType) {
final ValueType valueType = value.getValueType();
if (value.isNilValue()) {
return null;
} else if (value.isArrayValue()) {
final ArrayValue arrayValue = value.asArrayValue();
List deserializedArray = new ArrayList(arrayValue.size());
final Iterator<Value> valueIterator = arrayValue.iterator();
while (valueIterator.hasNext()) deserializedArray.add(unpackProperty(valueIterator.next(), propertyType));
return deserializedArray;
} else if (propertyType.equals(Boolean.class)) {
if (!valueType.isBooleanType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asBooleanValue().getBoolean();
} else if (propertyType.equals(String.class)) {
if (!valueType.isStringType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asStringValue().asString();
} else if (propertyType.equals(Byte.class)) {
if (!valueType.isIntegerType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asIntegerValue().asByte();
} else if (propertyType.equals(Short.class)) {
if (!valueType.isIntegerType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asIntegerValue().asShort();
} else if (propertyType.equals(Integer.class)) {
if (!valueType.isIntegerType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asIntegerValue().asInt();
} else if (propertyType.equals(Long.class)) {
if (!valueType.isIntegerType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asIntegerValue().asLong();
} else if (propertyType.equals(Float.class)) {
if (!valueType.isFloatType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return value.asFloatValue().toFloat();
} else if (propertyType.equals(Double.class)) {
if (!valueType.isFloatType()) throw new UnexpectedPropertyTypeException(propertyType, value);
return Double.valueOf(value.asFloatValue().toFloat());
} else {
throw new NotImplementedException("unknown propertyType=`" + propertyType + " for value=" + value);
}
}

Expand All @@ -164,20 +203,27 @@ protected Object[] toTinkerpopKeyValues(Map<String, Object> properties) {
for (Map.Entry<String, Object> entry : properties.entrySet()) {
final String key = entry.getKey();
final Object property = entry.getValue();
// special handling for lists: create separate key/value entry for each list entry
if (property instanceof List) {
for (Object value : (List) property) {
if (property != null) {
// special handling for lists: create separate key/value entry for each list entry
if (property instanceof List) {
for (Object value : (List) property) {
keyValues.add(key);
keyValues.add(value);
}
} else {
keyValues.add(key);
keyValues.add(value);
keyValues.add(property);
}
} else {
keyValues.add(key);
keyValues.add(property);
}
}
return keyValues.toArray();
}

private Map<String, Object> convertPropertyIndexToPropertyNames(Map<Integer, Object> valuesByPropertyIndex, Map<Integer, String> propertyNamesByIndex) {
Map<String, Object> valuesByPropertyName = new THashMap<>(valuesByPropertyIndex.size());
valuesByPropertyIndex.forEach((index, value) -> valuesByPropertyName.put(propertyNamesByIndex.get(index), value));
return valuesByPropertyName;
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.tinkerpop.gremlin.tinkergraph.structure.VertexRef;

import java.util.Map;
import java.util.Optional;

public class EdgeDeserializer extends Deserializer<Edge> {
protected final TinkerGraph graph;
Expand All @@ -51,16 +52,25 @@ protected ElementRef createElementRef(long id, String label, Map<String, long[]>
}

@Override
protected Edge createElement(long id, String label, Map<String, Object> properties, Map<String, long[]> inVertexIdsByLabel, Map<String, long[]> outVertexIdsByLabel) {
protected Edge createElement(long id, String label, Optional<Map<String, Object>> properties, Map<String, long[]> inVertexIdsByLabel, Map<String, long[]> outVertexIdsByLabel) {
VertexRef outVertexRef = getVertexRef(outVertexIdsByLabel, Direction.OUT);
VertexRef inVertexRef = getVertexRef(inVertexIdsByLabel, Direction.IN);
SpecializedTinkerEdge edge = edgeFactoryByLabel.get(label).createEdge(id, graph, outVertexRef, inVertexRef);
ElementHelper.attachProperties(edge, toTinkerpopKeyValues(properties));

properties.ifPresent(props -> ElementHelper.attachProperties(edge, toTinkerpopKeyValues(props)));
edge.setModifiedSinceLastSerialization(false);
return edge;
}

@Override
protected Map<Integer, Class> propertyTypeByIndex(String label) {
return edgeFactoryByLabel.get(label).propertyTypeByIndex();
}

@Override
protected Map<Integer, String> propertyNamesByIndex(String label) {
return edgeFactoryByLabel.get(label).propertyNamesByIndex();
}

private VertexRef getVertexRef(Map<String, long[]> vertexIdsByLabel, Direction direction) {
final long[] vertexIds = vertexIdsByLabel.get(direction.name());
assert vertexIds != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.apache.commons.lang.NotImplementedException;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.ElementRef;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.SpecializedTinkerEdge;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge;

import java.util.Arrays;
import java.util.Map;
import java.util.SortedMap;

public class EdgeSerializer extends Serializer<Edge> {

Expand All @@ -41,10 +45,15 @@ protected String getLabel(Edge edge) {
}

@Override
protected Map<String, Object> getProperties(Edge edge) {
Map<String, Object> properties = new THashMap<>();
edge.properties().forEachRemaining(property -> properties.put(property.key(), property.value()));
return properties;
protected SortedMap<Integer, Object> getProperties(Edge edge) {
if (edge instanceof ElementRef) {
edge = ((ElementRef<TinkerEdge>) edge).get();
}
if (edge instanceof SpecializedTinkerEdge) {
return ((SpecializedTinkerEdge) edge).propertiesByStorageIdx();
} else {
throw new org.apache.commons.lang3.NotImplementedException("EdgeSerializer.getProperties for generic edges");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ public void persist(final TinkerElement element) throws IOException {
}
}

public <A extends TinkerVertex> A readVertex(final long id) throws IOException {
return (A) vertexDeserializer.get().deserialize(vertexMVMap.get(id));
public <A extends TinkerVertex> A readVertex(final long id, final boolean readProperties) throws IOException {
return (A) vertexDeserializer.get().deserialize(vertexMVMap.get(id), readProperties);
}

public <A extends TinkerEdge> A readEdge(final long id) throws IOException {
return (A) edgeDeserializer.get().deserialize(edgeMVMap.get(id));
public <A extends TinkerEdge> A readEdge(final long id, final boolean readProperties) throws IOException {
return (A) edgeDeserializer.get().deserialize(edgeMVMap.get(id), readProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;

public abstract class Serializer<A> {
private final Logger logger = LoggerFactory.getLogger(getClass());
Expand All @@ -39,7 +40,12 @@ public abstract class Serializer<A> {

protected abstract long getId(A a);
protected abstract String getLabel(A a);
protected abstract Map<String, Object> getProperties(A a);

/**
* Map<PropertyIndex, PropertyValue>, sorted by it's index so we can write it efficiently
*/
protected abstract SortedMap<Integer, Object> getProperties(A a);

protected abstract Map<String, TLongSet> getEdgeIds(A a, Direction direction);

public byte[] serialize(A a) throws IOException {
Expand All @@ -63,48 +69,46 @@ public byte[] serialize(A a) throws IOException {
}

/**
* when deserializing, msgpack can't differentiate between e.g. int and long, so we need to encode the type as well - doing that with an array
* i.e. format is: Map[PropertyName, Array(TypeId, PropertyValue)]
* packing as Array[PropertyValue] - each property is identified by it's index
*/
private void packProperties(MessageBufferPacker packer, Map<String, Object> properties) throws IOException {
packer.packMapHeader(properties.size());
for (Map.Entry<String, Object> property : properties.entrySet()) {
packer.packString(property.getKey());
private void packProperties(MessageBufferPacker packer, SortedMap<Integer, Object> properties) throws IOException {
packer.packArrayHeader(properties.size());
int currentIdx = 0;
for (Map.Entry<Integer, Object> property : properties.entrySet()) {
// to ensure we write the properties with correct index, fill the void with Nil values
final Integer propertyIdx = property.getKey();
while (propertyIdx > currentIdx) {
packer.packNil();
currentIdx++;
}

packPropertyValue(packer, property.getValue());
currentIdx++;
}
}

/**
* format: `[ValueType.id, value]`
* writing just the value itself
* every element type hard-codes the index and type for each property, so it can be deserialized again
*/
private void packPropertyValue(final MessageBufferPacker packer, final Object value) throws IOException {
packer.packArrayHeader(2);
if (value instanceof Boolean) {
packer.packByte(ValueTypes.BOOLEAN.id);
packer.packBoolean((Boolean) value);
} else if (value instanceof String) {
packer.packByte(ValueTypes.STRING.id);
packer.packString((String) value);
} else if (value instanceof Byte) {
packer.packByte(ValueTypes.BYTE.id);
packer.packByte((byte) value);
} else if (value instanceof Short) {
packer.packByte(ValueTypes.SHORT.id);
packer.packShort((short) value);
} else if (value instanceof Integer) {
packer.packByte(ValueTypes.INTEGER.id);
packer.packInt((int) value);
} else if (value instanceof Long) {
packer.packByte(ValueTypes.LONG.id);
packer.packLong((long) value);
} else if (value instanceof Float) {
packer.packByte(ValueTypes.FLOAT.id);
packer.packFloat((float) value);
} else if (value instanceof Double) {
packer.packByte(ValueTypes.DOUBLE.id);
packer.packFloat((float) value); //msgpack doesn't support double, but we still want to deserialize it as a double later
} else if (value instanceof List) {
packer.packByte(ValueTypes.LIST.id);
List listValue = (List) value;
packer.packArrayHeader(listValue.size());
final Iterator listIter = listValue.iterator();
Expand Down
Loading