|
34 | 34 | import com.google.common.base.Ticker; |
35 | 35 | import com.google.common.cache.Cache; |
36 | 36 | import com.google.common.cache.CacheBuilder; |
| 37 | +import com.google.common.cache.RemovalCause; |
37 | 38 | import com.google.common.cache.RemovalListener; |
38 | 39 | import com.google.common.cache.RemovalNotification; |
39 | 40 | import com.google.common.collect.ImmutableList; |
@@ -313,30 +314,55 @@ public void close() { |
313 | 314 | } |
314 | 315 |
|
315 | 316 | private void stopTimedOutSession(RemovalNotification<SessionId, SessionSlot> notification) { |
316 | | - if (notification.getKey() != null && notification.getValue() != null) { |
317 | | - SessionSlot slot = notification.getValue(); |
318 | | - SessionId id = notification.getKey(); |
319 | | - if (notification.wasEvicted()) { |
320 | | - // Session is timing out, stopping it by sending a DELETE |
321 | | - LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
322 | | - try { |
323 | | - slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
324 | | - } catch (Exception e) { |
325 | | - LOG.log(Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 317 | + try (Span span = tracer.getCurrentContext().createSpan("node.stop_session")) { |
| 318 | + AttributeMap attributeMap = tracer.createAttributeMap(); |
| 319 | + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); |
| 320 | + if (notification.getKey() != null && notification.getValue() != null) { |
| 321 | + SessionSlot slot = notification.getValue(); |
| 322 | + SessionId id = notification.getKey(); |
| 323 | + attributeMap.put("node.id", getId().toString()); |
| 324 | + attributeMap.put("session.slotId", slot.getId().toString()); |
| 325 | + attributeMap.put("session.id", id.toString()); |
| 326 | + attributeMap.put("session.timeout_in_seconds", getSessionTimeout().toSeconds()); |
| 327 | + attributeMap.put("session.remove.cause", notification.getCause().name()); |
| 328 | + if (notification.wasEvicted() && notification.getCause() == RemovalCause.EXPIRED) { |
| 329 | + // Session is timing out, stopping it by sending a DELETE |
| 330 | + LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
| 331 | + span.setStatus(Status.CANCELLED); |
| 332 | + span.addEvent(String.format("Stopping the the timed session %s", id), attributeMap); |
| 333 | + } else { |
| 334 | + LOG.log(Level.INFO, () -> String.format("Session id %s is stopping on demand...", id)); |
| 335 | + span.addEvent(String.format("Stopping the session %s on demand", id), attributeMap); |
326 | 336 | } |
327 | | - } |
328 | | - // Attempt to stop the session |
329 | | - slot.stop(); |
330 | | - // Decrement pending sessions if Node is draining |
331 | | - if (this.isDraining()) { |
332 | | - int done = pendingSessions.decrementAndGet(); |
333 | | - if (done <= 0) { |
334 | | - LOG.info("Node draining complete!"); |
335 | | - bus.fire(new NodeDrainComplete(this.getId())); |
| 337 | + if (notification.wasEvicted()) { |
| 338 | + try { |
| 339 | + slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
| 340 | + } catch (Exception e) { |
| 341 | + LOG.log( |
| 342 | + Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 343 | + span.setStatus(Status.INTERNAL); |
| 344 | + span.addEvent( |
| 345 | + String.format("Exception while trying to stop session %s", id), attributeMap); |
| 346 | + } |
336 | 347 | } |
| 348 | + // Attempt to stop the session |
| 349 | + slot.stop(); |
| 350 | + // Decrement pending sessions if Node is draining |
| 351 | + if (this.isDraining()) { |
| 352 | + int done = pendingSessions.decrementAndGet(); |
| 353 | + attributeMap.put("current.session.count", done); |
| 354 | + attributeMap.put("node.drain_after_session_count", this.configuredSessionCount); |
| 355 | + if (done <= 0) { |
| 356 | + LOG.info("Node draining complete!"); |
| 357 | + bus.fire(new NodeDrainComplete(this.getId())); |
| 358 | + span.addEvent("Node draining complete!", attributeMap); |
| 359 | + } |
| 360 | + } |
| 361 | + } else { |
| 362 | + LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
| 363 | + span.setStatus(Status.INVALID_ARGUMENT); |
| 364 | + span.addEvent("Received stop session notification with null values", attributeMap); |
337 | 365 | } |
338 | | - } else { |
339 | | - LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
340 | 366 | } |
341 | 367 | } |
342 | 368 |
|
@@ -985,17 +1011,27 @@ public HealthCheck getHealthCheck() { |
985 | 1011 |
|
986 | 1012 | @Override |
987 | 1013 | public void drain() { |
988 | | - bus.fire(new NodeDrainStarted(getId())); |
989 | | - draining = true; |
990 | | - // Ensure the pendingSessions counter will not be decremented by timed out sessions not included |
991 | | - // in the currentSessionCount and the NodeDrainComplete will be raised to early. |
992 | | - currentSessions.cleanUp(); |
993 | | - int currentSessionCount = getCurrentSessionCount(); |
994 | | - if (currentSessionCount == 0) { |
995 | | - LOG.info("Firing node drain complete message"); |
996 | | - bus.fire(new NodeDrainComplete(getId())); |
997 | | - } else { |
998 | | - pendingSessions.set(currentSessionCount); |
| 1014 | + try (Span span = tracer.getCurrentContext().createSpan("node.drain")) { |
| 1015 | + AttributeMap attributeMap = tracer.createAttributeMap(); |
| 1016 | + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); |
| 1017 | + bus.fire(new NodeDrainStarted(getId())); |
| 1018 | + draining = true; |
| 1019 | + // Ensure the pendingSessions counter will not be decremented by timed out sessions not |
| 1020 | + // included |
| 1021 | + // in the currentSessionCount and the NodeDrainComplete will be raised to early. |
| 1022 | + currentSessions.cleanUp(); |
| 1023 | + int currentSessionCount = getCurrentSessionCount(); |
| 1024 | + attributeMap.put("current.session.count", currentSessionCount); |
| 1025 | + attributeMap.put("node.id", getId().toString()); |
| 1026 | + attributeMap.put("node.drain_after_session_count", this.configuredSessionCount); |
| 1027 | + if (currentSessionCount == 0) { |
| 1028 | + LOG.info("Firing node drain complete message"); |
| 1029 | + bus.fire(new NodeDrainComplete(getId())); |
| 1030 | + span.addEvent("Node drain complete", attributeMap); |
| 1031 | + } else { |
| 1032 | + pendingSessions.set(currentSessionCount); |
| 1033 | + span.addEvent(String.format("%s session(s) pending before draining Node", attributeMap)); |
| 1034 | + } |
999 | 1035 | } |
1000 | 1036 | } |
1001 | 1037 |
|
|
0 commit comments