Skip to content

Commit 6bec406

Browse files
committed
Added stream tagging support
1 parent 4d50364 commit 6bec406

File tree

8 files changed

+293
-35
lines changed

8 files changed

+293
-35
lines changed

lib/stream.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,43 @@ bool Util::streamAlive(std::string &streamname){
471471
}
472472
}
473473

474+
/// Returns active tags for an exact-matching (already sanitized) streamname
475+
std::set<std::string> Util::streamTags(const std::string &streamname){
476+
std::set<std::string> ret;
477+
478+
IPC::sharedPage shmStreams(SHM_STATE_STREAMS, 0, false, false);
479+
// Abort silently if page cannot be loaded
480+
if (!shmStreams){return ret;}
481+
482+
Util::RelAccX rlxStreams(shmStreams.mapped);
483+
// Abort silently if page cannot be loaded
484+
if (!rlxStreams.isReady()){return ret;}
485+
486+
uint64_t startPos = rlxStreams.getDeleted();
487+
uint64_t endPos = rlxStreams.getEndPos();
488+
for (uint64_t cPos = startPos; cPos < endPos; ++cPos){
489+
const std::string & strm = rlxStreams.getPointer("stream", cPos);
490+
if (strm != streamname){continue;}
491+
492+
// Found it! Fill and break, since only one match can exist.
493+
std::string tags = rlxStreams.getPointer("tags", cPos);
494+
while (tags.size()){
495+
size_t endPos = tags.find(' ');
496+
if (!endPos){
497+
//extra space, ignore
498+
tags.erase(0, 1);
499+
continue;
500+
}
501+
if (endPos == std::string::npos){endPos = tags.size();}
502+
ret.insert(tags.substr(0, endPos));
503+
if (endPos == tags.size()){break;}
504+
tags.erase(0, endPos+1);
505+
}
506+
break;
507+
}
508+
return ret;
509+
}
510+
474511
/// Assures the input for the given stream name is active.
475512
/// Does stream name sanitation first, followed by a stream name length check (<= 100 chars).
476513
/// Then, checks if an input is already active by running streamAlive(). If yes, return true.

lib/stream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace Util{
1818
std::string getTmpFolder();
1919
void sanitizeName(std::string &streamname);
2020
bool streamAlive(std::string &streamname);
21+
std::set<std::string> streamTags(const std::string &streamname);
2122
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true,
2223
bool isProvider = false,
2324
const std::map<std::string, std::string> &overrides = std::map<std::string, std::string>(),

lib/triggers.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,15 @@ namespace Triggers{
203203
if ((streamName.size() == stringLen || splitter == stringLen) &&
204204
strncmp(strPtr + bPos + 4, streamName.data(), stringLen) == 0){
205205
isHandled = true;
206+
break;
207+
}
208+
// Tag-based? Check tags for this stream
209+
if (strPtr[bPos + 4] == '#'){
210+
std::set<std::string> tags = Util::streamTags(streamName);
211+
if (tags.count(std::string(strPtr + bPos + 5, stringLen - 1))){
212+
isHandled = true;
213+
break;
214+
}
206215
}
207216
bPos += stringLen + 4;
208217
}

src/controller/controller_api.cpp

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,17 @@ class streamStat{
175175
viewers = rlx.getInt("viewers", entry);
176176
inputs = rlx.getInt("inputs", entry);
177177
outputs = rlx.getInt("outputs", entry);
178+
tags = rlx.getPointer("tags", entry);
178179
}
179180
bool operator==(const streamStat &b) const{
180-
return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs);
181+
return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs && tags == b.tags);
181182
}
182183
bool operator!=(const streamStat &b) const{return !(*this == b);}
183184
uint8_t status;
184185
uint64_t viewers;
185186
uint64_t inputs;
186187
uint64_t outputs;
188+
std::string tags;
187189
};
188190

189191
void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){
@@ -292,6 +294,7 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){
292294
tmp[1u].append(tmpStat.viewers);
293295
tmp[1u].append(tmpStat.inputs);
294296
tmp[1u].append(tmpStat.outputs);
297+
tmp[1u].append(tmpStat.tags);
295298
W.sendFrame(tmp.toString());
296299
}
297300
}
@@ -305,6 +308,7 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){
305308
tmp[1u].append(0u);
306309
tmp[1u].append(0u);
307310
tmp[1u].append(0u);
311+
tmp[1u].append("");
308312
W.sendFrame(tmp.toString());
309313
strmRemove.erase(strm);
310314
lastStrmStat.erase(strm);
@@ -1164,6 +1168,69 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
11641168
}
11651169
}
11661170

