Merge gitea main branch with production improvements

This commit is contained in:
AITBC System
2026-03-18 16:31:41 +01:00
68 changed files with 3841 additions and 297 deletions

233
scripts/claim-task.py Executable file
View File

@@ -0,0 +1,233 @@
#!/usr/bin/env python3
"""
Task Claim System for AITBC agents.
Uses Git branch atomic creation as a distributed lock to prevent duplicate work.
Now with TTL/lease: claims expire after 2 hours to prevent stale locks.
"""
import os
import json
import subprocess
from datetime import datetime, timezone
REPO_DIR = '/opt/aitbc'
STATE_FILE = '/opt/aitbc/.claim-state.json'
GITEA_TOKEN = os.getenv('GITEA_TOKEN') or 'ffce3b62d583b761238ae00839dce7718acaad85'
API_BASE = os.getenv('GITEA_API_BASE', 'http://gitea.bubuit.net:3000/api/v1')
MY_AGENT = os.getenv('AGENT_NAME', 'aitbc1')
ISSUE_LABELS = ['security', 'bug', 'feature', 'refactor', 'task'] # priority order
BONUS_LABELS = ['good-first-task-for-agent']
AVOID_LABELS = ['needs-design', 'blocked', 'needs-reproduction']
CLAIM_TTL_SECONDS = 7200 # 2 hours lease
def query_api(path, method='GET', data=None):
url = f"{API_BASE}/{path}"
cmd = ['curl', '-s', '-H', f'Authorization: token {GITEA_TOKEN}', '-X', method]
if data:
cmd += ['-d', json.dumps(data), '-H', 'Content-Type: application/json']
cmd.append(url)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return None
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
return json.load(f)
return {'current_claim': None, 'claimed_at': None, 'work_branch': None}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def get_open_unassigned_issues():
"""Fetch open issues (excluding PRs) with no assignee, sorted by utility."""
all_items = query_api('repos/oib/aitbc/issues?state=open') or []
# Exclude pull requests
issues = [i for i in all_items if 'pull_request' not in i]
unassigned = [i for i in issues if not i.get('assignees')]
label_priority = {lbl: idx for idx, lbl in enumerate(ISSUE_LABELS)}
avoid_set = set(AVOID_LABELS)
bonus_set = set(BONUS_LABELS)
def utility(issue):
labels = [lbl['name'] for lbl in issue.get('labels', [])]
if any(lbl in avoid_set for lbl in labels):
return -1
base = 1.0
for lbl in labels:
if lbl in label_priority:
base += (len(ISSUE_LABELS) - label_priority[lbl]) * 0.2
break
else:
base = 0.5
if any(lbl in bonus_set for lbl in labels):
base += 0.2
if issue.get('comments', 0) > 10:
base *= 0.8
return base
unassigned.sort(key=utility, reverse=True)
return unassigned
def git_current_branch():
result = subprocess.run(['git', 'branch', '--show-current'], capture_output=True, text=True, cwd=REPO_DIR)
return result.stdout.strip()
def ensure_main_uptodate():
subprocess.run(['git', 'checkout', 'main'], capture_output=True, cwd=REPO_DIR)
subprocess.run(['git', 'pull', 'origin', 'main'], capture_output=True, cwd=REPO_DIR)
def claim_issue(issue_number):
"""Atomically create a claim branch on the remote."""
ensure_main_uptodate()
branch_name = f'claim/{issue_number}'
subprocess.run(['git', 'branch', '-f', branch_name, 'origin/main'], capture_output=True, cwd=REPO_DIR)
result = subprocess.run(['git', 'push', 'origin', branch_name], capture_output=True, text=True, cwd=REPO_DIR)
return result.returncode == 0
def is_claim_stale(claim_branch):
"""Check if a claim branch is older than TTL (stale lock)."""
try:
result = subprocess.run(['git', 'ls-remote', '--heads', 'origin', claim_branch],
capture_output=True, text=True, cwd=REPO_DIR)
if result.returncode != 0 or not result.stdout.strip():
return True # branch missing, treat as stale
# Optional: could check commit timestamp via git show -s --format=%ct <sha>
# For simplicity, we'll rely on state file expiration
return False
except Exception:
return True
def cleanup_stale_claim(claim_branch):
"""Delete a stale claim branch from remote."""
subprocess.run(['git', 'push', 'origin', '--delete', claim_branch],
capture_output=True, cwd=REPO_DIR)
def assign_issue(issue_number, assignee):
data = {"assignee": assignee}
return query_api(f'repos/oib/aitbc/issues/{issue_number}/assignees', method='POST', data=data)
def add_comment(issue_number, body):
data = {"body": body}
return query_api(f'repos/oib/aitbc/issues/{issue_number}/comments', method='POST', data=data)
def create_work_branch(issue_number, title):
"""Create the actual work branch from main."""
ensure_main_uptodate()
slug = ''.join(c if c.isalnum() else '-' for c in title.lower())[:40].strip('-')
branch_name = f'{MY_AGENT}/{issue_number}-{slug}'
subprocess.run(['git', 'checkout', '-b', branch_name, 'main'], check=True, cwd=REPO_DIR)
return branch_name
def main():
now = datetime.utcnow().replace(tzinfo=timezone.utc)
now_iso = now.isoformat()
now_ts = now.timestamp()
print(f"[{now_iso}] Claim task cycle starting...")
state = load_state()
current_claim = state.get('current_claim')
# Check if our own claim expired
if current_claim:
claimed_at = state.get('claimed_at')
expires_at = state.get('expires_at')
if expires_at and now_ts > expires_at:
print(f"Claim for issue #{current_claim} has expired (claimed at {claimed_at}). Releasing.")
# Delete the claim branch and clear state
claim_branch = state.get('claim_branch')
if claim_branch:
cleanup_stale_claim(claim_branch)
state = {}
save_state(state)
current_claim = None
if current_claim:
print(f"Already working on issue #{current_claim} (branch {state.get('work_branch')})")
return
# Optional global cleanup: delete any stale claim branches (older than TTL)
cleanup_global_stale_claims(now_ts)
issues = get_open_unassigned_issues()
if not issues:
print("No unassigned issues available.")
return
for issue in issues:
num = issue['number']
title = issue['title']
labels = [lbl['name'] for lbl in issue.get('labels', [])]
print(f"Attempting to claim issue #{num}: {title} (labels={labels})")
# Check if claim branch exists and is stale
claim_branch = f'claim/{num}'
if not is_claim_stale(claim_branch):
print(f"Claim failed for #{num} (active claim exists). Trying next...")
continue
# Force-delete any lingering claim branch before creating our own
cleanup_stale_claim(claim_branch)
if claim_issue(num):
assign_issue(num, MY_AGENT)
work_branch = create_work_branch(num, title)
expires_at = now_ts + CLAIM_TTL_SECONDS
state.update({
'current_claim': num,
'claim_branch': claim_branch,
'work_branch': work_branch,
'claimed_at': now_iso,
'expires_at': expires_at,
'issue_title': title,
'labels': labels
})
save_state(state)
print(f"✅ Claimed issue #{num}. Work branch: {work_branch} (expires {datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat()})")
add_comment(num, f"Agent `{MY_AGENT}` claiming this task with TTL {CLAIM_TTL_SECONDS/3600}h. (automated)")
return
else:
print(f"Claim failed for #{num} (push error). Trying next...")
print("Could not claim any issue; all taken or unavailable.")
def cleanup_global_stale_claims(now_ts=None):
"""Remove claim branches that appear stale (based on commit age)."""
if now_ts is None:
now_ts = datetime.utcnow().timestamp()
# List all remote claim branches
result = subprocess.run(['git', 'ls-remote', '--heads', 'origin', 'claim/*'],
capture_output=True, text=True, cwd=REPO_DIR)
if result.returncode != 0 or not result.stdout.strip():
return
lines = result.stdout.strip().split('\n')
cleaned = 0
for line in lines:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
continue
sha, branch = parts[0], parts[1]
# Get commit timestamp
ts_result = subprocess.run(['git', 'show', '-s', '--format=%ct', sha],
capture_output=True, text=True, cwd=REPO_DIR)
if ts_result.returncode == 0 and ts_result.stdout.strip():
commit_ts = int(ts_result.stdout.strip())
age = now_ts - commit_ts
if age > CLAIM_TTL_SECONDS:
print(f"Expired claim branch: {branch} (age {age/3600:.1f}h). Deleting.")
cleanup_stale_claim(branch)
cleaned += 1
if cleaned == 0:
print(" cleanup_global_stale_claims: none")
else:
print(f" cleanup_global_stale_claims: removed {cleaned} expired branch(es)")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Create a structured issue via Gitea API.
Requires GITEA_TOKEN in environment or /opt/aitbc/.gitea_token.sh.
"""
import os, sys, json, subprocess
def get_token():
token_file = '/opt/aitbc/.gitea_token.sh'
if os.path.exists(token_file):
with open(token_file) as f:
for line in f:
if line.strip().startswith('GITEA_TOKEN='):
return line.strip().split('=', 1)[1].strip()
return os.getenv('GITEA_TOKEN', '')
GITEA_TOKEN = get_token()
API_BASE = os.getenv('GITEA_API_BASE', 'http://gitea.bubuit.net:3000/api/v1')
REPO = 'oib/aitbc'
def create_issue(title, context, expected, files, implementation, difficulty, priority, labels, assignee=None):
body = f"""## Task
{title}
## Context
{context}
## Expected Result
{expected}
## Files Likely Affected
{files}
## Suggested Implementation
{implementation}
## Difficulty
- [{'x' if difficulty == d else ' '}] {d}
{'' if difficulty != 'medium' else ''}
## Priority
- [{'x' if priority == p else ' '}] {p}
## Labels
{', '.join([f'[{l}]' for l in labels])}
"""
data = {
"title": title,
"body": body,
"labels": labels
}
if assignee:
data["assignee"] = assignee
url = f"{API_BASE}/repos/{REPO}/issues"
cmd = ['curl', '-s', '-H', f'Authorization: token {GITEA_TOKEN}', '-X', 'POST',
'-H', 'Content-Type: application/json', '-d', json.dumps(data), url]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print("API error:", result.stderr)
sys.exit(1)
try:
resp = json.loads(result.stdout)
print(f"Created issue #{resp['number']}: {resp['html_url']}")
except Exception as e:
print("Failed to parse response:", e, result.stdout)
if __name__ == "__main__":
# Example usage; in practice, agents will fill these fields.
create_issue(
title="Add retry logic to Matrix event listener",
context="Spurious network failures cause agent disconnects.",
expected="Listener automatically reconnects and continues processing events.",
files="apps/matrix-listener/src/event_handler.py",
implementation="Wrap event loop in retry decorator with exponential backoff.",
difficulty="medium",
priority="high",
labels=["bug", "infra"],
assignee="aitbc1"
)

253
scripts/monitor-prs.py Executable file
View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Enhanced monitor for Gitea PRs:
- Auto-request review from sibling on my PRs
- Auto-validate sibling's PRs and approve if passing checks, with stability ring awareness
- Monitor CI statuses and report failures
- Release claim branches when associated PRs merge, close, or EXPIRE
"""
import os
import json
import subprocess
import tempfile
import shutil
from datetime import datetime, timezone
GITEA_TOKEN = os.getenv('GITEA_TOKEN') or 'ffce3b62d583b761238ae00839dce7718acaad85'
REPO = 'oib/aitbc'
API_BASE = os.getenv('GITEA_API_BASE', 'http://gitea.bubuit.net:3000/api/v1')
MY_AGENT = os.getenv('AGENT_NAME', 'aitbc1')
SIBLING_AGENT = 'aitbc' if MY_AGENT == 'aitbc1' else 'aitbc1'
CLAIM_STATE_FILE = '/opt/aitbc/.claim-state.json'
CLAIM_TTL_SECONDS = 7200 # Must match claim-task.py
def query_api(path, method='GET', data=None):
url = f"{API_BASE}/{path}"
cmd = ['curl', '-s', '-H', f'Authorization: token {GITEA_TOKEN}', '-X', method]
if data:
cmd += ['-d', json.dumps(data), '-H', 'Content-Type: application/json']
cmd.append(url)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return None
def get_pr_files(pr_number):
return query_api(f'repos/{REPO}/pulls/{pr_number}/files') or []
def detect_ring(path):
ring0 = ['packages/py/aitbc-core/', 'packages/py/aitbc-sdk/', 'packages/py/aitbc-agent-sdk/', 'packages/py/aitbc-crypto/']
ring1 = ['apps/coordinator-api/', 'apps/blockchain-node/', 'apps/analytics/', 'services/']
ring2 = ['cli/', 'scripts/', 'tools/']
ring3 = ['experiments/', 'playground/', 'prototypes/', 'examples/']
if any(path.startswith(p) for p in ring0):
return 0
if any(path.startswith(p) for p in ring1):
return 1
if any(path.startswith(p) for p in ring2):
return 2
if any(path.startswith(p) for p in ring3):
return 3
return 2
def load_claim_state():
if os.path.exists(CLAIM_STATE_FILE):
with open(CLAIM_STATE_FILE) as f:
return json.load(f)
return {}
def save_claim_state(state):
with open(CLAIM_STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def release_claim(issue_number, claim_branch):
check = subprocess.run(['git', 'ls-remote', '--heads', 'origin', claim_branch],
capture_output=True, text=True, cwd='/opt/aitbc')
if check.returncode == 0 and check.stdout.strip():
subprocess.run(['git', 'push', 'origin', '--delete', claim_branch],
capture_output=True, cwd='/opt/aitbc')
state = load_claim_state()
if state.get('current_claim') == issue_number:
state.clear()
save_claim_state(state)
print(f"✅ Released claim for issue #{issue_number} (deleted branch {claim_branch})")
def is_claim_expired(state):
"""Check if the current claim has exceeded TTL."""
expires_at = state.get('expires_at')
if not expires_at:
return False
now_ts = datetime.utcnow().timestamp()
return now_ts > expires_at
def get_open_prs():
return query_api(f'repos/{REPO}/pulls?state=open') or []
def get_all_prs(state='all'):
return query_api(f'repos/{REPO}/pulls?state={state}') or []
def get_pr_reviews(pr_number):
return query_api(f'repos/{REPO}/pulls/{pr_number}/reviews') or []
def get_commit_statuses(pr_number):
pr = query_api(f'repos/{REPO}/pulls/{pr_number}')
if not pr:
return []
sha = pr['head']['sha']
statuses = query_api(f'repos/{REPO}/commits/{sha}/statuses')
if not statuses or not isinstance(statuses, list):
return []
return statuses
def request_reviewer(pr_number, reviewer):
data = {"reviewers": [reviewer]}
return query_api(f'repos/{REPO}/pulls/{pr_number}/requested_reviewers', method='POST', data=data)
def post_review(pr_number, state, body=''):
data = {"body": body, "event": state}
return query_api(f'repos/{REPO}/pulls/{pr_number}/reviews', method='POST', data=data)
def validate_pr_branch(pr):
head = pr['head']
ref = head['ref']
repo = head.get('repo', {}).get('full_name', REPO)
tmpdir = tempfile.mkdtemp(prefix='aitbc-pr-')
try:
clone_url = f"git@gitea.bubuit.net:{repo}.git"
result = subprocess.run(['git', 'clone', '-b', ref, '--depth', '1', clone_url, tmpdir],
capture_output=True, text=True, timeout=60)
if result.returncode != 0:
return False, f"Clone failed: {result.stderr.strip()}"
py_files = subprocess.run(['find', tmpdir, '-name', '*.py'], capture_output=True, text=True)
if py_files.returncode == 0 and py_files.stdout.strip():
for f in py_files.stdout.strip().split('\n')[:20]:
res = subprocess.run(['python3', '-m', 'py_compile', f],
capture_output=True, text=True, cwd=tmpdir)
if res.returncode != 0:
return False, f"Syntax error in `{f}`: {res.stderr.strip()}"
return True, "Automated validation passed."
except Exception as e:
return False, f"Validation error: {str(e)}"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def main():
now = datetime.utcnow().replace(tzinfo=timezone.utc)
now_iso = now.isoformat()
now_ts = now.timestamp()
print(f"[{now_iso}] Monitoring PRs and claim locks...")
# 0. Check claim state: if we have a current claim, see if it expired or PR merged
state = load_claim_state()
if state.get('current_claim'):
issue_num = state['current_claim']
work_branch = state.get('work_branch')
claim_branch = state.get('claim_branch')
# Check expiration
if is_claim_expired(state):
print(f"Claim for issue #{issue_num} has expired. Releasing.")
release_claim(issue_num, claim_branch)
else:
# Check if PR merged/closed
all_prs = get_all_prs(state='all')
matched_pr = None
for pr in all_prs:
if pr['head']['ref'] == work_branch:
matched_pr = pr
break
if matched_pr and matched_pr['state'] == 'closed':
release_claim(issue_num, claim_branch)
# 1. Process open PRs
open_prs = get_open_prs()
notifications = []
for pr in open_prs:
number = pr['number']
title = pr['title']
author = pr['user']['login']
head_ref = pr['head']['ref']
# A. If PR from sibling, consider for review
if author == SIBLING_AGENT:
reviews = get_pr_reviews(number)
my_reviews = [r for r in reviews if r['user']['login'] == MY_AGENT]
if not my_reviews:
files = get_pr_files(number)
rings = [detect_ring(f['filename']) for f in files if f.get('status') != 'removed']
max_ring = max(rings) if rings else 2
if max_ring == 0:
body = "Automated analysis: This PR modifies core (Ring 0) components. Manual review and a design specification are required before merge. No auto-approval."
post_review(number, 'COMMENT', body=body)
notifications.append(f"PR #{number} (Ring 0) flagged for manual review")
else:
passed, msg = validate_pr_branch(pr)
if passed:
post_review(number, 'APPROVED', body=f"Automated peer review: branch validated.\n\n✅ Syntax checks passed.\nRing {max_ring} change — auto-approved. CI must still pass.")
notifications.append(f"Auto-approved PR #{number} from @{author} (Ring {max_ring})")
else:
post_review(number, 'CHANGES_REQUESTED', body=f"Automated peer review detected issues:\n\n{msg}\n\nPlease fix and push.")
notifications.append(f"Requested changes on PR #{number} from @{author}: {msg[:100]}")
# B. If PR from me, ensure sibling is requested as reviewer
if author == MY_AGENT:
pr_full = query_api(f'repos/{REPO}/pulls/{number}')
requested = pr_full.get('requested_reviewers', []) if pr_full else []
if not any(r.get('login') == SIBLING_AGENT for r in requested):
request_reviewer(number, SIBLING_AGENT)
notifications.append(f"Requested review from @{SIBLING_AGENT} for my PR #{number}")
# C. Check CI statuses for any PR
statuses = get_commit_statuses(number)
failing = [s for s in statuses if s.get('status') not in ('success', 'pending')]
if failing:
for s in failing:
notifications.append(f"PR #{number} status check failure: {s.get('context','unknown')} - {s.get('status','unknown')}")
# 2. Global cleanup of stale claim branches (orphaned, older than TTL)
cleanup_global_expired_claims(now_ts)
if notifications:
print("\n".join(notifications))
else:
print("No new alerts.")
def cleanup_global_expired_claims(now_ts=None):
"""Delete remote claim branches that are older than TTL, even if state file is gone."""
if now_ts is None:
now_ts = datetime.utcnow().timestamp()
# List all remote claim branches
result = subprocess.run(['git', 'ls-remote', '--heads', 'origin', 'claim/*'],
capture_output=True, text=True, cwd='/opt/aitbc')
if result.returncode != 0 or not result.stdout.strip():
return
lines = result.stdout.strip().split('\n')
cleaned = 0
for line in lines:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
continue
sha, branch = parts[0], parts[1]
# Get commit timestamp
ts_result = subprocess.run(['git', 'show', '-s', '--format=%ct', sha],
capture_output=True, text=True, cwd='/opt/aitbc')
if ts_result.returncode == 0 and ts_result.stdout.strip():
commit_ts = int(ts_result.stdout.strip())
age = now_ts - commit_ts
if age > CLAIM_TTL_SECONDS:
print(f"Expired claim branch: {branch} (age {age/3600:.1f}h). Deleting.")
subprocess.run(['git', 'push', 'origin', '--delete', branch],
capture_output=True, cwd='/opt/aitbc')
cleaned += 1
if cleaned == 0:
print(" cleanup_global_expired_claims: none")
else:
print(f" cleanup_global_expired_claims: removed {cleaned} expired branch(es)")
if __name__ == '__main__':
main()

160
scripts/qa-cycle.py Executable file
View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
QA Cycle: Run tests, exercise scenarios, find bugs, perform code reviews.
Runs periodically to ensure repository health and discover regressions.
"""
import os
import subprocess
import json
import sys
import shutil
import time
import random
from datetime import datetime
from pathlib import Path
# Jitter: random delay up to 15 minutes (900 seconds)
time.sleep(random.randint(0, 900))
REPO_DIR = '/opt/aitbc'
LOG_FILE = '/opt/aitbc/qa-cycle.log'
TOKEN_FILE = '/opt/aitbc/.gitea_token.sh'
def get_token():
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE) as f:
for line in f:
if line.strip().startswith('GITEA_TOKEN='):
return line.strip().split('=', 1)[1].strip()
return os.getenv('GITEA_TOKEN', '')
GITEA_TOKEN = get_token()
API_BASE = os.getenv('GITEA_API_BASE', 'http://gitea.bubuit.net:3000/api/v1')
REPO = 'oib/aitbc'
def log(msg):
now = datetime.utcnow().isoformat() + 'Z'
with open(LOG_FILE, 'a') as f:
f.write(f"[{now}] {msg}\n")
print(msg)
def run_cmd(cmd, cwd=REPO_DIR, timeout=300):
try:
result = subprocess.run(cmd, shell=True, cwd=cwd, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", "timeout"
except Exception as e:
return -2, "", str(e)
def fetch_latest_main():
log("Fetching latest main...")
rc, out, err = run_cmd("git fetch origin main")
if rc != 0:
log(f"Fetch failed: {err}")
return False
rc, out, err = run_cmd("git checkout main")
if rc != 0:
log(f"Checkout main failed: {err}")
return False
rc, out, err = run_cmd("git reset --hard origin/main")
if rc != 0:
log(f"Reset to origin/main failed: {err}")
return False
log("Main updated to latest.")
return True
def run_tests():
log("Running test suites...")
results = []
for pkg in ['aitbc-core', 'aitbc-sdk', 'aitbc-crypto']:
testdir = f"packages/py/{pkg}/tests"
if not os.path.exists(os.path.join(REPO_DIR, testdir)):
continue
log(f"Testing {pkg}...")
rc, out, err = run_cmd(f"python3 -m pytest {testdir} -q", timeout=120)
if rc == 0:
log(f"{pkg} tests passed.")
else:
log(f"{pkg} tests failed (rc={rc}). Output: {out}\nError: {err}")
results.append((pkg, rc == 0))
return results
def run_lint():
log("Running linters (flake8 if available)...")
if shutil.which('flake8'):
rc, out, err = run_cmd("flake8 packages/py/ --count --select=E9,F63,F7,F82 --show-source --statistics", timeout=60)
if rc == 0:
log("✅ No critical lint errors.")
else:
log(f"❌ Lint errors: {out}")
else:
log("flake8 not installed; skipping lint.")
def query_api(path, method='GET', data=None):
import urllib.request
import urllib.error
url = f"{API_BASE}/{path}"
headers = {'Authorization': f'token {GITEA_TOKEN}'}
if data:
headers['Content-Type'] = 'application/json'
data = json.dumps(data).encode()
req = urllib.request.Request(url, method=method, headers=headers, data=data)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.load(resp)
except Exception as e:
log(f"API error {path}: {e}")
return None
def review_my_open_prs():
log("Checking my open PRs for missing reviews...")
my_prs = query_api(f'repos/{REPO}/pulls?state=open&author={MY_AGENT}') or []
for pr in my_prs:
num = pr['number']
title = pr['title']
requested = pr.get('requested_reviewers', [])
if not any(r.get('login') == SIBLING_AGENT for r in requested):
log(f"PR #{num} '{title}' missing sibling review. Requesting...")
query_api(f'repos/{REPO}/pulls/{num}/requested_reviewers', method='POST', data={'reviewers': [SIBLING_AGENT]})
else:
log(f"PR #{num} already has sibling review requested.")
def synthesize_status():
log("Collecting repository status...")
issues = query_api(f'repos/{REPO}/issues?state=open') or []
prs = query_api(f'repos/{REPO}/pulls?state=open') or []
log(f"Open issues: {len(issues)}, open PRs: {len(prs)}")
unassigned_issues = [i for i in issues if not i.get('assignees') and 'pull_request' not in i]
log(f"Unassigned issues: {len(unassigned_issues)}")
if unassigned_issues:
for i in unassigned_issues[:3]:
log(f" - #{i['number']} {i['title'][:50]}")
# Check CI for open PRs
for pr in prs:
num = pr['number']
statuses = query_api(f'repos/{REPO}/commits/{pr["head"]["sha"]}/statuses') or []
failing = [s for s in statuses if s.get('status') not in ('success', 'pending')]
if failing:
log(f"PR #{num} has failing checks: {', '.join(s.get('context','?') for s in failing)}")
def main():
now = datetime.utcnow().isoformat() + 'Z'
log(f"\n=== QA Cycle start: {now} ===")
if not GITEA_TOKEN:
log("GITEA_TOKEN not set; aborting.")
sys.exit(1)
global MY_AGENT, SIBLING_AGENT
MY_AGENT = os.getenv('AGENT_NAME', 'aitbc1')
SIBLING_AGENT = 'aitbc' if MY_AGENT == 'aitbc1' else 'aitbc1'
if not fetch_latest_main():
log("Aborting due to fetch failure.")
return
run_tests()
run_lint()
review_my_open_prs()
synthesize_status()
log(f"=== QA Cycle complete ===")
if __name__ == '__main__':
main()