Skip to content

Commit e4300a8

Browse files
committed
session CHANGE poll sessions now have state
Mainly so that one invalid session is not returned several times, but it may also increase concurrency. Fixes CESNET/netopeer2#146
1 parent e8b3b33 commit e4300a8

File tree

4 files changed

+80
-39
lines changed

4 files changed

+80
-39
lines changed

src/session_p.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,18 @@ struct nc_session {
395395
} opts;
396396
};
397397

398+
enum nc_ps_session_state {
399+
NC_PS_STATE_NONE = 0, /**< session is not being worked with */
400+
NC_PS_STATE_BUSY, /**< session is being polled or communicated on (and locked) */
401+
NC_PS_STATE_INVALID /**< session is invalid and was already returned by another poll */
402+
};
403+
398404
/* ACCESS locked */
399405
struct nc_pollsession {
400-
struct nc_session **sessions;
406+
struct {
407+
struct nc_session *session;
408+
enum nc_ps_session_state state;
409+
} *sessions;
401410
uint16_t session_count;
402411
uint16_t last_event_session;
403412

src/session_server.c

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,8 @@ nc_ps_add_session(struct nc_pollsession *ps, struct nc_session *session)
826826
nc_ps_unlock(ps, q_id, __func__);
827827
return -1;
828828
}
829-
ps->sessions[ps->session_count - 1] = session;
829+
ps->sessions[ps->session_count - 1].session = session;
830+
ps->sessions[ps->session_count - 1].state = NC_PS_STATE_NONE;
830831

