diff --git a/README.md b/README.md index 1f2fb078..1bbf62f5 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ A production-ready MCP server that integrates all major Google Workspace service - **📅 Google Calendar**: Full calendar management with event CRUD operations - **📁 Google Drive**: File operations with native Microsoft Office format support (.docx, .xlsx) - **📧 Gmail**: Complete email management with search, send, and draft capabilities -- **📄 Google Docs**: Document operations including content extraction, creation, and comment management +- **📄 Google Docs**: Document operations including content extraction, creation, line-based editing, find/replace, versioned updates, and comment management - **📊 Google Sheets**: Comprehensive spreadsheet management with flexible cell operations and comment management - **🖼️ Google Slides**: Presentation management with slide creation, updates, content manipulation, and comment management - **📝 Google Forms**: Form creation, retrieval, publish settings, and response management diff --git a/gdocs/docs_tools.py b/gdocs/docs_tools.py index f868f72b..ab80d6db 100644 --- a/gdocs/docs_tools.py +++ b/gdocs/docs_tools.py @@ -257,6 +257,12 @@ async def create_doc( """ Creates a new Google Doc and optionally inserts initial content. + Args: + service: Google Docs service + user_google_email: User's email + title: Document title + content: Initial content (optional) + Returns: str: Confirmation message with document ID and link. """ @@ -273,6 +279,558 @@ async def create_doc( return msg +@server.tool() +@require_google_service("docs", "docs_write") +@handle_http_errors("insert_text_at_line") +async def insert_text_at_line( + service, + user_google_email: str, + document_id: str, + line_number: int, + text: str, +) -> str: + """ + Insert text at a specific line number in a Google Doc. + + Args: + service: Google Docs service + user_google_email: User's email + document_id: Document ID to modify + line_number: Line number to insert at (1-based) + text: Text to insert + + Returns: + str: Confirmation message + """ + logger.info(f"[insert_text_at_line] Invoked. Document ID: '{document_id}', Line: {line_number}, User: '{user_google_email}'") + + # Validate line number + if line_number < 1: + raise ValueError("Line number must be 1 or greater") + + # Get document to calculate insertion index + doc = await asyncio.to_thread(service.documents().get(documentId=document_id).execute) + body_elements = doc.get('body', {}).get('content', []) + + # Find the insertion index based on line number + current_line = 1 + insertion_index = 1 # Default to beginning if line not found + + for element in body_elements: + if 'paragraph' in element: + if current_line == line_number: + insertion_index = element.get('startIndex', 1) + break + current_line += 1 + + # If line number is beyond document length, append at end + if line_number > current_line: + # Find the last element's end index + last_index = 1 + for element in body_elements: + if 'endIndex' in element: + last_index = max(last_index, element.get('endIndex', 1)) + insertion_index = last_index + + # Insert text at the calculated index + requests = [{ + 'insertText': { + 'location': {'index': insertion_index}, + 'text': text + } + }] + + await asyncio.to_thread( + service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[insert_text_at_line] Successfully inserted text at line {line_number} in document {document_id} at index {insertion_index}") + return f"Inserted text at line {line_number} in document {document_id} for {user_google_email}" + + +@server.tool() +@require_google_service("docs", "docs_write") +@handle_http_errors("find_and_replace_text") +async def find_and_replace_text( + service, + user_google_email: str, + document_id: str, + find_text: str, + replace_text: str, + match_case: bool = False, +) -> str: + """ + Find and replace text in a Google Doc. + + Args: + service: Google Docs service + user_google_email: User's email + document_id: Document ID to modify + find_text: Text to find + replace_text: Text to replace with + match_case: Whether to match case + + Returns: + str: Confirmation message with replacement count + """ + logger.info(f"[find_and_replace_text] Invoked. Document ID: '{document_id}', Find: '{find_text}', Replace: '{replace_text}', User: '{user_google_email}'") + + # Validate input parameters + if not find_text: + raise ValueError("Find text cannot be empty") + + requests = [{ + 'replaceAllText': { + 'containsText': { + 'text': find_text, + 'matchCase': match_case + }, + 'replaceText': replace_text + } + }] + + response = await asyncio.to_thread( + service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + # Count replacements from response + replacements = 0 + for reply in response.get('replies', []): + if 'replaceAllText' in reply: + replacements += reply['replaceAllText'].get('occurrencesChanged', 0) + + logger.info(f"[find_and_replace_text] Successfully replaced {replacements} occurrences in document {document_id}") + return f"Replaced {replacements} occurrences of '{find_text}' with '{replace_text}' in document {document_id} for {user_google_email}" + + +@server.tool() +@require_multiple_services([ + {"service_type": "docs", "scopes": "docs_write", "param_name": "docs_service"}, + {"service_type": "drive", "scopes": "drive_write", "param_name": "drive_service"} +]) +@handle_http_errors("create_versioned_document") +async def create_versioned_document( + docs_service, + drive_service, + user_google_email: str, + document_id: str, + new_content: str, + version_comment: str = "Document updated via MCP", +) -> str: + """ + Create a new version of a document by overwriting content while preserving version history. + Creates a backup copy before modification for rollback capability. + + Args: + docs_service: Google Docs service + drive_service: Google Drive service + user_google_email: User's email + document_id: Document ID to update + new_content: New content to replace existing content + version_comment: Comment for the version + + Returns: + str: Confirmation message with backup document ID + """ + logger.info(f"[create_versioned_document] Invoked. Document ID: '{document_id}', User: '{user_google_email}', Version: '{version_comment}'") + + # Validate input parameters + if not new_content: + raise ValueError("New content cannot be empty") + if not version_comment.strip(): + raise ValueError("Version comment cannot be empty") + + # Get original document metadata + doc_metadata = await asyncio.to_thread( + drive_service.files().get(fileId=document_id, fields="name").execute + ) + original_name = doc_metadata.get('name', 'Untitled Document') + + # Create backup copy + backup_name = f"{original_name} - Backup {version_comment}" + backup_response = await asyncio.to_thread( + drive_service.files().copy( + fileId=document_id, + body={'name': backup_name} + ).execute + ) + backup_id = backup_response.get('id') + if not backup_id: + raise Exception("Failed to create backup copy of document") + + # Get current document to calculate content range + doc = await asyncio.to_thread(docs_service.documents().get(documentId=document_id).execute) + body_content = doc.get('body', {}).get('content', []) + + # Calculate the range to delete (everything except the first empty paragraph) + end_index = 1 + for element in body_content: + if 'endIndex' in element: + end_index = max(end_index, element['endIndex']) + + # Clear document and insert new content + requests = [] + + # Delete existing content (but leave the first character to maintain structure) + if end_index > 1: + requests.append({ + 'deleteContentRange': { + 'range': { + 'startIndex': 1, + 'endIndex': end_index - 1 + } + } + }) + + # Insert new content + requests.append({ + 'insertText': { + 'location': {'index': 1}, + 'text': new_content + } + }) + + # Execute batch update + await asyncio.to_thread( + docs_service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + backup_link = f"https://docs.google.com/document/d/{backup_id}/edit" + logger.info(f"[create_versioned_document] Successfully updated document {document_id}. Backup created: {backup_name} (ID: {backup_id})") + return f"Document {document_id} updated successfully for {user_google_email}. Backup created: {backup_name} (ID: {backup_id}, Link: {backup_link})" + + +@server.tool() +@require_google_service("docs", "docs_write") +@handle_http_errors("format_text_style") +async def format_text_style( + service, + user_google_email: str, + document_id: str, + start_index: int, + end_index: int, + bold: bool = None, + italic: bool = None, + underline: bool = None, + font_size: int = None, + font_family: str = None, + text_color: str = None, + background_color: str = None, +) -> str: + """ + Apply text formatting to a range of text in a Google Doc. + + Args: + service: Google Docs service + user_google_email: User's email + document_id: Document ID to modify + start_index: Start position (0-based) + end_index: End position (0-based, exclusive) + bold: Set bold formatting (True/False) + italic: Set italic formatting (True/False) + underline: Set underline formatting (True/False) + font_size: Font size in points + font_family: Font family name (e.g., 'Arial', 'Times New Roman') + text_color: Text color in hex format (e.g., '#FF0000' for red) + background_color: Background color in hex format + + Returns: + str: Confirmation message + """ + logger.info(f"[format_text_style] Invoked. Document ID: '{document_id}', Range: {start_index}-{end_index}, User: '{user_google_email}'") + + if start_index < 0 or end_index <= start_index: + raise ValueError("Invalid range: start_index must be >= 0 and end_index must be > start_index") + + # Build text style object + text_style = {} + if bold is not None: + text_style['bold'] = bold + if italic is not None: + text_style['italic'] = italic + if underline is not None: + text_style['underline'] = underline + if font_size is not None: + text_style['fontSize'] = {'magnitude': font_size, 'unit': 'PT'} + if font_family is not None: + text_style['fontFamily'] = font_family + if text_color is not None: + text_style['foregroundColor'] = {'color': {'rgbColor': _hex_to_rgb(text_color)}} + if background_color is not None: + text_style['backgroundColor'] = {'color': {'rgbColor': _hex_to_rgb(background_color)}} + + if not text_style: + return f"No formatting changes specified for document {document_id}" + + # Create batch update request + requests = [{ + 'updateTextStyle': { + 'range': { + 'startIndex': start_index, + 'endIndex': end_index + }, + 'textStyle': text_style, + 'fields': ','.join(text_style.keys()) + } + }] + + await asyncio.to_thread( + service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[format_text_style] Successfully applied text formatting to document {document_id} for {user_google_email}") + return f"Text formatting applied to range {start_index}-{end_index} in document {document_id} for {user_google_email}" + + +@server.tool() +@require_google_service("docs", "docs_write") +@handle_http_errors("format_paragraph_style") +async def format_paragraph_style( + service, + user_google_email: str, + document_id: str, + start_index: int, + end_index: int, + alignment: str = None, + line_spacing: float = None, + indent_first_line: float = None, + indent_start: float = None, + indent_end: float = None, + space_above: float = None, + space_below: float = None, +) -> str: + """ + Apply paragraph formatting to a range of text in a Google Doc. + + Args: + service: Google Docs service + user_google_email: User's email + document_id: Document ID to modify + start_index: Start position (0-based) + end_index: End position (0-based, exclusive) + alignment: Text alignment ('START', 'CENTER', 'END', 'JUSTIFY') + line_spacing: Line spacing multiplier (e.g., 1.0 for single, 1.5 for 1.5x, 2.0 for double) + indent_first_line: First line indent in points + indent_start: Left indent in points + indent_end: Right indent in points + space_above: Space above paragraph in points + space_below: Space below paragraph in points + + Returns: + str: Confirmation message + """ + logger.info(f"[format_paragraph_style] Invoked. Document ID: '{document_id}', Range: {start_index}-{end_index}, User: '{user_google_email}'") + + if start_index < 0 or end_index <= start_index: + raise ValueError("Invalid range: start_index must be >= 0 and end_index must be > start_index") + + # Build paragraph style object + paragraph_style = {} + if alignment is not None: + valid_alignments = ['START', 'CENTER', 'END', 'JUSTIFY'] + if alignment.upper() not in valid_alignments: + raise ValueError(f"Invalid alignment. Must be one of: {valid_alignments}") + paragraph_style['alignment'] = alignment.upper() + if line_spacing is not None: + paragraph_style['lineSpacing'] = line_spacing + if indent_first_line is not None: + paragraph_style['indentFirstLine'] = {'magnitude': indent_first_line, 'unit': 'PT'} + if indent_start is not None: + paragraph_style['indentStart'] = {'magnitude': indent_start, 'unit': 'PT'} + if indent_end is not None: + paragraph_style['indentEnd'] = {'magnitude': indent_end, 'unit': 'PT'} + if space_above is not None: + paragraph_style['spaceAbove'] = {'magnitude': space_above, 'unit': 'PT'} + if space_below is not None: + paragraph_style['spaceBelow'] = {'magnitude': space_below, 'unit': 'PT'} + + if not paragraph_style: + return f"No paragraph formatting changes specified for document {document_id}" + + # Create batch update request + requests = [{ + 'updateParagraphStyle': { + 'range': { + 'startIndex': start_index, + 'endIndex': end_index + }, + 'paragraphStyle': paragraph_style, + 'fields': ','.join(paragraph_style.keys()) + } + }] + + await asyncio.to_thread( + service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[format_paragraph_style] Successfully applied paragraph formatting to document {document_id} for {user_google_email}") + return f"Paragraph formatting applied to range {start_index}-{end_index} in document {document_id} for {user_google_email}" + + +@server.tool() +@require_google_service("docs", "docs_write") +@handle_http_errors("apply_heading_style") +async def apply_heading_style( + service, + user_google_email: str, + document_id: str, + start_index: int, + end_index: int, + heading_level: int, +) -> str: + """ + Apply heading style to a range of text in a Google Doc. + + Args: + service: Google Docs service + user_google_email: User's email + document_id: Document ID to modify + start_index: Start position (0-based) + end_index: End position (0-based, exclusive) + heading_level: Heading level (1-6, where 1 is H1, 2 is H2, etc.) + + Returns: + str: Confirmation message + """ + logger.info(f"[apply_heading_style] Invoked. Document ID: '{document_id}', Range: {start_index}-{end_index}, Heading Level: {heading_level}, User: '{user_google_email}'") + + if start_index < 0 or end_index <= start_index: + raise ValueError("Invalid range: start_index must be >= 0 and end_index must be > start_index") + + if heading_level < 1 or heading_level > 6: + raise ValueError("Heading level must be between 1 and 6") + + # Map heading level to Google Docs named style + heading_style = f"HEADING_{heading_level}" + + # Create batch update request + requests = [{ + 'updateParagraphStyle': { + 'range': { + 'startIndex': start_index, + 'endIndex': end_index + }, + 'paragraphStyle': { + 'namedStyleType': heading_style + }, + 'fields': 'namedStyleType' + } + }] + + await asyncio.to_thread( + service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[apply_heading_style] Successfully applied {heading_style} to document {document_id} for {user_google_email}") + return f"Applied {heading_style} to range {start_index}-{end_index} in document {document_id} for {user_google_email}" + + +@server.tool() +@require_google_service("docs", "docs_write") +@handle_http_errors("create_list") +async def create_list( + service, + user_google_email: str, + document_id: str, + start_index: int, + end_index: int, + list_type: str = "BULLET", + nesting_level: int = 0, +) -> str: + """ + Convert paragraphs to a bulleted or numbered list in a Google Doc. + + Args: + service: Google Docs service + user_google_email: User's email + document_id: Document ID to modify + start_index: Start position (0-based) + end_index: End position (0-based, exclusive) + list_type: Type of list ('BULLET' or 'NUMBERED') + nesting_level: Nesting level (0-8, where 0 is top level) + + Returns: + str: Confirmation message + """ + logger.info(f"[create_list] Invoked. Document ID: '{document_id}', Range: {start_index}-{end_index}, List Type: {list_type}, User: '{user_google_email}'") + + if start_index < 0 or end_index <= start_index: + raise ValueError("Invalid range: start_index must be >= 0 and end_index must be > start_index") + + if list_type not in ['BULLET', 'NUMBERED']: + raise ValueError("List type must be 'BULLET' or 'NUMBERED'") + + if nesting_level < 0 or nesting_level > 8: + raise ValueError("Nesting level must be between 0 and 8") + + # Create batch update request + requests = [{ + 'createParagraphBullets': { + 'range': { + 'startIndex': start_index, + 'endIndex': end_index + }, + 'bulletPreset': f"{list_type}_DISC_CIRCLE_SQUARE" if list_type == "BULLET" else "NUMBERED_DECIMAL_ALPHA_ROMAN" + } + }] + + # Add nesting if specified + if nesting_level > 0: + requests.append({ + 'updateParagraphStyle': { + 'range': { + 'startIndex': start_index, + 'endIndex': end_index + }, + 'paragraphStyle': { + 'indentStart': {'magnitude': nesting_level * 18, 'unit': 'PT'} + }, + 'fields': 'indentStart' + } + }) + + await asyncio.to_thread( + service.documents().batchUpdate( + documentId=document_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[create_list] Successfully created {list_type} list in document {document_id} for {user_google_email}") + return f"Created {list_type} list at range {start_index}-{end_index} in document {document_id} for {user_google_email}" + + +def _hex_to_rgb(hex_color: str) -> dict: + """Convert hex color to RGB dict for Google Docs API.""" + hex_color = hex_color.lstrip('#') + if len(hex_color) != 6: + raise ValueError("Invalid hex color format. Use #RRGGBB format") + + return { + 'red': int(hex_color[0:2], 16) / 255.0, + 'green': int(hex_color[2:4], 16) / 255.0, + 'blue': int(hex_color[4:6], 16) / 255.0 + } + + # Create comment management tools for documents _comment_tools = create_comment_tools("document", "document_id") diff --git a/gsheets/sheets_tools.py b/gsheets/sheets_tools.py index 9a4ed36c..e775fd2f 100644 --- a/gsheets/sheets_tools.py +++ b/gsheets/sheets_tools.py @@ -337,6 +337,393 @@ async def create_sheet( return text_output +@server.tool() +@require_google_service("sheets", "sheets_write") +@handle_http_errors("format_cell_style") +async def format_cell_style( + service, + user_google_email: str, + spreadsheet_id: str, + range_name: str, + bold: bool = None, + italic: bool = None, + underline: bool = None, + strikethrough: bool = None, + font_size: int = None, + font_family: str = None, + text_color: str = None, + background_color: str = None, + horizontal_alignment: str = None, + vertical_alignment: str = None, +) -> str: + """ + Apply formatting to a range of cells in a Google Sheet. + + Args: + service: Google Sheets service + user_google_email: User's email + spreadsheet_id: Spreadsheet ID to modify + range_name: Range to format (e.g., "A1:C3", "Sheet1!A1:C3") + bold: Set bold formatting (True/False) + italic: Set italic formatting (True/False) + underline: Set underline formatting (True/False) + strikethrough: Set strikethrough formatting (True/False) + font_size: Font size in points + font_family: Font family name (e.g., 'Arial', 'Calibri') + text_color: Text color in hex format (e.g., '#FF0000' for red) + background_color: Background color in hex format + horizontal_alignment: Horizontal alignment ('LEFT', 'CENTER', 'RIGHT') + vertical_alignment: Vertical alignment ('TOP', 'MIDDLE', 'BOTTOM') + + Returns: + str: Confirmation message + """ + logger.info(f"[format_cell_style] Invoked. Spreadsheet ID: '{spreadsheet_id}', Range: '{range_name}', User: '{user_google_email}'") + + # Parse range to get sheet ID if needed + sheet_id = 0 # Default to first sheet + if '!' in range_name: + sheet_name, cell_range = range_name.split('!', 1) + # Get sheet ID from sheet name + spreadsheet = await asyncio.to_thread( + service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute + ) + for sheet in spreadsheet.get('sheets', []): + if sheet['properties']['title'] == sheet_name: + sheet_id = sheet['properties']['sheetId'] + break + else: + cell_range = range_name + + # Build cell format object + cell_format = {} + text_format = {} + + if bold is not None: + text_format['bold'] = bold + if italic is not None: + text_format['italic'] = italic + if underline is not None: + text_format['underline'] = underline + if strikethrough is not None: + text_format['strikethrough'] = strikethrough + if font_size is not None: + text_format['fontSize'] = font_size + if font_family is not None: + text_format['fontFamily'] = font_family + if text_color is not None: + text_format['foregroundColor'] = _hex_to_rgb_sheets(text_color) + + if text_format: + cell_format['textFormat'] = text_format + + if background_color is not None: + cell_format['backgroundColor'] = _hex_to_rgb_sheets(background_color) + + if horizontal_alignment is not None or vertical_alignment is not None: + cell_format['horizontalAlignment'] = horizontal_alignment + cell_format['verticalAlignment'] = vertical_alignment + + if not cell_format: + return f"No formatting changes specified for range {range_name}" + + # Create batch update request + requests = [{ + 'repeatCell': { + 'range': _parse_range_to_grid_range(cell_range, sheet_id), + 'cell': { + 'userEnteredFormat': cell_format + }, + 'fields': 'userEnteredFormat(' + ','.join(_get_format_fields(cell_format)) + ')' + } + }] + + await asyncio.to_thread( + service.spreadsheets().batchUpdate( + spreadsheetId=spreadsheet_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[format_cell_style] Successfully applied cell formatting to range {range_name} in spreadsheet {spreadsheet_id}") + return f"Cell formatting applied to range {range_name} in spreadsheet {spreadsheet_id} for {user_google_email}" + + +@server.tool() +@require_google_service("sheets", "sheets_write") +@handle_http_errors("format_cell_borders") +async def format_cell_borders( + service, + user_google_email: str, + spreadsheet_id: str, + range_name: str, + border_style: str = "SOLID", + border_color: str = "#000000", + border_width: int = 1, + top: bool = True, + bottom: bool = True, + left: bool = True, + right: bool = True, + inner_horizontal: bool = False, + inner_vertical: bool = False, +) -> str: + """ + Apply borders to a range of cells in a Google Sheet. + + Args: + service: Google Sheets service + user_google_email: User's email + spreadsheet_id: Spreadsheet ID to modify + range_name: Range to format (e.g., "A1:C3", "Sheet1!A1:C3") + border_style: Border style ('SOLID', 'DOTTED', 'DASHED', 'SOLID_MEDIUM', 'SOLID_THICK', 'DOUBLE') + border_color: Border color in hex format (e.g., '#000000' for black) + border_width: Border width in pixels + top: Apply top border + bottom: Apply bottom border + left: Apply left border + right: Apply right border + inner_horizontal: Apply inner horizontal borders + inner_vertical: Apply inner vertical borders + + Returns: + str: Confirmation message + """ + logger.info(f"[format_cell_borders] Invoked. Spreadsheet ID: '{spreadsheet_id}', Range: '{range_name}', User: '{user_google_email}'") + + # Parse range to get sheet ID + sheet_id = 0 + if '!' in range_name: + sheet_name, cell_range = range_name.split('!', 1) + spreadsheet = await asyncio.to_thread( + service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute + ) + for sheet in spreadsheet.get('sheets', []): + if sheet['properties']['title'] == sheet_name: + sheet_id = sheet['properties']['sheetId'] + break + else: + cell_range = range_name + + # Create border object + border = { + 'style': border_style, + 'color': _hex_to_rgb_sheets(border_color), + 'width': border_width + } + + # Build requests for each border position + requests = [] + grid_range = _parse_range_to_grid_range(cell_range, sheet_id) + + if top: + requests.append({ + 'updateBorders': { + 'range': grid_range, + 'top': border + } + }) + + if bottom: + requests.append({ + 'updateBorders': { + 'range': grid_range, + 'bottom': border + } + }) + + if left: + requests.append({ + 'updateBorders': { + 'range': grid_range, + 'left': border + } + }) + + if right: + requests.append({ + 'updateBorders': { + 'range': grid_range, + 'right': border + } + }) + + if inner_horizontal: + requests.append({ + 'updateBorders': { + 'range': grid_range, + 'innerHorizontal': border + } + }) + + if inner_vertical: + requests.append({ + 'updateBorders': { + 'range': grid_range, + 'innerVertical': border + } + }) + + if not requests: + return f"No border changes specified for range {range_name}" + + await asyncio.to_thread( + service.spreadsheets().batchUpdate( + spreadsheetId=spreadsheet_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[format_cell_borders] Successfully applied borders to range {range_name} in spreadsheet {spreadsheet_id}") + return f"Borders applied to range {range_name} in spreadsheet {spreadsheet_id} for {user_google_email}" + + +@server.tool() +@require_google_service("sheets", "sheets_write") +@handle_http_errors("format_number_display") +async def format_number_display( + service, + user_google_email: str, + spreadsheet_id: str, + range_name: str, + number_format: str, +) -> str: + """ + Apply number formatting to a range of cells in a Google Sheet. + + Args: + service: Google Sheets service + user_google_email: User's email + spreadsheet_id: Spreadsheet ID to modify + range_name: Range to format (e.g., "A1:C3", "Sheet1!A1:C3") + number_format: Number format pattern: + - 'CURRENCY': '$#,##0.00' + - 'PERCENT': '0.00%' + - 'DATE': 'M/d/yyyy' + - 'TIME': 'h:mm:ss AM/PM' + - 'SCIENTIFIC': '0.00E+00' + - Custom pattern like '#,##0.00', '0.00%', 'M/d/yyyy', etc. + + Returns: + str: Confirmation message + """ + logger.info(f"[format_number_display] Invoked. Spreadsheet ID: '{spreadsheet_id}', Range: '{range_name}', Format: '{number_format}', User: '{user_google_email}'") + + # Parse range to get sheet ID + sheet_id = 0 + if '!' in range_name: + sheet_name, cell_range = range_name.split('!', 1) + spreadsheet = await asyncio.to_thread( + service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute + ) + for sheet in spreadsheet.get('sheets', []): + if sheet['properties']['title'] == sheet_name: + sheet_id = sheet['properties']['sheetId'] + break + else: + cell_range = range_name + + # Convert common format names to patterns + format_patterns = { + 'CURRENCY': '$#,##0.00', + 'PERCENT': '0.00%', + 'DATE': 'M/d/yyyy', + 'TIME': 'h:mm:ss AM/PM', + 'SCIENTIFIC': '0.00E+00', + 'NUMBER': '#,##0.00', + 'INTEGER': '#,##0' + } + + pattern = format_patterns.get(number_format.upper(), number_format) + + # Create batch update request + requests = [{ + 'repeatCell': { + 'range': _parse_range_to_grid_range(cell_range, sheet_id), + 'cell': { + 'userEnteredFormat': { + 'numberFormat': { + 'type': 'NUMBER', + 'pattern': pattern + } + } + }, + 'fields': 'userEnteredFormat.numberFormat' + } + }] + + await asyncio.to_thread( + service.spreadsheets().batchUpdate( + spreadsheetId=spreadsheet_id, + body={'requests': requests} + ).execute + ) + + logger.info(f"[format_number_display] Successfully applied number formatting to range {range_name} in spreadsheet {spreadsheet_id}") + return f"Number formatting '{pattern}' applied to range {range_name} in spreadsheet {spreadsheet_id} for {user_google_email}" + + +def _hex_to_rgb_sheets(hex_color: str) -> dict: + """Convert hex color to RGB dict for Google Sheets API.""" + hex_color = hex_color.lstrip('#') + if len(hex_color) != 6: + raise ValueError("Invalid hex color format. Use #RRGGBB format") + + return { + 'red': int(hex_color[0:2], 16) / 255.0, + 'green': int(hex_color[2:4], 16) / 255.0, + 'blue': int(hex_color[4:6], 16) / 255.0 + } + + +def _parse_range_to_grid_range(range_name: str, sheet_id: int) -> dict: + """Parse A1 notation to GridRange object.""" + # Simple A1 notation parser - handles cases like "A1:C3" + if ':' in range_name: + start_cell, end_cell = range_name.split(':') + else: + start_cell = end_cell = range_name + + def _a1_to_coords(cell: str): + """Convert A1 notation to row/col coordinates.""" + col = 0 + row = 0 + i = 0 + + # Extract column letters + while i < len(cell) and cell[i].isalpha(): + col = col * 26 + (ord(cell[i].upper()) - ord('A') + 1) + i += 1 + + # Extract row numbers + if i < len(cell): + row = int(cell[i:]) + + return row - 1, col - 1 # Convert to 0-based + + start_row, start_col = _a1_to_coords(start_cell) + end_row, end_col = _a1_to_coords(end_cell) + + return { + 'sheetId': sheet_id, + 'startRowIndex': start_row, + 'endRowIndex': end_row + 1, + 'startColumnIndex': start_col, + 'endColumnIndex': end_col + 1 + } + + +def _get_format_fields(cell_format: dict) -> List[str]: + """Get list of format fields for API request.""" + fields = [] + for key in cell_format.keys(): + if key == 'textFormat': + for text_key in cell_format[key].keys(): + fields.append(f'textFormat.{text_key}') + else: + fields.append(key) + return fields + + # Create comment management tools for sheets _comment_tools = create_comment_tools("spreadsheet", "spreadsheet_id") diff --git a/uv.lock b/uv.lock index 8ae62bab..879cf268 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,4 @@ version = 1 -revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.13'",