Skip to content

Commit 4b2c281

Browse files
committed
Multi-subscribe requests
This allows the /subscribe and /unsubscribe HTTP endpoints and the push.subscribe/push.unsubscribe OMQ endpoints to be given a JSON array of objects to submit multiple subscriptions at once.
1 parent c3dc2a8 commit 4b2c281

File tree

3 files changed

+216
-116
lines changed

3 files changed

+216
-116
lines changed

spns/hivemind.cpp

Lines changed: 198 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <oxen/log.hpp>
1414
#include <oxenmq/zmq.hpp>
1515
#include <set>
16+
#include <stdexcept>
1617

1718
#include "blake2b.hpp"
1819
#include "hive/signature.hpp"
@@ -168,6 +169,8 @@ HiveMind::HiveMind(Config conf_in) :
168169
// "enc_key": "abcdef..." (32 bytes: 64 hex or 43 base64).
169170
// }
170171
//
172+
// or a JSON array of such objects.
173+
//
171174
// The `service_info` argument is passed along to the underlying notification provider
172175
// and must contain whatever info is required to send notifications to the device:
173176
// typically some device ID, and possibly other data. It is specific to each
@@ -185,6 +188,9 @@ HiveMind::HiveMind(Config conf_in) :
185188
//
186189
// { "success": true, "updated": true, "message": "Resubscription successful" }
187190
//
191+
// If given an array of objects then the response is an array of such return values for
192+
// each individual subscription item.
193+
//
188194
// Note that the "message" strings are subject to change and should not be relied on
189195
// programmatically; instead rely on the "error" or "success" values.
190196
.add_request_command(
@@ -917,7 +923,29 @@ void HiveMind::log_stats(std::string_view pre_cmd) {
917923
}
918924
}
919925

926+
static void sub_json_set_one_response(
927+
oxenmq::Message::DeferredSend&& m,
928+
nlohmann::json& response,
929+
size_t i,
930+
std::atomic<int>& remaining,
931+
bool multi,
932+
nlohmann::json val) {
933+
response[i] = std::move(val);
934+
935+
if (--remaining == 0) {
936+
// This is the last response set, so we have to send all the responses
937+
if (!multi)
938+
m(response[0].dump());
939+
else
940+
m(response.dump());
941+
}
942+
}
943+
920944
void HiveMind::on_notifier_validation(
945+
nlohmann::json& final_response,
946+
size_t i,
947+
std::atomic<int>& remaining,
948+
bool multi,
921949
bool success,
922950
oxenmq::Message::DeferredSend replier,
923951
std::string service,
@@ -1020,7 +1048,13 @@ void HiveMind::on_notifier_validation(
10201048
if (!message.empty())
10211049
response["message"] = std::move(message);
10221050

1023-
replier(response.dump());
1051+
sub_json_set_one_response(
1052+
std::move(replier),
1053+
final_response,
1054+
i,
1055+
remaining,
1056+
multi,
1057+
std::move(response));
10241058
}
10251059

10261060
std::tuple<SwarmPubkey, std::optional<Subaccount>, int64_t, Signature, std::string, nlohmann::json>
@@ -1062,123 +1096,174 @@ oxenmq::ConnectionID HiveMind::sub_unsub_service_conn(const std::string& service
10621096
service + " notification service not currently available"};
10631097
}
10641098

