11package com .layor .tinyflow .service ;
22
33import com .layor .tinyflow .config .RabbitMQConfig ;
4+ import com .layor .tinyflow .entity .ClickEvent ;
45import com .layor .tinyflow .entity .ClickMessage ;
56import com .layor .tinyflow .entity .DailyClick ;
67import com .layor .tinyflow .repository .DailyClickRepository ;
1718import org .springframework .beans .factory .annotation .Value ;
1819
1920import java .time .LocalDate ;
21+ import java .time .LocalDateTime ;
22+ import java .util .ArrayList ;
23+ import java .util .List ;
24+ import java .util .concurrent .ConcurrentHashMap ;
25+ import java .util .concurrent .ConcurrentLinkedQueue ;
26+ import java .util .concurrent .ThreadLocalRandom ;
27+ import java .util .concurrent .atomic .AtomicLong ;
2028
2129@ Service
2230@ Slf4j
@@ -32,15 +40,21 @@ public class ClickRecorderService {
3240 private org .springframework .data .redis .core .StringRedisTemplate redisTemplate ;
3341 @ Autowired
3442 private RabbitTemplate rabbitTemplate ;
43+ @ Autowired (required = false )
44+ private IpLocationService ipLocationService ;
3545
3646 @ Value ("${clicks.mode:mq}" )
3747 private String counterMode ;
3848
3949 @ Value ("${events.sampleRate:0.0}" )
4050 private double sampleRate ;
4151
42- private final java .util .concurrent .ConcurrentHashMap <String , java .util .concurrent .atomic .AtomicLong > localTotal = new java .util .concurrent .ConcurrentHashMap <>();
43- private final java .util .concurrent .ConcurrentHashMap <String , java .util .concurrent .atomic .AtomicLong > localDay = new java .util .concurrent .ConcurrentHashMap <>();
52+ private final ConcurrentHashMap <String , AtomicLong > localTotal = new ConcurrentHashMap <>();
53+ private final ConcurrentHashMap <String , AtomicLong > localDay = new ConcurrentHashMap <>();
54+
55+ // 详细事件批量缓冲队列
56+ private final ConcurrentLinkedQueue <ClickEvent > eventBuffer = new ConcurrentLinkedQueue <>();
57+ private static final int EVENT_BATCH_SIZE = 100 ;
4458
4559 /**
4660 * 记录点击事件
@@ -53,36 +67,94 @@ public class ClickRecorderService {
5367 @ Async
5468 public void recordClick (String shortCode ) {
5569 // 所有模式都先写本地内存(最快)
56- localTotal .computeIfAbsent (shortCode , k -> new java . util . concurrent . atomic . AtomicLong ())
70+ localTotal .computeIfAbsent (shortCode , k -> new AtomicLong ())
5771 .incrementAndGet ();
58- localDay .computeIfAbsent (shortCode , k -> new java . util . concurrent . atomic . AtomicLong ())
72+ localDay .computeIfAbsent (shortCode , k -> new AtomicLong ())
5973 .incrementAndGet ();
6074
6175 log .debug ("[LOCAL] Click recorded: {}" , shortCode );
6276 }
6377
78+ /**
79+ * 记录详细点击事件(包含 IP 地理位置解析)
80+ * 使用批量缓冲队列,定时批量写入数据库
81+ */
6482 @ Async
6583 public void recordClickEvent (String shortCode , String referer , String ua , String ip , String host , String device ) {
6684 if (sampleRate <= 0.0 ) return ;
6785 if (sampleRate < 1.0 ) {
68- if (java .util .concurrent .ThreadLocalRandom .current ().nextDouble () >= sampleRate ) return ;
86+ if (ThreadLocalRandom .current ().nextDouble () >= sampleRate ) return ;
87+ }
88+
89+ // IP 地理位置解析
90+ String city = "" ;
91+ String country = "" ;
92+ if (ipLocationService != null && ip != null && !ip .isEmpty ()) {
93+ try {
94+ city = ipLocationService .getCity (ip );
95+ country = ipLocationService .getCountry (ip );
96+ } catch (Exception e ) {
97+ log .debug ("[EVENT] IP location parse failed for {}: {}" , ip , e .getMessage ());
98+ }
6999 }
70- // 封装
71- com .layor .tinyflow .entity .ClickEvent ev = com .layor .tinyflow .entity .ClickEvent .builder ()
100+
101+ // 构建事件对象
102+ ClickEvent ev = ClickEvent .builder ()
72103 .shortCode (shortCode )
73- .ts (java . time . LocalDateTime .now ())
104+ .ts (LocalDateTime .now ())
74105 .referer (referer )
75106 .ua (ua )
76107 .ip (ip )
77108 .sourceHost (host )
78109 .deviceType (device )
79- .city ("" )
80- .country ("" )
110+ .city (city )
111+ .country (country )
81112 .build ();
82- // 保存
83- clickEventRepository .save (ev );
113+
114+ // 加入批量缓冲队列
115+ eventBuffer .offer (ev );
116+
117+ // 缓冲区达到阈值时触发批量写入
118+ if (eventBuffer .size () >= EVENT_BATCH_SIZE ) {
119+ flushEventBuffer ();
120+ }
121+
122+ log .debug ("[EVENT] Buffered click event: code={}, city={}, country={}" , shortCode , city , country );
84123 }
85124
125+ /**
126+ * 定时刷新事件缓冲区(5秒一次)
127+ */
128+ @ Scheduled (fixedDelay = 5000 )
129+ @ Transactional
130+ public void flushEventBuffer () {
131+ if (eventBuffer .isEmpty ()) return ;
132+
133+ long startTime = System .currentTimeMillis ();
134+ List <ClickEvent > batch = new ArrayList <>();
135+
136+ // 每次最多处理 500 条
137+ int count = 0 ;
138+ while (!eventBuffer .isEmpty () && count < 500 ) {
139+ ClickEvent ev = eventBuffer .poll ();
140+ if (ev != null ) {
141+ batch .add (ev );
142+ count ++;
143+ }
144+ }
145+
146+ if (!batch .isEmpty ()) {
147+ try {
148+ clickEventRepository .saveAll (batch );
149+ long duration = System .currentTimeMillis () - startTime ;
150+ log .info ("[EVENT FLUSH] Saved {} click events to DB, duration={}ms" , batch .size (), duration );
151+ } catch (Exception e ) {
152+ log .error ("[EVENT FLUSH ERROR] Failed to save click events: {}" , e .getMessage (), e );
153+ // 失败时重新放回队列
154+ eventBuffer .addAll (batch );
155+ }
156+ }
157+ }
86158
87159 /**
88160 * 第一阶段:定期快照到 Redis(10 秒一次)
@@ -178,4 +250,4 @@ private long toLong(Object value) {
178250 if (value instanceof Number ) return ((Number ) value ).longValue ();
179251 try { return Long .parseLong (String .valueOf (value )); } catch (Exception ex ) { return 0L ; }
180252 }
181- }
253+ }
0 commit comments