|
| 1 | +# deepnote_jupyter_extension/contents.py |
| 2 | +from jupyter_server.services.contents.filemanager import FileContentsManager |
| 3 | +from typing import cast |
| 4 | +import nbformat |
| 5 | + |
| 6 | +import yaml |
| 7 | +from nbformat.v4 import new_notebook, new_code_cell, new_markdown_cell |
| 8 | + |
| 9 | + |
| 10 | +def yaml_to_ipynb(yaml_text: str): |
| 11 | + """Convert Deepnote YAML into a minimal Jupyter nbformat v4 notebook.""" |
| 12 | + try: |
| 13 | + data = yaml.safe_load(yaml_text) |
| 14 | + except Exception: |
| 15 | + return new_notebook(cells=[]) |
| 16 | + |
| 17 | + notebooks = ( |
| 18 | + data.get("project", {}).get("notebooks", []) if isinstance(data, dict) else [] |
| 19 | + ) |
| 20 | + if not notebooks: |
| 21 | + return new_notebook(cells=[]) |
| 22 | + |
| 23 | + nb0 = notebooks[0] |
| 24 | + blocks = nb0.get("blocks", []) |
| 25 | + cells = [] |
| 26 | + |
| 27 | + for block in sorted(blocks, key=lambda b: b.get("sortingKey", "")): |
| 28 | + btype = block.get("type", "code") |
| 29 | + content = block.get("content", "") |
| 30 | + |
| 31 | + if btype == "code": |
| 32 | + cells.append(new_code_cell(content)) |
| 33 | + else: |
| 34 | + cells.append(new_markdown_cell(content)) |
| 35 | + |
| 36 | + return new_notebook(cells=cells, metadata={}) |
| 37 | + |
| 38 | + |
| 39 | +def yaml_to_ipynb_dummy(yaml_text: str) -> dict: |
| 40 | + return {"nbformat": 4, "nbformat_minor": 5, "metadata": {}, "cells": []} |
| 41 | + |
| 42 | + |
| 43 | +class DeepnoteContentsManager(FileContentsManager): |
| 44 | + def get(self, path, content=True, type=None, format=None, require_hash=False): |
| 45 | + if path.endswith(".deepnote") and (content == 1): |
| 46 | + self.log.info( |
| 47 | + "\n\n\n🌴🌴🌴 path %s, content: %s, type: %s", path, content, type |
| 48 | + ) |
| 49 | + os_path = self._get_os_path(path) |
| 50 | + |
| 51 | + # _read_file may return 2- or 3-tuple depending on raw flag in implementation hints |
| 52 | + _content, _fmt, *_ = self._read_file(os_path, "text") # type: ignore[misc] |
| 53 | + # Coerce to str for converter |
| 54 | + if isinstance(_content, bytes): |
| 55 | + yaml_text = _content.decode("utf-8", errors="replace") |
| 56 | + else: |
| 57 | + yaml_text = cast(str, _content) |
| 58 | + |
| 59 | + nb_dict = yaml_to_ipynb(yaml_text) |
| 60 | + nb_node = nbformat.from_dict(nb_dict) |
| 61 | + |
| 62 | + model = self._base_model(path) |
| 63 | + model["type"] = "notebook" |
| 64 | + model["format"] = "json" |
| 65 | + model["content"] = nb_node |
| 66 | + self.mark_trusted_cells(nb_node, path) |
| 67 | + self.validate_notebook_model(model, validation_error={}) |
| 68 | + |
| 69 | + if require_hash: |
| 70 | + # Accept 2- or 3-tuple; we only need the bytes |
| 71 | + bytes_content, *_ = self._read_file(os_path, "byte") # type: ignore[misc] |
| 72 | + if isinstance(bytes_content, str): |
| 73 | + bytes_content = bytes_content.encode("utf-8", errors="replace") |
| 74 | + model.update(**self._get_hash(bytes_content)) # type: ignore[arg-type] |
| 75 | + |
| 76 | + return model |
| 77 | + |
| 78 | + return super().get( |
| 79 | + path, content=content, type=type, format=format, require_hash=require_hash |
| 80 | + ) |
0 commit comments