|
34 | 34 | import com.google.common.base.Ticker; |
35 | 35 | import com.google.common.cache.Cache; |
36 | 36 | import com.google.common.cache.CacheBuilder; |
| 37 | +import com.google.common.cache.RemovalCause; |
37 | 38 | import com.google.common.cache.RemovalListener; |
38 | 39 | import com.google.common.cache.RemovalNotification; |
39 | 40 | import com.google.common.collect.ImmutableList; |
@@ -340,30 +341,55 @@ public void close() { |
340 | 341 | } |
341 | 342 |
|
342 | 343 | private void stopTimedOutSession(RemovalNotification<SessionId, SessionSlot> notification) { |
343 | | - if (notification.getKey() != null && notification.getValue() != null) { |
344 | | - SessionSlot slot = notification.getValue(); |
345 | | - SessionId id = notification.getKey(); |
346 | | - if (notification.wasEvicted()) { |
347 | | - // Session is timing out, stopping it by sending a DELETE |
348 | | - LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
349 | | - try { |
350 | | - slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
351 | | - } catch (Exception e) { |
352 | | - LOG.log(Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 344 | + try (Span span = tracer.getCurrentContext().createSpan("node.stop_session")) { |
| 345 | + AttributeMap attributeMap = tracer.createAttributeMap(); |
| 346 | + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); |
| 347 | + if (notification.getKey() != null && notification.getValue() != null) { |
| 348 | + SessionSlot slot = notification.getValue(); |
| 349 | + SessionId id = notification.getKey(); |
| 350 | + attributeMap.put("node.id", getId().toString()); |
| 351 | + attributeMap.put("session.slotId", slot.getId().toString()); |
| 352 | + attributeMap.put("session.id", id.toString()); |
| 353 | + attributeMap.put("session.timeout_in_seconds", getSessionTimeout().toSeconds()); |
| 354 | + attributeMap.put("session.remove.cause", notification.getCause().name()); |
| 355 | + if (notification.wasEvicted() && notification.getCause() == RemovalCause.EXPIRED) { |
| 356 | + // Session is timing out, stopping it by sending a DELETE |
| 357 | + LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
| 358 | + span.setStatus(Status.ABORTED); |
| 359 | + span.addEvent(String.format("Stopping the the timed session %s", id), attributeMap); |
| 360 | + } else { |
| 361 | + LOG.log(Level.INFO, () -> String.format("Session id %s is stopping on demand...", id)); |
| 362 | + span.addEvent(String.format("Stopping the session %s on demand", id), attributeMap); |
353 | 363 | } |
354 | | - } |
355 | | - // Attempt to stop the session |
356 | | - slot.stop(); |
357 | | - // Decrement pending sessions if Node is draining |
358 | | - if (this.isDraining()) { |
359 | | - int done = pendingSessions.decrementAndGet(); |
360 | | - if (done <= 0) { |
361 | | - LOG.info("Node draining complete!"); |
362 | | - bus.fire(new NodeDrainComplete(this.getId())); |
| 364 | + if (notification.wasEvicted()) { |
| 365 | + try { |
| 366 | + slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
| 367 | + } catch (Exception e) { |
| 368 | + LOG.log( |
| 369 | + Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 370 | + span.setStatus(Status.INTERNAL); |
| 371 | + span.addEvent( |
| 372 | + String.format("Exception while trying to stop session %s", id), attributeMap); |
| 373 | + } |
363 | 374 | } |
| 375 | + // Attempt to stop the session |
| 376 | + slot.stop(); |
| 377 | + // Decrement pending sessions if Node is draining |
| 378 | + if (this.isDraining()) { |
| 379 | + int done = pendingSessions.decrementAndGet(); |
| 380 | + attributeMap.put("current.session.count", done); |
| 381 | + attributeMap.put("node.drain_after_session_count", this.configuredSessionCount); |
| 382 | + if (done <= 0) { |
| 383 | + LOG.info("Node draining complete!"); |
| 384 | + bus.fire(new NodeDrainComplete(this.getId())); |
| 385 | + span.addEvent("Node draining complete!", attributeMap); |
| 386 | + } |
| 387 | + } |
| 388 | + } else { |
| 389 | + LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
| 390 | + span.setStatus(Status.INVALID_ARGUMENT); |
| 391 | + span.addEvent("Received stop session notification with null values", attributeMap); |
364 | 392 | } |
365 | | - } else { |
366 | | - LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
367 | 393 | } |
368 | 394 | } |
369 | 395 |
|
@@ -1022,17 +1048,28 @@ public HealthCheck getHealthCheck() { |
1022 | 1048 |
|
1023 | 1049 | @Override |
1024 | 1050 | public void drain() { |
1025 | | - bus.fire(new NodeDrainStarted(getId())); |
1026 | | - draining = true; |
1027 | | - // Ensure the pendingSessions counter will not be decremented by timed out sessions not included |
1028 | | - // in the currentSessionCount and the NodeDrainComplete will be raised to early. |
1029 | | - currentSessions.cleanUp(); |
1030 | | - int currentSessionCount = getCurrentSessionCount(); |
1031 | | - if (currentSessionCount == 0) { |
1032 | | - LOG.info("Firing node drain complete message"); |
1033 | | - bus.fire(new NodeDrainComplete(getId())); |
1034 | | - } else { |
1035 | | - pendingSessions.set(currentSessionCount); |
| 1051 | + try (Span span = tracer.getCurrentContext().createSpan("node.drain")) { |
| 1052 | + AttributeMap attributeMap = tracer.createAttributeMap(); |
| 1053 | + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); |
| 1054 | + bus.fire(new NodeDrainStarted(getId())); |
| 1055 | + draining = true; |
| 1056 | + // Ensure the pendingSessions counter will not be decremented by timed out sessions not |
| 1057 | + // included |
| 1058 | + // in the currentSessionCount and the NodeDrainComplete will be raised to early. |
| 1059 | + currentSessions.cleanUp(); |
| 1060 | + int currentSessionCount = getCurrentSessionCount(); |
| 1061 | + attributeMap.put("current.session.count", currentSessionCount); |
| 1062 | + attributeMap.put("node.id", getId().toString()); |
| 1063 | + attributeMap.put("node.drain_after_session_count", this.configuredSessionCount); |
| 1064 | + if (currentSessionCount == 0) { |
| 1065 | + LOG.info("Firing node drain complete message"); |
| 1066 | + bus.fire(new NodeDrainComplete(getId())); |
| 1067 | + span.addEvent("Node drain complete", attributeMap); |
| 1068 | + } else { |
| 1069 | + pendingSessions.set(currentSessionCount); |
| 1070 | + span.addEvent( |
| 1071 | + String.format("%s session(s) pending before draining Node", currentSessionCount)); |
| 1072 | + } |
1036 | 1073 | } |
1037 | 1074 | } |
1038 | 1075 |
|
|
0 commit comments