1171+
if (Request.isMember("tag_stream")){
1172+
if (Request["tag_stream"].isObject()){
1173+
jsonForEach(Request["tag_stream"], it){
1174+
if (it->isString()){
1175+
Controller::stream_tag(it.key(), it->asStringRef());
1176+
}else if (it->isArray()){
1177+
jsonForEach(*it, jt){
1178+
if (jt->isString()){
1179+
Controller::stream_tag(it.key(), jt->asStringRef());
1180+
}
1181+
}
1182+
}
1183+
}
1184+
}
1185+
}
1186+
1187+
if (Request.isMember("untag_stream")){
1188+
if (Request["untag_stream"].isObject()){
1189+
jsonForEach(Request["untag_stream"], it){
1190+
if (it->isString()){
1191+
Controller::stream_untag(it.key(), it->asStringRef());
1192+
}else if (it->isArray()){
1193+
jsonForEach(*it, jt){
1194+
if (jt->isString()){
1195+
Controller::stream_untag(it.key(), jt->asStringRef());
1196+
}
1197+
}
1198+
}
1199+
}
1200+
}
1201+
}
1202+
1203+
if (Request.isMember("stream_tags")){
1204+
JSON::Value & rT = Response["stream_tags"];
1205+
if (Request["stream_tags"].isArray()){
1206+
jsonForEach(Request["stream_tags"], it){
1207+
if (it->isString()){
1208+
std::set<std::string> tags = Controller::stream_tags(it->asStringRef());
1209+
JSON::Value & tRef = rT[it->asStringRef()];
1210+
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
1211+
}
1212+
}
1213+
}else if (Request["stream_tags"].isObject()){
1214+
jsonForEach(Request["stream_tags"], it){
1215+
std::set<std::string> tags = Controller::stream_tags(it.key());
1216+
JSON::Value & tRef = rT[it.key()];
1217+
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
1218+
}
1219+
}else if (Request["stream_tags"].isString() && Request["stream_tags"].asStringRef().size()){
1220+
std::set<std::string> tags = Controller::stream_tags(Request["stream_tags"].asStringRef());
1221+
JSON::Value & tRef = rT[Request["stream_tags"].asStringRef()];
1222+
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
1223+
}else{
1224+
JSON::Value nullPkt, resp;
1225+
Controller::fillActive(nullPkt, resp);
1226+
jsonForEach(resp, it){
1227+
std::set<std::string> tags = Controller::stream_tags(it->asStringRef());
1228+
JSON::Value & tRef = rT[it->asStringRef()];
1229+
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
1230+
}
1231+
}
1232+
}
1233+
11671234
if (Request.isMember("push_start")){
11681235
std::string stream;
11691236
std::string target;

src/controller/controller_push.cpp

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -333,19 +333,17 @@ namespace Controller{
333333
for (std::set<std::string>::iterator jt = activeStreams.begin();
334334
jt != activeStreams.end(); ++jt){
335335
std::string streamname = *jt;
336-
if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){
337-
if (!isPushActive(streamname, target)){
338-
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
339-
waitingPushes[streamname].erase(target);
340-
if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);}
341-
MEDIUM_MSG("Conditions of push `%s->%s` evaluate to true. Starting push...", stream.c_str(), target.c_str());
342-
startPush(streamname, target);
343-
curCount++;
344-
// If no end time is given but there is a start time, remove the push after starting it
345-
if (startTime && !endTime){
346-
removePush(*it);
347-
break;
348-
}
336+
if (!isPushActive(streamname, target)){
337+
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
338+
waitingPushes[streamname].erase(target);
339+
if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);}
340+
MEDIUM_MSG("Conditions of push `%s->%s` evaluate to true. Starting push...", stream.c_str(), target.c_str());
341+
startPush(streamname, target);
342+
curCount++;
343+
// If no end time is given but there is a start time, remove the push after starting it
344+
if (startTime && !endTime){
345+
removePush(*it);
346+
break;
349347
}
350348
}
351349
}
@@ -537,9 +535,7 @@ namespace Controller{
537535
for (std::set<std::string>::iterator jt = activeStreams.begin();
538536
jt != activeStreams.end(); ++jt){
539537
std::string streamname = *jt;
540-
if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){
541-
startPush(streamname, target);
542-
}
538+
startPush(streamname, target);
543539
}
544540
}
545541
// Return push list
@@ -588,7 +584,7 @@ namespace Controller{
588584
jsonForEach(Controller::Storage["autopushes"], it){
589585
if ((*it)[2u].asInt() && (*it)[2u].asInt() < Util::epoch()){continue;}
590586
const std::string &pStr = (*it)[0u].asStringRef();
591-
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
587+
if (Controller::streamMatches(streamname, pStr)){
592588
std::string stream = streamname;
593589
Util::sanitizeName(stream);
594590
// Check variable condition if it exists

0 commit comments

Comments
 (0)