Skip to content

Commit efaf473

Browse files
authored
CLARA I/O service development (#638)
* read variation/timestamp from yaml * cleanup * load with data-cv * cleanup * reduce verbosity * debug * fix schema path * debug * store full schema * cleanup * remove printouts * cleanup * yet another option * pass # events on macos too * cleanup * silence * minimize * cleanup * add HEL::scaler to tag-1 banks * env vars do not work * support another schema * use full schema * default to RCDB for torus/solenoid, add yaml support * optimize * reduce severity * reduce severity * remove unused dictionary access * remove unused evio dictionaries * cleanup * add dependency * store filename * add post-processor * add another usage * add another initialization * record scalers too * rename, remove re-reading * add accessibility * also store run number, cleanup * get delay from CCDB, reuse writer logic * cleanup * just require exact match, no shorthands * add logging properties files * cleanup * add verbosity option * print warning * add quiet option * support overriding -v * bugfix * remove clara * import frame io services * add dependency * resolve "used undeclared dependency" errors
1 parent f682cb5 commit efaf473

File tree

58 files changed

+718
-2312
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+718
-2312
lines changed

bin/run-clara

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ function error() {
2525
threads=2
2626
prefix=rec_
2727
CLARA_USER_DATA=.
28-
while getopts y:o:p:c:t:n:mh opt
28+
while getopts y:o:p:c:t:n:qmh opt
2929
do
3030
case $opt in
3131
y) yaml=$OPTARG ;;
@@ -35,6 +35,7 @@ do
3535
t) threads=$OPTARG && echo $threads | grep -q -E '^[0-9]+$' || error "-t must be an integer, threads" ;;
3636
n) nevents="-e $OPTARG" && echo "$nevents" | grep -q -E '^-e [0-9]+$' || error "-n must be an integer, events" ;;
3737
m) merge=1 ;;
38+
q) quiet=1 ;;
3839
h) echo -e "\n$usage" && echo -e $info && exit 0 ;;
3940
esac
4041
done
@@ -53,7 +54,7 @@ yaml=$(cd $(dirname $yaml) && pwd)/$(basename $yaml)
5354

5455
# Create the environment variables and directories required by CLARA:
5556
[ -e $CLARA_USER_DATA ] && echo "WARNING: Using existing directory: $CLARA_USER_DATA"
56-
mkdir -p -v $CLARA_USER_DATA || error "Cannot create -o output directory: $CLARA_USER_DATA"
57+
mkdir -p $CLARA_USER_DATA || error "Cannot create -o output directory: $CLARA_USER_DATA"
5758
mkdir -p $CLARA_USER_DATA/log $CLARA_USER_DATA/config $CLARA_USER_DATA/data/output
5859
export CLARA_USER_DATA=$(cd $CLARA_USER_DATA && pwd)
5960
export CLARA_HOME=$(cd $CLARA_HOME && pwd)
@@ -73,7 +74,9 @@ done
7374
# Set some JVM options:
7475
export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions"
7576
export JAVA_OPTS="$JAVA_OPTS -Djava.io.tmpdir=$CLARA_USER_DATA -Dorg.sqlite.tmpdir=$CLARA_USER_DATA"
76-
export JAVA_OPTS="$JAVA_OPTS -Djava.util.logging.config.file=$CLAS12DIR/etc/logging/debug.properties"
77+
# Set verbosity:
78+
[ -z ${quiet+x} ] && stub=fine || stub=info
79+
export JAVA_OPTS="$JAVA_OPTS -Djava.util.logging.config.file=$CLAS12DIR/etc/logging/$stub.properties"
7780

