@@ -15,39 +15,68 @@ def is_github_url(url: str) -> bool:
15
15
except :
16
16
return False
17
17
18
+ def extract_repo_name (repo_path : str ) -> str :
19
+ """Extract repository name from path or URL"""
20
+ if is_github_url (repo_path ):
21
+ # For GitHub URLs, extract owner/repo format
22
+ parts = repo_path .rstrip ('/' ).split ('/' )
23
+ if len (parts ) >= 5 :
24
+ return f"{ parts [3 ]} /{ parts [4 ]} " # owner/repo format
25
+
26
+ # For local paths, use the last directory name
27
+ return Path (repo_path ).name
28
+
18
29
class RepoProcessor :
19
- def __init__ (self ):
20
- """Initialize repository processor"""
21
- pass
30
+ def __init__ (self , chunk_size : int = 500 ):
31
+ """Initialize repository processor with chunk size """
32
+ self . chunk_size = chunk_size
22
33
23
- def _extract_metadata (self , summary : Dict [str , Any ], tree : Dict [str , Any ]) -> Dict [str , Any ]:
34
+ def _extract_metadata (self , summary : Dict [str , Any ], tree : Dict [str , Any ], repo_path : str ) -> Dict [str , Any ]:
24
35
"""Extract metadata from repository summary and tree"""
36
+ # Extract repo name from path or URL
37
+ repo_name = extract_repo_name (repo_path )
38
+
25
39
# Handle case where summary might be a string
26
40
if isinstance (summary , str ):
27
41
return {
28
- "repo_name" : "Unknown" ,
29
- "description" : "" ,
30
- "language" : "" ,
31
- "topics" : [],
32
- "stars" : 0 ,
33
- "forks" : 0 ,
34
- "last_updated" : "" ,
42
+ "repo_name" : repo_name ,
35
43
"file_count" : len (tree ) if tree else 0
36
44
}
37
45
38
46
return {
39
- "repo_name" : summary .get ("name" , "" ),
40
- "description" : summary .get ("description" , "" ),
41
- "language" : summary .get ("language" , "" ),
42
- "topics" : summary .get ("topics" , []),
43
- "stars" : summary .get ("stars" , 0 ),
44
- "forks" : summary .get ("forks" , 0 ),
45
- "last_updated" : summary .get ("updated_at" , "" ),
47
+ "repo_name" : repo_name , # Use extracted name instead of summary
46
48
"file_count" : len (tree ) if tree else 0
47
49
}
48
50
51
+ def _chunk_text (self , text : str ) -> List [str ]:
52
+ """Split text into chunks of roughly equal size"""
53
+ # Split into sentences (roughly)
54
+ sentences = [s .strip () for s in text .split ('.' ) if s .strip ()]
55
+
56
+ chunks = []
57
+ current_chunk = []
58
+ current_length = 0
59
+
60
+ for sentence in sentences :
61
+ # Add period back
62
+ sentence = sentence + '.'
63
+ # If adding this sentence would exceed chunk size, save current chunk
64
+ if current_length + len (sentence ) > self .chunk_size and current_chunk :
65
+ chunks .append (' ' .join (current_chunk ))
66
+ current_chunk = []
67
+ current_length = 0
68
+
69
+ current_chunk .append (sentence )
70
+ current_length += len (sentence )
71
+
72
+ # Add any remaining text
73
+ if current_chunk :
74
+ chunks .append (' ' .join (current_chunk ))
75
+
76
+ return chunks
77
+
49
78
def process_repo (self , repo_path : str | Path ) -> Tuple [List [Dict [str , Any ]], str ]:
50
- """Process a repository and return chunks of content with metadata"""
79
+ """Process a repository and return chunks of text with metadata"""
51
80
try :
52
81
# Generate a unique document ID
53
82
document_id = str (uuid .uuid4 ())
@@ -61,66 +90,48 @@ def process_repo(self, repo_path: str | Path) -> Tuple[List[Dict[str, Any]], str
61
90
# Ingest repository
62
91
summary , tree , content = ingest (str (repo_path ))
63
92
64
- # Calculate token count based on content type
65
- def estimate_tokens (content : Any ) -> int :
66
- if isinstance (content , dict ):
67
- # If content is a dictionary of file contents
68
- return int (sum (len (str (c ).split ()) for c in content .values ()) * 1.3 )
69
- elif isinstance (content , str ):
70
- # If content is a single string
71
- return int (len (content .split ()) * 1.3 )
72
- else :
73
- # If content is in another format, return 0
74
- return 0
75
-
76
- # Print formatted repository information
77
- if isinstance (summary , dict ):
78
- repo_name = summary .get ("name" , "Unknown" )
79
- file_count = len (tree ) if tree else 0
80
- else :
81
- repo_name = str (repo_path ).split ('/' )[- 1 ]
82
- file_count = len (tree ) if tree else 0
83
-
84
- token_count = estimate_tokens (content )
85
-
86
- print ("\n Repository Information:" )
87
- print ("-" * 50 )
88
- print (f"📦 Repository: { repo_name } " )
89
- print (f"📄 Files analyzed: { file_count } " )
90
- print (f"🔤 Estimated tokens: { token_count :,} " )
91
-
92
93
# Extract metadata
93
- metadata = self ._extract_metadata (summary , tree )
94
+ metadata = self ._extract_metadata (summary , tree , str ( repo_path ) )
94
95
95
96
# Process content into chunks
96
97
processed_chunks = []
98
+ chunk_id = 0
97
99
98
100
if isinstance (content , dict ):
99
101
# Handle dictionary of file contents
100
102
for file_path , file_content in content .items ():
101
- if isinstance (file_content , str ):
102
- chunk = {
103
- "text" : file_content ,
104
- "metadata" : {
105
- ** metadata ,
106
- "file_path" : file_path ,
107
- "source" : str (repo_path ),
108
- "document_id" : document_id
103
+ if isinstance (file_content , str ) and file_content .strip (): # Only process non-empty content
104
+ # Split content into chunks
105
+ text_chunks = self ._chunk_text (file_content )
106
+
107
+ for text_chunk in text_chunks :
108
+ chunk = {
109
+ "text" : text_chunk ,
110
+ "metadata" : {
111
+ ** metadata ,
112
+ "source" : str (repo_path ),
113
+ "document_id" : document_id ,
114
+ "chunk_id" : chunk_id
115
+ }
109
116
}
110
- }
111
- processed_chunks . append ( chunk )
117
+ processed_chunks . append ( chunk )
118
+ chunk_id += 1
112
119
elif isinstance (content , str ):
113
120
# Handle single string content
114
- chunk = {
115
- "text" : content ,
116
- "metadata" : {
117
- ** metadata ,
118
- "file_path" : "repository_content.txt" ,
119
- "source" : str (repo_path ),
120
- "document_id" : document_id
121
+ text_chunks = self ._chunk_text (content )
122
+
123
+ for text_chunk in text_chunks :
124
+ chunk = {
125
+ "text" : text_chunk ,
126
+ "metadata" : {
127
+ ** metadata ,
128
+ "source" : str (repo_path ),
129
+ "document_id" : document_id ,
130
+ "chunk_id" : chunk_id
131
+ }
121
132
}
122
- }
123
- processed_chunks . append ( chunk )
133
+ processed_chunks . append ( chunk )
134
+ chunk_id += 1
124
135
125
136
return processed_chunks , document_id
126
137
@@ -132,9 +143,11 @@ def main():
132
143
parser .add_argument ("--input" , required = True ,
133
144
help = "Input repository path or GitHub URL" )
134
145
parser .add_argument ("--output" , required = True , help = "Output JSON file for chunks" )
146
+ parser .add_argument ("--chunk-size" , type = int , default = 500 ,
147
+ help = "Maximum size of text chunks" )
135
148
136
149
args = parser .parse_args ()
137
- processor = RepoProcessor ()
150
+ processor = RepoProcessor (chunk_size = args . chunk_size )
138
151
139
152
try :
140
153
# Create output directory if it doesn't exist
0 commit comments