diff --git a/daemon/Makefile b/daemon/Makefile index 7d0b112545..2f339e6b3c 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -127,7 +127,8 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c + codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c \ + redis-json.c json-helpers.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call.c b/daemon/call.c index 48bbeaf631..801aeb3008 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -151,7 +151,7 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) { } if (c->deleted && rtpe_now.tv_sec >= c->deleted - && c->last_signal <= c->deleted) + && c->last_signal.tv_sec <= c->deleted) goto delete; if (c->ml_deleted && rtpe_now.tv_sec >= c->ml_deleted) { @@ -1800,6 +1800,18 @@ static void __update_media_id(struct call_media *media, struct call_media *other } } +static void __update_rtpe_address(struct call_media* media, struct sdp_ng_flags *flags) { + struct packet_stream *ps; + + if (media->rtpe_connection_addr.len || !media->streams.head) + return; + + ps = media->streams.head->data; + media->rtpe_connection_addr.s = call_malloc(media->call, 64); + format_network_address(&media->rtpe_connection_addr, ps, flags, 0); + rlog(LOG_INFO, "Stored media address %s",media->rtpe_connection_addr.s); +} + /* called with call->master_lock held in W */ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, struct sdp_ng_flags *flags) @@ -1822,7 +1834,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, monologue = other_ml->active_dialogue; call = monologue->call; - call->last_signal = rtpe_now.tv_sec; + call->last_signal = rtpe_now; call->deleted = 0; __C_DBG("this="STR_FORMAT" other="STR_FORMAT, STR_FMT(&monologue->tag), STR_FMT(&other_ml->tag)); @@ -2022,6 +2034,8 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */ recording_setup_media(media); + __update_rtpe_address(media, flags); + __update_rtpe_address(other_media, flags); } return 0; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 92ef5d3aa3..3905ba514b 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -965,22 +965,8 @@ static const char *call_offer_answer_ng(bencode_item_t *input, /* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */ call = call_get(&flags.call_id); - /* Failover scenario because of timeout on offer response: siprouter tries - * to establish session with another rtpengine2 even though rtpengine1 - * might have persisted part of the session. rtpengine2 deletes previous - * call in memory and recreates an OWN call in redis */ - // SDP fragments for trickle ICE must always operate on an existing call if (opmode == OP_OFFER && !flags.fragment) { - if (call) { - if (IS_FOREIGN_CALL(call)) { - /* destroy call and create new one */ - rwlock_unlock_w(&call->master_lock); - call_destroy(call); - obj_put(call); - call = call_get_or_create(&flags.call_id, CT_OWN_CALL); - } - } - else { + if (!call) { /* call == NULL, should create call */ call = call_get_or_create(&flags.call_id, CT_OWN_CALL); } @@ -1361,7 +1347,7 @@ void ng_call_stats(struct call *call, const str *fromtag, const str *totag, benc bencode_dictionary_add_integer(output, "created", call->created.tv_sec); bencode_dictionary_add_integer(output, "created_us", call->created.tv_usec); - bencode_dictionary_add_integer(output, "last signal", call->last_signal); + bencode_dictionary_add_integer(output, "last signal", call->last_signal.tv_sec); ng_stats_ssrc(bencode_dictionary_add_dictionary(output, "SSRC"), call->ssrc_hash); tags = bencode_dictionary_add_dictionary(output, "tags"); @@ -1945,3 +1931,15 @@ int call_interfaces_init() { return 0; } + + +void format_network_address(str* o, struct packet_stream *ps, struct sdp_ng_flags *flags, int keep_unspec) { + if (!is_addr_unspecified(&flags->parsed_media_address)) + o->len = sprintf(o->s, "%s %s", + flags->parsed_media_address.family->rfc_name, + sockaddr_print_buf(&flags->parsed_media_address)); + else if (IS_FOREIGN_CALL(ps->call) && ps->media && ps->media->rtpe_connection_addr.len) + o->len = sprintf(o->s, "%s", ps->media->rtpe_connection_addr.s); + else + call_stream_address46(o->s, ps, SAF_NG, &o->len, NULL, keep_unspec); +} diff --git a/daemon/cdr.c b/daemon/cdr.c index be13145692..59d619f080 100644 --- a/daemon/cdr.c +++ b/daemon/cdr.c @@ -58,7 +58,7 @@ void cdr_update_entry(struct call* c) { ADJUSTLEN(printlen,cdrbufend,cdrbufcur); printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from); ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal.tv_sec); ADJUSTLEN(printlen,cdrbufend,cdrbufcur); printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos); ADJUSTLEN(printlen,cdrbufend,cdrbufcur); diff --git a/daemon/cli.c b/daemon/cli.c index 97e28b2e6b..7bfeb45634 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -617,7 +617,7 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) "\ncallid: %s\ndeletionmark: %s\ncreated: %i\nproxy: %s\ntos: %u\nlast_signal: %llu\n" "redis_keyspace: %i\nforeign: %s\n\n", c->callid.s, c->ml_deleted ? "yes" : "no", (int) c->created.tv_sec, c->created_from, - (unsigned int) c->tos, (unsigned long long) c->last_signal, c->redis_hosted_db, + (unsigned int) c->tos, (unsigned long long) c->last_signal.tv_sec, c->redis_hosted_db, IS_FOREIGN_CALL(c) ? "yes" : "no"); for (l = c->monologues.head; l; l = l->next) { diff --git a/daemon/codec.c b/daemon/codec.c index f548ac1cf6..085b561c50 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -1165,7 +1165,17 @@ void codec_packet_free(void *pp) { g_slice_free1(sizeof(*p), p); } - +str *codec_print_payload_type(const struct rtp_payload_type* pt) { + return str_sprintf( + "%s/" /* encoding */ + "%u/" /* clock_rate */ + "%i/" /* channels */ + "%i/" /* bitrate (opts) */ + "%i/" /* ptime (extra_opts) */ + "%s/" /* format_parameters(fmt_params) */ + /* the last part must end with '/', otherwise codec_make_payload_type won't read it*/ + ,pt->encoding.s, pt->clock_rate, pt->channels, pt->bitrate, pt->ptime, pt->format_parameters.s); +} struct rtp_payload_type *codec_make_payload_type(const str *codec_str, struct call_media *media) { str codec_fmt = *codec_str; diff --git a/daemon/json-helpers.c b/daemon/json-helpers.c new file mode 100644 index 0000000000..b132693207 --- /dev/null +++ b/daemon/json-helpers.c @@ -0,0 +1,167 @@ +#include "json-helpers.h" + +/** + * Helper method to create `str` instances from C-strings that glib uses often + * @param c C-style string where `strlen()` reports correct length. + * @return `str` string. Release using `free()`. + */ +static str *str_dup_charptr(const char *c) { + str temp = { .s = (char*)c, .len = strlen(c) }; + return str_dup(&temp); +} + +/** + * Helper method to test if the current node the reader is pointing to is a string value. + * Similar to `json_reader_is_value()` but will only return non-zero for values that are string values. + * @param reader JSON reader that is not in an error state + * @return non-zero if the current node is a string value and the reader is not in an error state. zero otherwise. + */ +static int json_reader_is_string(JsonReader* reader) { + JsonNode *node; + + node = json_reader_get_value(reader); + if (json_node_get_value_type(node) == G_TYPE_STRING) + return 1; + return 0; +} + +str *json_reader_get_str(JsonReader *reader, const char *key) { + const gchar *strval; + str *out = NULL; + json_reader_read_member(reader, key); + strval = json_reader_get_string_value(reader); + json_reader_end_member(reader); + if (strval) + out = str_dup_charptr(strval); + return out; +} + +str *json_object_get_str(JsonObject* json, const char *key) { + const gchar *strval = NULL; + str *out = NULL; + if (json_object_has_member(json, key)) + strval = json_object_get_string_member(json, key); + if (strval) + out = str_dup_charptr(strval); + return out; +} + +str *json_object_get_str_uri_enc(JsonObject *json, const char *key) { + str *strval = NULL; + strval = json_object_get_str(json, key); + if (!strval) + return NULL; + return str_uri_decode_len(strval->s, strval->len); +} + +str *json_array_get_str(JsonArray *json, unsigned idx) { + const gchar *strval; + str *out = NULL; + strval = json_array_get_string_element(json, idx); + if (strval) + out = str_dup_charptr(strval); + return out; +} + +str *json_reader_get_str_element(JsonReader *reader, unsigned idx) { + const gchar *strval = NULL; + str *out = NULL; + json_reader_read_element(reader, idx); + strval = json_reader_get_string_value(reader); + json_reader_end_element(reader); + if (strval) + out = str_dup_charptr(strval); + return out; +} + +long long json_reader_get_ll_element(JsonReader *reader, unsigned idx) { + str *strval; + long long out = -1; + if (json_reader_read_element(reader, idx)) { + if (json_reader_is_string(reader)) { + json_reader_end_element(reader); + strval = json_reader_get_str_element(reader, idx); + if (strval) { + out = strtoll(strval->s, NULL, 10); + free(strval); + } + } else { + out = json_reader_get_int_value(reader); + json_reader_end_element(reader); + } + } else + json_reader_end_element(reader); + return out; +} + +long long json_array_get_ll(JsonArray *json, unsigned idx) { + long long out = -1; + JsonNode *member; + + if (idx >= json_array_get_length(json)) + return out; + + member = json_array_get_element(json, idx); + if (json_node_get_value_type(member) == G_TYPE_STRING) { + str *strval = json_array_get_str(json, idx); + out = strtoll(strval->s, NULL, 10); + free(strval); + return out; + } + + return json_array_get_int_element(json, idx); // returns gint64 +} + +str *json_reader_get_string_value_uri_enc(JsonReader *reader) { + const char *s = json_reader_get_string_value(reader); + if (!s) + return NULL; + str *out = str_uri_decode_len(s, strlen(s)); + return out; +} + +long long json_reader_get_ll(JsonReader *reader, const char *key) { + long long r = -1; + + if (!json_reader_read_member(reader, key)) { + json_reader_end_member(reader); + return r; + } + if (json_reader_is_string(reader)) { + str *ret = json_reader_get_string_value_uri_enc(reader); + json_reader_end_member(reader); + r = strtoll(ret->s, NULL, 10); + free(ret); + return r; + } + /* not a string, lets assume integer */ + r = json_reader_get_int_value(reader); + json_reader_end_member(reader); + return r; +} + +long long json_object_get_ll(JsonObject *json, const char *key) { + long long r = -1; + JsonNode *member; + + if (!json_object_has_member(json, key)) + return r; + + member = json_object_get_member(json, key); + if (json_node_get_value_type(member) == G_TYPE_STRING) { + str *ret = json_object_get_str(json, key); + r = strtoll(ret->s, NULL, 10); + free(ret); + return r; + } + /* not a string, lets assume integer */ + return json_object_get_int_member(json, key); // returns gint64 +} + +JsonNode* json_reader_get_node(JsonReader *reader, const char *key) { + JsonNode* nodeval; + json_reader_read_member(reader, key); + nodeval = json_reader_get_value(reader); + json_reader_end_member(reader); + return nodeval; +} diff --git a/daemon/media_socket.c b/daemon/media_socket.c index b172017eb3..8450bcfb95 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -755,7 +755,9 @@ static void release_port(socket_t *r, struct intf_spec *spec) { iptables_del_rule(r); - if (close_socket(r) == 0) { + if (r->is_foreign) { + __C_DBG("port %u is foreign so release is not needed"); + } else if (close_socket(r) == 0) { __C_DBG("port %u is released", port); bit_array_clear(pp->ports_used, port); g_atomic_int_inc(&pp->free_ports); @@ -1634,7 +1636,7 @@ static int media_packet_address_check(struct packet_handler_ctx *phc) /* wait at least 3 seconds after last signal before committing to a particular * endpoint address */ - if (!phc->mp.call->last_signal || rtpe_now.tv_sec <= phc->mp.call->last_signal + 3) + if (!phc->mp.call->last_signal.tv_sec || rtpe_now.tv_sec <= phc->mp.call->last_signal.tv_sec + 3) goto update_peerinfo; confirm_now: @@ -2000,14 +2002,16 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); - ZERO(pi); - pi.fd = sfd->socket.fd; - pi.obj = &sfd->obj; - pi.readable = stream_fd_readable; - pi.closed = stream_fd_closed; + if (!sfd->socket.is_foreign) { + ZERO(pi); + pi.fd = sfd->socket.fd; + pi.obj = &sfd->obj; + pi.readable = stream_fd_readable; + pi.closed = stream_fd_closed; - if (poller_add_item(rtpe_poller, &pi)) - ilog(LOG_ERR, "Failed to add stream_fd to poller"); + if (poller_add_item(rtpe_poller, &pi)) + ilog(LOG_ERR, "Failed to add stream_fd to poller"); + } return sfd; } diff --git a/daemon/redis-json.c b/daemon/redis-json.c new file mode 100644 index 0000000000..7476489e8d --- /dev/null +++ b/daemon/redis-json.c @@ -0,0 +1,901 @@ +#include "redis-json.h" + +#include "json-helpers.h" +#include "log_funcs.h" + +#define JSON_UPDATE_NUM_FIELD_IF_SET(json, key, field) {\ + long long llval = json_object_get_ll(json, key); \ + if (llval >= 0) field = llval; \ +} + +#define JSON_UPDATE_BOOL_FIELD_IF_SET(json, key, field) {\ + long long llval = json_object_get_ll(json, key); \ + if (llval >= 0) field = llval ? TRUE : FALSE; \ +} + +/** For use with fields that support -1 (for "not set"), but are stored in JSON as unsigned int */ +#define JSON_UPDATE_SIGNED_NUM_FIELD_IF_SET(json, key, field) {\ + long long llval = json_object_get_ll(json, key); \ + if (llval >= 0) field = llval < 1000 ? llval : -1; \ +} + +#define JSON_UPDATE_NUM_FIELD_IF_SET_OR_FAIL(json, key, field) {\ + long long llval = json_object_get_ll(json, key); \ + if (llval >= 0) field = llval; \ + else goto fail; \ +} + +#define JSON_UPDATE_TVAL_FIELD_IF_SET(json, key, field) {\ + long long llval = json_object_get_ll(json, key); \ + if (llval >= 0) timeval_from_us(&field, llval); \ +} + +#define JSON_UPDATE_TVAL_FIELD_IF_SET_OR_FAIL(json, key, field) {\ + long long llval = json_object_get_ll(json, key); \ + if (llval >= 0) timeval_from_us(&field, llval); \ + else goto fail; \ +} + +/** + * Helper for using obj_put as a (*GDestroyNotify) parameter for glib. + * + * Use it to cleanup `GQueue*`s returned from redis-json calls. + * @param o gpointerdata that references a struct that extends `struct obj` + */ +void gdestroy_obj_put(void* o) { + obj_put_o(o); +} + +/** + * Retrieve a list of all `redis_call_media_stream_t` across all media in the call. + * @param callref a pointer to the `redis_call_t` data + */ +GQueue* redis_call_get_streams(redis_call_t* callref) { + GQueue* streams; + redis_call_media_t *media; + unsigned midx, sidx; + + streams = g_queue_new(); + for (midx = 0; midx < g_queue_get_length(callref->media); midx++) { + media = g_queue_peek_nth(callref->media, midx); + for (sidx = 0; sidx < g_queue_get_length(media->streams); sidx++) { + g_queue_push_tail(streams, obj_get((redis_call_media_stream_t*)g_queue_peek_nth(media->streams, sidx))); + } + } + + return streams; +} + + +static void redis_call_media_stream_fd_free(void *rcmsf) { + redis_call_media_stream_fd_t *streamfdref = rcmsf; + if (!streamfdref) + return; + if (streamfdref->pref_family) + free(streamfdref->pref_family); + if (streamfdref->logical_intf) + free(streamfdref->logical_intf); +} + +static redis_call_media_stream_fd_t *redis_call_media_stream_fd_create(unsigned unique_id, JsonObject *json) { + redis_call_media_stream_fd_t *streamfdref = NULL; + + streamfdref = obj_alloc0("redis_call_media_stream_fd", sizeof(*streamfdref), redis_call_media_stream_fd_free); + streamfdref->unique_id = unique_id; + JSON_UPDATE_NUM_FIELD_IF_SET_OR_FAIL(json, "stream", streamfdref->stream_unique_id); + streamfdref->pref_family = json_object_get_str(json, "pref_family"); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "localport", streamfdref->localport); + streamfdref->logical_intf = json_object_get_str(json, "logical_intf"); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "local_intf_uid", streamfdref->logical_intf_uid); + + goto done; + +fail: + if (streamfdref) { + obj_put(streamfdref); + streamfdref = NULL; + } + +done: + return streamfdref; +} + +static void redis_call_media_stream_free(void *rcms) { + redis_call_media_stream_t *streamref = rcms; + if (!streamref) + return; + if (streamref->endpoint) + free(streamref->endpoint); + if (streamref->advertised_endpoint) + free(streamref->advertised_endpoint); + if (streamref->fds) + g_queue_free_full(streamref->fds, gdestroy_obj_put); +} + +static redis_call_media_stream_t *redis_call_media_stream_create(unsigned unique_id, JsonObject *json, GQueue *sfds) { + redis_call_media_stream_t *streamref = NULL; + redis_call_media_stream_fd_t *streamfdref; + + unsigned idx; + + streamref = obj_alloc0("redis_call_media_stream", sizeof(*streamref), redis_call_media_stream_free); + streamref->unique_id = unique_id; + JSON_UPDATE_NUM_FIELD_IF_SET_OR_FAIL(json, "media", streamref->media_unique_id); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "sfd", streamref->selected_sfd); + JSON_UPDATE_SIGNED_NUM_FIELD_IF_SET(json, "rtp_sink", streamref->rtp_sink); + JSON_UPDATE_SIGNED_NUM_FIELD_IF_SET(json, "rtcp_sink", streamref->rtcp_sink); + JSON_UPDATE_SIGNED_NUM_FIELD_IF_SET(json, "rtcp_sibling", streamref->rtcp_sibling); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "last_packet", streamref->last_packet); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "ps_flags", streamref->ps_flags); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "component", streamref->component); + streamref->endpoint = json_object_get_str(json, "endpoint"); + streamref->advertised_endpoint = json_object_get_str(json, "advertised_endpoint"); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "stats-packets", streamref->stats_packets); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "stats-bytes", streamref->stats_bytes); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "stats-errors", streamref->stats_errors); + + /* grab my fds */ + streamref->fds = g_queue_new(); + for (idx = 0; idx < g_queue_get_length(sfds); idx++) { + streamfdref = g_queue_peek_nth(sfds, idx); + if (streamfdref && streamfdref->stream_unique_id == streamref->unique_id) + g_queue_push_tail(streamref->fds, obj_get(streamfdref)); + } + + goto done; + +fail: + if (streamref) { + obj_put(streamref); + streamref = NULL; + } + +done: + return streamref; +} + +static void redis_call_rtp_payload_type_free(void *rcrpt) { + redis_call_rtp_payload_type_t *payloadref = rcrpt; + if (!payloadref) + return; + if (payloadref->codec_str) + free(payloadref->codec_str); +} + +static redis_call_rtp_payload_type_t *redis_call_rtp_payload_type_create(unsigned payload_type, str* payload_string) { + redis_call_rtp_payload_type_t *payloadref; + + payloadref = obj_alloc0("redis_call_rtp_payload_type", sizeof(*payloadref), redis_call_rtp_payload_type_free); + payloadref->payload_type = payload_type; + payloadref->codec_str = str_dup(payload_string); + return payloadref; +} + +static void redis_call_media_endpoint_map_free(void *rcmem) { + redis_call_media_endpoint_map_t *mapref = rcmem; + if (!mapref) + return; + if (mapref->intf_preferred_family) + free(mapref->intf_preferred_family); + if (mapref->logical_intf) + free(mapref->logical_intf); + if (mapref->endpoint) + free(mapref->endpoint); +} + +static redis_call_media_endpoint_map_t *redis_call_media_endpoint_map_create(unsigned unique_id, JsonObject *json) { + redis_call_media_endpoint_map_t *mapref; + + mapref = obj_alloc0("redis_call_media_endpoint_map", sizeof(*mapref), redis_call_media_endpoint_map_free); + mapref->unique_id = unique_id; + mapref->wildcard = json_object_get_ll(json, "wildcard"); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "num_ports", mapref->num_ports); + mapref->intf_preferred_family = json_object_get_str(json, "intf_preferred_family"); + mapref->logical_intf = json_object_get_str(json, "logical_intf"); + mapref->endpoint = json_object_get_str(json, "endpoint"); + return mapref; +} + +static void redis_call_media_tag_free(void *rcmt) { + redis_call_media_tag_t *tagref = rcmt; + if (!tagref) + return; + if (tagref->tag) + free(tagref->tag); + if (tagref->viabranch) + free(tagref->viabranch); + if (tagref->label) + free(tagref->label); + if (tagref->other_tag) + obj_put(tagref->other_tag); +} + +static redis_call_media_tag_t *redis_call_media_tag_create(unsigned unique_id, JsonObject *json) { + redis_call_media_tag_t *tagref = NULL; + + tagref = obj_alloc0("redis_call_media_tag", sizeof(*tagref), redis_call_media_tag_free); + tagref->unique_id = unique_id; + JSON_UPDATE_NUM_FIELD_IF_SET_OR_FAIL(json, "created", tagref->created); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "active", tagref->active); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "deleted", tagref->deleted); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "block_dtmf", tagref->block_dtmf); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "block_media", tagref->block_media); + tagref->tag = json_object_get_str(json, "tag"); + tagref->viabranch = json_object_get_str(json, "viabranch"); + tagref->label = json_object_get_str(json, "label"); + + goto done; + +fail: + if (tagref) { + obj_put(tagref); + tagref = NULL; + } + +done: + return tagref; +} + +static void redis_call_media_sdes_free(void *rcms) { + redis_call_media_sdes_t *sdesref = rcms; + if (!sdesref) + return; + if (sdesref->crypto_suite_name) + free(sdesref->crypto_suite_name); + if (sdesref->master_key) + free(sdesref->master_key); + if (sdesref->master_salt) + free(sdesref->master_salt); + if (sdesref->mki) + free(sdesref->mki); +} + +static redis_call_media_sdes_t *redis_call_media_sdes_create(const char *prefix, JsonObject* json) { + redis_call_media_sdes_t *sdesref; + str *fieldname; + + sdesref = obj_alloc0("redis_call_media_sdes", sizeof(*sdesref), redis_call_media_sdes_free); + fieldname = str_sprintf("%s_tag", prefix); + JSON_UPDATE_NUM_FIELD_IF_SET_OR_FAIL(json, fieldname->s, sdesref->tag); + free(fieldname); + fieldname = str_sprintf("%s-crypto_suite", prefix); + sdesref->crypto_suite_name = json_object_get_str(json, fieldname->s); + free(fieldname); + fieldname = str_sprintf("%s-master_key", prefix); + sdesref->master_key = json_object_get_str_uri_enc(json, fieldname->s); + free(fieldname); + fieldname = str_sprintf("%s-master_salt", prefix); + sdesref->master_salt = json_object_get_str_uri_enc(json, fieldname->s); + free(fieldname); + fieldname = str_sprintf("%s-mki", prefix); + sdesref->mki = json_object_get_str(json, fieldname->s); + free(fieldname); + fieldname = str_sprintf("%s-unenc-srtp", prefix); + JSON_UPDATE_NUM_FIELD_IF_SET(json, fieldname->s, sdesref->session_params.unencrypted_srtp); + free(fieldname); + fieldname = str_sprintf("%s-unenc-srtcp", prefix); + JSON_UPDATE_NUM_FIELD_IF_SET(json, fieldname->s, sdesref->session_params.unencrypted_srtcp); + free(fieldname); + fieldname = str_sprintf("%s-unauth-srtp", prefix); + JSON_UPDATE_NUM_FIELD_IF_SET(json, fieldname->s, sdesref->session_params.unauthenticated_srtp); + + goto done; + +fail: + ilog(LOG_WARNING, "Failed to read crypto params %s from Redis", prefix); + if (sdesref) { + obj_put(sdesref); + sdesref = NULL; + } + +done: + if (fieldname) + free(fieldname); + return sdesref; +} + +static void redis_call_media_free(void* rcm) { + redis_call_media_t *mediaref = rcm; + if (!mediaref) + return; + if (mediaref->type) + free(mediaref->type); + if (mediaref->protocol) + free(mediaref->protocol); + if (mediaref->desired_family) + free(mediaref->desired_family); + if (mediaref->logical_intf) + free(mediaref->logical_intf); + if (mediaref->rtpe_addr) + free(mediaref->rtpe_addr); + if (mediaref->tag) + obj_put(mediaref->tag); + if (mediaref->endpoint_maps) + g_queue_free_full(mediaref->endpoint_maps, gdestroy_obj_put); + if (mediaref->streams) + g_queue_free_full(mediaref->streams, gdestroy_obj_put); + if (mediaref->codec_prefs_recv) + g_queue_free_full(mediaref->codec_prefs_recv, gdestroy_obj_put); + if (mediaref->codec_prefs_send) + g_queue_free_full(mediaref->codec_prefs_send, gdestroy_obj_put); + if (mediaref->sdes_in) + g_queue_free_full(mediaref->sdes_in, gdestroy_obj_put); + if (mediaref->sdes_out) + g_queue_free_full(mediaref->sdes_out, gdestroy_obj_put); + if (mediaref->fingerprint.hash_func_name) + free(mediaref->fingerprint.hash_func_name); + if (mediaref->fingerprint.fingerprint) + free(mediaref->fingerprint.fingerprint); +} + +static GQueue *redis_call_media_read_payloads(JsonArray* payload_types) { + GQueue *out; + JsonReader* reader = NULL; + redis_call_rtp_payload_type_t *payload; + unsigned payload_count; + str* payload_str = NULL; + str ptype; + unsigned idx, pt; + + char *err = 0; + + /* read payloads */ + out = g_queue_new(); + payload_count = json_array_get_length(payload_types); + for (idx = 0; idx < payload_count; idx++) { + payload_str = json_array_get_str(payload_types, idx); + if (str_token(&ptype, payload_str, '/')) { + err = "No payload type in payload data"; + goto fail; + } + + pt = str_to_ui(&ptype, 0); + payload = redis_call_rtp_payload_type_create(pt, payload_str); + if (!payload) { + err = "Failed to create payload"; + goto fail; + } + g_queue_push_tail(out, payload); + + free(payload_str); + payload_str = NULL; + } + + goto done; + +fail: + if (out) { + g_queue_free_full(out, gdestroy_obj_put); + out = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + if (payload_str) + free(payload_str); + if (reader) + g_object_unref(reader); + return out; +} + +static GQueue* redis_call_media_try_read_sdes(const char* prefix, JsonObject *json) { + /* unlike all the other shit not-really-JSON that redis.c pulls, this time it encodes a list of items into the media + * object itself, where the list index is encoded as "-%u" between the "type prefix" and the field name - but only + * if its the second or later element. That code is fscking insane */ + GQueue *out = NULL; + redis_call_media_sdes_t *sdesref = NULL; + str *testfield = NULL, *prefixfield = NULL; + int idx; + + /* check if we have any sdes for prefix */ + testfield = str_sprintf("%s_tag", prefix); + if (!json_object_has_member(json, testfield->s)) + goto done; /* nope */ + + out = g_queue_new(); + sdesref = redis_call_media_sdes_create(prefix, json); + if (!sdesref) { /* shouldn't happen, because we tested, but JSON might be broken */ + ilog(LOG_WARNING, "crypto params %s are broken", prefix); + goto fail; + } + + g_queue_push_tail(out, sdesref); + for (idx = 1; ; idx++) { + free(testfield); + prefixfield = str_sprintf("%s-%u", prefix, idx); + /* check if we have more sdes for prefix */ + testfield = str_sprintf("%s_tag", prefixfield->s); + if (!json_object_has_member(json, testfield->s)) + goto done; /* nope */ + sdesref = redis_call_media_sdes_create(prefixfield->s, json); + if (!sdesref) { + ilog(LOG_WARNING, "crypto params %s are broken", prefixfield->s); + break; + } + free(prefixfield); + g_queue_push_tail(out, sdesref); + } + + goto done; + +fail: + if (out) { + g_queue_free_full(out, gdestroy_obj_put); + out = NULL; + } + +done: + if (testfield) + free(testfield); + if (prefixfield) + free(prefixfield); + return out; +} + +static redis_call_media_t *redis_call_media_create(unsigned unique_id, JsonObject *json, GQueue *tags, GQueue *streams, + JsonArray *stream_ids_ar, GQueue* endpoint_maps, JsonArray *endpoint_maps_ar, JsonArray *payload_types_recv_ar, + JsonArray *payload_types_send_ar) { + redis_call_media_t *mediaref = NULL; + redis_call_media_tag_t *tagref = NULL; + redis_call_media_stream_t *streamref = NULL; + redis_call_media_endpoint_map_t *mapref = NULL; + + char *err = 0; + long long llval = 0; + unsigned idx; + + mediaref = obj_alloc0("redis_call_media", sizeof(*mediaref), redis_call_media_free); + mediaref->unique_id = unique_id; + if ((llval = json_object_get_ll(json, "tag")) >= 0) { + tagref = g_queue_peek_nth(tags, llval); + if (!tagref) { + err = "Failed to find referenced tag when creating media"; + goto fail; + } + mediaref->tag = obj_get(tagref); + } + JSON_UPDATE_NUM_FIELD_IF_SET(json, "index", mediaref->index); + mediaref->type = json_object_get_str(json, "type"); + mediaref->protocol = json_object_get_str(json, "protocol"); + mediaref->desired_family = json_object_get_str(json, "desired_family"); + mediaref->logical_intf = json_object_get_str(json, "logical_intf"); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "ptime", mediaref->ptime); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "media_flags", mediaref->media_flags); + mediaref->rtpe_addr = json_object_get_str(json, "rtpe_addr"); + + /* try to read crypto params, if exist */ + mediaref->sdes_in = redis_call_media_try_read_sdes("sdes_in", json); /* we get NULL on failure, which is what we want */ + mediaref->sdes_out = redis_call_media_try_read_sdes("sdes_out", json); + if (json_object_has_member(json, "hash_func")) { + mediaref->fingerprint.hash_func_name = json_object_get_str(json, "hash_func"); + mediaref->fingerprint.fingerprint = json_object_get_str_uri_enc(json, "fingerprint"); + } + + /* grab my streams */ + mediaref->streams = g_queue_new(); + for (idx = 0; idx < g_queue_get_length(streams); idx++) { + streamref = g_queue_peek_nth(streams, idx); + if (streamref && streamref->media_unique_id == mediaref->unique_id) + g_queue_push_tail(mediaref->streams, obj_get(streamref)); + } + + if (!(mediaref->codec_prefs_recv = redis_call_media_read_payloads(payload_types_recv_ar))) { + err = "Failed to read recv payloads"; + goto fail; + } + if (!(mediaref->codec_prefs_send = redis_call_media_read_payloads(payload_types_send_ar))) { + err = "Failed to read send payloads"; + goto fail; + } + mediaref->endpoint_maps = g_queue_new(); + for (idx = 0; idx < json_array_get_length(endpoint_maps_ar); idx++) { + unsigned map_id = json_array_get_ll(endpoint_maps_ar, idx); + mapref = g_queue_peek_nth(endpoint_maps, map_id); + if (!mapref) { + err = "Failed to find endpoint map for media"; + goto fail; + } + g_queue_push_tail(mediaref->endpoint_maps, obj_get(mapref)); + } + + goto done; + +fail: + if (mediaref) { + obj_put(mediaref); + mediaref = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + return mediaref; +} + +static void redis_call_free(void* rc) { + redis_call_t *callref = rc; + if (!callref) + return; + if (callref->media) + g_queue_free_full(callref->media, gdestroy_obj_put); + if (callref->call_id) + free(callref->call_id); + if (callref->created_from) + free(callref->created_from); + if (callref->created_from_addr) + free(callref->created_from_addr); + if (callref->recording_metadata) + free(callref->recording_metadata); +} + +static redis_call_t* redis_call_create_from_metadata(const str* callid, JsonObject* json) { + redis_call_t *callref = NULL; + + callref = obj_alloc0("redis_call", sizeof(*callref), redis_call_free); + callref->call_id = str_dup(callid); + JSON_UPDATE_TVAL_FIELD_IF_SET_OR_FAIL(json, "created", callref->created); + JSON_UPDATE_TVAL_FIELD_IF_SET(json, "last_signal", callref->last_signal); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "deleted", callref->deleted); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "ml_deleted", callref->ml_deleted); + callref->created_from = json_object_get_str(json, "created_from"); + callref->created_from_addr = json_object_get_str(json, "created_from_addr"); + JSON_UPDATE_NUM_FIELD_IF_SET(json, "redis_hosted_db", callref->redis_hosted_db); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "block_dtmf", callref->block_dtmf); + JSON_UPDATE_BOOL_FIELD_IF_SET(json, "block_media", callref->block_media); + + goto done; + +fail: + if (callref) { + obj_put(callref); + callref = NULL; + } + +done: + return callref; +} + +static int redis_call_match_tags(redis_call_media_tag_t *tag, GQueue *call_tags, JsonArray *json) { + redis_call_media_tag_t *other_tag; + int status = 1; + unsigned num_others, other_idx, other_tagid; + + char *err = 0; + + num_others = json_array_get_length(json); + if (num_others < 0) { + err = "Negative number of other tags members?!?"; + goto fail; + } + + for (other_idx = 0; other_idx < num_others; other_idx++) { + other_tagid = json_array_get_ll(json, other_idx); + if (other_tagid < 0) { + err = "Failed to read other tag id from tag list"; + goto fail; + } + other_tag = g_queue_peek_nth(call_tags, other_tagid); + if (!other_tag) { + err = "Other tag id doesn't exist in tag list!"; + goto fail; + } + tag->other_tag = obj_get(other_tag); + other_tag->other_tag = obj_get(tag); + } + + goto done; + +fail: + status = 0; + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + return status; +} + +static GQueue *redis_call_read_tags(JsonObject *json) { + GQueue *call_tags = NULL; + unsigned tag_idx, other_tags_idx; + str *tag_field = NULL; + redis_call_media_tag_t *tag = NULL; + JsonObject *tag_object = NULL; + JsonArray *othertags_ar = NULL; + + char *err = 0; + + call_tags = g_queue_new(); + for (tag_idx = 0; ; tag_idx++) { + tag_field = str_sprintf("tag-%u", tag_idx); + if (!json_object_has_member(json, tag_field->s)) + break; /* no more tags */ + tag_object = json_object_get_object_member(json, tag_field->s); + tag = redis_call_media_tag_create(tag_idx, tag_object); + if (!tag) { + err = "Failed to create media tag"; + goto fail; + } + g_queue_push_tail(call_tags, tag); + free(tag_field); + } + + for (other_tags_idx = 0; other_tags_idx < tag_idx; other_tags_idx++) { + free(tag_field); + tag_field = str_sprintf("other_tags-%d", other_tags_idx); + othertags_ar = json_object_get_array_member(json, tag_field->s); + if (othertags_ar) { + tag = g_queue_peek_nth(call_tags, other_tags_idx); + if (!tag) {/* shouldn't actually happen, but we're sanity-first! */ + err = "The world is insane - no matching tag and other tag"; + goto fail; + } + if (!redis_call_match_tags(tag, call_tags, othertags_ar)) { + err = "Failed to match call tags and other tags"; + goto fail; + } + } /* missing other_tags list is treated like an empty list */ + } + + goto done; + +fail: + if (call_tags) { + g_queue_free_full(call_tags, gdestroy_obj_put); + call_tags = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + if (tag_field) + free(tag_field); + return call_tags; +} + +static GQueue *redis_call_read_stream_fds(JsonObject *json) { + GQueue *call_sfds; + unsigned sfd_idx; + str* sfd_field; + JsonObject *sfd_object; + redis_call_media_stream_fd_t* sfd; + + char *err = 0; + + call_sfds = g_queue_new(); + for (sfd_idx = 0; ; sfd_idx++) { + sfd_field = str_sprintf("sfd-%u", sfd_idx); + if (!json_object_has_member(json, sfd_field->s)) + goto done; /* no more sfds */ + sfd_object = json_object_get_object_member(json, sfd_field->s); + sfd = redis_call_media_stream_fd_create(sfd_idx, sfd_object); + if (!sfd) { + err = "Failed to create stream fd"; + goto fail; + } + g_queue_push_tail(call_sfds, sfd); + free(sfd_field); + } + + /* we shouldn't reach this point, but just playing it safe */ + sfd_field = NULL; + goto done; + +fail: + if (call_sfds) { + g_queue_free_full(call_sfds, gdestroy_obj_put); + call_sfds = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + if (sfd_field) + free(sfd_field); + return call_sfds; +} + +static GQueue *redis_call_read_streams(JsonObject *json) { + GQueue *call_streams = NULL, *call_sfds = NULL; + unsigned stream_idx; + str *stream_field = NULL; + JsonObject *stream_object = NULL; + redis_call_media_stream_t *stream; + + char *err = 0; + + if (!(call_sfds = redis_call_read_stream_fds(json))) { + err = "Failed to read stream fds"; + goto fail; + } + call_streams = g_queue_new(); + for (stream_idx = 0; ; stream_idx++) { + stream_field = str_sprintf("stream-%u", stream_idx); + if (!json_object_has_member(json, stream_field->s)) + goto done; /* no more streams */ + stream_object = json_object_get_object_member(json, stream_field->s); + stream = redis_call_media_stream_create(stream_idx, stream_object, call_sfds); + if (!stream) { + err = "Failed to create call media stream"; + goto fail; + } + g_queue_push_tail(call_streams, stream); + free(stream_field); + } + + /* we shouldn't reach this point, but just playing it safe */ + stream_field = NULL; + goto done; + +fail: + if (call_streams) { + g_queue_free_full(call_streams, gdestroy_obj_put); + call_streams = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + if (stream_field) + free(stream_field); + if (call_sfds) + g_queue_free_full(call_sfds, gdestroy_obj_put); + return call_streams; +} + +static GQueue *redis_call_read_media_endpoint_maps(JsonObject *json) { + GQueue *media_endpoint_maps; + unsigned endpoint_map_idx; + str *endpoint_map_field = NULL; + JsonObject *endpoint_map_object = NULL; + redis_call_media_endpoint_map_t *map; + + char *err = 0; + + media_endpoint_maps = g_queue_new(); + for (endpoint_map_idx =0; ; endpoint_map_idx++) { + endpoint_map_field = str_sprintf("map-%u", endpoint_map_idx); + if (!json_object_has_member(json, endpoint_map_field->s)) + goto done; /* no more maps */ + endpoint_map_object = json_object_get_object_member(json, endpoint_map_field->s); + map = redis_call_media_endpoint_map_create(endpoint_map_idx, endpoint_map_object); + if (!map) { + err = "Failed to create call media endpoint map"; + goto fail; + } + g_queue_push_tail(media_endpoint_maps, map); + free(endpoint_map_field); + } + + /* we shouldn't reach this point, but just playing it safe */ + endpoint_map_field = NULL; + goto done; + +fail: + if (media_endpoint_maps) { + g_queue_free_full(media_endpoint_maps, gdestroy_obj_put); + media_endpoint_maps = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + if (endpoint_map_field) + free(endpoint_map_field); + return media_endpoint_maps; +} + +static GQueue *redis_call_read_media(JsonObject *json) { + int media_idx; + GQueue *call_media = NULL, *call_tags = NULL, *call_streams = NULL, *media_endpoint_maps = NULL; + JsonObject *media_object = NULL; + JsonArray *stream_ids_ar = NULL, *endpoint_maps_ar = NULL, *payload_types_recv_ar = NULL, *payload_types_send_ar = NULL; + redis_call_media_t *media = NULL; + + char *err = 0; + char fieldname[50]; + + if (!(call_tags = redis_call_read_tags(json))) { + err = "Failed to read call tags"; + goto fail; + } + if (!(call_streams = redis_call_read_streams(json))) { + err = "Failed to read call streams"; + goto fail; + } + if (!(media_endpoint_maps = redis_call_read_media_endpoint_maps(json))) { + err = "Failed to read call media endpoint maps"; + goto fail; + } + call_media = g_queue_new(); + for (media_idx = 0; ; media_idx++) { + snprintf(fieldname, sizeof(fieldname), "media-%u", media_idx); + if (!json_object_has_member(json, fieldname)) /* no more media */ + goto done; + media_object = json_object_get_object_member(json, fieldname); + snprintf(fieldname, sizeof(fieldname), "streams-%u", media_idx); + stream_ids_ar = json_object_get_array_member(json, fieldname); + snprintf(fieldname, sizeof(fieldname), "maps-%u", media_idx); + endpoint_maps_ar = json_object_get_array_member(json, fieldname); + snprintf(fieldname, sizeof(fieldname), "payload_types-%u", media_idx); + payload_types_recv_ar = json_object_get_array_member(json, fieldname); + snprintf(fieldname, sizeof(fieldname), "payload_types_send-%u", media_idx); + payload_types_send_ar = json_object_get_array_member(json, fieldname); + if (!stream_ids_ar || !endpoint_maps_ar || !payload_types_recv_ar || !payload_types_send_ar) { + err = "Missing streams, maps or payloads"; + goto fail; + } + media = redis_call_media_create(media_idx, media_object, call_tags, call_streams, stream_ids_ar, + media_endpoint_maps, endpoint_maps_ar, payload_types_recv_ar, payload_types_send_ar); + if (!media) { + err = "Failed to create call media"; + goto fail; + } + g_queue_push_tail(call_media, media); + } + + /* not supposed to get here, but just making sure */ + goto done; + +fail: + if (call_media) + g_queue_free_full(call_media, gdestroy_obj_put); + if (err) { + ilog(LOG_WARNING, "Failed to read call data from Redis: %s", err); + } + +done: + if (call_tags) + g_queue_free_full(call_tags, gdestroy_obj_put); + if (call_streams) + g_queue_free_full(call_streams, gdestroy_obj_put); + if (media_endpoint_maps) + g_queue_free_full(media_endpoint_maps, gdestroy_obj_put); + return call_media; +} + +redis_call_t* redis_call_create(const str* callid, JsonNode* json) { + redis_call_t *callref = NULL; + JsonObject *root = NULL; + JsonObject *metadata = NULL; + + char *err = 0; + + root = json_node_get_object(json); + metadata = json_object_get_object_member(root, "json"); + if (!metadata) { + err = "Could not find call data"; + goto fail; + } + callref = redis_call_create_from_metadata(callid, metadata); + if (!callref) { + err = "Failed to read call data"; + goto fail; + } + if (!(callref->media = redis_call_read_media(root))) { + err = "Failed to read call media"; + goto fail; + } + + goto done; + +fail: + if (callref) { + obj_put(callref); + callref = NULL; + } + if (err) { + ilog(LOG_WARNING, "Failed to read call data '" STR_FORMAT_M "' from Redis: %s", + STR_FMT_M(callid), err); + } + +done: + return callref; +} diff --git a/daemon/redis.c b/daemon/redis.c index fb43d3ccd7..ab6934f595 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -32,6 +32,8 @@ #include "ssrc.h" #include "main.h" #include "codec.h" +#include "redis-json.h" +#include "json-helpers.h" struct redis *rtpe_redis; struct redis *rtpe_redis_write; @@ -80,8 +82,10 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define REDIS_FMT(x) (int) (x)->len, (x)->str static int redis_check_conn(struct redis *r); -static void json_restore_call(struct redis *r, const str *id, enum call_type type); +static void redis_update_call(str *callid, struct redis *r, struct call *call); +static int redis_update_call_crypto(struct call_media *m, redis_call_media_t *media); static int redis_connect(struct redis *r, int wait); +static char* redis_encode_json(struct call *c); static void redis_pipe(struct redis *r, const char *fmt, ...) { va_list ap; @@ -362,14 +366,8 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) c = call_get(&callid); if (c) { rwlock_unlock_w(&c->master_lock); - if (IS_FOREIGN_CALL(c)) - call_destroy(c); - else { - rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: %s\n", rr->element[2]->str); - goto err; - } } - json_restore_call(r, &callid, CT_FOREIGN_CALL); + redis_update_call(&callid, r, c); } if (strncmp(rr->element[3]->str,"del",3)==0) { @@ -762,23 +760,6 @@ INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const ch str_uri_encode_len(enc, tmp, len); json_builder_add_string_value(builder,enc); } -INLINE str *json_reader_get_string_value_uri_enc(JsonReader *root_reader) { - const char *s = json_reader_get_string_value(root_reader); - if (!s) - return NULL; - str *out = str_uri_decode_len(s, strlen(s)); - return out; // must be free'd -} -// XXX rework restore procedure to use functions like this everywhere and eliminate the GHashTable -INLINE long long json_reader_get_ll(JsonReader *root_reader, const char *key) { - if (!json_reader_read_member(root_reader, key)) - return -1; - str *ret = json_reader_get_string_value_uri_enc(root_reader); - long long r = strtoll(ret->s, NULL, 10); - free(ret); - json_reader_end_member(root_reader); - return r; -} static int json_get_hash(struct redis_hash *out, const char *key, unsigned int id, JsonReader *root_reader) @@ -886,7 +867,7 @@ static struct timeval strtotimeval(const char *c, char **endp, int base) { define_get_int_type(time_t, time_t, strtoull); define_get_int_type(timeval, struct timeval, strtotimeval); define_get_int_type(int, int, strtol); -define_get_int_type(unsigned, unsigned int, strtol); +define_get_int_type(unsigned, unsigned int, strtoul); //define_get_int_type(u16, u_int16_t, strtol); //define_get_int_type(u64, u_int64_t, strtoull); define_get_int_type(a64, atomic64, strtoa64); @@ -936,8 +917,8 @@ static int redis_hash_get_endpoint(struct endpoint *out, const struct redis_hash if (redis_hash_get_str(&s, h, k)) return -1; - if (endpoint_parse_any(out, s.s)) - return -1; + if (s.len && endpoint_parse_any(out, s.s)) + return -1; return 0; } @@ -1089,7 +1070,7 @@ static int redis_hash_get_sdes_params(GQueue *out, const struct redis_hash *h, c char key[32], tagkey[64]; const char *kk = k; unsigned int tag; - unsigned int iter = 0; + unsigned int iter = 1; while (1) { snprintf(tagkey, sizeof(tagkey), "%s_tag", kk); @@ -1104,8 +1085,8 @@ static int redis_hash_get_sdes_params(GQueue *out, const struct redis_hash *h, c return 0; return -1; } - g_queue_push_tail(out, cps); + snprintf(key, sizeof(key), "%s-%u", k, iter++); kk = key; } @@ -1155,14 +1136,21 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) { if (!loc) goto err; - err = "failed to open ports"; - if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid)) - goto err; - err = "no port returned"; - sock = g_queue_pop_head(&q); - if (!sock) - goto err; - set_tos(sock, c->tos); + if (IS_FOREIGN_CALL(c)) { + sock = g_slice_alloc0(sizeof(*sock)); + err = "failed to register foreign port"; + if (create_foreign_socket(sock, SOCK_DGRAM, port, &loc->spec->local_address.addr)) + goto err; + } else { + err = "failed to open ports"; + if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid)) + goto err; + err = "no port returned"; + sock = g_queue_pop_head(&q); + if (!sock) + goto err; + set_tos(sock, c->tos); + } sfd = stream_fd_new(sock, c, loc); sfds->ptrs[i] = sfd; @@ -1178,6 +1166,7 @@ static int redis_streams(struct call *c, struct redis_list *streams) { unsigned int i; struct redis_hash *rh; struct packet_stream *ps; + unsigned int ps_last_packet; for (i = 0; i < streams->len; i++) { rh = &streams->rh[i]; @@ -1186,7 +1175,8 @@ static int redis_streams(struct call *c, struct redis_list *streams) { if (!ps) return -1; - atomic64_set_na(&ps->last_packet, time(NULL)); + redis_hash_get_unsigned(&ps_last_packet, rh, "last_packet"); + atomic64_set_na(&ps->last_packet, ps_last_packet); if (redis_hash_get_unsigned((unsigned int *) &ps->ps_flags, rh, "ps_flags")) return -1; if (redis_hash_get_unsigned((unsigned int *) &ps->component, rh, "component")) @@ -1303,6 +1293,9 @@ static int json_medias(struct call *c, struct redis_list *medias, JsonReader *ro "media_flags")) return -1; + if (!redis_hash_get_str(&s, rh, "rtpe_addr")) + call_str_cpy(c, &med->rtpe_connection_addr, &s); + if (redis_hash_get_sdes_params(&med->sdes_in, rh, "sdes_in") < 0) return -1; if (redis_hash_get_sdes_params(&med->sdes_out, rh, "sdes_out") < 0) @@ -1527,8 +1520,7 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) { return 0; } -static void json_restore_call(struct redis *r, const str *callid, enum call_type type) { - redisReply* rr_jsonStr; +static void json_restore_call(JsonParser *parser, const str *callid, enum call_type type) { struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; @@ -1537,17 +1529,7 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type const char *err = 0; int i; JsonReader *root_reader =0; - JsonParser *parser =0; - rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, STR(callid)); - err = "could not retrieve JSON data from redis"; - if (!rr_jsonStr) - goto err1; - - parser = json_parser_new(); - err = "could not parse JSON data"; - if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) - goto err1; root_reader = json_reader_new (json_parser_get_root (parser)); err = "could not read JSON data"; if (!root_reader) @@ -1559,7 +1541,7 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type goto err1; err = "call already exists"; - if (c->last_signal) + if (c->last_signal.tv_sec) goto err2; err = "'call' data incomplete"; @@ -1585,7 +1567,7 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type if (redis_hash_get_timeval(&c->created, &call, "created")) goto err8; err = "missing 'last signal' timestamp"; - if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) + if (redis_hash_get_timeval(&c->last_signal, &call, "last_signal")) goto err8; if (redis_hash_get_int(&i, &call, "tos")) c->tos = 184; @@ -1647,6 +1629,25 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type recording_start(c, s.s, &meta); } + /* because json_medias() failed to implement dtls updates, and I can't suffer to re-implement it their way, just use our new parser */ + redis_call_t *redis_call = redis_call_create(&c->callid, json_parser_get_root(parser)); + err = "new parser failed to parse JSON data"; + if (!redis_call) + goto err1; + for (GList *ml = c->medias.head; ml; ml = ml->next) { + struct call_media *m = ml->data; + redis_call_media_t *media = g_queue_peek_nth(redis_call->media, m->unique_id); + if (!media) { + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: missing media %u", + STR_FMT_M(&c->callid), m->unique_id); + continue; /* weird... */ + } + if (redis_update_call_crypto(m, media)) { + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: error in update crypto", + STR_FMT_M(&c->callid)); + } + } + err = NULL; err8: @@ -1666,11 +1667,6 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type err1: if (root_reader) g_object_unref (root_reader); - if (parser) - g_object_unref (parser); - if (rr_jsonStr) - freeReplyObject(rr_jsonStr); - log_info_clear(); if (err) { rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT_M "' from Redis: %s", STR_FMT_M(callid), @@ -1687,6 +1683,367 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type obj_put(c); } +static int redis_update_call_streams(struct call *c, redis_call_t *redis_call) { + GQueue* redis_call_streams; + redis_call_media_stream_t *stream; + unsigned int updated = 0; + struct packet_stream *ps; + GList *pk; + struct endpoint endpoint, advertised_endpoint; + + if (!(redis_call_streams = redis_call_get_streams(redis_call))) + return -1; + // review call streams and only update where needed + for (pk = c->streams.head; pk; pk = pk->next) { + ps = pk->data; + ZERO(endpoint); + stream = g_queue_peek_nth(redis_call_streams, ps->unique_id); + if (stream->endpoint->len) + endpoint_parse_any(&endpoint, stream->endpoint->s); + if (stream->advertised_endpoint->len) + endpoint_parse_any(&advertised_endpoint, stream->advertised_endpoint->s); + + if (!ps->endpoint.port && endpoint.port && endpoint.address.family->af) { + ps->endpoint = endpoint; + ps->advertised_endpoint = advertised_endpoint; + ps->ps_flags = stream->ps_flags; + updated = 1; + } + } + + if (updated) + rlog(LOG_INFO, "Updated stream endpoints and flags from Redis"); + g_queue_free_full(redis_call_streams, gdestroy_obj_put); + return 0; +} + +static int redis_update_call_tags(struct call *c, redis_call_t *redis_call) { + unsigned midx, updated = 0; + redis_call_media_t *media; + // need to combine redis_tags and json_link_tags: + // - read sets of linked tags + // - for each tag that doesn't exist in g_hash_table_lookup(call->tags, tag), and that has a linked tag that does, + // call call_get_monologue(call, exist_tag, not_exist_tag, NULL) + + for (midx = 0; midx < redis_call->media->length; midx++) { + media = g_queue_peek_nth(redis_call->media, midx); + if (media->tag && media->tag->tag && media->tag->other_tag && media->tag->other_tag->tag && + g_hash_table_lookup(c->tags, media->tag->tag) && + !g_hash_table_lookup(c->tags, media->tag->other_tag->tag)) { + call_get_mono_dialogue(c, media->tag->tag, media->tag->other_tag->tag, media->tag->viabranch); + updated = 1; + } + } + + if (updated) + rlog(LOG_INFO, "Updated monologue tags from Redis"); + return 0; +} + +static int redis_update_call_media_codecs(struct call_media *cm, GQueue* redis_codec_list, + void(*adder_func)(struct call_media *media, struct rtp_payload_type *pt)) { + unsigned pidx, updates = 0; + redis_call_rtp_payload_type_t *payload; + + if (!redis_codec_list) + return updates; /* nothing to add */ + for (pidx = 0; pidx < redis_codec_list->length; pidx++) { + payload = g_queue_peek_nth(redis_codec_list, pidx); + struct rtp_payload_type *pt = codec_make_payload_type(payload->codec_str, cm); + if (!pt) + continue; /* oops? */ + pt->payload_type = payload->payload_type; + adder_func(cm, pt); + updates++; + } + return updates; +} + +static void redis_update_call_codec_handlers(struct call_media *media) { + struct call_monologue *ml = media->monologue; + struct call_monologue *other_ml = ml->active_dialogue; + + for (GList *l = other_ml->medias.head; l; l = l->next) { + struct call_media *other_m = l->data; + if (other_m->index == media->index) { + rlog(LOG_INFO, "['" STR_FORMAT_M "'] media %u: updating codec handlers", + STR_FMT_M(&media->call->callid), media->unique_id); + codec_handlers_update(media, other_m, NULL); + break; + } + } +} + +static int redis_update_call_payloads(struct call_media *m, redis_call_media_t *media) { + unsigned updated = 0; + + /* replace codec prefs with those loaded from the database. */ + /* TODO: ATM the database does not encode them correctly, so we lose some data. */ + /* maybe convert codec prefs cleanup code to use __delete_x_codec (which is currently static) */ + if (g_queue_get_length(media->codec_prefs_recv) != g_queue_get_length(&m->codecs_prefs_recv)) { + rlog(LOG_INFO, "['" STR_FORMAT_M "'] media %u: replacing %d local codec prefs recv with %d remote codec prefs", + STR_FMT_M(&m->call->callid), m->unique_id, g_queue_get_length(&m->codecs_prefs_recv), + g_queue_get_length(media->codec_prefs_recv)); + g_hash_table_remove_all(m->codecs_recv); + g_hash_table_remove_all(m->codec_names_recv); + g_queue_clear_full(&m->codecs_prefs_recv, (GDestroyNotify) payload_type_free); + updated += redis_update_call_media_codecs(m, media->codec_prefs_recv, __rtp_payload_type_add_recv); + } + if (g_queue_get_length(media->codec_prefs_send) != g_queue_get_length(&m->codecs_prefs_send)) { + rlog(LOG_INFO, "['" STR_FORMAT_M "'] media %u: replacing %d local codec prefs send with %d remote codec prefs", + STR_FMT_M(&m->call->callid), m->unique_id, g_queue_get_length(&m->codecs_prefs_send), + g_queue_get_length(media->codec_prefs_send)); + g_hash_table_remove_all(m->codecs_send); + g_hash_table_remove_all(m->codec_names_send); + g_queue_clear_full(&m->codecs_prefs_send, (GDestroyNotify) payload_type_free); + updated += redis_update_call_media_codecs(m, media->codec_prefs_send, __rtp_payload_type_add_send); + } + if (updated) { + redis_update_call_codec_handlers(m); + rlog(LOG_INFO, "Updated media %u codecs from Redis", m->unique_id); + } + return 0; +} + +static int redis_update_call_maps(struct call_media *m, redis_call_media_t *media) { + struct endpoint_map *ep; + redis_call_media_endpoint_map_t *rcep; + GList *epl, *rcepl; + + for (epl = m->endpoint_maps.head; epl; epl = epl->next) { + ep = epl->data; + for (rcepl = media->endpoint_maps->head; rcepl; rcepl = rcepl->next) { + rcep = rcepl->data; + if (rcep->unique_id != ep->unique_id) + continue; + ep->wildcard = rcep->wildcard; + if (rcep->endpoint->len) + endpoint_parse_any(&ep->endpoint, rcep->endpoint->s); + } + } + /* update some media fields here, while we have the media */ + if (!m->ptime) + m->ptime = media->ptime; + m->media_flags = media->media_flags; + return 0; +} + +static void redis_update_call_crypto_sync_sdes_params(GQueue *m_sdes_q, GQueue *redis_sdes_q) { + redis_call_media_sdes_t *redis_sdes; + + crypto_params_sdes_queue_clear(m_sdes_q); + for (GList *l = redis_sdes_q->head; l; l = l->next) { + redis_sdes = l->data; + /** copied and modified from sdp.c:sdp_streams() */ + struct crypto_params_sdes *cps = g_slice_alloc0(sizeof(*cps)); + g_queue_push_tail(m_sdes_q, cps); + + if (redis_sdes->crypto_suite_name) + cps->params.crypto_suite = crypto_find_suite(redis_sdes->crypto_suite_name); + if (redis_sdes->mki) { + cps->params.mki_len = redis_sdes->mki->len; + if (cps->params.mki_len) { + cps->params.mki = malloc(cps->params.mki_len); + memcpy(cps->params.mki, redis_sdes->mki->s, cps->params.mki_len); + } + } + cps->tag = redis_sdes->tag; + if (redis_sdes->master_key) + memcpy(cps->params.master_key, redis_sdes->master_key->s, redis_sdes->master_key->len); + if (redis_sdes->master_salt) + memcpy(cps->params.master_salt, redis_sdes->master_salt->s, redis_sdes->master_salt->len); + cps->params.session_params = redis_sdes->session_params; + } +} + +static int redis_update_call_crypto(struct call_media *m, redis_call_media_t *media) { + const struct dtls_hash_func *found_hash_func; + int needReinitCrypto = 0; + AUTO_CLEANUP_BUF(paramsbuf); + + if (media->fingerprint.hash_func_name && !m->fingerprint.hash_func) { + found_hash_func = dtls_find_hash_func(media->fingerprint.hash_func_name); + if (found_hash_func) { + rlog(LOG_DEBUG, "Updating crypto for call ID '" STR_FORMAT_M "', media %u from Redis", + STR_FMT_M(&m->call->callid), m->unique_id); + m->fingerprint.hash_func = found_hash_func; + memcpy(m->fingerprint.digest, media->fingerprint.fingerprint->s, media->fingerprint.fingerprint->len); + } + } + + if (media->sdes_in && m->sdes_in.length != media->sdes_in->length) { + rlog(LOG_DEBUG, "Need update input crypto"); + redis_update_call_crypto_sync_sdes_params(&m->sdes_in, media->sdes_in); + ++needReinitCrypto; + } + if (media->sdes_out && m->sdes_out.length != media->sdes_out->length) { + rlog(LOG_DEBUG, "Need update output crypto"); + redis_update_call_crypto_sync_sdes_params(&m->sdes_out, media->sdes_out); + ++needReinitCrypto; + } + + if (!needReinitCrypto) + return 0; + + /* re-init crypto context after update. No easy access to call.c's methods so here is a copy of __init_stream() */ + for (GList *ml = m->streams.head; ml; ml = ml->next) { + struct packet_stream* ps = ml->data; + for (GList *l = ps->sfds.head; l; l = l->next) { + struct stream_fd *sfd = l->data; + struct crypto_params_sdes *cps = m->sdes_in.head ? m->sdes_in.head->data : NULL; + crypto_init(&sfd->crypto, cps ? &cps->params : NULL); + ilog(LOG_DEBUG, "[%s] Initialized incoming SRTP with SDES crypto params: %s%s%s", + endpoint_print_buf(&sfd->socket.local), + FMT_M(crypto_params_sdes_dump(cps, ¶msbuf))); + } + struct crypto_params_sdes *cps = m->sdes_out.head ? m->sdes_out.head->data : NULL; + crypto_init(&ps->crypto, cps ? &cps->params : NULL); + ilog(LOG_DEBUG, "[%i] Initialized outgoing SRTP with SDES crypto params: %s%s%s", + ps->component, FMT_M(crypto_params_sdes_dump(cps, ¶msbuf))); + } + + return 0; +} + +static int redis_update_call_media(struct call *c, redis_call_t* redis_call) { + struct call_media *m = NULL; + redis_call_media_t *media; + GList *ml; + + for (ml = c->medias.head; ml; ml = ml->next) { + m = ml->data; + media = g_queue_peek_nth(redis_call->media, m->unique_id); + if (!media) { + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: missing media %u", + STR_FMT_M(&c->callid), m->unique_id); + continue; /* weird... */ + } + if (redis_update_call_maps(m, media)) { + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: error in update maps", + STR_FMT_M(&c->callid)); + return -1; + } + if (redis_update_call_payloads(m, media)) { + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: error in update payloads", + STR_FMT_M(&c->callid)); + return -1; + } + if (redis_update_call_crypto(m, media)) { + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: error in update crypto", + STR_FMT_M(&c->callid)); + return -1; + } + } + + return 0; +} + +static void redis_update_call_details(redis_call_t *redis_call, struct call *c) { + const char *err = NULL; + + if (timeval_us(&c->last_signal) >= timeval_us(&redis_call->last_signal)) { + rlog(LOG_INFO, "Ignoring Redis notification without update"); + return; + } + + c->last_signal = redis_call->last_signal; + + err = "failed to update stream data"; + if (redis_update_call_streams(c, redis_call)) + goto fail; + + err = "failed to update tag data"; + if (redis_update_call_tags(c, redis_call)) + goto fail; + + err = "failed to update payload data"; + if (redis_update_call_media(c, redis_call)) + goto fail; + + return; + +fail: + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: %s", + STR_FMT_M(&c->callid), + err); + +} + +static void redis_update_call(str *callid, struct redis *r, struct call *call) { + redisReply* rr_jsonStr; + redis_call_t *redis_call = NULL; + const char *err = NULL; + JsonParser *parser = NULL; + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, STR(callid)); + err = "could not retrieve JSON data from redis"; + if (!rr_jsonStr) + goto fail; + + rlog(LOG_INFO, "Received call '" STR_FORMAT_M "' data from redis, call looks like this: %s", + STR_FMT_M(callid), rr_jsonStr->str); + + parser = json_parser_new(); + err = "could not parse JSON data"; + if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) + goto fail; + redis_call = redis_call_create(callid, json_parser_get_root(parser)); + err = "could not read JSON data"; + if (!redis_call) + goto fail; + + if (!call) { /* a new call published by the primary */ + /* call the old reader (for now) as it knows how to create a call from scratch */ + /* TODO: verify the new reader in call create scenarios and dump the old code */ + rlog(LOG_INFO, "Creating a new call from redis"); + json_restore_call(parser, callid, CT_FOREIGN_CALL); + goto done; + } + + rlog(LOG_INFO, "Updating call data from redis"); + redis_update_call_details(redis_call, call); + goto done; + +fail: + rlog(LOG_WARNING, "Failed to update data for call ID '" STR_FORMAT_M "' from Redis: %s", + STR_FMT_M(callid), err); +done: + if (redis_call) + obj_put(redis_call); + if (parser) + g_object_unref (parser); + if (rr_jsonStr) + freeReplyObject(rr_jsonStr); + log_info_clear(); +} + +static void json_parse_end_restore_call(struct redis *r, str *callid, enum call_type type) { + redisReply* rr_jsonStr; + const char *err = NULL; + JsonParser *parser = NULL; + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, STR(callid)); + err = "could not retrieve JSON data from redis"; + if (!rr_jsonStr) + goto fail; + + parser = json_parser_new(); + err = "could not parse JSON data"; + if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) + goto fail; + json_restore_call(parser, callid, type); + goto done; +fail: + rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT_M "' from Redis: %s", + STR_FMT_M(callid), err); +done: + if (parser) + g_object_unref (parser); + if (rr_jsonStr) + freeReplyObject(rr_jsonStr); + log_info_clear(); +} + struct thread_ctx { GQueue r_q; mutex_t r_m; @@ -1705,7 +2062,7 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - json_restore_call(r, &callid, CT_OWN_CALL); + json_parse_end_restore_call(r, &callid, CT_OWN_CALL); mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -1800,7 +2157,7 @@ static int json_update_sdes_params(JsonBuilder *builder, const char *pref, const char *k, GQueue *q) { char tmp[2048]; - unsigned int iter = 0; + unsigned int iter = 1; char keybuf[32]; const char *key = k; @@ -1846,7 +2203,7 @@ static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, * encodes the few (k,v) pairs for one call under one json structure */ -char* redis_encode_json(struct call *c) { +static char* redis_encode_json(struct call *c) { GList *l=0,*k=0, *m=0, *n=0; struct endpoint_map *ep; @@ -1858,6 +2215,7 @@ char* redis_encode_json(struct call *c) { struct call_monologue *ml, *ml2; JsonBuilder *builder = json_builder_new (); struct recording *rec = 0; + str *ptbuf; char tmp[2048]; @@ -1869,7 +2227,7 @@ char* redis_encode_json(struct call *c) { { JSON_SET_SIMPLE("created","%lli", timeval_us(&c->created)); - JSON_SET_SIMPLE("last_signal","%ld",(long int) c->last_signal); + JSON_SET_SIMPLE("last_signal","%lli",timeval_us(&c->last_signal)); JSON_SET_SIMPLE("tos","%u",(int) c->tos); JSON_SET_SIMPLE("deleted","%ld",(long int) c->deleted); JSON_SET_SIMPLE("num_sfds","%u",g_queue_get_length(&c->stream_fds)); @@ -2041,7 +2399,8 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE_STR("logical_intf",&media->logical_intf->name); JSON_SET_SIMPLE("ptime","%i",media->ptime); JSON_SET_SIMPLE("media_flags","%u",media->media_flags); - + JSON_SET_SIMPLE_STR("rtpe_addr", &media->rtpe_connection_addr); + json_update_sdes_params(builder, "media", media->unique_id, "sdes_in", &media->sdes_in); json_update_sdes_params(builder, "media", media->unique_id, "sdes_out", @@ -2080,10 +2439,9 @@ char* redis_encode_json(struct call *c) { json_builder_begin_array (builder); for (m = media->codecs_prefs_recv.head; m; m = m->next) { pt = m->data; - JSON_ADD_STRING("%u/" STR_FORMAT "/%u/" STR_FORMAT "/" STR_FORMAT "/%i/%i", - pt->payload_type, STR_FMT(&pt->encoding), - pt->clock_rate, STR_FMT(&pt->encoding_parameters), - STR_FMT(&pt->format_parameters), pt->bitrate, pt->ptime); + ptbuf = codec_print_payload_type(pt); + JSON_ADD_STRING("%u/" STR_FORMAT, pt->payload_type, STR_FMT(ptbuf)); + free(ptbuf); } json_builder_end_array (builder); @@ -2092,10 +2450,9 @@ char* redis_encode_json(struct call *c) { json_builder_begin_array (builder); for (m = media->codecs_prefs_send.head; m; m = m->next) { pt = m->data; - JSON_ADD_STRING("%u/" STR_FORMAT "/%u/" STR_FORMAT "/" STR_FORMAT "/%i/%i", - pt->payload_type, STR_FMT(&pt->encoding), - pt->clock_rate, STR_FMT(&pt->encoding_parameters), - STR_FMT(&pt->format_parameters), pt->bitrate, pt->ptime); + ptbuf = codec_print_payload_type(pt); + JSON_ADD_STRING("%u/" STR_FORMAT, pt->payload_type, STR_FMT(ptbuf)); + free(ptbuf); } json_builder_end_array (builder); } diff --git a/daemon/sdp.c b/daemon/sdp.c index 00fa92e98f..acace8ac48 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -1571,7 +1571,7 @@ static int replace_network_address(struct sdp_chopper *chop, struct network_addr struct packet_stream *ps, struct sdp_ng_flags *flags, int keep_unspec) { char buf[64]; - int len; + str res = { buf, 0 }; struct packet_stream *sink = packet_stream_sink(ps); if (is_addr_unspecified(&address->parsed) @@ -1583,14 +1583,9 @@ static int replace_network_address(struct sdp_chopper *chop, struct network_addr if (flags->media_address.s && is_addr_unspecified(&flags->parsed_media_address)) __parse_address(&flags->parsed_media_address, NULL, NULL, &flags->media_address); - - if (!is_addr_unspecified(&flags->parsed_media_address)) - len = sprintf(buf, "%s %s", - flags->parsed_media_address.family->rfc_name, - sockaddr_print_buf(&flags->parsed_media_address)); - else - call_stream_address46(buf, ps, SAF_NG, &len, NULL, keep_unspec); - chopper_append(chop, buf, len); + + format_network_address(&res, ps, flags, keep_unspec); + chopper_append(chop, res.s, res.len); if (skip_over(chop, &address->address)) return -1; diff --git a/include/call.h b/include/call.h index a966744c08..d9f3efd76b 100644 --- a/include/call.h +++ b/include/call.h @@ -330,6 +330,7 @@ struct call_media { int ptime; // either from SDP or overridden volatile unsigned int media_flags; + str rtpe_connection_addr; }; /* half a dialogue */ @@ -379,14 +380,14 @@ struct call { str callid; struct timeval created; - time_t last_signal; + struct timeval last_signal; time_t deleted; time_t ml_deleted; unsigned char tos; char *created_from; sockaddr_t created_from_addr; sockaddr_t xmlrpc_callback; - + unsigned int redis_hosted_db; unsigned int foreign_call; // created_via_redis_notify call diff --git a/include/call_interfaces.h b/include/call_interfaces.h index f225e7450f..2b81797eb8 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -124,5 +124,6 @@ void ng_call_stats(struct call *call, const str *fromtag, const str *totag, benc int call_interfaces_init(void); +void format_network_address(str* o, struct packet_stream *ps, struct sdp_ng_flags *flags, int keep_unspec); #endif diff --git a/include/codec.h b/include/codec.h index d11aa9599b..6f8fc1b4eb 100644 --- a/include/codec.h +++ b/include/codec.h @@ -60,6 +60,7 @@ void codec_packet_free(void *); void codec_rtp_payload_types(struct call_media *media, struct call_media *other_media, GQueue *types, struct sdp_ng_flags *flags); +str *codec_print_payload_type(const struct rtp_payload_type* pt); // special return value `(void *) 0x1` to signal type mismatch struct rtp_payload_type *codec_make_payload_type(const str *codec_str, struct call_media *media); void codec_init_payload_type(struct rtp_payload_type *, struct call_media *); diff --git a/include/json-helpers.h b/include/json-helpers.h new file mode 100644 index 0000000000..2ddbddffa9 --- /dev/null +++ b/include/json-helpers.h @@ -0,0 +1,111 @@ +#ifndef __JSON_HELPERS_H__ +#define __JSON_HELPERS_H__ + +#include + +#include "str.h" + +/** + * Retrieve a string value from a JSON object according to a key. + * @param reader glib JsonReader that has the target object as its current node. + * @param key name of the string value to retrieve + * @return `str` string created from the string value, or `NULL` if no such value was found, + * the reader is in an error state or not pointing to a JSON object. Release using `free()`. + */ +str *json_reader_get_str(JsonReader *reader, const char *key); + +/** + * Retrieve a string value from a JSON object, using the JsonObject API, according to a key. + * @param json glib JsonObject from which to get the string + * @param key name of the string value to retrieve + * @return `str` string created from the string value, or `NULL` if no such value was found, + * the reader is in an error state or not pointing to a JSON object. Release using `free()`. + */ +str *json_object_get_str(JsonObject *json, const char *key); + +/** + * Retrieve a string value from a JSON object, using the JsonObject API, according to a key, decoding the + * URI encoded value. The resulting buffer might contain NULL values. + * @param json glib JsonObject from which to get the string + * @param key name of the string value to retrieve + * @return `str` string created from the string value, or `NULL` if no such value was found, + * the reader is in an error state or not pointing to a JSON object. Release using `free()`. + */ +str *json_object_get_str_uri_enc(JsonObject *json, const char *key); + +/** + * Retrieve a string value from a JSON list, using the JsonArray API, according to a key. + * @param json glib JsonArray from which to get the string + * @param idx index to the string value to retrieve + * @return `str` string created from the string value, or `NULL` if no such value was found, + * the reader is in an error state or not pointing to a JSON object. Release using `free()`. + */ +str *json_array_get_str(JsonArray *json, unsigned idx); + +/** + * Retrieve an integer value from a JSON object according to a key. + * @param reader glib JsonReader that has the target object as its current node. + * @param key name of the integer value to retrieve + * @return integer value, if found, -1 otherwise. + * The widest possible "native" storage is used but depending on the original content, this might still result in data loss. + */ +long long json_reader_get_ll(JsonReader *reader, const char *key); + +/** + * Retrieve an integer value from a JSON object, using the JsonObject API, according to a key. + * @param json glib JsonObject from which to get the integer + * @param key name of the integer value to retrieve + * @return integer value, if found, -1 otherwise. + * The widest possible "native" storage is used but depending on the original content, this might still result in data loss. + */ +long long json_object_get_ll(JsonObject *json, const char *key); + +/** + * Retrieve a string value from a JSON list according to an index. + * This would also work on a JSON object, by retrieving values from keys ordered by storage order (but it is just weird). + * @param reader glib JsonReader that has the target list as its current node + * @param idx index to the string value to retrieve + * @return `str` string created from the string value, or `NULL` if no such value was found, + * the reader is in an error state or not pointing to a JSON list or object. Release using `free()`. + */ +str *json_reader_get_str_element(JsonReader *reader, unsigned idx); + +/** + * Retrieve an integer value from a JSON list according to an index. + * If the value is stored as a string, this call will run `strtoll` on it and return the result. + * @param reader glib JsonReader that has the target list as its current node. + * @param idx index to the string value to retrieve + * @return integer value, if found, -1 otherwise. + * The widest possible "native" storage is used but depending on the original content, this might still result in data loss. + */ +long long json_reader_get_ll_element(JsonReader *reader, unsigned idx); + +/** + * Retrieve an integer value from a JSON list, using the JsonArray API according to an index. + * If the value is stored as a string, this call will run `strtoll` on it and return the result. + * @param json glib JsonArray from which to get the integer.. + * @param idx index to the string value to retrieve + * @return integer value, if found, -1 otherwise. + * The widest possible "native" storage is used but depending on the original content, this might still result in data loss. + */ +long long json_array_get_ll(JsonArray *json, unsigned idx); + +/** + * Retrieve the current string value from a JSON reader and decode its URI encoding. + * @param reader glib JsonReader whose current node is a string value + * @return `str` string containing URI decoded value, or `NULL` if the reader is an error state, the current node is not + * a string value or the string value is not a valid URI encoded value. Release using `free()`. + */ +str *json_reader_get_string_value_uri_enc(JsonReader *reader); + +/** + * Retrieve a JSON node from a JSON object according to a key. + * The node can be any node, but this call will be mostly useful to get an object or list to be fed into + * `json_reader_new()`. + * @param reader glib JsonReader that has the target object as its current node. + * @param key name of the object or list value to retrieve + * @return JSON node retrieved, if found, or `NULL` otherwise + */ +JsonNode *json_reader_get_node(JsonReader *reader, const char *key); + +#endif /* __JSON_HELPERS_H__ */ diff --git a/include/redis-json.h b/include/redis-json.h new file mode 100644 index 0000000000..5b1d78659e --- /dev/null +++ b/include/redis-json.h @@ -0,0 +1,161 @@ +#ifndef __REDIS_JSON_H__ +#define __REDIS_JSON_H__ + +#include +#include + +#include "obj.h" +#include "str.h" +#include "crypto.h" + +/** + * Document object model for mapping call data to storable JSON. + * Currently used by the Redis driver. + * + * There is some confusion about the correct way to map the call data structures + * to JSON and the code in redis.h/c uses a a set of "enumerated object collection" + * to store the hierarchical call data instead of a more traditional object heirarchy. + * + * The model here suggest an object heirarchy where ownership relationships are implied + * by containment. + **/ + +typedef struct redis_call_media_stream_fd { + struct obj obj; + unsigned unique_id; + unsigned stream_unique_id; + str* pref_family; + unsigned localport; + str* logical_intf; + unsigned logical_intf_uid; +} redis_call_media_stream_fd_t; + +typedef struct redis_call_media_stream { + struct obj obj; + unsigned unique_id; + unsigned media_unique_id; + unsigned selected_sfd; + int rtp_sink; + int rtcp_sink; + int rtcp_sibling; + unsigned last_packet; + unsigned ps_flags; + unsigned component; + str* endpoint; + str* advertised_endpoint; + unsigned stats_packets; + unsigned stats_bytes; + unsigned stats_errors; + GQueue* fds; /**< list of redis_call_media_stream_fd_t */ +} redis_call_media_stream_t; + +typedef struct redis_call_rtp_payload_type { + struct obj obj; + unsigned payload_type; + str* codec_str; +} redis_call_rtp_payload_type_t; + +typedef struct redis_call_media_endpoint_map { + struct obj obj; + unsigned unique_id; + int wildcard; + unsigned num_ports; + str* intf_preferred_family; + str* logical_intf; + str* endpoint; +} redis_call_media_endpoint_map_t; + +struct redis_call_media_tag; + +typedef struct redis_call_media_tag { + struct obj obj; + unsigned unique_id; + unsigned long created; + gboolean active; + gboolean deleted; + gboolean block_dtmf; + gboolean block_media; + str* tag; + str* viabranch; + str* label; + struct redis_call_media_tag* other_tag; +} redis_call_media_tag_t; + +typedef struct redis_call_media_sdes { + struct obj obj; + str* crypto_suite_name; + str* master_key; + str* master_salt; + str* mki; + struct crypto_session_params session_params; + unsigned tag; +} redis_call_media_sdes_t; + +struct redis_call_media_fingerprint { + str* hash_func_name; + str* fingerprint; +}; + +typedef struct redis_call_media { + struct obj obj; + unsigned index; + unsigned unique_id; + str* type; + str* protocol; + str* desired_family; + str* logical_intf; + unsigned ptime; + unsigned media_flags; + str* rtpe_addr; + redis_call_media_tag_t* tag; + GQueue* endpoint_maps; /**< list of redis_call_media_endpoint_map_t */ + GQueue* streams; /**< list of redis_call_media_stream_t */ + GQueue* codec_prefs_recv; /**< list of redis_call_rtp_payload_type_t */ + GQueue* codec_prefs_send; /**< list of redis_call_rtp_payload_type_t */ + GQueue* sdes_in; /**< list of redis_call_media_sdes_t */ + GQueue* sdes_out; /**< list of redis_call_media_sdes_t */ + struct redis_call_media_fingerprint fingerprint; /* not an object reference */ +} redis_call_media_t; + +typedef struct redis_call { + struct obj obj; + str* call_id; + struct timeval created; + struct timeval last_signal; + unsigned tos; + gboolean deleted; + gboolean ml_deleted; + str* created_from; + str* created_from_addr; + unsigned redis_hosted_db; + str* recording_metadata; + gboolean block_dtmf; + gboolean block_media; + GQueue* media; /**< list of redis_call_media_t */ +} redis_call_t; + +/** + * Parse the JSON node into a `redis_call_t` data structure. + * @param callid the Call's Call-ID that was used as the key for originally storing the call + * @param json the glib-json parsed JSON data + * @return loaded call object model. Release using `obj_put()` + */ +redis_call_t* redis_call_create(const str *callid, JsonNode *json); + +/** + * Retrieve a list of references to `redis_call_media_stream_t` across all media in the call. + * + * @param callref a pointer to the `redis_call_t` data + * @return list of call streams. Release using `q_queue_free_full(list, gdestroy_obj_put)` + */ +GQueue* redis_call_get_streams(redis_call_t *callref); + +/** + * Helper for using obj_put as a (*GDestroyNotify) parameter for glib. + * + * Use it to cleanup `GQueue*`s returned from redis-json calls. + * @param o gpointerdata that references a struct that extends `struct obj` + */ +void gdestroy_obj_put(void* o); + +#endif /* __REDIS_JSON_H__ */ diff --git a/lib/socket.c b/lib/socket.c index e6f464da0e..f9d2993bff 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -673,6 +673,28 @@ int open_socket(socket_t *r, int type, unsigned int port, const sockaddr_t *sa) return -1; } +int create_foreign_socket(socket_t *r, int type, unsigned int port, const sockaddr_t *sa) { + ZERO(*r); + r->is_foreign = 1; + r->family = sa->family; + r->fd = -1; // Make sure no one tries to close a real fd + + if (port > 0xffff) { + __C_DBG("create foreign socket fail, port=%d > 0xfffffd", port); + goto fail; + } + + r->local.port = port; + r->local.address = *sa; + + __C_DBG("create foreign socket success, port=%d", port); + + return 0; + + fail: + return -1; +} + int connect_socket(socket_t *r, int type, const endpoint_t *ep) { sockfamily_t *fam; diff --git a/lib/socket.h b/lib/socket.h index e33e238b90..7de07e5c11 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -96,6 +96,7 @@ struct socket { sockfamily_t *family; endpoint_t local; endpoint_t remote; + int is_foreign; }; @@ -202,6 +203,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep); int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress int connect_socket_retry(socket_t *r); // retries connect() while in progress int close_socket(socket_t *r); +int create_foreign_socket(socket_t *r, int type, unsigned int port, const sockaddr_t * sa); sockfamily_t *get_socket_family_rfc(const str *s); sockfamily_t *__get_socket_family_enum(enum socket_families); diff --git a/t/.gitignore b/t/.gitignore index d0321297fa..294460b608 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -48,6 +48,8 @@ const_str_hash-test.strhash tests-preload.so timerthread.c media_player.c +redis-json.c +json-helpers.c dtmflib.c test-dtmf-detect *-test diff --git a/t/Makefile b/t/Makefile index 8e77371203..247a52253f 100644 --- a/t/Makefile +++ b/t/Makefile @@ -70,7 +70,8 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ - media_player.c jitter_buffer.c + media_player.c jitter_buffer.c \ + redis-json.c json-helpers.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -130,7 +131,8 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ - media_player.o jitter_buffer.o dtmflib.o + media_player.o jitter_buffer.o dtmflib.o \ + redis-json.o json-helpers.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o dtmflib.o