17
17
from typing import List
18
18
from typing import Optional
19
19
from typing import Type
20
+ from typing import cast
20
21
21
22
from pydantic import Field
22
23
@@ -74,6 +75,7 @@ class ElasticSearchConfig(ForwardConfigBase):
74
75
username : Optional [str ] = None
75
76
password : Optional [str ] = None
76
77
timeout : int = 10
78
+ index : Optional [str ] = None
77
79
78
80
79
81
def get_config_schema () -> Type [ElasticSearchConfig ]:
@@ -86,7 +88,7 @@ def get_config_schema() -> Type[ElasticSearchConfig]:
86
88
async def forward (
87
89
* ,
88
90
ctx : PipelineRunContext [ElasticSearchConfig ],
89
- event : ElasticSearchEvent ,
91
+ event : ElasticSearchEvent | CollectedEvent ,
90
92
) -> None :
91
93
"""
92
94
Method called to forward the event.
@@ -106,8 +108,31 @@ async def forward(
106
108
connection_info = await client .info ()
107
109
log .warning ("ES Connection Info:\n %s" , pprint .pformat (connection_info ))
108
110
client = ctx .cache ["client" ]
109
- data = event .data .copy ()
111
+ if isinstance (event , ElasticSearchEvent ):
112
+ es_id = event .id
113
+ es_index = event .index
114
+ else :
115
+ es_id = str (uuid .uuid4 ())
116
+ if ctx .config .index is None :
117
+ msg = (
118
+ "Event to forward is not an instance of ElasticSearchEvent and the "
119
+ "elasticsearch plugin index configuration is not defined."
120
+ )
121
+ raise RuntimeError (msg )
122
+ es_index = ctx .config .index
123
+ data = cast (Dict [str , Any ], event .data ).copy ()
110
124
if "@timestamp" not in data :
111
125
data ["@timestamp" ] = event .timestamp
112
- ret = await client .index (index = event .index , id = event .id , body = data )
113
- log .warning ("ES SEND:\n %s" , pprint .pformat (ret ))
126
+ log .debug (
127
+ "ElasticSearch Forward Details:\n index: %s\n id: %s\n data:\n %s" ,
128
+ es_index ,
129
+ es_id ,
130
+ pprint .pformat (data ),
131
+ )
132
+ ret = await client .index (index = es_index , id = es_id , body = data )
133
+ log .debug (
134
+ "ElasticSearch Forward Response(index: %s; id: %s):\n %s" ,
135
+ es_index ,
136
+ es_id ,
137
+ pprint .pformat (ret ),
138
+ )
0 commit comments