Skip to content

Commit b60e27a

Browse files
authored
Fix timestamp replacement logic and offsets (#893)
1 parent 243fb38 commit b60e27a

File tree

4 files changed

+328
-280
lines changed

4 files changed

+328
-280
lines changed

elastic/shared/parameter_sources/processed.py

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class MagicNumbers:
5656
RALLYTSDATA_LEN = 3 # len(NNN) (always 3 digits)
5757
RALLYTSDATA_LEN_END = 11 # position of character before <
5858
RALLYTS_FORMAT_BEGIN = 12 # position of first character after <
59+
RALLYTS_CLOSING_CHAR_LEN = 1 # length of closing '>' character
5960

6061

6162
class ProcessedCorpusParamSource:
@@ -162,9 +163,9 @@ def partition(self, partition_index, total_partitions):
162163
return ProcessedCorpusParamSource(self._orig_args[0], new_params, **self._orig_args[2])
163164

164165
def _json_processor(self, doc: bytes, line_num: int, _: str) -> Tuple[str, int]:
165-
doc = doc.decode("utf-8").strip()
166+
decoded_doc = doc.decode("utf-8").strip()
166167
if line_num % 2 == 0:
167-
return doc, 0
168+
return decoded_doc, 0
168169
# adds the timestamp to docs not metadata lines which will be in generated files
169170
timestamp = self._ts_generator.next_timestamp()
170171
# we assume date order - maybe speed this up for a boolean check on first request?
@@ -173,43 +174,60 @@ def _json_processor(self, doc: bytes, line_num: int, _: str) -> Tuple[str, int]:
173174
self.max_timestamp = timestamp
174175

175176
# see ProcessedCorpusParamSource for more details
176-
rallyts_start_pos = int(doc[MagicNumbers.RALLYTS_BEGIN_IDX : MagicNumbers.TS_BEGIN_IDX], 16)
177-
msglen_value_start_pos = int(doc[MagicNumbers.MSGLEN_BEGIN_IDX : MagicNumbers.MSGLEN_END_IDX], 16)
178-
msglen_value_end_pos = int(doc[MagicNumbers.MSGLEN_END_IDX : MagicNumbers.MSGLEN_END_IDX + 10], 16)
177+
rallyts_start_pos = int(decoded_doc[MagicNumbers.RALLYTS_BEGIN_IDX : MagicNumbers.TS_BEGIN_IDX], 16)
178+
ts_value_start_pos = int(decoded_doc[MagicNumbers.TS_BEGIN_IDX : MagicNumbers.TS_END_IDX], 16)
179+
ts_value_end_pos = int(decoded_doc[MagicNumbers.TS_END_IDX : MagicNumbers.MSGLEN_BEGIN_IDX], 16)
180+
msglen_value_start_pos = int(decoded_doc[MagicNumbers.MSGLEN_BEGIN_IDX : MagicNumbers.MSGLEN_END_IDX], 16)
181+
msglen_value_end_pos = int(decoded_doc[MagicNumbers.MSGLEN_END_IDX : MagicNumbers.MSGLEN_END_IDX + 10], 16)
179182

180-
msgsize = int(doc[msglen_value_start_pos:msglen_value_end_pos], 10)
183+
msgsize = int(decoded_doc[msglen_value_start_pos:msglen_value_end_pos], 10)
181184

185+
# Track the offset adjustment for subsequent replacements
186+
offset_adjustment = 0
187+
188+
# First, handle rally timestamp marker replacement if present
182189
if rallyts_start_pos != -1:
183190
# doc["message"] contains _RALLYTS with timestamp format specification (most of integrations)
184191

185-
rallyts_len = int(doc[rallyts_start_pos + MagicNumbers.RALLYTS_LEN : rallyts_start_pos + MagicNumbers.RALLYTSDATA_LEN_END], 10)
192+
rallyts_len = int(
193+
decoded_doc[rallyts_start_pos + MagicNumbers.RALLYTS_LEN : rallyts_start_pos + MagicNumbers.RALLYTSDATA_LEN_END], 10
194+
)
186195

187-
ts_format = doc[
196+
ts_format = decoded_doc[
188197
rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN : rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len
189198
]
190199

191200
# %s is spuriously supported/implemented, depending on the platform's implementation of strftime. Here we specifically implement handling to mean
192201
# timezone-less interpretation of the epoch
193202
if ts_format == "%s":
194203
# turns out float.__trunc__ is faster than builtins.int(<float>) per microbenchmark
195-
formatted_ts = timestamp.timestamp().__trunc__()
204+
formatted_rallyts = str(timestamp.timestamp().__trunc__())
196205
else:
197-
formatted_ts = time.strftime(ts_format, timestamp.timetuple())
206+
formatted_rallyts = time.strftime(ts_format, timestamp.timetuple())
207+
208+
# Calculate the original placeholder length
209+
rallyts_placeholder_len = (
210+
MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len + MagicNumbers.RALLYTS_CLOSING_CHAR_LEN
211+
) # +1 for closing >
198212

199213
# replace _RALLYTSNNN<...> with generated timestamp in the right format
200-
# and omit the "markers" key
201-
doc = (
202-
f"{doc[:rallyts_start_pos]}"
203-
f"{formatted_ts}"
204-
f"{doc[rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len + 1: MagicNumbers.MARKER_IDX]}"
205-
f"}}}}"
214+
decoded_doc = (
215+
f"{decoded_doc[:rallyts_start_pos]}" f"{formatted_rallyts}" f"{decoded_doc[rallyts_start_pos + rallyts_placeholder_len:]}"
206216
)
207-
else:
208-
# no timestamp in message field e.g. application-logs, redis-slowlog-log
217+
218+
# Calculate the offset adjustment for subsequent replacements
219+
offset_adjustment = len(formatted_rallyts) - rallyts_placeholder_len
220+
221+
# Second, handle @timestamp field replacement if present
222+
if ts_value_start_pos != -1:
209223
# directly copy timestamp in a format compatible with the `date` ES field (`strict_date_optional_time`)
210224

211-
ts_value_start_pos = int(doc[MagicNumbers.TS_BEGIN_IDX : MagicNumbers.TS_END_IDX], 16)
212-
ts_value_end_pos = int(doc[MagicNumbers.TS_END_IDX : MagicNumbers.MSGLEN_BEGIN_IDX], 16)
225+
# Adjust positions if rally marker replacement occurred before this position
226+
adjusted_ts_value_start_pos = ts_value_start_pos
227+
adjusted_ts_value_end_pos = ts_value_end_pos
228+
if rallyts_start_pos != -1 and rallyts_start_pos < ts_value_start_pos:
229+
adjusted_ts_value_start_pos += offset_adjustment
230+
adjusted_ts_value_end_pos += offset_adjustment
213231

214232
formatted_ts = "%04d-%02d-%02dT%02d:%02d:%02d" % (
215233
timestamp.year,
@@ -225,15 +243,17 @@ def _json_processor(self, doc: bytes, line_num: int, _: str) -> Tuple[str, int]:
225243
LARGEINT = 123132434 + line_num
226244

227245
# replace @timestamp value with generated timestamp
228-
# and omit the "markers" key
229-
doc = (
230-
f"{doc[:ts_value_start_pos]}"
246+
decoded_doc = (
247+
f"{decoded_doc[:adjusted_ts_value_start_pos]}"
231248
f"""{formatted_ts}.{LARGEINT % 1000}Z"""
232-
f"{doc[ts_value_end_pos: MagicNumbers.MARKER_IDX]}"
233-
f"}}}}"
249+
f"{decoded_doc[adjusted_ts_value_end_pos:]}"
234250
)
235251

236-
return doc, msgsize
252+
# Finally, remove the "markers" key from the end of the document
253+
# The marker index is from the end, so it's not affected by earlier replacements
254+
decoded_doc = decoded_doc[: MagicNumbers.MARKER_IDX] + "}}"
255+
256+
return decoded_doc, msgsize
237257

238258
def create_bulk_corpus_reader(self, corpus, bulk_size, processor, num_clients, client_index):
239259
readers = []

elastic/shared/track_processors/data_generator.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,13 @@ def _append_doc_markers(doc):
320320
_rallyts_start_idx = raw_string.find(_rallyts_token)
321321

322322
_ts_token = '@timestamp": "'
323-
_ts_start_idx = raw_string.find(_ts_token) + len(_ts_token)
324-
_ts_end_idx = _ts_start_idx + raw_string[_ts_start_idx:].find('"')
323+
_ts_find_result = raw_string.find(_ts_token)
324+
if _ts_find_result != -1:
325+
_ts_start_idx = _ts_find_result + len(_ts_token)
326+
_ts_end_idx = _ts_start_idx + raw_string[_ts_start_idx:].find('"')
327+
else:
328+
_ts_start_idx = -1
329+
_ts_end_idx = -1
325330

326331
_msgsize_token = '"rally": {"message_size": '
327332
_msgsize_start_idx = raw_string.find(_msgsize_token) + len(_msgsize_token)

0 commit comments

Comments
 (0)