Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/ru/rt/restream/reindexer/ReindexerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import ru.rt.restream.reindexer.binding.cproto.DataSourceConfiguration;
import ru.rt.restream.reindexer.binding.cproto.DataSourceFactory;
import ru.rt.restream.reindexer.binding.cproto.DataSourceFactoryStrategy;
import ru.rt.restream.reindexer.convert.FieldConverterRegistry;
import ru.rt.restream.reindexer.convert.FieldConverterRegistryFactory;
import ru.rt.restream.reindexer.exceptions.UnimplementedException;

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* Represents approach for bootstrapping Reindexer.
Expand Down Expand Up @@ -103,6 +106,17 @@ public ReindexerConfiguration dataSourceFactory(DataSourceFactory dataSourceFact
return this;
}

/**
* Allows customizing a {@link FieldConverterRegistry}.
*
* @param customizer the {@link FieldConverterRegistry} customizer.
* @return the {@link ReindexerConfiguration} for further customizations
*/
public ReindexerConfiguration fieldConverterRegistry(Consumer<FieldConverterRegistry> customizer) {
customizer.accept(FieldConverterRegistryFactory.INSTANCE);
return this;
}

/**
* Configure reindexer connection pool size. Defaults to 8.
*
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/ru/rt/restream/reindexer/annotations/Convert.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Restream
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.rt.restream.reindexer.annotations;

import ru.rt.restream.reindexer.convert.FieldConverter;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Specifies how fields are converted between the Reindexer database type
* and the one used within the POJO representation.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface Convert {

/**
* Specifies a {@link FieldConverter} implementation to be used for converting
* fields between Reindexer database type and the one used within the POJO representation.
* @return the {@link FieldConverter} implementation to use
*/
Class<? extends FieldConverter> converterClass() default FieldConverter.class;

/**
* Specifies whether conversion should be disabled for the given field,
* useful in case of global converter should not be applied for specific fields.
* Defaults to {@literal false}.
* @return true, if conversion should be disabled for the given field, defaults to {@literal false}
*/
boolean disableConversion() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
import ru.rt.restream.reindexer.IndexType;
import ru.rt.restream.reindexer.ReindexScanner;
import ru.rt.restream.reindexer.ReindexerIndex;
import ru.rt.restream.reindexer.convert.FieldConverter;
import ru.rt.restream.reindexer.convert.util.ConversionUtils;
import ru.rt.restream.reindexer.convert.FieldConverterRegistryFactory;
import ru.rt.restream.reindexer.exceptions.IndexConflictException;
import ru.rt.restream.reindexer.fulltext.FullTextConfig;
import ru.rt.restream.reindexer.util.BeanPropertyUtils;
import ru.rt.restream.reindexer.convert.util.ResolvableType;
import ru.rt.restream.reindexer.util.Pair;
import ru.rt.restream.reindexer.vector.HnswConfig;
import ru.rt.restream.reindexer.vector.HnswConfigs;
import ru.rt.restream.reindexer.vector.IvfConfig;
Expand All @@ -33,11 +38,8 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -341,28 +343,24 @@ private ReindexerIndex createIndex(String reindexPath, List<String> jsonPath, In
}