831832
/* UNLOCK */
832833
return nc_ps_unlock(ps, q_id, __func__);
@@ -842,7 +843,7 @@ _nc_ps_del_session(struct nc_pollsession *ps, struct nc_session *session, int in
842843
goto remove;
843844
}
844845
for (i = 0; i < ps->session_count; ++i) {
845-
if (ps->sessions[i] == session) {
846+
if (ps->sessions[i].session == session) {
846847
remove:
847848
--ps->session_count;
848849
if (i < ps->session_count) {
@@ -904,8 +905,8 @@ nc_ps_get_session_by_sid(const struct nc_pollsession *ps, uint32_t sid)
904905
}
905906

906907
for (i = 0; i < ps->session_count; ++i) {
907-
if (ps->sessions[i]->id == sid) {
908-
ret = ps->sessions[i];
908+
if (ps->sessions[i].session->id == sid) {
909+
ret = ps->sessions[i].session;
909910
break;
910911
}
911912
}
@@ -1150,30 +1151,31 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
11501151
/* check timeout of all the sessions */
11511152
nc_gettimespec(&ts_cur);
11521153
for (i = 0; i < ps->session_count; ++i) {
1153-
if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
1154+
cur_session = ps->sessions[i].session;
1155+
if ((cur_session->status != NC_STATUS_RUNNING) && (ps->sessions[i].state != NC_PS_STATE_INVALID)) {
11541156
/* when the status change occurred an error was printed, no need to print another */
1155-
if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME)) {
1156-
/* ... so the application should have handled it and removed this session before unless
1157-
* this is a Call Home session, when it could not */
1158-
WRN("Session %u: polling an invalid session.", ps->sessions[i]->id);
1159-
}
11601157
ret = NC_PSPOLL_SESSION_TERM;
1161-
if (ps->sessions[i]->term_reason != NC_SESSION_TERM_CLOSED) {
1158+
if (cur_session->term_reason != NC_SESSION_TERM_CLOSED) {
11621159
ret |= NC_PSPOLL_SESSION_ERROR;
11631160
}
1161+
ps->sessions[i].state = NC_PS_STATE_INVALID;
1162+
11641163
if (session) {
1165-
*session = ps->sessions[i];
1164+
*session = cur_session;
11661165
}
11671166
goto ps_unlock_finish;
1168-
} else if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME) && !ps->sessions[i]->opts.server.ntf_status
1167+
} else if (!(cur_session->flags & NC_SESSION_CALLHOME) && !cur_session->opts.server.ntf_status
11691168
&& server_opts.idle_timeout
1170-
&& (ts_cur.tv_sec >= ps->sessions[i]->opts.server.last_rpc + server_opts.idle_timeout)) {
1171-
ERR("Session %u: session idle timeout elapsed.", ps->sessions[i]->id);
1172-
ps->sessions[i]->status = NC_STATUS_INVALID;
1173-
ps->sessions[i]->term_reason = NC_SESSION_TERM_TIMEOUT;
1169+
&& (ts_cur.tv_sec >= cur_session->opts.server.last_rpc + server_opts.idle_timeout)) {
1170+
ERR("Session %u: session idle timeout elapsed.", cur_session->id);
1171+
cur_session->status = NC_STATUS_INVALID;
1172+
cur_session->term_reason = NC_SESSION_TERM_TIMEOUT;
1173+
11741174
ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
1175+
ps->sessions[i].state = NC_PS_STATE_INVALID;
1176+
11751177
if (session) {
1176-
*session = ps->sessions[i];
1178+
*session = cur_session;
11771179
}
11781180
goto ps_unlock_finish;
11791181
}
@@ -1193,23 +1195,26 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
11931195
i = j = ps->last_event_session + 1;
11941196
}
11951197
do {
1196-
cur_session = ps->sessions[i];
1198+
cur_session = ps->sessions[i].session;
11971199

11981200
/* SESSION LOCK */
1199-
if ((cur_session->status == NC_STATUS_RUNNING)
1201+
if ((cur_session->status == NC_STATUS_RUNNING) && (ps->sessions[i].state == NC_PS_STATE_NONE)
12001202
&& !*cur_session->ti_inuse && ((r = nc_session_lock(cur_session, 0, __func__)))) {
12011203
/* we go here if we successfully lock the session or there was an error, on timeout we simply skip it */
12021204
if (r == -1) {
12031205
ret = NC_PSPOLL_ERROR;
12041206
goto ps_unlock_finish;
12051207
}
12061208
/* damn race condition */
1207-
if (cur_session->status != NC_STATUS_RUNNING) {
1209+
if ((cur_session->status != NC_STATUS_RUNNING) || (ps->sessions[i].state != NC_PS_STATE_NONE)) {
12081210
/* SESSION UNLOCK */
12091211
nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
12101212
goto next_iteration;
12111213
}
12121214

1215+
/* it is being polled */
1216+
ps->sessions[i].state = NC_PS_STATE_BUSY;
1217+
12131218
switch (cur_session->ti_type) {
12141219
#ifdef NC_ENABLED_SSH
12151220
case NC_TI_LIBSSH:
@@ -1243,20 +1248,24 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
12431248
if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
12441249
&& (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
12451250
/* new NETCONF SSH channel */
1251+
ret = NC_PSPOLL_SSH_CHANNEL;
1252+
ps->sessions[i].state = NC_PS_STATE_NONE;
1253+
12461254
if (session) {
12471255
*session = cur_session;
12481256
}
1249-
ret = NC_PSPOLL_SSH_CHANNEL;
12501257
goto session_ps_unlock_finish;
12511258
}
12521259
}
12531260
}
12541261

12551262
/* just some SSH message */
1263+
ret = NC_PSPOLL_SSH_MSG;
1264+
ps->sessions[i].state = NC_PS_STATE_NONE;
1265+
12561266
if (session) {
12571267
*session = cur_session;
12581268
}
1259-
ret = NC_PSPOLL_SSH_MSG;
12601269
goto session_ps_unlock_finish;
12611270
} else {
12621271
/* we have some application data */
@@ -1273,6 +1282,11 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
12731282
if (pfd.fd < 0) {
12741283
ERRINT;
12751284
ret = NC_PSPOLL_ERROR;
1285+
ps->sessions[i].state = NC_PS_STATE_NONE;
1286+
1287+
if (session) {
1288+
*session = cur_session;
1289+
}
12761290
goto session_ps_unlock_finish;
12771291
}
12781292
pfd.events = POLLIN;
@@ -1330,6 +1344,11 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
13301344
case NC_TI_NONE:
13311345
ERRINT;
13321346
ret = NC_PSPOLL_ERROR;
1347+
ps->sessions[i].state = NC_PS_STATE_NONE;
1348+
1349+
if (session) {
1350+
*session = cur_session;
1351+
}
13331352
goto session_ps_unlock_finish;
13341353
}
13351354
/* we have some data, but it may be just an SSH message */
@@ -1342,14 +1361,23 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
13421361
ERR("Session %u: %s.", cur_session->id, msg);
13431362
cur_session->status = NC_STATUS_INVALID;
13441363
cur_session->term_reason = term_reason;
1364+
1365+
ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
1366+
ps->sessions[i].state = NC_PS_STATE_INVALID;
1367+
13451368
if (session) {
13461369
*session = cur_session;
13471370
}
1348-
ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
13491371
goto session_ps_unlock_finish;
13501372
} else if (poll_ret == -1) {
13511373
ERR("Session %u: %s.", cur_session->id, msg);
1374+
13521375
ret = NC_PSPOLL_ERROR;
1376+
ps->sessions[i].state = NC_PS_STATE_INVALID;
1377+
1378+
if (session) {
1379+
*session = cur_session;
1380+
}
13531381
goto session_ps_unlock_finish;
13541382
} else if (poll_ret > 0) {
13551383
break;
@@ -1363,6 +1391,7 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
13631391
}
13641392

13651393
next_iteration:
1394+
ps->sessions[i].state = NC_PS_STATE_NONE;
13661395
if (i == ps->session_count - 1) {
13671396
i = 0;
13681397
} else {
@@ -1396,6 +1425,7 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
13961425
if (ret & (NC_PSPOLL_ERROR | NC_PSPOLL_BAD_RPC)) {
13971426
if (cur_session->status != NC_STATUS_RUNNING) {
13981427
ret |= NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
1428+
ps->sessions[i].state = NC_PS_STATE_INVALID;
13991429
}
14001430
goto session_unlock_finish;
14011431
}
@@ -1411,6 +1441,9 @@ nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
14111441
if (!(cur_session->term_reason & (NC_SESSION_TERM_CLOSED | NC_SESSION_TERM_KILLED))) {
14121442
ret |= NC_PSPOLL_SESSION_ERROR;
14131443
}
1444+
ps->sessions[i].state = NC_PS_STATE_INVALID;
1445+
} else {
1446+
ps->sessions[i].state = NC_PS_STATE_NONE;
14141447
}
14151448

14161449
session_unlock_finish:
@@ -1447,16 +1480,16 @@ nc_ps_clear(struct nc_pollsession *ps, int all, void (*data_free)(void *))
14471480

14481481
if (all) {
14491482
for (i = 0; i < ps->session_count; i++) {
1450-
nc_session_free(ps->sessions[i], data_free);
1483+
nc_session_free(ps->sessions[i].session, data_free);
14511484
}
14521485
free(ps->sessions);
14531486
ps->sessions = NULL;
14541487
ps->session_count = 0;
14551488
ps->last_event_session = 0;
14561489
} else {
14571490
for (i = 0; i < ps->session_count; ) {
1458-
if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
1459-
session = ps->sessions[i];
1491+
if (ps->sessions[i].session->status != NC_STATUS_RUNNING) {
1492+
session = ps->sessions[i].session;
14601493
_nc_ps_del_session(ps, NULL, i);
14611494
nc_session_free(session, data_free);
14621495
continue;

src/session_server.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,9 @@ uint16_t nc_ps_session_count(struct nc_pollsession *ps);
252252
/**
253253
* @brief Poll sessions and process any received RPCs.
254254
*
255-
* All the sessions must be running. Only one event on one session
256-
* is handled in one function call. If this event is a session termination
257-
* (#NC_PSPOLL_SESSION_TERM returned), the session must be removed from \p ps
258-
* before another call. Otherwise no session will be polled and the same
259-
* return value and \p session will be returned.
255+
* Only one event on one session is handled in one function call. If this event
256+
* is a session termination (#NC_PSPOLL_SESSION_TERM returned), the session
257+
* should be removed from \p ps.
260258
*
261259
* @param[in] ps Pollsession structure to use.
262260
* @param[in] timeout Poll timeout in milliseconds. 0 for non-blocking call, -1 for

src/session_server_ssh.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,7 +1520,7 @@ nc_ps_accept_ssh_channel(struct nc_pollsession *ps, struct nc_session **session)
15201520
{
15211521
uint8_t q_id;
15221522
NC_MSG_TYPE msgtype;
1523-
struct nc_session *new_session = NULL;
1523+
struct nc_session *new_session = NULL, *cur_session;
15241524
uint16_t i;
15251525

15261526
if (!ps) {
@@ -1537,19 +1537,20 @@ nc_ps_accept_ssh_channel(struct nc_pollsession *ps, struct nc_session **session)
15371537
}
15381538

15391539
for (i = 0; i < ps->session_count; ++i) {
1540-
if ((ps->sessions[i]->status == NC_STATUS_RUNNING) && (ps->sessions[i]->ti_type == NC_TI_LIBSSH)
1541-
&& ps->sessions[i]->ti.libssh.next) {
1540+
cur_session = ps->sessions[i].session;
1541+
if ((cur_session->status == NC_STATUS_RUNNING) && (cur_session->ti_type == NC_TI_LIBSSH)
1542+
&& cur_session->ti.libssh.next) {
15421543
/* an SSH session with more channels */
1543-
for (new_session = ps->sessions[i]->ti.libssh.next;
1544-
new_session != ps->sessions[i];
1544+
for (new_session = cur_session->ti.libssh.next;
1545+
new_session != cur_session;
15451546
new_session = new_session->ti.libssh.next) {
15461547
if ((new_session->status == NC_STATUS_STARTING) && new_session->ti.libssh.channel
15471548
&& (new_session->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
15481549
/* we found our session */
15491550
break;
15501551
}
15511552
}
1552-
if (new_session != ps->sessions[i]) {
1553+
if (new_session != cur_session) {
15531554
break;
15541555
}
15551556

0 commit comments

Comments
 (0)