forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpulsar.rs
More file actions
138 lines (124 loc) · 4.22 KB
/
pulsar.rs
File metadata and controls
138 lines (124 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#![allow(dead_code)] // TODO requires optional feature compilation
#[cfg(feature = "sources-pulsar")]
use metrics::Counter;
use metrics::counter;
use vector_lib::NamedInternalEvent;
use vector_lib::internal_event::{
ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
};
#[derive(Debug, NamedInternalEvent)]
pub struct PulsarSendingError {
pub count: usize,
pub error: vector_lib::Error,
}
impl InternalEvent for PulsarSendingError {
fn emit(self) {
let reason = "A Pulsar sink generated an error.";
error!(
message = reason,
error = %self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
);
counter!(
"component_errors_total",
"error_type" => error_type::REQUEST_FAILED,
"stage" => error_stage::SENDING,
)
.increment(1);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: self.count,
reason,
});
}
}
#[derive(NamedInternalEvent)]
pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
pub property_field: F,
}
impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
fn emit(self) {
error!(
message = "Failed to extract properties. Value should be a map of String -> Bytes.",
error_code = "extracting_property",
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
property_field = %self.property_field,
);
counter!(
"component_errors_total",
"error_code" => "extracting_property",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
}
}
#[cfg(feature = "sources-pulsar")]
pub enum PulsarErrorEventType {
Read,
Ack,
NAck,
}
#[cfg(feature = "sources-pulsar")]
pub struct PulsarErrorEventData {
pub msg: String,
pub error_type: PulsarErrorEventType,
}
#[cfg(feature = "sources-pulsar")]
registered_event!(
PulsarErrorEvent => {
ack_errors: Counter = counter!(
"component_errors_total",
"error_code" => "acknowledge_message",
"error_type" => error_type::ACKNOWLEDGMENT_FAILED,
"stage" => error_stage::RECEIVING,
),
nack_errors: Counter = counter!(
"component_errors_total",
"error_code" => "negative_acknowledge_message",
"error_type" => error_type::ACKNOWLEDGMENT_FAILED,
"stage" => error_stage::RECEIVING,
),
read_errors: Counter = counter!(
"component_errors_total",
"error_code" => "reading_message",
"error_type" => error_type::READER_FAILED,
"stage" => error_stage::RECEIVING,
),
}
fn emit(&self,error:PulsarErrorEventData) {
match error.error_type{
PulsarErrorEventType::Read => {
error!(
message = "Failed to read message.",
error = error.msg,
error_code = "reading_message",
error_type = error_type::READER_FAILED,
stage = error_stage::RECEIVING,
);
self.read_errors.increment(1_u64);
}
PulsarErrorEventType::Ack => {
error!(
message = "Failed to acknowledge message.",
error = error.msg,
error_code = "acknowledge_message",
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::RECEIVING,
);
self.ack_errors.increment(1_u64);
}
PulsarErrorEventType::NAck => {
error!(
message = "Failed to negatively acknowledge message.",
error = error.msg,
error_code = "negative_acknowledge_message",
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::RECEIVING,
);
self.nack_errors.increment(1_u64);
}
}
}
);