This repository was archived by the owner on Dec 16, 2024. It is now read-only.
File tree Expand file tree Collapse file tree 3 files changed +18
-34
lines changed Expand file tree Collapse file tree 3 files changed +18
-34
lines changed Original file line number Diff line number Diff line change 33
44from messaging import Consumer
55from db import DataBase
6- from threading import Lock , Timer
6+ from threading import Thread , Condition
77import json
88import time
99import os
1010
1111kafka_topic = "seg_analytics_data"
1212kafka_group = "kafka_to_db_converter"
13- ingest_batch = int (os .environ ["INGEST_BATCH" ])
14- ingest_duration = float (os .environ ["INGEST_DURATION" ])
15-
16- class IntervalTimer (Timer ):
17- def run (self ):
18- while not self .finished .is_set ():
19- self .finished .wait (self .interval )
20- self .function (* self .args , ** self .kwargs )
21- self .finished .set ()
2213
2314class KafkaToDB (object ):
2415 def __init__ (self ):
2516 super (KafkaToDB ,self ).__init__ ()
2617 self ._db = DataBase ()
2718 self ._cache = []
28- self ._lock = Lock ()
29- self ._timer = IntervalTimer (ingest_duration , self ._ingest )
30- self ._timer .start ()
19+ self ._cond = Condition ()
20+ Thread (target = self ._ingest ).start ()
3121
3222 def _ingest (self ):
33- if not len (self ._cache ): return
34- self ._lock .acquire ()
35- bulk = self ._cache
36- self ._cache = []
37- self ._lock .release ()
38- if not len (bulk ): return
39- try :
40- self ._db .save (bulk )
41- print ("SaveToDB #" + str (len (bulk )), flush = True )
42- except Exception as e :
43- print ("Exception: " + str (e ), flush = True )
23+ while True :
24+ self ._cond .acquire ()
25+ self ._cond .wait ()
26+ bulk = self ._cache
27+ self ._cache = []
28+ self ._cond .release ()
29+
30+ try :
31+ self ._db .save (bulk )
32+ print ("SaveToDB #" + str (len (bulk )), flush = True )
33+ except Exception as e :
34+ print ("Exception: " + str (e ), flush = True )
4435
4536 def _send (self , data ):
46- self ._lock .acquire ()
37+ self ._cond .acquire ()
4738 self ._cache .append (data )
48- self ._lock .release ()
49- if len (self ._cache )> ingest_batch :
50- self ._ingest ()
39+ self ._cond .notify ()
40+ self ._cond .release ()
5141
5242 def listen (self ):
5343 while True :
Original file line number Diff line number Diff line change 22 kafka2db:
33 image: ssai_kafka2db:latest
44 environment:
5- INGEST_DURATION: "0.1"
6- INGEST_BATCH: "50"
75 NO_PROXY: "*"
86 no_proxy: "*"
97 networks:
Original file line number Diff line number Diff line change 2222 image: ssai_kafka2db:latest
2323 imagePullPolicy: IfNotPresent
2424 env:
25- - name: INGEST_DURATION
26- value: "0.1"
27- - name: INGEST_BATCH
28- value: "50"
2925 - name: NO_PROXY
3026 value: "*"
3127 - name: no_proxy
You can’t perform that action at this time.
0 commit comments