|
| 1 | +import cocoindex |
| 2 | +import io |
| 3 | +import tempfile |
| 4 | +import dataclasses |
| 5 | +import datetime |
| 6 | + |
| 7 | +from marker.config.parser import ConfigParser |
| 8 | +from marker.converters.pdf import PdfConverter |
| 9 | +from marker.models import create_model_dict |
| 10 | +from marker.output import text_from_rendered |
| 11 | +from functools import cache |
| 12 | +from pypdf import PdfReader, PdfWriter |
| 13 | + |
| 14 | + |
| 15 | +@cache |
| 16 | +def get_marker_converter() -> PdfConverter: |
| 17 | + config_parser = ConfigParser({}) |
| 18 | + return PdfConverter( |
| 19 | + create_model_dict(), config=config_parser.generate_config_dict() |
| 20 | + ) |
| 21 | + |
| 22 | + |
| 23 | +@dataclasses.dataclass |
| 24 | +class PaperBasicInfo: |
| 25 | + num_pages: int |
| 26 | + first_page: bytes |
| 27 | + |
| 28 | + |
| 29 | +@cocoindex.op.function() |
| 30 | +def extract_basic_info(content: bytes) -> PaperBasicInfo: |
| 31 | + """Extract the first pages of a PDF.""" |
| 32 | + reader = PdfReader(io.BytesIO(content)) |
| 33 | + |
| 34 | + output = io.BytesIO() |
| 35 | + writer = PdfWriter() |
| 36 | + writer.add_page(reader.pages[0]) |
| 37 | + writer.write(output) |
| 38 | + |
| 39 | + return PaperBasicInfo(num_pages=len(reader.pages), first_page=output.getvalue()) |
| 40 | + |
| 41 | + |
| 42 | +@dataclasses.dataclass |
| 43 | +class Author: |
| 44 | + """One author of the paper.""" |
| 45 | + |
| 46 | + name: str |
| 47 | + email: str | None |
| 48 | + affiliation: str | None |
| 49 | + |
| 50 | + |
| 51 | +@dataclasses.dataclass |
| 52 | +class PaperMetadata: |
| 53 | + """ |
| 54 | + Metadata for a paper. |
| 55 | + """ |
| 56 | + |
| 57 | + title: str |
| 58 | + authors: list[Author] |
| 59 | + abstract: str |
| 60 | + |
| 61 | + |
| 62 | +@cocoindex.op.function(gpu=True, cache=True, behavior_version=2) |
| 63 | +def pdf_to_markdown(content: bytes) -> str: |
| 64 | + """Convert to Markdown.""" |
| 65 | + |
| 66 | + with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as temp_file: |
| 67 | + temp_file.write(content) |
| 68 | + temp_file.flush() |
| 69 | + text, _, _ = text_from_rendered(get_marker_converter()(temp_file.name)) |
| 70 | + return text |
| 71 | + |
| 72 | + |
| 73 | +@cocoindex.transform_flow() |
| 74 | +def text_to_embedding( |
| 75 | + text: cocoindex.DataSlice[str], |
| 76 | +) -> cocoindex.DataSlice[list[float]]: |
| 77 | + """ |
| 78 | + Embed the text using a SentenceTransformer model. |
| 79 | + This is a shared logic between indexing and querying, so extract it as a function. |
| 80 | + """ |
| 81 | + return text.transform( |
| 82 | + cocoindex.functions.SentenceTransformerEmbed( |
| 83 | + model="sentence-transformers/all-MiniLM-L6-v2" |
| 84 | + ) |
| 85 | + ) |
| 86 | + |
| 87 | + |
| 88 | +@cocoindex.flow_def(name="PaperMetadata") |
| 89 | +def paper_metadata_flow( |
| 90 | + flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope |
| 91 | +) -> None: |
| 92 | + """ |
| 93 | + Define an example flow that embeds files into a vector database. |
| 94 | + """ |
| 95 | + data_scope["documents"] = flow_builder.add_source( |
| 96 | + cocoindex.sources.LocalFile(path="papers", binary=True), |
| 97 | + refresh_interval=datetime.timedelta(seconds=10), |
| 98 | + ) |
| 99 | + |
| 100 | + paper_metadata = data_scope.add_collector() |
| 101 | + metadata_embeddings = data_scope.add_collector() |
| 102 | + author_papers = data_scope.add_collector() |
| 103 | + |
| 104 | + with data_scope["documents"].row() as doc: |
| 105 | + doc["basic_info"] = doc["content"].transform(extract_basic_info) |
| 106 | + doc["first_page_md"] = doc["basic_info"]["first_page"].transform( |
| 107 | + pdf_to_markdown |
| 108 | + ) |
| 109 | + doc["metadata"] = doc["first_page_md"].transform( |
| 110 | + cocoindex.functions.ExtractByLlm( |
| 111 | + llm_spec=cocoindex.LlmSpec( |
| 112 | + api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4o" |
| 113 | + ), |
| 114 | + output_type=PaperMetadata, |
| 115 | + instruction="Please extract the metadata from the first page of the paper.", |
| 116 | + ) |
| 117 | + ) |
| 118 | + doc["title_embedding"] = text_to_embedding(doc["metadata"]["title"]) |
| 119 | + doc["abstract_chunks"] = doc["metadata"]["abstract"].transform( |
| 120 | + cocoindex.functions.SplitRecursively( |
| 121 | + custom_languages=[ |
| 122 | + cocoindex.functions.CustomLanguageSpec( |
| 123 | + language_name="abstract", |
| 124 | + separators_regex=[r"[.?!]+\s+", r"[:;]\s+", r",\s+", r"\s+"], |
| 125 | + ) |
| 126 | + ] |
| 127 | + ), |
| 128 | + language="abstract", |
| 129 | + chunk_size=500, |
| 130 | + min_chunk_size=200, |
| 131 | + chunk_overlap=150, |
| 132 | + ) |
| 133 | + |
| 134 | + paper_metadata.collect( |
| 135 | + filename=doc["filename"], |
| 136 | + title=doc["metadata"]["title"], |
| 137 | + authors=doc["metadata"]["authors"], |
| 138 | + abstract=doc["metadata"]["abstract"], |
| 139 | + ) |
| 140 | + metadata_embeddings.collect( |
| 141 | + id=cocoindex.GeneratedField.UUID, |
| 142 | + filename=doc["filename"], |
| 143 | + location="title", |
| 144 | + text=doc["metadata"]["title"], |
| 145 | + embedding=doc["title_embedding"], |
| 146 | + ) |
| 147 | + with doc["metadata"]["authors"].row() as author: |
| 148 | + author_papers.collect( |
| 149 | + author_name=author["name"], |
| 150 | + filename=doc["filename"], |
| 151 | + ) |
| 152 | + |
| 153 | + with doc["abstract_chunks"].row() as chunk: |
| 154 | + chunk["embedding"] = text_to_embedding(chunk["text"]) |
| 155 | + metadata_embeddings.collect( |
| 156 | + id=cocoindex.GeneratedField.UUID, |
| 157 | + filename=doc["filename"], |
| 158 | + location="abstract", |
| 159 | + text=chunk["text"], |
| 160 | + embedding=chunk["embedding"], |
| 161 | + ) |
| 162 | + |
| 163 | + paper_metadata.export( |
| 164 | + "paper_metadata", |
| 165 | + cocoindex.targets.Postgres(), |
| 166 | + primary_key_fields=["filename"], |
| 167 | + ) |
| 168 | + metadata_embeddings.export( |
| 169 | + "metadata_embeddings", |
| 170 | + cocoindex.targets.Postgres(), |
| 171 | + primary_key_fields=["id"], |
| 172 | + vector_indexes=[ |
| 173 | + cocoindex.VectorIndexDef( |
| 174 | + field_name="embedding", |
| 175 | + metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY, |
| 176 | + ) |
| 177 | + ], |
| 178 | + ) |
| 179 | + author_papers.export( |
| 180 | + "author_papers", |
| 181 | + cocoindex.targets.Postgres(), |
| 182 | + primary_key_fields=["author_name", "filename"], |
| 183 | + ) |
0 commit comments