- Update workspace state timestamp - Add weekly summary to MEMORY.md (removing duplicate entry)
136 lines
4.8 KiB
Python
Executable File
136 lines
4.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import subprocess
|
|
import os
|
|
import datetime
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
REPO_DIR = "/opt/aitbc"
|
|
OUTPUT_DIR = "/opt/aitbc"
|
|
DAYS_BACK = 7
|
|
MAX_FILES_PER_BATCH = 15
|
|
MAX_LINES_PER_FILE = 100
|
|
# File extensions to review
|
|
ALLOWED_EXT = {'.py', '.js', '.ts', '.jsx', '.tsx', '.json', '.yaml', '.yml', '.md', '.sh', '.rs', '.go', '.sol'}
|
|
|
|
def get_changed_files():
|
|
"""Get list of files changed in the last N days."""
|
|
os.chdir(REPO_DIR)
|
|
# Using git diff with --name-only to get changed files
|
|
result = subprocess.run(
|
|
["git", "diff", "--name-only", f"HEAD~{DAYS_BACK}"],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
print(f"Git diff error: {result.stderr}")
|
|
return []
|
|
files = [f.strip() for f in result.stdout.strip().split('\n') if f.strip()]
|
|
# Filter by allowed extensions and existence
|
|
filtered = []
|
|
for f in files:
|
|
p = Path(f)
|
|
if p.suffix in ALLOWED_EXT and (Path(REPO_DIR) / f).exists():
|
|
filtered.append(f)
|
|
return filtered
|
|
|
|
def read_file_content(filepath, max_lines):
|
|
"""Read file content with size limit."""
|
|
full_path = os.path.join(REPO_DIR, filepath)
|
|
try:
|
|
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
if len(lines) > max_lines:
|
|
lines = lines[:max_lines]
|
|
note = f"\n[TRUNCATED at {max_lines} lines]\n"
|
|
else:
|
|
note = ""
|
|
return ''.join(lines) + note
|
|
except Exception as e:
|
|
return f"[Error reading {filepath}: {e}]"
|
|
|
|
def review_batch(filepaths):
|
|
"""Ask agent to review a batch of files."""
|
|
prompt = (
|
|
"You are a senior code reviewer. Review the following files for general code quality and best practices.\n"
|
|
"For each file, provide concise bullet-point feedback on:\n"
|
|
"- Code style and consistency\n"
|
|
"- Potential bugs or issues\n"
|
|
"- Security concerns\n"
|
|
"- Performance considerations\n"
|
|
"- Suggestions for improvement\n"
|
|
"- Documentation / test coverage gaps\n\n"
|
|
"Focus on actionable insights. If multiple files have related issues, group them.\n\n"
|
|
)
|
|
for fp in filepaths:
|
|
content = read_file_content(fp, MAX_LINES_PER_FILE)
|
|
prompt += f"=== File: {fp} ===\n{content}\n\n"
|
|
# Call OpenClaw agent via CLI
|
|
try:
|
|
result = subprocess.run(
|
|
["openclaw", "agent", "--agent", "main", "--message", prompt, "--thinking", "medium"],
|
|
capture_output=True, text=True, timeout=300
|
|
)
|
|
if result.returncode != 0:
|
|
return f"[Agent error: {result.stderr}]"
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return "[Review timed out after 3 minutes]"
|
|
except Exception as e:
|
|
return f"[Exception: {e}]"
|
|
|
|
def main():
|
|
changed = get_changed_files()
|
|
if not changed:
|
|
print(f"No changed files found in the last {DAYS_BACK} days with allowed extensions.")
|
|
return
|
|
|
|
print(f"Found {len(changed)} changed files to review.")
|
|
|
|
# Sort by file size (largest first) to prioritize bigger files
|
|
files_with_size = []
|
|
for f in changed:
|
|
try:
|
|
size = os.path.getsize(os.path.join(REPO_DIR, f))
|
|
except:
|
|
size = 0
|
|
files_with_size.append((f, size))
|
|
files_with_size.sort(key=lambda x: x[1], reverse=True)
|
|
# For initial run, limit to top 2 largest files to avoid long processing
|
|
sorted_files = [f for f, _ in files_with_size[:2]]
|
|
|
|
# Batch processing
|
|
batches = []
|
|
for i in range(0, len(sorted_files), MAX_FILES_PER_BATCH):
|
|
batches.append(sorted_files[i:i+MAX_FILES_PER_BATCH])
|
|
|
|
all_reviews = []
|
|
for idx, batch in enumerate(batches, 1):
|
|
print(f"Reviewing batch {idx}/{len(batches)}: {len(batch)} files")
|
|
review = review_batch(batch)
|
|
all_reviews.append(review)
|
|
|
|
# Consolidate report
|
|
today = datetime.date.today().isoformat()
|
|
out_path = Path(OUTPUT_DIR) / f"code_review_{today}.md"
|
|
|
|
with open(out_path, 'w', encoding='utf-8') as f:
|
|
f.write(f"# Code Review Report — {today}\n\n")
|
|
f.write(f"**Repository:** `{REPO_DIR}`\n\n")
|
|
f.write(f"**Scope:** Files changed in the last {DAYS_BACK} days\n")
|
|
f.write(f"**Total files reviewed:** {len(changed)}\n\n")
|
|
f.write("## Files Reviewed\n\n")
|
|
for file in sorted_files:
|
|
f.write(f"- `{file}`\n")
|
|
f.write("\n---\n\n")
|
|
f.write("## Review Findings\n\n")
|
|
for i, review in enumerate(all_reviews, 1):
|
|
if len(batches) > 1:
|
|
f.write(f"### Batch {i}\n\n")
|
|
f.write(review.strip() + "\n\n")
|
|
|
|
print(f"Report generated: {out_path}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|