Skip to content

Commit fa76c74

Browse files
Merge pull request #29 from keyko-io/feature/flat_event_record
flatten eventBlock record
2 parents cdc10eb + cd36f36 commit fa76c74

File tree

3 files changed

+105
-2
lines changed

3 files changed

+105
-2
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>io.keyko.monitoring</groupId>
55
<artifactId>web3-event-streamer</artifactId>
6-
<version>0.2.6</version>
6+
<version>0.2.7</version>
77
<name>Web3 Monitoring Event Streamer</name>
88
<url>https://github.com/keyko-io/web3-event-streamer</url>
99
<inceptionYear>2019</inceptionYear>
@@ -20,7 +20,7 @@
2020
<org.json.version>20190722</org.json.version>
2121
<typesafe.config.version>1.4.0</typesafe.config.version>
2222
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23-
<keyko.schemas.version>0.3.1</keyko.schemas.version>
23+
<keyko.schemas.version>0.3.3</keyko.schemas.version>
2424
<junit.version>4.4</junit.version>
2525
</properties>
2626

src/main/java/io/keyko/monitoring/preprocessing/Transformations.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.apache.kafka.streams.kstream.KTable;
1313
import org.apache.log4j.Logger;
1414

15+
import java.lang.reflect.InvocationTargetException;
16+
import java.lang.reflect.Method;
1517
import java.util.ArrayList;
1618
import java.util.Arrays;
1719
import java.util.List;
@@ -247,4 +249,99 @@ public static KStream<String, LogRecordTopicsFlattened> flatLogs(KStream<String,
247249

248250
}
249251

