Skip to content

Commit e7a324e

Browse files
authored
Merge pull request #47 from turtacn/feat-round6-phase6-update-semantic-snapshot
feat: align snapshot create with semantic scripts and add docs
2 parents 2d80aed + 7f5ca6f commit e7a324e

File tree

11 files changed

+224
-319
lines changed

11 files changed

+224
-319
lines changed

codesage/cli/commands/snapshot.py

Lines changed: 48 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def _create_snapshot_data(path, project_name):
9898
@snapshot.command('create')
9999
@click.argument('path', type=click.Path(exists=True, dir_okay=True))
100100
@click.option('--project', '-p', 'project_name_override', help='Override the project name.')
101-
@click.option('--format', '-f', type=click.Choice(['json', 'python-semantic-digest', 'go-semantic-digest']), default='json', help='Snapshot format.')
101+
@click.option('--format', '-f', type=click.Choice(['yaml', 'json', 'md']), default='yaml', help='Snapshot format.')
102102
@click.option('--output', '-o', type=click.Path(), default=None, help='Output file path.')
103103
@click.option('--compress', is_flag=True, help='Enable compression.')
104104
@click.option('--language', '-l', type=click.Choice(['python', 'go', 'shell', 'java', 'auto']), default='auto', help='Language to analyze.')
@@ -110,82 +110,69 @@ def create(ctx, path, project_name_override, format, output, compress, language)
110110
try:
111111
root_path = Path(path)
112112

113-
if format in ['python-semantic-digest', 'go-semantic-digest']:
114-
if output is None:
115-
output = f"{root_path.name}_{language}_semantic_digest.yaml"
113+
if language == 'auto':
114+
if list(root_path.rglob("*.py")):
115+
language = "python"
116+
elif list(root_path.rglob("*.go")):
117+
language = "go"
118+
elif list(root_path.rglob("*.java")):
119+
language = "java"
120+
elif list(root_path.rglob("*.sh")):
121+
language = "shell"
122+
else:
123+
click.echo("Could not auto-detect language.", err=True)
124+
return
116125

126+
if language in ['python', 'go']:
117127
config = SnapshotConfig()
118128
builder = None
119-
120-
if language == 'auto':
121-
if format == 'python-semantic-digest':
122-
language = 'python'
123-
elif format == 'go-semantic-digest':
124-
language = 'go'
125-
else:
126-
# Fallback for auto-detection if format doesn't imply language
127-
if list(root_path.rglob("*.py")):
128-
language = "python"
129-
elif list(root_path.rglob("*.go")):
130-
language = "go"
131-
elif list(root_path.rglob("*.java")):
132-
language = "java"
133-
elif list(root_path.rglob("*.sh")):
134-
language = "shell"
135-
else:
136-
click.echo("Could not auto-detect language.", err=True)
137-
return
138-
139-
if language == 'python' and format == 'python-semantic-digest':
129+
if language == 'python':
140130
builder = PythonSemanticSnapshotBuilder(root_path, config)
141-
elif language == 'go' and format == 'go-semantic-digest':
131+
else: # language == 'go'
142132
builder = GoSemanticSnapshotBuilder(root_path, config)
143-
# Preserve other language builders for future use, but they won't be triggered
144-
# by the current format options.
145-
elif language == 'shell':
146-
builder = ShellSemanticSnapshotBuilder(root_path, config)
147-
elif language == 'java':
148-
builder = JavaSemanticSnapshotBuilder(root_path, config)
149-
else:
150-
click.echo(f"Unsupported language/format combination: {language}/{format}", err=True)
151-
return
152133

153134
project_snapshot = builder.build()
154135

155-
generator = YAMLGenerator()
156-
generator.export(project_snapshot, Path(output))
157-
158-
click.echo(f"{language.capitalize()} semantic digest created at {output}")
159-
return
160-
161-
snapshot_data = _create_snapshot_data(path, project_name)
136+
if output is None:
137+
output = f"{root_path.name}_snapshot.{format}"
162138