7881
function get_host_ip() {
7982
if command -v ip >/dev/null 2>&1
@@ -127,7 +130,7 @@ then
127130
$CLARA_HOME/bin/clara-orchestrator \
128131
-F -f ${ip}%${port}_java -s recon \
129132
-i $CLARA_USER_DATA -o $CLARA_USER_DATA -z $prefix \
130-
-p $threads -t $threads \
133+
-p $threads -t $threads $nevents \
131134
$yaml $CLARA_USER_DATA/filelist.txt
132135
set +v
133136
else

build-coatjava.sh

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,4 @@ for pom in $(find common-tools -name pom.xml); do
182182
done
183183
echo "installed coatjava to: $prefix_dir"
184184

185-
# install clara
186-
#rm -rf clara-home && ./install-clara -c ./coatjava ./clara-home
187-
188185
echo "COATJAVA SUCCESSFULLY BUILT !"

common-tools/clara-io/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@
5353
<version>12.0.6t-SNAPSHOT</version>
5454
</dependency>
5555

56+
<dependency>
57+
<groupId>org.jlab.clas</groupId>
58+
<artifactId>clas-analysis</artifactId>
59+
<version>12.0.6t-SNAPSHOT</version>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>org.jlab.clas</groupId>
64+
<artifactId>clas-utils</artifactId>
65+
<version>12.0.6t-SNAPSHOT</version>
66+
</dependency>
67+
5668
<dependency>
5769
<groupId>org.apache.commons</groupId>
5870
<artifactId>commons-text</artifactId>

common-tools/clara-io/src/main/java/org/jlab/io/clara/Clas12Types.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.jlab.clara.engine.ClaraSerializer;
55
import org.jlab.clara.engine.EngineDataType;
66
import org.jlab.jnp.hipo4.data.Event;
7+
import org.jlab.jnp.hipo4.data.DataFrame;
78

89
import java.nio.ByteBuffer;
910

@@ -29,9 +30,26 @@ public Object read(ByteBuffer buffer) throws ClaraException {
2930
}
3031
}
3132

33+
private static class FrameSerializer implements ClaraSerializer {
34+
35+
@Override
36+
public ByteBuffer write(Object o) throws ClaraException {
37+
DataFrame stream = (DataFrame) o;
38+
return stream.getFrameBuffer();
39+
}
40+
41+
@Override
42+
public Object read(ByteBuffer bb) throws ClaraException {
43+
return new DataFrame(bb);
44+
}
45+
}
46+
3247
public static final EngineDataType EVIO =
3348
new EngineDataType("binary/data-evio", EngineDataType.BYTES.serializer());
3449

3550
public static final EngineDataType HIPO =
3651
new EngineDataType("binary/data-hipo", new HipoSerializer());
52+
53+
public static final EngineDataType HIPOFRAME =
54+
new EngineDataType("binary/data-hipo-frame", new FrameSerializer());
3755
}