252+
public static KStream<String, FlatEventBlockRecord> flatEventBlockRecord(KStream<String, EventBlockRecord> eventBlockStream) {
253+
254+
return eventBlockStream.mapValues( eventBlock -> {
255+
256+
FlatEventBlockRecord flatEventBlockRecord = new FlatEventBlockRecord();
257+
258+
flatEventBlockRecord.setAddress(eventBlock.getEvent().getAddress());
259+
flatEventBlockRecord.setBlockHash(eventBlock.getEvent().getBlockHash());
260+
flatEventBlockRecord.setBlockNumber(eventBlock.getEvent().getBlockNumber());
261+
flatEventBlockRecord.setContractName(eventBlock.getEvent().getContractName());
262+
flatEventBlockRecord.setEventSpecificationSignature(eventBlock.getEvent().getEventSpecificationSignature());
263+
flatEventBlockRecord.setFilterId(eventBlock.getEvent().getFilterId());
264+
flatEventBlockRecord.setId(eventBlock.getEvent().getId());
265+
flatEventBlockRecord.setLogIndex(eventBlock.getEvent().getLogIndex());
266+
flatEventBlockRecord.setName(eventBlock.getEvent().getName());
267+
flatEventBlockRecord.setNetworkName(eventBlock.getEvent().getNetworkName());
268+
flatEventBlockRecord.setNodeName(eventBlock.getEvent().getNodeName());
269+
flatEventBlockRecord.setRetries(eventBlock.getEvent().getRetries());
270+
flatEventBlockRecord.setStatus(getEventStatus(eventBlock.getEvent().getStatus()));
271+
flatEventBlockRecord.setTimestamp(eventBlock.getBlock().getTimestamp());
272+
273+
List<Object> indexedParameteres = eventBlock.getEvent().getIndexedParameters();
274+
List<Object> nonIndexedParameteres = eventBlock.getEvent().getNonIndexedParameters();
275+
276+
setParametersOfFlatEvent(flatEventBlockRecord, indexedParameteres, true);
277+
setParametersOfFlatEvent(flatEventBlockRecord, nonIndexedParameteres, false);
278+
279+
return flatEventBlockRecord;
280+
281+
});
282+
283+
}
284+
285+
private static void setParametersOfFlatEvent(FlatEventBlockRecord flatEvent, List<Object> parameters, Boolean indexed) {
286+
287+
288+
Integer maxIndexParameters = 4;
289+
Integer maxNonIndexParameters = 5;
290+
291+
Integer max = indexed?maxIndexParameters:maxNonIndexParameters;
292+
293+
String nameMethod = indexed?"setIndexedParamName":"setNonIndexedParamName";
294+
String typeMethod = indexed?"setIndexedParamType":"setNonIndexedParamType";
295+
String valueMethod = indexed?"setIndexedParamValue":"setNonIndexedParamValue";
296+
297+
Method setNameMethod;
298+
Method setTypeMethod;
299+
Method setValueMethod;
300+
301+
String paramName = "";
302+
String paramType = "";
303+
String paramValue = "";
304+
305+
for (int i=0; i< parameters.size() && i< max; i++){
306+
307+
try {
308+
setNameMethod = FlatEventBlockRecord.class.getMethod(nameMethod + i, String.class);
309+
setTypeMethod = FlatEventBlockRecord.class.getMethod(typeMethod + i, String.class);
310+
setValueMethod = FlatEventBlockRecord.class.getMethod(valueMethod + i, String.class);
311+
312+
if (parameters.get(i) instanceof NumberParameter) {
313+
NumberParameter nParam = (NumberParameter) parameters.get(i);
314+
paramName = nParam.getName();
315+
paramType = nParam.getType();
316+
paramValue = nParam.getValue();
317+
} else {
318+
StringParameter sParam = (StringParameter) parameters.get(i);
319+
paramName = sParam.getName();
320+
paramType = sParam.getType();
321+
paramValue = sParam.getValue();
322+
}
323+
324+
setNameMethod.invoke(flatEvent, paramName);
325+
setTypeMethod.invoke(flatEvent, paramType);
326+
setValueMethod.invoke(flatEvent, paramValue);
327+
328+
}catch (Exception e){
329+
log.error("Error setting parameter " + e.getMessage());
330+
continue;
331+
}
332+
333+
}
334+
335+
}
336+
337+
private static FlatContractEventBlockStatus getEventStatus(ContractEventStatus status){
338+
switch(status){
339+
case CONFIRMED: return FlatContractEventBlockStatus.CONFIRMED;
340+
case UNCONFIRMED: return FlatContractEventBlockStatus.UNCONFIRMED;
341+
case INVALIDATED: return FlatContractEventBlockStatus.INVALIDATED;
342+
}
343+
344+
return FlatContractEventBlockStatus.UNCONFIRMED;
345+
}
346+
250347
}

src/main/java/io/keyko/monitoring/serde/Web3MonitoringSerdes.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class Web3MonitoringSerdes {
2020
private final static SpecificAvroSerde<TimeSeriesRecord> timeSeriesSerde = new SpecificAvroSerde<>();
2121
private final static SpecificAvroSerde<LogRecord> logSerde = new SpecificAvroSerde<>();
2222
private final static SpecificAvroSerde<LogRecordTopicsFlattened> logFlattenedSerde = new SpecificAvroSerde<>();
23+
private final static SpecificAvroSerde<FlatEventBlockRecord> flatEventBlockSerde = new SpecificAvroSerde<>();
2324
protected static Map<String, String> serdeConfig;
2425

2526

@@ -36,6 +37,7 @@ public static void configureSerdes(String schemaRegistryUrl) {
3637
timeSeriesSerde.configure(serdeConfig, false);
3738
logSerde.configure(serdeConfig, false);
3839
logFlattenedSerde.configure(serdeConfig, false);
40+
flatEventBlockSerde.configure(serdeConfig, false);
3941
}
4042

4143
protected static void configureSerde(SpecificAvroSerde serde) {
@@ -74,5 +76,9 @@ public static SpecificAvroSerde<LogRecordTopicsFlattened> getLogFlattenedSerde()
7476
return logFlattenedSerde;
7577
}
7678

79+
public static SpecificAvroSerde<FlatEventBlockRecord> getFlatEventBlockSerde() {
80+
return flatEventBlockSerde;
81+
}
82+
7783

7884
}

0 commit comments

Comments
 (0)