Skip to content

Commit 49dee97

Browse files
committed
Refactored Start-Stop for output plugins
1 parent 3f2e48e commit 49dee97

File tree

9 files changed

+62
-60
lines changed

9 files changed

+62
-60
lines changed

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/CsvOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public List<String> getFiles() {
5555
return files;
5656
}
5757

58-
public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSensors) {
58+
public void outputPluginStart(Object sessionTag, List<ISensor> streamingSensors) {
5959
List<String> nn = new ArrayList<>(streamingSensors.size());
6060
_reverseSensors = new HashMap<>(streamingSensors.size(), 1f);
6161
for (int i = 0; i < streamingSensors.size(); i++) {
@@ -90,7 +90,7 @@ public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSen
9090
files.add(f.getAbsolutePath());
9191
}
9292

93-
public void outputPluginFinalize() {
93+
public void outputPluginStop() {
9494
close();
9595
}
9696

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/SQLiteOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public List<String> getFiles() {
3535
return Collections.singletonList(_sav.getPath());
3636
}
3737

38-
public void outputPluginInitialize(Object sessionTag, List<ISensor> linkedSensors) {
38+
public void outputPluginStart(Object sessionTag, List<ISensor> linkedSensors) {
3939
File f = new File(_path + "/" + sessionTag.toString());
4040
//noinspection ResultOfMethodCallIgnored
4141
f.mkdirs();
@@ -59,7 +59,7 @@ public void outputPluginInitialize(Object sessionTag, List<ISensor> linkedSensor
5959
}
6060
}
6161

62-
public void outputPluginFinalize() {
62+
public void outputPluginStop() {
6363
close();
6464
}
6565

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/TCPClientOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public TCPClientOutput(InetAddress local, int port, String username, String pass
4141
}
4242

4343
@Override
44-
public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSensors) {
44+
public void outputPluginStart(Object sessionTag, List<ISensor> streamingSensors) {
4545
mSensors = streamingSensors;
4646
mTag = sessionTag.toString();
4747
try {
@@ -61,7 +61,7 @@ public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSen
6161
}
6262

6363
@Override
64-
public void outputPluginFinalize() {
64+
public void outputPluginStop() {
6565
close();
6666
}
6767

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/TCPServerOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ private void writeDescriptors(OutputStream o) throws IOException {
9393
}
9494

9595
@Override
96-
public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSensors) {
96+
public void outputPluginStart(Object sessionTag, List<ISensor> streamingSensors) {
9797
mSTh.start();
9898
mSensors = streamingSensors;
9999
mTag = sessionTag.toString();
100100
}
101101

102102
@Override
103-
public void outputPluginFinalize() {
103+
public void outputPluginStop() {
104104
close();
105105
}
106106

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/litix/ProtobufferOutput.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222

2323
public class ProtobufferOutput implements OutputPlugin<Long, double[]> {
2424

25-
public static final String TS_PACKAGES = "splits-buffered ";
26-
public static final String TS_TOTAL_KB = "data-raw [KB]";
27-
public static final String TS_COMPRESSED_KB = "data-gzipped [KB]";
28-
public static final String TS_COMPRESSED = "data-gz-ratio [%]";
29-
public static final String TS_PACKTIMEOUT = "split-time-max [s]";
25+
private static final String TS_PACKAGES = "splits-count ";
26+
private static final String TS_TOTAL_KB = "data-raw [KB]";
27+
private static final String TS_COMPRESSED_KB = "data-gzipped [KB]";
28+
private static final String TS_COMPRESSED = "data-gz-ratio [%]";
29+
private static final String TS_PACKTIMEOUT = "split-time-max [s]";
3030
private final SplitEvent mOnSplit;
3131
private Integer mSessionID;
3232
private Integer mTrackID;
@@ -54,31 +54,31 @@ private class Queries {
5454

5555
private final SplitterParams mSplitter;
5656
protected final SQLiteDatabase buffer;
57-
protected List<SensorInfo> mSensorInfo = new ArrayList<>();
58-
protected HashMap<ISensor, Integer> mReverseSensors = new HashMap<>();
59-
protected List<Litix.SensorData> mSensorData = new ArrayList<>();
60-
protected List<Litix.SensorEvent> mSensorEvent = new ArrayList<>();
61-
protected List<Litix.SessionMeta> mSessionMeta = new ArrayList<>();
62-
protected Object mSessionTag = "undefined";
63-
protected int splits = 0;
57+
private List<SensorInfo> mSensorInfo = new ArrayList<>();
58+
private HashMap<ISensor, Integer> mReverseSensors = new HashMap<>();
59+
private List<Litix.SensorData> mSensorData = new ArrayList<>();
60+
private List<Litix.SensorEvent> mSensorEvent = new ArrayList<>();
61+
private List<Litix.SessionMeta> mSessionMeta = new ArrayList<>();
62+
private Object mSessionTag = "undefined";
63+
private int splits = 0;
6464
private String mName;
6565
private long mForwardedBytes = 0;
6666
private long mReceivedBytes = 0;
6767

6868
private TextStatusUpdater mUpd;
6969

7070
public void setTextStatusUpdater(TextStatusUpdater upd) {
71-
upd.textStatusPut(TS_PACKAGES, splits);
72-
upd.textStatusPut(TS_TOTAL_KB, 0);
73-
upd.textStatusPut(TS_COMPRESSED_KB, 0);
74-
upd.textStatusPut(TS_COMPRESSED, 0);
75-
upd.textStatusPut(TS_PACKTIMEOUT, mSplitter.maxSplitTime / 1000.);
71+
upd.put(TS_PACKAGES, splits);
72+
upd.put(TS_TOTAL_KB, 0);
73+
upd.put(TS_COMPRESSED_KB, 0);
74+
upd.put(TS_COMPRESSED, 0);
75+
upd.put(TS_PACKTIMEOUT, mSplitter.maxSplitTime / 1000.);
7676
this.mUpd = upd;
7777
}
7878

79-
void textStatusPut(String k, Object v) {
79+
private void textStatusPut(String k, Object v) {
8080
if (mUpd != null) {
81-
mUpd.textStatusPut(k, v);
81+
mUpd.put(k, v);
8282
}
8383
}
8484

@@ -102,18 +102,18 @@ public SplitterParams(float targetCompressedSize, float maxSplitTime, float minS
102102
lastSplitTime = SystemClock.elapsedRealtime();
103103
}
104104

105-
public void updateSize(float compressed, float raw) {
105+
void updateSize(float compressed, float raw) {
106106
Log.d("ProtoOut", "Updating size" + (timeout ? " after timeout" : ""));
107107
if (!timeout)
108108
adjust *= (float) Math.pow(targetCompressedSize / compressed, adjustBalance);
109109
compressionRatio = Math.min(1.f, compressionRatio * (1 - ratioBalance) + ratioBalance * compressed / raw);
110110
}
111111

112-
public float getFlushSize() {
112+
float getFlushSize() {
113113
return targetCompressedSize * adjust / compressionRatio;
114114
}
115115

116-
public boolean addAndPopFlushSuggested(int newSize) {
116+
boolean addAndPopFlushSuggested(int newSize) {
117117
size += newSize;
118118
if (size >= getFlushSize() || size >= minSplitSize && SystemClock.elapsedRealtime() - lastSplitTime > maxSplitTime) {
119119
size = 0;
@@ -161,7 +161,7 @@ public void setLitixID(int session, int track) {
161161
throw new NullPointerException("ProtobufferOutput already initialized.");
162162
}
163163

164-
public void flushTrackSplit() {
164+
private void flushTrackSplit() {
165165
Log.d("ProtoOut", "Flushing " + currentBacklogSize() + " SensorData/Event");
166166
final int split_id = splits;
167167

@@ -175,8 +175,6 @@ public void flushTrackSplit() {
175175

176176
final Litix.TrackSplit ts = sb.build();
177177

178-
textStatusPut(TS_PACKAGES, splits);
179-
180178
mSensorData.clear();
181179
mSensorEvent.clear();
182180
mSessionMeta.clear();
@@ -226,8 +224,13 @@ public void run() {
226224

227225
if (mOnSplit != null) {
228226
mOnSplit.newSplit(ProtobufferOutput.this, split_id, mSplitter);
229-
if (lastFlush)
227+
}
228+
if (lastFlush) {
229+
// The commit
230+
splits++;
231+
if (mOnSplit != null) {
230232
mOnSplit.noMoreBuffers(ProtobufferOutput.this, mSplitter);
233+
}
231234
}
232235
} catch (SQLiteConstraintException e) {
233236
Log.e("ProtoOut", "SQL error, splitID:" + split_id + " trackID:" + mTrackID);
@@ -236,14 +239,15 @@ public void run() {
236239
Log.e("ProtoOut", "Flush error");
237240
e.printStackTrace();
238241
}
242+
textStatusPut(TS_PACKAGES, splits);
239243
}
240244
}, "Flush").start();
241245
}
242246

243247
// OutputPlugIn implementation
244248

245249
@Override
246-
public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSensors) {
250+
public void outputPluginStart(Object sessionTag, List<ISensor> streamingSensors) {
247251
lastFlush = false;
248252
finalized = false;
249253
mReverseSensors = new HashMap<>(streamingSensors.size(), 1);
@@ -280,7 +284,7 @@ public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSen
280284
private boolean lastFlush = false;
281285

282286
@Override
283-
public void outputPluginFinalize() {
287+
public void outputPluginStop() {
284288
lastFlush = true;
285289
flushTrackSplit();
286290
finalized = true;
@@ -323,7 +327,7 @@ public String getName() {
323327
@Override
324328
public void close() {
325329
if (!finalized)
326-
outputPluginFinalize();
330+
outputPluginStop();
327331
}
328332

329333
@Override

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/litix/TextStatusUpdater.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,24 @@
11
package eu.fbk.mpba.sensorsflows.plugins.outputs.litix;
22

3-
import android.os.SystemClock;
4-
53
import java.util.Map;
64
import java.util.TreeMap;
75

86
public abstract class TextStatusUpdater {
9-
Map<String, Object> tsParams = new TreeMap<>();
10-
long tsLastUpd = 0;
7+
private Map<String, Object> tsParams = new TreeMap<>();
118

12-
public void textStatusPut(String k, Object v) {
9+
public void put(String k, Object v) {
1310
tsParams.put(k, v);
14-
if (SystemClock.elapsedRealtime() - tsLastUpd > 33) {
15-
tsLastUpd = SystemClock.elapsedRealtime();
16-
StringBuilder text = new StringBuilder();
17-
for (Map.Entry<String, Object> e : tsParams.entrySet())
18-
text.append(e.getKey())
19-
.append(": \t")
20-
.append(e.getValue())
21-
.append('\n');
22-
updateTextStatus(text.toString());
23-
}
11+
StringBuilder text = new StringBuilder();
12+
for (Map.Entry<String, Object> e : tsParams.entrySet())
13+
text.append(e.getKey())
14+
.append(": \t")
15+
.append(e.getValue())
16+
.append('\n');
17+
updateTextStatus(text.toString());
18+
}
19+
20+
public Object remove(String k) {
21+
return tsParams.remove(k);
2422
}
2523

2624
public abstract void updateTextStatus(String text);

SensorFlow/plugins/src/main/java/eu/fbk/mpba/sensorsflows/plugins/outputs/skilo/ProtobufferOutput.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void run() {
119119
// OutputPlugIn implementation
120120

121121
@Override
122-
public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSensors) {
122+
public void outputPluginStart(Object sessionTag, List<ISensor> streamingSensors) {
123123
mSensors = streamingSensors;
124124
mSessionTag = sessionTag;
125125
mFolder = new File(mFolder, mSessionTag.toString() + "/" + getName());
@@ -146,7 +146,7 @@ public void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSen
146146
private boolean finalized = false;
147147

148148
@Override
149-
public void outputPluginFinalize() {
149+
public void outputPluginStop() {
150150
flushTrackSplit(mSensorData, getTrackSplitNameForNow(), true);
151151
finalized = true;
152152
}
@@ -196,7 +196,7 @@ public String getName() {
196196
@Override
197197
public void close() {
198198
if (!finalized)
199-
outputPluginFinalize();
199+
outputPluginStop();
200200
}
201201

202202
@Override

SensorFlow/src/main/java/eu/fbk/mpba/sensorsflows/OutputDecorator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ protected OutputDecorator(OutputPlugin<TimeT, ValueT> output, IOutputCallback<Ti
4444
private Thread _thread = new Thread(new Runnable() {
4545
@Override
4646
public void run() {
47-
outputPlugIn.outputPluginInitialize(sessionTag, linkedSensors);
47+
outputPlugIn.outputPluginStart(sessionTag, linkedSensors);
4848
changeStatus(OutputStatus.INITIALIZED);
4949
dispatchLoopWhileNotStopPending();
50-
outputPlugIn.outputPluginFinalize();
50+
outputPlugIn.outputPluginStop();
5151
changeStatus(OutputStatus.FINALIZED);
5252
}
5353
});
@@ -86,7 +86,7 @@ private void changeStatus(OutputStatus s) {
8686
public void initializeOutput(Object sessionTag) {
8787
this.sessionTag = sessionTag;
8888
changeStatus(OutputStatus.INITIALIZING);
89-
// outputPlugIn.outputPluginInitialize(...) in _thread
89+
// outputPlugIn.outputPluginStart(...) in _thread
9090
_thread.start();
9191
}
9292

SensorFlow/src/main/java/eu/fbk/mpba/sensorsflows/OutputPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
public interface OutputPlugin<TimeT, ValueT> extends IPlugin {
1111

12-
void outputPluginInitialize(Object sessionTag, List<ISensor> streamingSensors);
12+
void outputPluginStart(Object sessionTag, List<ISensor> streamingSensors);
1313

14-
void outputPluginFinalize();
14+
void outputPluginStop();
1515

1616
void newSensorEvent(SensorEventEntry<TimeT> event);
1717

0 commit comments

Comments
 (0)