1+ import os
2+ from pathlib import Path
3+ from typing import Generator , List
4+ from tree_sitter import Language , Parser , Tree , Node
5+ import tree_sitter_python
6+
7+ def extract_paths_from_markdown (markdown_file : str , sdk_name : str ) -> dict :
8+ """Extract paths from premium-ex.md for given SDK"""
9+ paths = {"basics" : [], "feature-scenario" : [], "complex-feature-scenario" : []}
10+
11+ with open (markdown_file , 'r' ) as f :
12+ content = f .read ()
13+
14+ current_section = None
15+ for line in content .split ('\n ' ):
16+ line = line .strip ()
17+ if line .startswith ('## basics:' ):
18+ current_section = "basics"
19+ elif line .startswith ('## feature-scenario:' ):
20+ current_section = "feature-scenario"
21+ elif line .startswith ('## complex-feature-scenario:' ):
22+ current_section = "complex-feature-scenario"
23+ elif line .startswith ('##' ):
24+ current_section = None
25+ elif current_section and line .startswith ('/' ):
26+ paths [current_section ].append (f"./{ sdk_name } { line } " )
27+
28+ return paths
29+
30+ def get_python_files (directory : str ) -> List [str ]:
31+ """Get all Python files in directory recursively"""
32+ python_files = []
33+ for root , dirs , files in os .walk (directory ):
34+ for file in files :
35+ if file .endswith ('.py' ):
36+ python_files .append (os .path .join (root , file ))
37+ return python_files
38+
39+ def extract_subtrees (tree : Tree ) -> List [Node ]:
40+ """Extract terminal subtrees from AST"""
41+ terminal = [
42+ 'function_definition' , 'async_function_definition' , 'class_definition' ,
43+ 'if_statement' , 'while_statement' , 'for_statement' , 'try_statement' ,
44+ 'with_statement' , 'import_statement' , 'import_from_statement'
45+ ]
46+
47+ def extract_subtree (subtree_root ):
48+ queue = [subtree_root ]
49+ subtree_nodes = []
50+ while queue :
51+ current_node = queue .pop (0 )
52+ for child in current_node .children :
53+ if str (child .type ) not in ["\n " ]:
54+ queue .append (child )
55+ if str (child .type ) in terminal :
56+ subtree_nodes .append (child )
57+ return subtree_nodes
58+
59+ root = tree .root_node
60+ all_subtrees = []
61+ queue = [root ]
62+ while queue :
63+ current_node = queue .pop (0 )
64+ if str (current_node .type ) in terminal :
65+ all_subtrees .append (current_node )
66+ else :
67+ subtree = extract_subtree (current_node )
68+ all_subtrees .extend (subtree )
69+ children = [x for x in current_node .children ]
70+ queue .extend (children )
71+ return all_subtrees
72+
73+ def process_python_file (file_path : str , s3_client , bucket_name : str , level : str ):
74+ """Process a single Python file and upload chunks to S3"""
75+ try :
76+ PY_LANGUAGE = Language (tree_sitter_python .language ())
77+ parser = Parser ()
78+ parser .language = PY_LANGUAGE
79+
80+ code = Path (file_path ).read_text ()
81+ tree = parser .parse (bytes (code , "utf8" ))
82+
83+ subtrees = extract_subtrees (tree )
84+
85+ for i , subtree in enumerate (subtrees ):
86+ chunk_text = code [subtree .start_byte :subtree .end_byte ]
87+ file_key = f"{ level } /{ Path (file_path ).stem } _chunk_{ i } _{ subtree .type } .py"
88+
89+ s3_client .put_object (
90+ Bucket = bucket_name ,
91+ Key = file_key ,
92+ Body = chunk_text ,
93+ ContentType = 'text/plain'
94+ )
95+
96+ except Exception as e :
97+ print (f"Error processing { file_path } : { e } " )
98+
99+ def main ():
100+ sdk_name = os .environ .get ('sdk_name' , 'python' )
101+ markdown_file = f"./{ sdk_name } /premium-ex.md"
102+ paths = extract_paths_from_markdown (markdown_file , sdk_name )
103+
104+ for level , directories in paths .items ():
105+ for directory in directories :
106+ if os .path .exists (directory ):
107+ for root , dirs , files in os .walk (directory ):
108+ for file in files :
109+ if file .endswith ('.py' ):
110+ py_file = os .path .join (root , file )
111+ process_python_file (py_file , level )
112+
113+ if __name__ == "__main__" :
114+ main ()
0 commit comments