Skip to content

Commit 90d514d

Browse files
committed
more cleanup
1 parent 14320b7 commit 90d514d

File tree

3 files changed

+4
-18
lines changed

3 files changed

+4
-18
lines changed

api/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (s *Server) handleTxSSE(w http.ResponseWriter, r *http.Request) {
4141
return
4242

4343
case tx := <-subscriber.txC:
44+
// Note/TODO: a client with a slow connection may cause blocking other clients and cause DoS on all receivers
4445
fmt.Fprintf(w, "data: %s\n\n", tx)
4546
w.(http.Flusher).Flush() //nolint:forcetypeassert
4647

collector/clickhouse.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -235,21 +235,3 @@ func (ch *Clickhouse) sendBatchWithRetries(name string, batch driver.Batch) {
235235
metrics.IncClickhouseBatchSaveRetries()
236236
}
237237
}
238-
239-
// FlishData flushes any remaining transactions and source logs in the batch to Clickhouse.
240-
// This is useful for ensuring all data is saved before shutting down the collector.
241-
// Needs a lock to avoid concurrent access to the batches.
242-
//
243-
// func (ch *Clickhouse) FlushData() {
244-
// // Flush any remaining transactions in the batch
245-
// if len(ch.currentTxBatch) > 0 {
246-
// ch.saveTxs(slices.Clone(ch.currentTxBatch))
247-
// ch.currentTxBatch = ch.currentTxBatch[:0] // Clear the slice without reallocating
248-
// }
249-
250-
// // Flush any remaining source logs in the batch
251-
// if len(ch.currentSourcelogBatch) > 0 {
252-
// ch.saveSourcelogs(slices.Clone(ch.currentSourcelogBatch))
253-
// ch.currentSourcelogBatch = ch.currentSourcelogBatch[:0] // Clear the slice without reallocating
254-
// }
255-
// }

collector/tx_processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ func (p *TxProcessor) Start() {
166166
func (p *TxProcessor) startTransactionReceiverLoop() {
167167
p.log.Info("Waiting for transactions...")
168168
for txIn := range p.txC {
169+
if txIn.Tx == nil {
170+
continue
171+
}
169172
p.processTx(txIn)
170173
}
171174
}

0 commit comments

Comments
 (0)