1065-
void HiveMind::on_subscribe(oxenmq::Message& m) {
1066-
ready_or_defer();
1099+
static void json_error(oxenmq::Message& m, hive::SUBSCRIBE err, std::string_view msg) {
1100+
int code = static_cast<int>(err);
1101+
log::debug(cat, "Replying with error code {}: {}", code, msg);
1102+
m.send_reply(nlohmann::json{{"error", code}, {"message", msg}}.dump());
1103+
}
10671104

1068-
// If these are set at the end we send them in reply.
1069-
std::optional<std::pair<hive::SUBSCRIBE, std::string>> error;
1105+
void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) {
1106+
ready_or_defer();
10701107

1108+
nlohmann::json args;
10711109
try {
1072-
auto args = nlohmann::json::parse(m.data.at(0));
1073-
1074-
auto [pubkey, subaccount, sig_ts, sig, service, service_info] = sub_unsub_args(args);
1075-
1076-
auto enc_key = from_hex_or_b64<EncKey>(args.at("enc_key").get<std::string_view>());
1077-
auto namespaces = args.at("namespaces").get<std::vector<int16_t>>();
1078-
1079-
auto conn = sub_unsub_service_conn(service);
1080-
1081-
auto reply_handler = [this,
1082-
service = service,
1083-
sub = std::make_shared<hive::Subscription>( // Throws on bad sig
1084-
pubkey,
1085-
std::move(subaccount),
1086-
args.at("namespaces").get<std::vector<int16_t>>(),
1087-
args.at("data").get<bool>(),
1088-
args.at("sig_ts").get<int64_t>(),
1089-
std::move(sig)),
1090-
pubkey = pubkey,
1091-
enc_key = std::move(enc_key),
1092-
replier = m.send_later()](
1093-
bool success, std::vector<std::string> data) mutable {
1094-
on_notifier_validation(
1095-
success,
1096-
std::move(replier),
1097-
std::move(service),
1098-
std::move(pubkey),
1099-
std::move(sub),
1100-
std::move(enc_key),
1101-
std::move(data));
1102-
};
1103-
1104-
// We handle everything else (including the response) in `_on_notifier_validation`
1105-
// when/if the notifier service comes back to us with the unique identifier:
1106-
omq_.request(
1107-
conn, "notifier.validate", std::move(reply_handler), service, service_info.dump());
1108-
1110+
args = nlohmann::json::parse(m.data.at(0));
11091111
} catch (const nlohmann::json::exception&) {
11101112
log::debug(cat, "Subscription failed: bad json");
1111-
error = {hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON"};
1112-
} catch (const std::out_of_range& e) {
1113-
log::debug(cat, "Sub failed: missing param {}", e.what());
1114-
error = {hive::SUBSCRIBE::BAD_INPUT, "Missing required parameter"};
1115-
} catch (const hive::subscribe_error& e) {
1116-
error = {e.code, e.what()};
1117-
} catch (const std::exception& e) {
1118-
log::debug(cat, "Exception handling input: {}", e.what());
1119-
error = {hive::SUBSCRIBE::ERROR, e.what()};
1113+
return json_error(m, hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON");
1114+
} catch (const std::out_of_range&) {
1115+
log::debug(cat, "Subscription failed: no request data provided");
1116+
return json_error(m, hive::SUBSCRIBE::BAD_INPUT, "Invalid request: missing request data");
1117+
}
1118+
if (!(args.is_array() || args.is_object())) {
1119+
log::debug(cat, "Subscription failed: bad json -- expected object or array");
1120+
return json_error(
1121+
m, hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON: expected object or array of objects");
11201122
}
11211123

1122-
if (error) {
1123-
int code = static_cast<int>(error->first);
1124-
log::debug(cat, "Replying with error code {}: {}", code, error->second);
1125-
m.send_reply(nlohmann::json{{"error", code}, {"message", error->second}}.dump());
1124+
const bool multi = args.is_array();
1125+
1126+
auto response = std::make_shared<nlohmann::json>();
1127+
*response = nlohmann::json::array();
1128+
auto remaining = std::make_shared<std::atomic<int>>(multi ? args.size() : 1);
1129+
1130+
if (!multi) {
1131+
// If given an object, convert to a single-element array (to make life easier below)
1132+
auto single = nlohmann::json::array();
1133+
single.push_back(std::move(args));
1134+
args = std::move(single);
11261135
}
1127-
// Otherwise the reply is getting deferred and handled later in on_notifier_validation
1128-
}
11291136

1130-
void HiveMind::on_unsubscribe(oxenmq::Message& m) {
1131-
ready_or_defer();
1137+
for (auto& e : args)
1138+
response->push_back(nlohmann::json::object());
11321139

1133-
// If these are set at the end we send them in reply.
1134-
std::optional<std::pair<hive::SUBSCRIBE, std::string>> error;
1140+
for (size_t i = 0; i < args.size(); i++) {
1141+
auto& e = args[i];
11351142

1136-
try {
1137-
auto args = nlohmann::json::parse(m.data.at(0));
1138-
1139-
auto [pubkey, subaccount, sig_ts, sig, service, service_info] = sub_unsub_args(args);
1140-
1141-
auto conn = sub_unsub_service_conn(service);
1142-
1143-
auto reply_handler = [this,
1144-
service = service,
1145-
pubkey = pubkey,
1146-
unsub = UnsubData{std::move(sig), std::move(subaccount), sig_ts},
1147-
replier = m.send_later()](
1148-
bool success, std::vector<std::string> data) mutable {
1149-
on_notifier_validation(
1150-
success,
1151-
std::move(replier),
1152-
std::move(service),
1153-
std::move(pubkey),
1154-
nullptr,
1155-
std::nullopt,
1156-
std::move(data),
1157-
std::move(unsub));
1158-
};
1143+
std::optional<std::pair<hive::SUBSCRIBE, std::string>> error;
11591144

1160-
omq_.request(
1161-
conn, "notifier.validate", std::move(reply_handler), service, service_info.dump());
1145+
try {
1146+
auto [pubkey, subaccount, sig_ts, sig, service, service_info] = sub_unsub_args(e);
1147+
1148+
auto conn = sub_unsub_service_conn(service);
1149+
1150+
oxenmq::OxenMQ::ReplyCallback reply_handler;
1151+
1152+
if (subscribe) {
1153+
auto enc_key = from_hex_or_b64<EncKey>(e.at("enc_key").get<std::string_view>());
1154+
auto namespaces = e.at("namespaces").get<std::vector<int16_t>>();
1155+
1156+
reply_handler = [this,
1157+
response,
1158+
i,
1159+
remaining,
1160+
multi,
1161+
service = service,
1162+
sub = std::make_shared<hive::Subscription>( // Throws on bad sig
1163+
pubkey,
1164+
std::move(subaccount),
1165+
e.at("namespaces").get<std::vector<int16_t>>(),
1166+
e.at("data").get<bool>(),
1167+
e.at("sig_ts").get<int64_t>(),
1168+
std::move(sig)),
1169+
pubkey = pubkey,
1170+
enc_key = std::move(enc_key),
1171+
replier = m.send_later()](
1172+
bool success, std::vector<std::string> data) mutable {
1173+
on_notifier_validation(
1174+
*response,
1175+
i,
1176+
*remaining,
1177+
multi,
1178+
success,
1179+
std::move(replier),
1180+
std::move(service),
1181+
std::move(pubkey),
1182+
std::move(sub),
1183+
std::move(enc_key),
1184+
std::move(data));
1185+
};
1186+
1187+
// We handle everything else (including the response) in `_on_notifier_validation`
1188+
// when/if the notifier service comes back to us with the unique identifier:
1189+
omq_.request(
1190+
conn,
1191+
"notifier.validate",
1192+
std::move(reply_handler),
1193+
service,
1194+
service_info.dump());
1195+
} else {
1196+
// unsubscribe
1197+
1198+
reply_handler = [this,
1199+
response,
1200+
i,
1201+
remaining,
1202+
multi,
1203+
service = service,
1204+
pubkey = pubkey,
1205+
unsub = UnsubData{std::move(sig), std::move(subaccount), sig_ts},
1206+
replier = m.send_later()](
1207+
bool success, std::vector<std::string> data) mutable {
1208+
on_notifier_validation(
1209+
*response,
1210+
i,
1211+
*remaining,
1212+
multi,
1213+
success,
1214+
std::move(replier),
1215+
std::move(service),
1216+
std::move(pubkey),
1217+
nullptr,
1218+
std::nullopt,
1219+
std::move(data),
1220+
std::move(unsub));
1221+
};
1222+
}
11621223

1163-
} catch (const nlohmann::json::exception&) {
1164-
log::debug(cat, "Unsubscription failed: bad json");
1165-
error = {hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON"};
1166-
} catch (const std::out_of_range& e) {
1167-
log::debug(cat, "Unsub failed: missing param {}", e.what());
1168-
error = {hive::SUBSCRIBE::BAD_INPUT, "Missing required parameter"};
1169-
} catch (const hive::subscribe_error& e) {
1170-
error = {e.code, e.what()};
1171-
} catch (const std::exception& e) {
1172-
log::debug(cat, "Exception handling input: {}", e.what());
1173-
error = {hive::SUBSCRIBE::ERROR, e.what()};
1174-
}
1224+
omq_.request(
1225+
conn,
1226+
"notifier.validate",
1227+
std::move(reply_handler),
1228+
service,
1229+
service_info.dump());
11751230

1176-
if (error) {
1177-
int code = static_cast<int>(error->first);
1178-
log::debug(cat, "Replying with error code {}: {}", code, error->second);
1179-
m.send_reply(nlohmann::json{{"error", code}, {"message", error->second}}.dump());
1231+
} catch (const std::out_of_range& e) {
1232+
log::debug(cat, "Sub failed: missing param {}", e.what());
1233+
error = {hive::SUBSCRIBE::BAD_INPUT, "Missing required parameter"};
1234+
} catch (const hive::subscribe_error& e) {
1235+
error = {e.code, e.what()};
1236+
} catch (const std::exception& e) {
1237+
log::debug(cat, "Exception handling input: {}", e.what());
1238+
error = {hive::SUBSCRIBE::ERROR, e.what()};
1239+
}
1240+
1241+
if (error) {
1242+
int code = static_cast<int>(error->first);
1243+
log::debug(
1244+
cat,
1245+
"Replying with {}subscribe error code {}: {}",
1246+
subscribe ? "" : "un",
1247+
code,
1248+
error->second);
1249+
sub_json_set_one_response(
1250+
m.send_later(),
1251+
*response,
1252+
i,
1253+
*remaining,
1254+
multi,
1255+
nlohmann::json{{"error", code}, {"message", error->second}});
1256+
}
1257+
// Otherwise the reply is getting deferred and handled later in on_notifier_validation
11801258
}
1181-
// Otherwise the reply is getting deferred and handled later in on_notifier_validation
1259+
}
1260+
1261+
void HiveMind::on_subscribe(oxenmq::Message& m) {
1262+
on_sub_unsub_impl(m, true);
1263+
}
1264+
1265+
void HiveMind::on_unsubscribe(oxenmq::Message& m) {
1266+
on_sub_unsub_impl(m, false);
11821267
}
11831268

11841269
void HiveMind::db_cleanup() {
@@ -1492,15 +1577,16 @@ void HiveMind::load_saved_subscriptions() {
14921577
log::info(cat, "Loading {} stored subscriptions from database", total);
14931578

14941579
int64_t count = 0, unique = 0;
1495-
for (auto [acc, ed, sub_tag, sub_sig, sig, sigts, wd, ns_arr] : txn
1496-
.stream<AccountID,
1497-
std::optional<Ed25519PK>,
1498-
std::optional<SubaccountTag>,
1499-
std::optional<Signature>,
1500-
Signature,
1501-
int64_t,
1502-
bool,
1503-
Int16ArrayLoader>(R"(
1580+
for (auto [acc, ed, sub_tag, sub_sig, sig, sigts, wd, ns_arr] :
1581+
txn
1582+
.stream<AccountID,
1583+
std::optional<Ed25519PK>,
1584+
std::optional<SubaccountTag>,
1585+
std::optional<Signature>,
1586+
Signature,
1587+
int64_t,
1588+
bool,
1589+
Int16ArrayLoader>(R"(
15041590
SELECT account, session_ed25519, subaccount_tag, subaccount_sig, signature, signature_ts, want_data,
15051591
ARRAY(SELECT namespace FROM sub_namespaces WHERE subscription = id ORDER BY namespace)
15061592
FROM subscriptions)")) {

spns/hivemind.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ class HiveMind {
220220

221221
using UnsubData = std::tuple<Signature, std::optional<Subaccount>, int64_t>;
222222
void on_notifier_validation(
223+
nlohmann::json& final_response,
224+
size_t i,
225+
std::atomic<int>& remaining,
226+
bool multi,
223227
bool success,
224228
oxenmq::Message::DeferredSend replier,
225229
std::string service,
@@ -241,8 +245,8 @@ class HiveMind {
241245
oxenmq::ConnectionID sub_unsub_service_conn(const std::string& service);
242246

243247
void on_subscribe(oxenmq::Message& m);
244-
245248
void on_unsubscribe(oxenmq::Message& m);
249+
void on_sub_unsub_impl(oxenmq::Message& m, bool subscribe);
246250

247251
void db_cleanup();
248252

0 commit comments

Comments
 (0)