-
Notifications
You must be signed in to change notification settings - Fork 509
Logcap #668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
📢 Thoughts on this report? Let us know! |
Signed-off-by: Teo <[email protected]>
Rich formatted text with various styles (color, bold, italic) Direct ANSI codes with proper newlines Mixed color text in both stdout and stderr All ANSI codes are preserved in the logged output Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Added a private _make_request method that handles all the common request logicSimplified the public HTTP methods (GET/POST/PUT/DELETE) to use the common handlerEach method only specifies its unique parameters and passes them to _make_requestAdded DELETE method for completenessMaintained all existing error handling and response processingKept the connection pooling and header preparation logic unchanged Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…it from `Session` Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…ed for this session Signed-off-by: Teo <[email protected]> logger -> agentops_logger Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…ssion Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…tion Signed-off-by: Teo <[email protected]>
…_sesssion" Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…tegers or slices, not str Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
|
The current implementation has a potential issue: it doesn't guarantee that logs from one session won't leak into another when running concurrently. This needs to be addressed with callstack lookup or other methods. @areibman: otherwise, how does agentops actually know which session to assign llm events to? would the same mechanism work? |
…uests) Signed-off-by: Teo <[email protected]>
…d url Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
| def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]: | ||
| """Set up OpenTelemetry logging components for a new session. | ||
| This function creates the necessary components to capture and export logs for a specific session: | ||
| - A LoggerProvider with session-specific resource attributes | ||
| - A BatchLogRecordProcessor to batch and export logs | ||
| - A LoggingHandler to capture logs and forward them to the processor | ||
| Args: | ||
| session_id: Unique identifier for the session, used to tag telemetry data | ||
| log_exporter: SessionLogExporter instance that handles sending logs to AgentOps backend | ||
| Returns: | ||
| Tuple containing: | ||
| - LoggingHandler: Handler that should be added to the logger | ||
| - BatchLogRecordProcessor: Processor that batches and exports logs | ||
| """ | ||
| # Create logging components | ||
| resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"}) | ||
| logger_provider = LoggerProvider(resource=resource) | ||
|
|
||
| # Create processor and handler | ||
| log_processor = BatchLogRecordProcessor(log_exporter) | ||
| logger_provider.add_log_record_processor(log_processor) # Add processor to provider | ||
|
|
||
| from agentops.log_capture import LogCapture | ||
|
|
||
| logcap = LogCapture( | ||
| session, | ||
| ) | ||
|
|
||
| logcap.start() | ||
|
|
||
| # log_handler = LoggingHandler( | ||
| # level=logging.INFO, | ||
| # logger_provider=logger_provider, | ||
| # ) | ||
| # | ||
| # # Register handler with session | ||
| # set_session_handler(session_id, log_handler) | ||
| # | ||
| # return log_handler, log_processor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function setup_session_telemetry no longer returns the promised tuple[LoggingHandler, BatchLogRecordProcessor] as specified in its return type annotation, causing type checking failures and potential runtime errors.
📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]: | |
| """Set up OpenTelemetry logging components for a new session. | |
| This function creates the necessary components to capture and export logs for a specific session: | |
| - A LoggerProvider with session-specific resource attributes | |
| - A BatchLogRecordProcessor to batch and export logs | |
| - A LoggingHandler to capture logs and forward them to the processor | |
| Args: | |
| session_id: Unique identifier for the session, used to tag telemetry data | |
| log_exporter: SessionLogExporter instance that handles sending logs to AgentOps backend | |
| Returns: | |
| Tuple containing: | |
| - LoggingHandler: Handler that should be added to the logger | |
| - BatchLogRecordProcessor: Processor that batches and exports logs | |
| """ | |
| # Create logging components | |
| resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"}) | |
| logger_provider = LoggerProvider(resource=resource) | |
| # Create processor and handler | |
| log_processor = BatchLogRecordProcessor(log_exporter) | |
| logger_provider.add_log_record_processor(log_processor) # Add processor to provider | |
| from agentops.log_capture import LogCapture | |
| logcap = LogCapture( | |
| session, | |
| ) | |
| logcap.start() | |
| # log_handler = LoggingHandler( | |
| # level=logging.INFO, | |
| # logger_provider=logger_provider, | |
| # ) | |
| # | |
| # # Register handler with session | |
| # set_session_handler(session_id, log_handler) | |
| # | |
| # return log_handler, log_processor | |
| def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]: | |
| # ... | |
| logcap.start() | |
| return log_handler, log_processor |
| resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"}) | ||
| logger_provider = LoggerProvider(resource=resource) | ||
|
|
||
| # Create processor and handler | ||
| log_processor = BatchLogRecordProcessor(log_exporter) | ||
| logger_provider.add_log_record_processor(log_processor) # Add processor to provider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log_processor and logger_provider are created but never used since the code is commented out, leading to resource leaks and non-functional logging setup.
📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"}) | |
| logger_provider = LoggerProvider(resource=resource) | |
| # Create processor and handler | |
| log_processor = BatchLogRecordProcessor(log_exporter) | |
| logger_provider.add_log_record_processor(log_processor) # Add processor to provider | |
| resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"}) | |
| logger_provider = LoggerProvider(resource=resource) | |
| # Create processor and handler | |
| log_processor = BatchLogRecordProcessor(log_exporter) | |
| logger_provider.add_log_record_processor(log_processor) |
| # Setup logger provider with console exporter | ||
| resource = Resource.create(resource_attrs) | ||
| self._logger_provider = LoggerProvider(resource=resource) | ||
| self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self.session._log_exporter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing self.session._log_exporter directly breaks encapsulation and may fail if the internal implementation changes. Should use a public accessor method instead.
📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self.session._log_exporter)) | |
| self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self.session.get_log_exporter())) |
| def _reauthorize_jwt(self) -> Union[str, None]: | ||
| with self._lock: | ||
| payload = {"session_id": self.session_id} | ||
| serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") | ||
| res = HttpClient.post( | ||
| f"{self.config.endpoint}/v2/reauthorize_jwt", | ||
| serialized_payload, | ||
| self.config.api_key, | ||
| ) | ||
|
|
||
| logger.debug(res.body) | ||
|
|
||
| if res.code != 200: | ||
| try: | ||
| serialized_payload = safe_serialize(payload).encode("utf-8") | ||
| res = HttpClient.post( | ||
| f"{self.config.endpoint}/v2/reauthorize_jwt", | ||
| serialized_payload, | ||
| self.config.api_key, | ||
| ) | ||
| if not res: | ||
| return None | ||
| jwt = res.body.get("jwt") | ||
| self.jwt = jwt | ||
| return jwt | ||
| except Exception as e: | ||
| logger.error(f"Failed to reauthorize JWT: {e}") | ||
| return None | ||
|
|
||
| jwt = res.body.get("jwt", None) | ||
| self.jwt = jwt | ||
| return jwt | ||
|
|
||
| def _start_session(self): | ||
| with self._lock: | ||
| payload = {"session": self.__dict__} | ||
| serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") | ||
|
|
||
| try: | ||
| serialized_payload = safe_serialize(payload).encode("utf-8") | ||
| res = HttpClient.post( | ||
| f"{self.config.endpoint}/v2/create_session", | ||
| serialized_payload, | ||
| api_key=self.config.api_key, | ||
| parent_key=self.config.parent_key, | ||
| ) | ||
| except ApiServerException as e: | ||
| return logger.error(f"Could not start session - {e}") | ||
| if not res: | ||
| return False | ||
| jwt = res.body.get("jwt") | ||
| self.jwt = jwt | ||
| if jwt is None: | ||
| return False | ||
|
|
||
| logger.debug(res.body) | ||
|
|
||
| if res.code != 200: | ||
| return False | ||
|
|
||
| jwt = res.body.get("jwt", None) | ||
| self.jwt = jwt | ||
| if jwt is None: | ||
| return False | ||
|
|
||
| logger.info( | ||
| colored( | ||
| f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", | ||
| "blue", | ||
| logger.info( | ||
| colored( | ||
| f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", | ||
| "blue", | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| return True | ||
| add_session(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition risk: add_session() is called after session setup completes, while remove_session() happens first in cleanup. Should be reversed to prevent tracking partially initialized sessions.
📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| def _reauthorize_jwt(self) -> Union[str, None]: | |
| with self._lock: | |
| payload = {"session_id": self.session_id} | |
| serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") | |
| res = HttpClient.post( | |
| f"{self.config.endpoint}/v2/reauthorize_jwt", | |
| serialized_payload, | |
| self.config.api_key, | |
| ) | |
| logger.debug(res.body) | |
| if res.code != 200: | |
| try: | |
| serialized_payload = safe_serialize(payload).encode("utf-8") | |
| res = HttpClient.post( | |
| f"{self.config.endpoint}/v2/reauthorize_jwt", | |
| serialized_payload, | |
| self.config.api_key, | |
| ) | |
| if not res: | |
| return None | |
| jwt = res.body.get("jwt") | |
| self.jwt = jwt | |
| return jwt | |
| except Exception as e: | |
| logger.error(f"Failed to reauthorize JWT: {e}") | |
| return None | |
| jwt = res.body.get("jwt", None) | |
| self.jwt = jwt | |
| return jwt | |
| def _start_session(self): | |
| with self._lock: | |
| payload = {"session": self.__dict__} | |
| serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") | |
| try: | |
| serialized_payload = safe_serialize(payload).encode("utf-8") | |
| res = HttpClient.post( | |
| f"{self.config.endpoint}/v2/create_session", | |
| serialized_payload, | |
| api_key=self.config.api_key, | |
| parent_key=self.config.parent_key, | |
| ) | |
| except ApiServerException as e: | |
| return logger.error(f"Could not start session - {e}") | |
| if not res: | |
| return False | |
| jwt = res.body.get("jwt") | |
| self.jwt = jwt | |
| if jwt is None: | |
| return False | |
| logger.debug(res.body) | |
| if res.code != 200: | |
| return False | |
| jwt = res.body.get("jwt", None) | |
| self.jwt = jwt | |
| if jwt is None: | |
| return False | |
| logger.info( | |
| colored( | |
| f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", | |
| "blue", | |
| logger.info( | |
| colored( | |
| f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", | |
| "blue", | |
| ) | |
| ) | |
| ) | |
| return True | |
| add_session(self) | |
| finally: | |
| add_session(self) | |
| ... | |
| remove_session(self) |
|
This pull request has been automatically marked as stale because it has not had any activity in the last 14 days. If no updates are made within 7 days, this PR will be automatically closed. |
|
This pull request has been automatically closed because it has been stale for 7 days with no activity. Feel free to reopen this PR if you'd like to continue working on it. |
Capture terminal output to be displayed in the agentops dashboard. Session-based capture functionality is currently under development. Captures will be displayed in the AgentOps dashboard under "Terminal".
At the moment we capture:
logging.info(),logging.error(), etc.print()Key changes in this PR:
_make_requestmethodagentops_sessionandagentops_initwhich promotes auto setup/teardown.