diff --git a/scripts/claim-task.py b/scripts/claim-task.py new file mode 100755 index 00000000..21097ea8 --- /dev/null +++ b/scripts/claim-task.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +""" +Task Claim System for AITBC agents. +Uses Git branch atomic creation as a distributed lock to prevent duplicate work. +""" +import os +import json +import subprocess +from datetime import datetime + +REPO_DIR = '/opt/aitbc' +STATE_FILE = '/opt/aitbc/.claim-state.json' +GITEA_TOKEN = os.getenv('GITEA_TOKEN') or 'ffce3b62d583b761238ae00839dce7718acaad85' +API_BASE = os.getenv('GITEA_API_BASE', 'http://gitea.bubuit.net:3000/api/v1') +MY_AGENT = os.getenv('AGENT_NAME', 'aitbc1') +ISSUE_LABELS = ['security', 'bug', 'feature', 'refactor', 'task'] # priority order +BONUS_LABELS = ['good-first-task-for-agent'] +AVOID_LABELS = ['needs-design', 'blocked', 'needs-reproduction'] + +def query_api(path, method='GET', data=None): + url = f"{API_BASE}/{path}" + cmd = ['curl', '-s', '-H', f'Authorization: token {GITEA_TOKEN}', '-X', method] + if data: + cmd += ['-d', json.dumps(data), '-H', 'Content-Type: application/json'] + cmd.append(url) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + return None + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return None + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {'current_claim': None, 'claimed_at': None, 'work_branch': None} + +def save_state(state): + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + +def get_open_unassigned_issues(): + """Fetch open issues (excluding PRs) with no assignee, sorted by utility.""" + all_items = query_api('repos/oib/aitbc/issues?state=open') or [] + # Exclude pull requests + issues = [i for i in all_items if 'pull_request' not in i] + unassigned = [i for i in issues if not i.get('assignees')] + + label_priority = {lbl: idx for idx, lbl in enumerate(ISSUE_LABELS)} + avoid_set = set(AVOID_LABELS) + bonus_set = set(BONUS_LABELS) + + def utility(issue): + labels = [lbl['name'] for lbl in issue.get('labels', [])] + if any(lbl in avoid_set for lbl in labels): + return -1 + base = 1.0 + for lbl in labels: + if lbl in label_priority: + base += (len(ISSUE_LABELS) - label_priority[lbl]) * 0.2 + break + else: + base = 0.5 + if any(lbl in bonus_set for lbl in labels): + base += 0.2 + if issue.get('comments', 0) > 10: + base *= 0.8 + return base + + unassigned.sort(key=utility, reverse=True) + return unassigned + +def git_current_branch(): + result = subprocess.run(['git', 'branch', '--show-current'], capture_output=True, text=True, cwd=REPO_DIR) + return result.stdout.strip() + +def ensure_main_uptodate(): + subprocess.run(['git', 'checkout', 'main'], capture_output=True, cwd=REPO_DIR) + subprocess.run(['git', 'pull', 'origin', 'main'], capture_output=True, cwd=REPO_DIR) + +def claim_issue(issue_number): + """Atomically create a claim branch on the remote.""" + ensure_main_uptodate() + branch_name = f'claim/{issue_number}' + subprocess.run(['git', 'branch', '-f', branch_name, 'origin/main'], capture_output=True, cwd=REPO_DIR) + result = subprocess.run(['git', 'push', 'origin', branch_name], capture_output=True, text=True, cwd=REPO_DIR) + return result.returncode == 0 + +def assign_issue(issue_number, assignee): + data = {"assignee": assignee} + return query_api(f'repos/oib/aitbc/issues/{issue_number}/assignees', method='POST', data=data) + +def add_comment(issue_number, body): + data = {"body": body} + return query_api(f'repos/oib/aitbc/issues/{issue_number}/comments', method='POST', data=data) + +def create_work_branch(issue_number, title): + """Create the actual work branch from main.""" + ensure_main_uptodate() + slug = ''.join(c if c.isalnum() else '-' for c in title.lower())[:40].strip('-') + branch_name = f'{MY_AGENT}/{issue_number}-{slug}' + subprocess.run(['git', 'checkout', '-b', branch_name, 'main'], check=True, cwd=REPO_DIR) + return branch_name + +def main(): + now = datetime.utcnow().isoformat() + 'Z' + print(f"[{now}] Claim task cycle starting...") + + state = load_state() + current_claim = state.get('current_claim') + + if current_claim: + print(f"Already working on issue #{current_claim} (branch {state.get('work_branch')})") + # Optional: could check if that PR has been merged/closed and release claim here + return + + issues = get_open_unassigned_issues() + if not issues: + print("No unassigned issues available.") + return + + for issue in issues: + num = issue['number'] + title = issue['title'] + labels = [lbl['name'] for lbl in issue.get('labels', [])] + print(f"Attempting to claim issue #{num}: {title} (labels={labels})") + if claim_issue(num): + assign_issue(num, MY_AGENT) + work_branch = create_work_branch(num, title) + state.update({ + 'current_claim': num, + 'claim_branch': f'claim/{num}', + 'work_branch': work_branch, + 'claimed_at': datetime.utcnow().isoformat() + 'Z', + 'issue_title': title, + 'labels': labels + }) + save_state(state) + print(f"āœ… Claimed issue #{num}. Work branch: {work_branch}") + add_comment(num, f"Agent `{MY_AGENT}` claiming this task. (automated)") + return + else: + print(f"Claim failed for #{num} (branch exists). Trying next...") + + print("Could not claim any issue; all taken or unavailable.") + +if __name__ == '__main__': + main() diff --git a/scripts/monitor-prs.py b/scripts/monitor-prs.py new file mode 100755 index 00000000..49d0ab38 --- /dev/null +++ b/scripts/monitor-prs.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +""" +Enhanced monitor for Gitea PRs: +- Auto-request review from sibling on my PRs +- Auto-validate sibling's PRs and approve if passing checks, with stability ring awareness +- Monitor CI statuses and report failures +- Release claim branches when associated PRs merge or close +""" +import os +import json +import subprocess +import tempfile +import shutil +from datetime import datetime + +GITEA_TOKEN = os.getenv('GITEA_TOKEN') or 'ffce3b62d583b761238ae00839dce7718acaad85' +REPO = 'oib/aitbc' +API_BASE = os.getenv('GITEA_API_BASE', 'http://gitea.bubuit.net:3000/api/v1') +MY_AGENT = os.getenv('AGENT_NAME', 'aitbc1') +SIBLING_AGENT = 'aitbc' if MY_AGENT == 'aitbc1' else 'aitbc1' +CLAIM_STATE_FILE = '/opt/aitbc/.claim-state.json' + +def query_api(path, method='GET', data=None): + url = f"{API_BASE}/{path}" + cmd = ['curl', '-s', '-H', f'Authorization: token {GITEA_TOKEN}', '-X', method] + if data: + cmd += ['-d', json.dumps(data), '-H', 'Content-Type: application/json'] + cmd.append(url) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + return None + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return None + +def get_pr_files(pr_number): + return query_api(f'repos/{REPO}/pulls/{pr_number}/files') or [] + +def detect_ring(path): + ring0 = ['packages/py/aitbc-core/', 'packages/py/aitbc-sdk/', 'packages/py/aitbc-agent-sdk/', 'packages/py/aitbc-crypto/'] + ring1 = ['apps/coordinator-api/', 'apps/blockchain-node/', 'apps/analytics/', 'services/'] + ring2 = ['cli/', 'scripts/', 'tools/'] + ring3 = ['experiments/', 'playground/', 'prototypes/', 'examples/'] + if any(path.startswith(p) for p in ring0): + return 0 + if any(path.startswith(p) for p in ring1): + return 1 + if any(path.startswith(p) for p in ring2): + return 2 + if any(path.startswith(p) for p in ring3): + return 3 + return 2 + +def load_claim_state(): + if os.path.exists(CLAIM_STATE_FILE): + with open(CLAIM_STATE_FILE) as f: + return json.load(f) + return {} + +def save_claim_state(state): + with open(CLAIM_STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + +def release_claim(issue_number, claim_branch): + check = subprocess.run(['git', 'ls-remote', '--heads', 'origin', claim_branch], + capture_output=True, text=True, cwd='/opt/aitbc') + if check.returncode == 0 and check.stdout.strip(): + subprocess.run(['git', 'push', 'origin', '--delete', claim_branch], + capture_output=True, cwd='/opt/aitbc') + state = load_claim_state() + if state.get('current_claim') == issue_number: + state.clear() + save_claim_state(state) + print(f"āœ… Released claim for issue #{issue_number} (deleted branch {claim_branch})") + +def get_open_prs(): + return query_api(f'repos/{REPO}/pulls?state=open') or [] + +def get_all_prs(state='all'): + return query_api(f'repos/{REPO}/pulls?state={state}') or [] + +def get_pr_reviews(pr_number): + return query_api(f'repos/{REPO}/pulls/{pr_number}/reviews') or [] + +def get_commit_statuses(pr_number): + pr = query_api(f'repos/{REPO}/pulls/{pr_number}') + if not pr: + return [] + sha = pr['head']['sha'] + statuses = query_api(f'repos/{REPO}/commits/{sha}/statuses') + if not statuses or not isinstance(statuses, list): + return [] + return statuses + +def request_reviewer(pr_number, reviewer): + data = {"reviewers": [reviewer]} + return query_api(f'repos/{REPO}/pulls/{pr_number}/requested_reviewers', method='POST', data=data) + +def post_review(pr_number, state, body=''): + data = {"body": body, "event": state} + return query_api(f'repos/{REPO}/pulls/{pr_number}/reviews', method='POST', data=data) + +def validate_pr_branch(pr): + head = pr['head'] + ref = head['ref'] + repo = head.get('repo', {}).get('full_name', REPO) + tmpdir = tempfile.mkdtemp(prefix='aitbc-pr-') + try: + clone_url = f"git@gitea.bubuit.net:{repo}.git" + result = subprocess.run(['git', 'clone', '-b', ref, '--depth', '1', clone_url, tmpdir], + capture_output=True, text=True, timeout=60) + if result.returncode != 0: + return False, f"Clone failed: {result.stderr.strip()}" + py_files = subprocess.run(['find', tmpdir, '-name', '*.py'], capture_output=True, text=True) + if py_files.returncode == 0 and py_files.stdout.strip(): + for f in py_files.stdout.strip().split('\n')[:20]: + res = subprocess.run(['python3', '-m', 'py_compile', f], + capture_output=True, text=True, cwd=tmpdir) + if res.returncode != 0: + return False, f"Syntax error in `{f}`: {res.stderr.strip()}" + return True, "Automated validation passed." + except Exception as e: + return False, f"Validation error: {str(e)}" + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + +def main(): + now = datetime.utcnow().isoformat() + 'Z' + print(f"[{now}] Monitoring PRs and claim locks...") + + # 0. Check claim state: if we have a current claim, see if corresponding PR merged + state = load_claim_state() + if state.get('current_claim'): + issue_num = state['current_claim'] + work_branch = state.get('work_branch') + claim_branch = state.get('claim_branch') + all_prs = get_all_prs(state='all') + matched_pr = None + for pr in all_prs: + if pr['head']['ref'] == work_branch: + matched_pr = pr + break + if matched_pr: + if matched_pr['state'] == 'closed': + release_claim(issue_num, claim_branch) + + # 1. Process open PRs + open_prs = get_open_prs() + notifications = [] + + for pr in open_prs: + number = pr['number'] + title = pr['title'] + author = pr['user']['login'] + head_ref = pr['head']['ref'] + + # A. If PR from sibling, consider for review + if author == SIBLING_AGENT: + reviews = get_pr_reviews(number) + my_reviews = [r for r in reviews if r['user']['login'] == MY_AGENT] + if not my_reviews: + files = get_pr_files(number) + rings = [detect_ring(f['filename']) for f in files if f.get('status') != 'removed'] + max_ring = max(rings) if rings else 2 + if max_ring == 0: + body = "Automated analysis: This PR modifies core (Ring 0) components. Manual review and a design specification are required before merge. No auto-approval." + post_review(number, 'COMMENT', body=body) + notifications.append(f"PR #{number} (Ring 0) flagged for manual review") + else: + passed, msg = validate_pr_branch(pr) + if passed: + post_review(number, 'APPROVED', body=f"Automated peer review: branch validated.\n\nāœ… Syntax checks passed.\nRing {max_ring} change — auto-approved. CI must still pass.") + notifications.append(f"Auto-approved PR #{number} from @{author} (Ring {max_ring})") + else: + post_review(number, 'CHANGES_REQUESTED', body=f"Automated peer review detected issues:\n\n{msg}\n\nPlease fix and push.") + notifications.append(f"Requested changes on PR #{number} from @{author}: {msg[:100]}") + + # B. If PR from me, ensure sibling is requested as reviewer + if author == MY_AGENT: + pr_full = query_api(f'repos/{REPO}/pulls/{number}') + requested = pr_full.get('requested_reviewers', []) if pr_full else [] + if not any(r.get('login') == SIBLING_AGENT for r in requested): + request_reviewer(number, SIBLING_AGENT) + notifications.append(f"Requested review from @{SIBLING_AGENT} for my PR #{number}") + + # C. Check CI statuses for any PR + statuses = get_commit_statuses(number) + failing = [s for s in statuses if s.get('status') not in ('success', 'pending')] + if failing: + for s in failing: + notifications.append(f"PR #{number} status check failure: {s.get('context','unknown')} - {s.get('status','unknown')}") + + if notifications: + print("\n".join(notifications)) + else: + print("No new alerts.") + +if __name__ == '__main__': + main()