163-
if output:
164139
output_path = Path(output)
165140
output_path.parent.mkdir(parents=True, exist_ok=True)
166-
# Use model_dump_json for consistency
167-
with open(output_path, 'w', encoding='utf-8') as f:
168-
f.write(snapshot_data.model_dump_json(indent=2))
169141

170-
click.echo(f"Snapshot created at {output}")
171-
else:
172-
manager = SnapshotVersionManager(SNAPSHOT_DIR, project_name, DEFAULT_SNAPSHOT_CONFIG['snapshot'])
142+
if format == 'yaml':
143+
generator = YAMLGenerator()
144+
generator.export(project_snapshot, output_path)
145+
elif format == 'json':
146+
with open(output_path, 'w', encoding='utf-8') as f:
147+
json.dump(project_snapshot, f, indent=2)
148+
elif format == 'md':
149+
click.echo("Markdown format is not yet implemented.", err=True)
150+
return
173151

174-
# The format for saving via manager is 'json', not the input format for semantic digests
175-
save_format = 'json'
152+
click.echo(f"Snapshot created at {output}")
176153

177-
if compress:
178-
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
154+
else: # Fallback to original snapshot logic for other languages
155+
snapshot_data = _create_snapshot_data(path, project_name)
179156

180-
# Compress the file
181-
with open(snapshot_path, 'rb') as f_in:
182-
with gzip.open(f"{snapshot_path}.gz", 'wb') as f_out:
183-
f_out.writelines(f_in)
184-
os.remove(snapshot_path)
185-
click.echo(f"Compressed snapshot created at {snapshot_path}.gz")
157+
if output:
158+
output_path = Path(output)
159+
output_path.parent.mkdir(parents=True, exist_ok=True)
160+
with open(output_path, 'w', encoding='utf-8') as f:
161+
f.write(snapshot_data.model_dump_json(indent=2))
162+
click.echo(f"Snapshot created at {output}")
186163
else:
187-
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
188-
click.echo(f"Snapshot created at {snapshot_path}")
164+
manager = SnapshotVersionManager(SNAPSHOT_DIR, project_name, DEFAULT_SNAPSHOT_CONFIG['snapshot'])
165+
save_format = 'json'
166+
if compress:
167+
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
168+
with open(snapshot_path, 'rb') as f_in:
169+
with gzip.open(f"{snapshot_path}.gz", 'wb') as f_out:
170+
f_out.writelines(f_in)
171+
os.remove(snapshot_path)
172+
click.echo(f"Compressed snapshot created at {snapshot_path}.gz")
173+
else:
174+
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
175+
click.echo(f"Snapshot created at {snapshot_path}")
189176
finally:
190177
audit_logger.log(
191178
AuditEvent(

codesage/semantic_digest/go_snapshot_builder.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@
233233
if len(f.Names) == 0 {
234234
ps = append(ps, typeStr)
235235
} else {
236-
for _, name := range f.Names {
237-
ps = append(ps, name.Name+" "+typeStr)
236+
for range f.Names {
237+
ps = append(ps, typeStr) // 简化:只存类型,省 token
238238
}
239239
}
240240
}
@@ -264,25 +264,41 @@
264264
"""
265265

266266
class GoSemanticSnapshotBuilder(BaseLanguageSnapshotBuilder):
267-
def build(self) -> Dict[str, Any]:
268-
has_go = False
267+
_parser_bin_path = None
268+
_temp_dir = None
269+
270+
def __init__(self, root_path: Path, config: SnapshotConfig):
271+
super().__init__(root_path, config)
272+
self._setup_parser()
273+
274+
def _setup_parser(self):
275+
if GoSemanticSnapshotBuilder._parser_bin_path:
276+
return
277+
269278
try:
270279
subprocess.check_call(["go", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
271-
has_go = True
280+
GoSemanticSnapshotBuilder._temp_dir = tempfile.TemporaryDirectory()
281+
parser_src_path = os.path.join(GoSemanticSnapshotBuilder._temp_dir.name, "parser.go")
282+
with open(parser_src_path, "w", encoding="utf-8") as f:
283+
f.write(GO_AST_PARSER_SRC)
284+
285+
parser_bin_path = os.path.join(GoSemanticSnapshotBuilder._temp_dir.name, "parser")
286+
subprocess.run(["go", "build", "-o", parser_bin_path, parser_src_path], capture_output=True, text=True, check=True)
287+
GoSemanticSnapshotBuilder._parser_bin_path = parser_bin_path
272288
except (subprocess.CalledProcessError, FileNotFoundError):
273-
pass
289+
GoSemanticSnapshotBuilder._parser_bin_path = None
274290

291+
def build(self) -> Dict[str, Any]:
275292
digest = {
276293
"root": self.root_path.name, "pkgs": {}, "graph": {}, "meta": {}
277294
}
278295

279296
pkg_map = defaultdict(list)
280297
all_files = self._collect_files()
281-
total_cx = 0
282298
total_err_checks = 0
283299

284300
for fpath in all_files:
285-
data = self._extract_semantics(fpath, has_go)
301+
data = self._extract_semantics(fpath)
286302
pkg_name = data.get("pk", "unknown")
287303
clean_data = {k: v for k, v in data.items() if v}
288304
clean_data["f"] = str(fpath.relative_to(self.root_path))
@@ -295,9 +311,6 @@ def build(self) -> Dict[str, Any]:
295311
"er": data["stat"].get("er", 0),
296312
}
297313

298-
if "fn" in data:
299-
total_cx += sum(fn.get("cx", 1) for fn in data["fn"])
300-
301314
pkg_map[pkg_name].append(clean_data)
302315

303316
deps = {imp for imp in data.get("im", []) if "." in imp}
@@ -315,34 +328,27 @@ def build(self) -> Dict[str, Any]:
315328

316329
digest["meta"] = {
317330
"files": len(all_files), "pkgs": len(pkg_map),
318-
"total_complexity": total_cx, "error_hotspots": total_err_checks,
319-
"strategy": "AST" if has_go else "Regex"
331+
"error_hotspots": total_err_checks,
332+
"strategy": "AST" if GoSemanticSnapshotBuilder._parser_bin_path else "Regex"
320333
}
321334

322335
return digest
323336

324337
def _collect_files(self) -> List[Path]:
325338
return list(self.root_path.rglob("*.go"))
326339

327-
def _extract_semantics(self, file_path: Path, has_go: bool) -> Dict[str, Any]:
328-
if has_go:
329-
with tempfile.TemporaryDirectory() as temp_dir:
330-
parser_src_path = os.path.join(temp_dir, "parser.go")
331-
with open(parser_src_path, "w", encoding="utf-8") as f:
332-
f.write(GO_AST_PARSER_SRC)
333-
334-
parser_bin_path = os.path.join(temp_dir, "parser")
335-
try:
336-
build_result = subprocess.run(["go", "build", "-o", parser_bin_path, parser_src_path], capture_output=True, text=True, check=True)
337-
cmd = [parser_bin_path, str(file_path)]
338-
output = subprocess.check_output(cmd, stderr=subprocess.PIPE, timeout=15)
339-
return json.loads(output.decode('utf-8'))
340-
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError) as e:
341-
print(f"AST parsing failed for {file_path}: {e}")
342-
if isinstance(e, subprocess.CalledProcessError):
343-
print(f"Stderr: {e.stderr}")
344-
if hasattr(e, 'stdout'):
345-
print(f"Stdout: {e.stdout}")
340+
def _extract_semantics(self, file_path: Path) -> Dict[str, Any]:
341+
if GoSemanticSnapshotBuilder._parser_bin_path:
342+
try:
343+
cmd = [GoSemanticSnapshotBuilder._parser_bin_path, str(file_path)]
344+
output = subprocess.check_output(cmd, stderr=subprocess.PIPE, timeout=15)
345+
return json.loads(output.decode('utf-8'))
346+
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError) as e:
347+
print(f"AST parsing failed for {file_path}: {e}")
348+
if isinstance(e, subprocess.CalledProcessError):
349+
print(f"Stderr: {e.stderr}")
350+
if hasattr(e, 'stdout'):
351+
print(f"Stdout: {e.stdout}")
346352

347353
# Fallback to regex
348354
content = file_path.read_text(encoding="utf-8", errors="ignore")

codesage/semantic_digest/python_snapshot_builder.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,17 @@ def build(self) -> Dict[str, Any]:
102102
digest["deps"][module_name].add(module)
103103

104104
self._finalize_digest(digest, total_ccn, all_imports)
105+
106+
# Convert defaultdicts to dicts for clean output
107+
final_modules = {}
108+
for name, data in digest["modules"].items():
109+
data["fim"] = dict(data["fim"])
110+
data["dc"] = sorted(list(data["dc"]))
111+
final_modules[name] = data
112+
digest["modules"] = final_modules
113+
digest["deps"] = {mod: sorted(list(deps)) for mod, deps in digest["deps"].items()}
114+
115+
105116
return digest
106117

107118
def _collect_files(self) -> List[Path]:

codesage/snapshot/versioning.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,31 +83,26 @@ def _update_index(self, snapshot_path: str, metadata: SnapshotMetadata):
8383
self._save_index(index)
8484

8585
def _get_expired_snapshots(self, index: List[Dict[str, Any]], now: datetime) -> List[Dict[str, Any]]:
86-
"""Identifies expired snapshots based on retention policies."""
87-
88-
def parse_timestamp(ts_str):
89-
ts = datetime.fromisoformat(ts_str)
90-
if ts.tzinfo is None:
91-
return ts.replace(tzinfo=timezone.utc)
92-
return ts
93-
94-
try:
95-
sorted_snapshots = sorted(
96-
index,
97-
key=lambda s: parse_timestamp(s["timestamp"]),
98-
reverse=True
99-
)
100-
except (ValueError, TypeError):
101-
return []
102-
103-
kept_snapshots = sorted_snapshots[:self.max_versions]
104-
105-
kept_by_date = {
106-
s['version'] for s in kept_snapshots
107-
if (now - parse_timestamp(s["timestamp"])) <= timedelta(days=self.retention_days)
108-
}
109-
110-
return [s for s in index if s["version"] not in kept_by_date]
86+
"""Identifies expired snapshots."""
87+
valid_snapshots = []
88+
for s in index:
89+
try:
90+
ts = datetime.fromisoformat(s["timestamp"])
91+
if ts.tzinfo is None:
92+
ts = ts.replace(tzinfo=timezone.utc)
93+
if now - ts <= timedelta(days=self.retention_days):
94+
valid_snapshots.append(s)
95+
except ValueError:
96+
# Skip malformed timestamps
97+
continue
98+
99+
if len(valid_snapshots) > self.max_versions:
100+
valid_snapshots = sorted(
101+
valid_snapshots, key=lambda s: s["timestamp"], reverse=True
102+
)[:self.max_versions]
103+
104+
valid_versions = {s["version"] for s in valid_snapshots}
105+
return [s for s in index if s["version"] not in valid_versions]
111106

112107
def cleanup_expired_snapshots(self) -> int:
113108
"""Removes expired snapshots and returns the count of deleted files."""

go_test_codesage.yaml

Lines changed: 0 additions & 8 deletions
This file was deleted.

go_test_script.yaml

Lines changed: 0 additions & 8 deletions
This file was deleted.

0 commit comments

Comments
 (0)