2121 */
2222package org .numenta .nupic .network ;
2323
24- import java .io .ByteArrayOutputStream ;
2524import java .io .File ;
2625import java .io .FileNotFoundException ;
2726import java .io .IOException ;
28- import java .io .InputStream ;
2927import java .nio .file .Files ;
3028import java .util .Arrays ;
3129import java .util .List ;
3937import org .joda .time .format .DateTimeFormatter ;
4038import org .numenta .nupic .Persistable ;
4139import org .nustaq .serialization .FSTConfiguration ;
40+ import org .nustaq .serialization .FSTObjectInput ;
4241import org .slf4j .Logger ;
4342import org .slf4j .LoggerFactory ;
4443
4544import com .esotericsoftware .kryo .Kryo ;
45+ import com .esotericsoftware .kryo .KryoException ;
4646import com .esotericsoftware .kryo .Serializer ;
4747import com .esotericsoftware .kryo .io .Input ;
4848import com .esotericsoftware .kryo .io .Output ;
4949
50- import de .javakaffee .kryoserializers .KryoReflectionFactorySupport ;
51-
5250/**
5351 * <p>
5452 * Low level serializer that wraps an FST serialization scheme in both a Kryo {@link Serializer}
6765 *
6866 * @param <T> the type which will be serialized
6967 */
70- class NetworkSerializerImpl <T extends Persistable > extends Serializer <T > implements NetworkSerializer <T > {
68+ class NetworkSerializerImpl <T extends Persistable > extends Serializer <T > implements NetworkSerializer <T >, Persistable {
69+ private static final long serialVersionUID = 1L ;
70+
7171 protected static final Logger LOGGER = LoggerFactory .getLogger (NetworkSerializerImpl .class );
7272
7373 /** Time stamped serialization file format */
74- public static final DateTimeFormatter CHECKPOINT_TIMESTAMP_FORMAT = DateTimeFormat .forPattern (SerialConfig .CHECKPOINT_FORMAT_STRING );
74+ public static transient DateTimeFormatter CHECKPOINT_TIMESTAMP_FORMAT = DateTimeFormat .forPattern (SerialConfig .CHECKPOINT_FORMAT_STRING );
75+ private transient DateTimeFormatter checkPointFormatter = CHECKPOINT_TIMESTAMP_FORMAT ;
76+ private String checkPointFormatString ;
7577
7678 /** Use of Fast Serialize https://github.com/RuedigerMoeller/fast-serialization */
7779 private final FSTConfiguration fastSerialConfig = FSTConfiguration .createDefaultConfiguration ();
@@ -89,9 +91,7 @@ class NetworkSerializerImpl<T extends Persistable> extends Serializer<T> impleme
8991 /** Stores the bytes of the last serialized object or null if there was a problem */
9092 private static AtomicReference <byte []> lastBytes = new AtomicReference <byte []>(null );
9193
92- private DateTimeFormatter checkPointFormatter = CHECKPOINT_TIMESTAMP_FORMAT ;
9394
94- private String checkPointFormatString ;
9595
9696 /**
9797 * All instances in this classloader will share the same atomic reference to the last checkpoint file name holder
@@ -134,23 +134,21 @@ class NetworkSerializerImpl<T extends Persistable> extends Serializer<T> impleme
134134 * @param klass The class of the object to be read in.
135135 * @return an instance of type <T>
136136 */
137+ @ SuppressWarnings ("unchecked" )
137138 @ Override
138139 public T read (Kryo kryo , Input in , Class <T > klass ) {
139- InputStream is = in .getInputStream ();
140- ByteArrayOutputStream buffer = new ByteArrayOutputStream ();
141-
140+ FSTObjectInput reader = fastSerialConfig .getObjectInput (in );
142141 try {
143- int nRead ;
144- byte [] data = new byte [ 16384 ];
145- while (( nRead = is . read ( data , 0 , data . length )) != - 1 ) {
146- buffer . write ( data , 0 , nRead );
142+ T t = ( T ) reader . readObject ( klass ) ;
143+
144+ if ( t instanceof Persistable ) {
145+ (( Persistable ) t ). postDeSerialize ( );
147146 }
148- buffer .flush ();
149- }catch (Exception e ) {
150- throw new RuntimeException (e );
147+ return t ;
148+ }
149+ catch (Exception e ) {
150+ throw new KryoException (e );
151151 }
152-
153- return deSerialize (klass , buffer .toByteArray ());
154152 }
155153
156154 /**
@@ -166,11 +164,18 @@ public T read(Kryo kryo, Input in, Class<T> klass) {
166164 * @param <T> instance to serialize
167165 */
168166 @ Override
169- public void write (Kryo kryo , Output out , T type ) {
170- byte [] bytes = serialize (type );
171- out .write (bytes );
172-
173- lastBytes .set (bytes );
167+ public void write (Kryo kryo , Output out , T t ) {
168+ try {
169+ if (t instanceof Persistable ) {
170+ ((Persistable ) t ).preSerialize ();
171+ }
172+
173+ byte [] bytes = fastSerialConfig .asByteArray (t );
174+ out .write (bytes );
175+ }
176+ catch (Exception e ) {
177+ throw new KryoException (e );
178+ }
174179 }
175180
176181 /**
@@ -547,22 +552,14 @@ File testFileExists(String fileName) throws IOException, FileNotFoundException {
547552 * @return an instance of Kryo
548553 */
549554 private Kryo createKryo () {
550- return new KryoReflectionFactorySupport () {
551- private NetworkSerializer <?> ser ;
552- private SerialConfig conf ;
553-
554- {
555- conf = new SerialConfig (
556- config .getFileName (), Scheme .FST ,
557- config .getRegistry (), config .getOpenOptions ());
558- ser = Network .serializer (conf , true );
559- }
560-
561- @ SuppressWarnings ("rawtypes" )
562- @ Override public Serializer <?> getDefaultSerializer (final Class clazz ) {
563- return (Serializer <?>)ser ;
564- }
565- };
555+ Kryo .DefaultInstantiatorStrategy initStrategy = new Kryo .DefaultInstantiatorStrategy ();
556+
557+ // use Objenesis to create classes without calling the constructor (Flink's technique)
558+ //initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
559+
560+ Kryo kryo = new Kryo ();
561+ kryo .setInstantiatorStrategy (initStrategy );
562+ return kryo ;
566563 }
567564
568565 /* (non-Javadoc)
0 commit comments