common-tools/clara-io/src/main/java/org/jlab/io/clara/EvioToEvioSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public Object readEvent(int eventNumber) throws EventReaderException {
5050
if (eventNumber >= maxEvents) return null;
5151
try {
5252
ByteBuffer bb = reader.getEventBuffer(++eventNumber, true);
53-
EvioDataEvent event = new EvioDataEvent(bb.array(), byteOrder, reader.getDictionary());
53+
EvioDataEvent event = new EvioDataEvent(bb.array(), byteOrder);
5454
return event;
5555
} catch (EvioException e) {
5656
throw new EventReaderException(e);

common-tools/clara-io/src/main/java/org/jlab/io/clara/EvioToHipoReader.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,19 @@ public class EvioToHipoReader extends AbstractEventReaderService<EvioSource> {
2323

2424
CLASDecoder4 decoder;
2525
private long maxEvents;
26-
27-
String variation = "default";
28-
String timestamp = "";
29-
30-
protected void configure(EvioToHipoReader reader, JSONObject opts) {
31-
if (opts.has("variation"))
32-
variation = opts.getString("variation");
33-
if (opts.has("timestamp"))
34-
timestamp = opts.getString("timestamp");
35-
}
26+
private Double torus;
27+
private Double solenoid;
3628

3729
@Override
3830
protected EvioSource createReader(Path file, JSONObject opts) throws EventReaderException {
3931
EvioSource s = new EvioSource();
4032
s.open(file.toString());
4133
maxEvents = s.getEventCount();
4234
decoder = new CLASDecoder4();
43-
decoder.setVariation(variation);
44-
decoder.setTimestamp(timestamp);
35+
torus = opts.has("torus") ? opts.getDouble("torus") : null;
36+
solenoid = opts.has("solenoid") ? opts.getDouble("solenoid") : null;
37+
if (opts.has("variation")) decoder.setVariation(opts.getString("variation"));
38+
if (opts.has("timestamp")) decoder.setTimestamp(opts.getString("timestamp"));
4539
return s;
4640
}
4741

@@ -65,8 +59,8 @@ public Object readEvent(int eventNumber) throws EventReaderException {
6559
if (eventNumber >= maxEvents) return null;
6660
try {
6761
ByteBuffer bb = reader.getEventBuffer(++eventNumber, true);
68-
EvioDataEvent evio = new EvioDataEvent(bb.array(), readByteOrder(), reader.getDictionary());
69-
Event hipo = decoder.getDecodedEvent(evio, -1, eventNumber, -1.0, -1.0);
62+
EvioDataEvent evio = new EvioDataEvent(bb.array(), readByteOrder());
63+
Event hipo = decoder.getDecodedEvent(evio, -1, eventNumber, torus, solenoid);
7064
if (eventNumber % 25000 == 0 && collectGarbage) System.gc();
7165
return hipo;
7266
} catch (EvioException e) {
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.jlab.io.clara;
2+
3+
import java.nio.ByteOrder;
4+
import java.nio.file.Path;
5+
import org.jlab.clara.engine.EngineDataType;
6+
import org.jlab.clara.std.services.AbstractEventReaderService;
7+
import org.jlab.clara.std.services.EventReaderException;
8+
import org.jlab.jnp.hipo4.data.Bank;
9+
import org.jlab.jnp.hipo4.data.DataFrame;
10+
import org.jlab.jnp.hipo4.data.DataFrameBuilder;
11+
import org.jlab.jnp.hipo4.data.Event;
12+
import org.jlab.jnp.hipo4.io.HipoReader;
13+
import org.json.JSONObject;
14+
15+
/**
16+
*
17+
* @author gavalian
18+
*/
19+
public class HipoFrameReader extends AbstractEventReaderService<HipoReader> {
20+
21+
private int maxEventsFrame = 10;
22+
private int maxSizeFrame = 1024*1024;
23+
24+
25+
public void setMaxEvents(int __nevents){ maxEventsFrame = __nevents;}
26+
public void setMaxSize(int __nsize){ maxSizeFrame = __nsize;}
27+
28+
@Override
29+
protected HipoReader createReader(Path file, JSONObject opts)
30+
throws EventReaderException {
31+
try {
32+
HipoReader reader = new HipoReader();
33+
reader.open(file.toString());
34+
return reader;
35+
} catch (Exception e) {
36+
throw new EventReaderException(e);
37+
}
38+
}
39+
40+
@Override
41+
protected void closeReader() {
42+
reader.close();
43+
}
44+
45+
@Override
46+
public int readEventCount() throws EventReaderException {
47+
int numberOfEvents = reader.getEventCount()/this.maxEventsFrame;
48+
int leftOver = reader.getEventCount()%this.maxEventsFrame;
49+
if(leftOver>0) numberOfEvents++;
50+
return numberOfEvents;
51+
}
52+
53+
@Override
54+
public ByteOrder readByteOrder() throws EventReaderException {
55+
return ByteOrder.LITTLE_ENDIAN;
56+
}
57+
58+
@Override
59+
public Object readEvent(int eventNumber) throws EventReaderException {
60+
try {
61+
int startEvent = eventNumber*this.maxEventsFrame;
62+
DataFrameBuilder builder = new DataFrameBuilder(this.maxEventsFrame,this.maxSizeFrame);
63+
Event event = new Event();
64+
65+
if(eventNumber==0){
66+
event.reset();
67+
Bank runConfig = new Bank(reader.getSchemaFactory().getSchema("RUN::config"),1);
68+
event.write(runConfig);
69+
event.setEventTag(1001);
70+
builder.addEvent(event.getEventBuffer().array(), 0,
71+
event.getEventBufferSize());
72+
System.out.println("[FRAMEREADER] --> COMPOSING START FILE EVENT...");
73+
}
74+
75+
for(int i = 0; i < this.maxEventsFrame; i++){
76+
if(reader.hasNext()==true){
77+
reader.nextEvent(event);
78+
event.clearEventBitMask();
79+
80+
boolean status = builder.addEvent(event.getEventBuffer().array(), 0,
81+
event.getEventBufferSize());
82+
if(status==false){
83+
System.out.println("[HipoFrameReader] >>>>>> out of space on event # " + i + " from banch # " + eventNumber);
84+
}
85+
}
86+
}
87+
DataFrame frame = builder.build();
88+
return frame;
89+
} catch (Exception e) {
90+
throw new EventReaderException(e);
91+
}
92+
}
93+
94+
@Override
95+
protected EngineDataType getDataType() {
96+
return Clas12Types.HIPOFRAME;
97+
}
98+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package org.jlab.io.clara;
2+
3+
import java.nio.file.Path;
4+
import org.jlab.clara.engine.EngineDataType;
5+
import org.jlab.clara.std.services.AbstractEventWriterService;
6+
import org.jlab.clara.std.services.EventWriterException;
7+
import org.jlab.jnp.hipo4.data.DataFrame;
8+
import org.jlab.jnp.hipo4.data.Event;
9+
import org.jlab.jnp.hipo4.io.HipoWriterStream;
10+
import org.jlab.jnp.utils.file.FileUtils;
11+
import org.json.JSONObject;
12+
13+
/**
14+
*
15+
* @author gavalian
16+
*/
17+
public class HipoFrameWriter extends AbstractEventWriterService<HipoWriterStream> {
18+
19+
private static final String CONF_COMPRESSION = "compression";
20+
private static final String CONF_SCHEMA = "schema_dir";
21+
private static final String CONF_FILTER = "filter";
22+
23+
@Override
24+
protected HipoWriterStream createWriter(Path file, JSONObject opts) throws EventWriterException {
25+
try {
26+
HipoWriterStream writer = new HipoWriterStream();
27+
int compression = getCompression(opts);
28+
System.out.printf("%s service: using compression level %d%n", getName(), compression);
29+
writer.getSchemaFactory().initFromDirectory(getSchemaDirectory(opts));
30+
String[] filters = getFilterString(opts);
31+
if(filters.length>0){
32+
for(int i = 0; i < filters.length; i+=2){
33+
int order = Integer.parseInt(filters[i]);
34+
writer.addEventFilter(order, filters[i+1]);
35+
}
36+
}
37+
writer.setFileName(file.toString());
38+
writer.open();
39+
return writer;
40+
} catch (Exception e) {
41+
throw new EventWriterException(e);
42+
}
43+
}
44+
45+
private int getCompression(JSONObject opts) {
46+
return opts.has(CONF_COMPRESSION) ? opts.getInt(CONF_COMPRESSION) : 2;
47+
}
48+
49+
private String getSchemaDirectory(JSONObject opts) {
50+
return opts.has(CONF_SCHEMA)
51+
? opts.getString(CONF_SCHEMA)
52+
: FileUtils.getEnvironmentPath("CLAS12DIR", "etc/bankdefs/hipo4");
53+
}
54+
55+
private String[] getFilterString(JSONObject opts){
56+
if(opts.has(CONF_FILTER)==false) return new String[0];
57+
String filterString = opts.getString(CONF_FILTER);
58+
return filterString.split("-");
59+
}
60+
61+
@Override
62+
protected void closeWriter() {
63+
writer.close();
64+
}
65+
66+
@Override
67+
protected void writeEvent(Object event) throws EventWriterException {
68+
try {
69+
DataFrame dataFrame = (DataFrame) event;
70+
71+
int count = dataFrame.getEntries();
72+
Event hipoEvent = new Event();
73+
74+
for(int i = 0; i < count; i++){
75+
76+
byte[] buffer = dataFrame.getEventCopy(i);
77+
hipoEvent.initFrom(buffer);
78+
79+
int eventTag = hipoEvent.getEventTag();
80+
if(eventTag==1){
81+
writer.writeEventAll(hipoEvent);
82+
} else {
83+
hipoEvent.setEventTag(0);
84+
for(int k = 0; k < 32; k++){
85+
int status = hipoEvent.getEventBitMask(k);
86+
if(status>0) {
87+
writer.writeEvent(k,hipoEvent);
88+
}
89+
}
90+
}
91+
}
92+
} catch (Exception e) {
93+
throw new EventWriterException(e);
94+
}
95+
}
96+
97+
@Override
98+
protected EngineDataType getDataType() {
99+
return Clas12Types.HIPOFRAME;
100+
}
101+
}

0 commit comments

Comments
 (0)