Skip to content

Commit b3d184e

Browse files
committed
Added ability to have multiple master servers
We can basically aggregate all streams from any number of master servers
1 parent 788bd94 commit b3d184e

File tree

4 files changed

+133
-28
lines changed

4 files changed

+133
-28
lines changed

src/cfgfile.c

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ static void _parse_http_headers(xmlDocPtr doc,
133133
xmlNodePtr node,
134134
ice_config_http_header_t **http_headers);
135135

136+
static void _parse_master(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
136137
static void _parse_relay(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
137138
static void _parse_mount(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
138139

@@ -487,6 +488,7 @@ void config_clear(ice_config_t *c)
487488
{
488489
ice_config_dir_t *dirnode,
489490
*nextdirnode;
491+
master_server *master;
490492
relay_server *relay,
491493
*nextrelay;
492494
mount_proxy *mount,
@@ -529,6 +531,11 @@ void config_clear(ice_config_t *c)
529531

530532
while ((c->listen_sock = config_clear_listener(c->listen_sock)));
531533

534+
master = c->master;
535+
while (master) {
536+
master = master_free(master);
537+
}
538+
532539
thread_mutex_lock(&(_locks.relay_lock));
533540
relay = c->relay;
534541
while (relay) {
@@ -934,6 +941,8 @@ static void _parse_root(xmlDocPtr doc,
934941
_parse_limits(doc, node->xmlChildrenNode, configuration);
935942
} else if (xmlStrcmp(node->name, XMLSTR("http-headers")) == 0) {
936943
_parse_http_headers(doc, node->xmlChildrenNode, &(configuration->http_headers));
944+
} else if (xmlStrcmp(node->name, XMLSTR("master")) == 0) {
945+
_parse_master(doc, node->xmlChildrenNode, configuration);
937946
} else if (xmlStrcmp(node->name, XMLSTR("relay")) == 0) {
938947
_parse_relay(doc, node->xmlChildrenNode, configuration);
939948
} else if (xmlStrcmp(node->name, XMLSTR("mount")) == 0) {
@@ -1579,6 +1588,70 @@ static void _parse_http_headers(xmlDocPtr doc,
15791588
xmlFree(value);
15801589
}
15811590

1591+
static void _parse_master(xmlDocPtr doc,
1592+
xmlNodePtr node,
1593+
ice_config_t *configuration)
1594+
{
1595+
char *tmp;
1596+
master_server *master = calloc(1, sizeof(master_server));
1597+
master_server *current = configuration->master;
1598+
master_server *last = NULL;
1599+
1600+
while(current) {
1601+
last = current;
1602+
current = current->next;
1603+
}
1604+
1605+
if (last) {
1606+
last->next = master;
1607+
} else {
1608+
configuration->master = master;
1609+
}
1610+
1611+
master->next = NULL;
1612+
master->on_demand = configuration->on_demand;
1613+
master->server = (char *) xmlCharStrdup("127.0.0.1");
1614+
master->username = (char *) xmlCharStrdup(configuration->master_username);
1615+
master->password = (char *) xmlCharStrdup(configuration->master_password);
1616+
1617+
do {
1618+
if (node == NULL)
1619+
break;
1620+
if (xmlIsBlankNode(node))
1621+
continue;
1622+
1623+
if (xmlStrcmp(node->name, XMLSTR("server")) == 0) {
1624+
if (master->server)
1625+
xmlFree(master->server);
1626+
master->server = (char *)xmlNodeListGetString(doc,
1627+
node->xmlChildrenNode, 1);
1628+
} else if (xmlStrcmp(node->name, XMLSTR("port")) == 0) {
1629+
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
1630+
if (tmp) {
1631+
master->port = atoi(tmp);
1632+
xmlFree(tmp);
1633+
} else {
1634+
ICECAST_LOG_WARN("<port> setting must not be empty.");
1635+
}
1636+
} else if (xmlStrcmp(node->name, XMLSTR("username")) == 0) {
1637+
if (master->username)
1638+
xmlFree(master->username);
1639+
master->username = (char *)xmlNodeListGetString(doc,
1640+
node->xmlChildrenNode, 1);
1641+
} else if (xmlStrcmp(node->name, XMLSTR("password")) == 0) {
1642+
if (master->password)
1643+
xmlFree(master->password);
1644+
master->password = (char *)xmlNodeListGetString(doc,
1645+
node->xmlChildrenNode, 1);
1646+
} else if (xmlStrcmp(node->name, XMLSTR("on-demand")) == 0) {
1647+
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
1648+
master->on_demand = util_str_to_bool(tmp);
1649+
if (tmp)
1650+
xmlFree(tmp);
1651+
}
1652+
} while ((node = node->next));
1653+
}
1654+
15821655
static void _parse_relay(xmlDocPtr doc,
15831656
xmlNodePtr node,
15841657
ice_config_t *configuration)

src/cfgfile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ typedef struct ice_config_tag {
218218
/* is TLS supported by the server? */
219219
int tls_ok;
220220

221+
master_server *master;
221222
relay_server *relay;
222223

223224
mount_proxy *mounts;

src/slave.c

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,19 @@ static volatile int update_all_mounts = 0;
6363
static volatile unsigned int max_interval = 0;
6464
static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval
6565

66+
master_server *master_free (master_server *master)
67+
{
68+
master_server *next = master->next;
69+
ICECAST_LOG_DEBUG("freeing master %s:%d", master->server, master->port);
70+
xmlFree (master->server);
71+
if (master->username)
72+
xmlFree (master->username);
73+
if (master->password)
74+
xmlFree (master->password);
75+
free (master);
76+
return next;
77+
}
78+
6679
relay_server *relay_free (relay_server *relay)
6780
{
6881
relay_server *next = relay->next;
@@ -598,10 +611,8 @@ static void relay_check_streams (relay_server *to_start,
598611
}
599612

600613

601-
static int update_from_master(ice_config_t *config)
614+
static int update_from_master(master_server *master)
602615
{
603-
char *master = NULL, *password = NULL, *username= NULL;
604-
int port;
605616
sock_t mastersock;
606617
int ret = 0;
607618
char buf[256];
@@ -610,33 +621,22 @@ static int update_from_master(ice_config_t *config)
610621
char *authheader, *data;
611622
relay_server *new_relays = NULL, *cleanup_relays;
612623
int len, count = 1;
613-
int on_demand;
614-
615-
username = strdup(config->master_username);
616-
if (config->master_password)
617-
password = strdup(config->master_password);
618-
619-
if (config->master_server)
620-
master = strdup(config->master_server);
621624

622-
port = config->master_server_port;
623-
624-
if (password == NULL || master == NULL || port == 0)
625+
if (master->password == NULL || master->server == NULL || master->port == 0)
625626
break;
626-
on_demand = config->on_demand;
627627
ret = 1;
628628
config_release_config();
629-
mastersock = sock_connect_wto(master, port, 10);
629+
mastersock = sock_connect_wto(master->server, master->port, 10);
630630

631631
if (mastersock == SOCK_ERROR)
632632
{
633633
ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list");
634634
break;
635635
}
636636

637-
len = strlen(username) + strlen(password) + 2;
637+
len = strlen(master->username) + strlen(master->password) + 2;
638638
authheader = malloc(len);
639-
snprintf (authheader, len, "%s:%s", username, password);
639+
snprintf (authheader, len, "%s:%s", master->username, master->password);
640640
data = util_base64_encode(authheader, len);
641641
sock_write (mastersock,
642642
"GET /admin/streamlist.txt HTTP/1.0\r\n"
@@ -682,14 +682,14 @@ static int update_from_master(ice_config_t *config)
682682
}
683683
else
684684
{
685-
r->server = (char *)xmlCharStrdup (master);
686-
r->port = port;
685+
r->server = (char *)xmlCharStrdup (master->server);
686+
r->port = master->port;
687687
}
688688

689689
r->mount = strdup(parsed_uri->path);
690690
r->localmount = strdup(parsed_uri->path);
691691
r->mp3metadata = 1;
692-
r->on_demand = on_demand;
692+
r->on_demand = master->on_demand;
693693
r->next = new_relays;
694694
ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", r->server, r->port, r->mount);
695695
new_relays = r;
@@ -708,12 +708,24 @@ static int update_from_master(ice_config_t *config)
708708

709709
} while(0);
710710

711-
if (master)
712-
free (master);
713-
if (username)
714-
free (username);
715-
if (password)
716-
free (password);
711+
return ret;
712+
}
713+
714+
static int update_from_master_legacy(ice_config_t *config)
715+
{
716+
master_server *master = calloc (1, sizeof (master_server));
717+
int ret = 0;
718+
719+
if (master) {
720+
master->username = strdup(config->master_username);
721+
if (config->master_password)
722+
master->password = strdup(config->master_password);
723+
if (config->master_server)
724+
master->server = strdup(config->master_server);
725+
master->port = config->master_server_port;
726+
ret = update_from_master(master);
727+
master_free(master);
728+
}
717729

718730
return ret;
719731
}
@@ -759,6 +771,7 @@ static void *_slave_thread(void *arg)
759771
thread_mutex_lock(&_slave_mutex);
760772
if (max_interval <= interval)
761773
{
774+
master_server *master;
762775
ICECAST_LOG_DEBUG("checking master stream list");
763776
config = config_get_config();
764777

@@ -767,9 +780,16 @@ static void *_slave_thread(void *arg)
767780
interval = 0;
768781
max_interval = config->master_update_interval;
769782
thread_mutex_unlock(&_slave_mutex);
783+
784+
/* update all non-legacy master servers */
785+
master = config->master;
786+
while (master) {
787+
update_from_master(master);
788+
master = master->next;
789+
}
770790

771791
/* the connection could take some time, so the lock can drop */
772-
if (update_from_master (config))
792+
if (update_from_master_legacy (config))
773793
config = config_get_config();
774794

775795
thread_mutex_lock (&(config_locks()->relay_lock));

src/slave.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515

1616
#include "common/thread/thread.h"
1717

18+
typedef struct _master_server {
19+
char *server;
20+
int port;
21+
char *username;
22+
char *password;
23+
int on_demand;
24+
struct _master_server *next;
25+
} master_server;
26+
1827
typedef struct _relay_server {
1928
char *server;
2029
int port;
@@ -34,6 +43,8 @@ typedef struct _relay_server {
3443
} relay_server;
3544

3645

46+
master_server *master_free (master_server *master);
47+
3748
void slave_initialize(void);
3849
void slave_shutdown(void);
3950
void slave_update_all_mounts (void);

0 commit comments

Comments
 (0)