private FieldInfo getFieldInfo(Field field) {
Class<?> type = field.getType();
FieldInfo fieldInfo = new FieldInfo();
fieldInfo.isArray = type.isArray() || Collection.class.isAssignableFrom(type);
FieldType fieldType = null;
if (type.isArray()) {
Class<?> componentType = type.getComponentType();
FieldType fieldType;
FieldConverter<?, ?> converter = FieldConverterRegistryFactory.INSTANCE.getFieldConverter(field);
ResolvableType resolvableType;
if (converter != null) {
Pair<ResolvableType, ResolvableType> convertiblePair = converter.getConvertiblePair();
resolvableType = convertiblePair.getSecond();
} else {
resolvableType = ConversionUtils.resolveFieldType(field);
}
fieldInfo.isArray = resolvableType.isCollectionLike();
if (fieldInfo.isArray) {
Class<?> componentType = getFieldType(field, resolvableType.getComponentType());
fieldType = getFieldTypeByClass(componentType);
fieldInfo.componentType = componentType;
fieldInfo.isFloatVector = (fieldType == FLOAT);
} else if (field.getGenericType() instanceof ParameterizedType && fieldInfo.isArray) {
ParameterizedType parameterizedType = (ParameterizedType) field.getGenericType();
Type typeArgument = parameterizedType.getActualTypeArguments()[0];
if (typeArgument instanceof Class<?>) {
Class<?> componentType = (Class<?>) typeArgument;
fieldType = getFieldTypeByClass(componentType);
fieldInfo.componentType = componentType;
}
} else if (Enum.class.isAssignableFrom(type)) {
Enumerated enumerated = field.getAnnotation(Enumerated.class);
fieldType = enumerated != null && enumerated.value() == EnumType.STRING ? STRING : INT;
fieldInfo.isFloatVector = resolvableType.getType().isArray() && fieldType == FLOAT;
} else {
fieldType = getFieldTypeByClass(type);
fieldType = getFieldTypeByClass(getFieldType(field, resolvableType.getType()));
}

if (fieldType == null) {
Expand All @@ -372,6 +370,14 @@ private FieldInfo getFieldInfo(Field field) {
fieldInfo.fieldType = fieldType;
return fieldInfo;
}

private Class<?> getFieldType(Field field, Class<?> type) {
if (Enum.class.isAssignableFrom(type)) {
Enumerated enumerated = field.getAnnotation(Enumerated.class);
return enumerated != null && enumerated.value() == EnumType.STRING ? String.class : Integer.class;
}
return type;
}

private FieldType getFieldTypeByClass(Class<?> type) {
return MAPPED_TYPES.getOrDefault(type, COMPOSITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import ru.rt.restream.reindexer.binding.cproto.ByteBuffer;
import ru.rt.restream.reindexer.binding.cproto.ItemWriter;
import ru.rt.restream.reindexer.binding.cproto.cjson.encdec.CjsonEncoder;
import ru.rt.restream.reindexer.convert.FieldConverter;
import ru.rt.restream.reindexer.convert.FieldConverterRegistryFactory;
import ru.rt.restream.reindexer.util.BeanPropertyUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
Expand All @@ -43,11 +47,11 @@ public CJsonItemWriter(CtagMatcher ctagMatcher) {
@Override
public void writeItem(ByteBuffer buffer, T item) {
CjsonEncoder cjsonEncoder = new CjsonEncoder(ctagMatcher);
byte[] itemData = cjsonEncoder.encode(toCjson(item));
byte[] itemData = cjsonEncoder.encode(toCjson(item, CJsonItemWriter::defaultExtract));
buffer.writeBytes(itemData);
}

private CjsonElement toCjson(Object source) {
private CjsonElement toCjson(Object source, AnnotationExtractor annotationExtractor) {
if (source == null) {
return CjsonNull.INSTANCE;
}
Expand All @@ -70,19 +74,25 @@ private CjsonElement toCjson(Object source) {
return new CjsonPrimitive((Float) source);
} else if (source instanceof UUID) {
return new CjsonPrimitive((UUID) source);
} else if (source instanceof List) {
} else if (source instanceof Enum<?>) {
Enumerated enumerated = annotationExtractor.extract(Enumerated.class);
if (enumerated != null && enumerated.value() == EnumType.STRING) {
return new CjsonPrimitive(((Enum<?>) source).name());
}
int ordinal = ((Enum<?>) source).ordinal();
return new CjsonPrimitive((long) ordinal);
} else if (source instanceof Iterable<?>) {
CjsonArray cjsonArray = new CjsonArray();
List<?> sourceList = (List<?>) source;
for (Object element : sourceList) {
CjsonElement cjsonElement = toCjson(element);
for (Object element : (Iterable<?>) source) {
CjsonElement cjsonElement = toCjson(element, annotationExtractor);
cjsonArray.add(cjsonElement);
}
return cjsonArray;
} else if (source.getClass().isArray() && source.getClass().getComponentType() == float.class) {
float[] floatVector = (float[]) source;
} else if (source.getClass().isArray()) {
int length = Array.getLength(source);
CjsonArray cjsonArray = new CjsonArray();
for (float el : floatVector) {
cjsonArray.add(new CjsonPrimitive(el));
for (int i = 0; i < length; i++) {
cjsonArray.add(toCjson(Array.get(source, i), annotationExtractor));
}
return cjsonArray;
} else {
Expand All @@ -93,22 +103,18 @@ private CjsonElement toCjson(Object source) {
continue;
}
Object fieldValue = readFieldValue(source, field);
FieldConverter<Object, ?> converter = FieldConverterRegistryFactory.INSTANCE.getFieldConverter(field);
if (converter != null) {
fieldValue = converter.convertToDatabaseType(fieldValue);
}
if (fieldValue != null) {
CjsonElement cjsonElement;
// hack for serialization of String field with Reindex.isUuid() == true as UUID.
if (field.getType() == String.class && field.isAnnotationPresent(Reindex.class)
if (fieldValue instanceof String && field.isAnnotationPresent(Reindex.class)
&& field.getAnnotation(Reindex.class).isUuid()) {
cjsonElement = new CjsonPrimitive(UUID.fromString((String) fieldValue));
} else if (Enum.class.isAssignableFrom(field.getType())) {
Enumerated enumerated = field.getAnnotation(Enumerated.class);
if (enumerated != null && enumerated.value() == EnumType.STRING) {
cjsonElement = new CjsonPrimitive(((Enum<?>) fieldValue).name());
} else {
int ordinal = ((Enum<?>) fieldValue).ordinal();
cjsonElement = new CjsonPrimitive((long) ordinal);
}
} else {
cjsonElement = toCjson(fieldValue);
cjsonElement = toCjson(fieldValue, field::getAnnotation);
}
Json json = field.getAnnotation(Json.class);
String tagName = json == null ? field.getName() : json.value();
Expand All @@ -123,4 +129,11 @@ private Object readFieldValue(Object source, Field field) {
return BeanPropertyUtils.getProperty(source, field.getName());
}

private interface AnnotationExtractor {
<A extends Annotation> A extract(Class<A> annotationClass);
}

private static <A extends Annotation> A defaultExtract(Class<A> annotationClass) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import ru.rt.restream.reindexer.binding.cproto.ByteBuffer;
import ru.rt.restream.reindexer.binding.cproto.ItemReader;
import ru.rt.restream.reindexer.binding.cproto.cjson.encdec.CjsonDecoder;
import ru.rt.restream.reindexer.convert.FieldConverter;
import ru.rt.restream.reindexer.convert.FieldConverterRegistryFactory;
import ru.rt.restream.reindexer.util.BeanPropertyUtils;
import ru.rt.restream.reindexer.convert.util.ConversionUtils;
import ru.rt.restream.reindexer.convert.util.ResolvableType;
import ru.rt.restream.reindexer.util.CollectionUtils;
import ru.rt.restream.reindexer.util.Pair;

import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -71,46 +75,45 @@ private <V> V readObject(CjsonObject cjsonObject, Class<V> itemClass) {
}

private Object getTargetValue(Field field, CjsonElement property) {
Class<?> fieldType = field.getType();
FieldConverter<?, Object> converter = FieldConverterRegistryFactory.INSTANCE.getFieldConverter(field);
if (converter != null) {
Pair<ResolvableType, ResolvableType> convertiblePair = converter.getConvertiblePair();
return converter.convertToFieldType(getTargetValue(field, convertiblePair.getSecond(), property));
}
ResolvableType resolvableType = ConversionUtils.resolveFieldType(field);
return getTargetValue(field, resolvableType, property);
}

private Object getTargetValue(Field field, ResolvableType resolvableType, CjsonElement property) {
if (property.isNull()) {
if (fieldType == List.class) {
return new ArrayList<>();
if (resolvableType.isCollectionLike()) {
return resolvableType.getType().isArray() ? Array.newInstance(resolvableType.getComponentType(), 0)
: CollectionUtils.createCollection(resolvableType.getType(), resolvableType.getComponentType(), 0);
}
return null;
}

if (fieldType == List.class) {
CjsonArray array = property.getAsCjsonArray();
ArrayList<Object> elements = new ArrayList<>();
ParameterizedType genericType = (ParameterizedType) field.getGenericType();
Type elementType = genericType.getActualTypeArguments()[0];
for (CjsonElement cjsonElement : array) {
elements.add(convert(cjsonElement, (Class<?>) elementType));
if (resolvableType.isCollectionLike()) {
List<CjsonElement> elements = property.getAsCjsonArray().list();
if (resolvableType.getType().isArray()) {
Object array = Array.newInstance(resolvableType.getComponentType(), elements.size());
for (int i = 0; i < elements.size(); i++) {
Array.set(array, i, convert(elements.get(i), resolvableType.getComponentType(), field));
}
return array;
}
return elements;
} else if (Enum.class.isAssignableFrom(fieldType)) {
Enumerated enumerated = field.getAnnotation(Enumerated.class);
if (enumerated != null && enumerated.value() == EnumType.STRING) {
return Enum.valueOf(fieldType.asSubclass(Enum.class), property.getAsString());
Collection<Object> collection = CollectionUtils
.createCollection(resolvableType.getType(), resolvableType.getComponentType(), elements.size());
for (CjsonElement element : elements) {
collection.add(convert(element, resolvableType.getComponentType(), field));
}
return fieldType.getEnumConstants()[property.getAsInteger()];
} else if ( fieldType.isArray() && fieldType.getComponentType() == float.class) {
// float_vector
CjsonArray array = property.getAsCjsonArray();
int size = array.list().size();
float[] elements = new float[size];
int i = 0;
Iterator<CjsonElement> iterator = array.iterator();
while (iterator.hasNext()) {
elements[i++] = iterator.next().getAsFloat();
}
return elements;
return collection;
} else {
return convert(property, field.getType());
return convert(property, resolvableType.getType(), field);
}
}

private Object convert(CjsonElement element, Class<?> targetClass) {
private Object convert(CjsonElement element, Class<?> targetClass, Field field) {
if (element.isNull()) {
return null;
} else if (targetClass == Integer.class || targetClass == int.class) {
Expand All @@ -131,6 +134,12 @@ private Object convert(CjsonElement element, Class<?> targetClass) {
return element.getAsFloat();
} else if (targetClass == UUID.class) {
return element.getAsUuid();
} else if (Enum.class.isAssignableFrom(targetClass)) {
Enumerated enumerated = field.getAnnotation(Enumerated.class);
if (enumerated != null && enumerated.value() == EnumType.STRING) {
return Enum.valueOf(targetClass.asSubclass(Enum.class), element.getAsString());
}
return targetClass.getEnumConstants()[element.getAsInteger()];
} else if (element.isObject()) {
return readObject(element.getAsCjsonObject(), targetClass);
} else {
Expand Down
Loading