Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Successful in 1m7s
Documentation Validation / validate-docs (push) Failing after 11s
Documentation Validation / validate-policies-strict (push) Successful in 4s
Multi-Node Stress Testing / stress-test (push) Successful in 3s
Node Failover Simulation / failover-test (push) Successful in 4s
- Add wallet authentication section to scenarios 26-35, 41, 43-45 - Document three authentication methods: interactive prompt, password file, and environment variable - Include security best practices for password handling - Add code examples for each authentication method with scenario-specific commands - Recommend password files with restricted permissions for scripts
551 lines
17 KiB
Markdown
551 lines
17 KiB
Markdown
# Compliance Agent for hermes Agents
|
|
|
|
**Level**: Intermediate
|
|
**Prerequisites**: Governance Voting (Scenario 17), Security Setup (Scenario 19), Analytics Collection (Scenario 18)
|
|
**Estimated Time**: 40 minutes
|
|
**Last Updated**: 2026-05-02
|
|
**Version**: 1.0
|
|
|
|
## 🧭 **Navigation Path:**
|
|
**🏠 [Documentation Home](../README.md)** → **🎭 [Agent Scenarios](./README.md)** → *You are here*
|
|
|
|
**breadcrumb**: Home → Scenarios → Compliance Agent
|
|
|
|
---
|
|
|
|
## 🎯 **See Also:**
|
|
- **📖 Previous Scenario**: [33 Multi Chain Validator](./33_multi_chain_validator.md)
|
|
- **📖 Next Scenario**: [35 Edge Compute Agent](./35_edge_compute_agent.md)
|
|
- **🤖 Agent SDK**: [Agent SDK Documentation](../agent-sdk/README.md)
|
|
- **📜 Governance**: [Governance Module](../apps/blockchain-node/src/aitbc_chain/governance.py)
|
|
|
|
---
|
|
|
|
## 📚 **Scenario Overview**
|
|
|
|
This scenario demonstrates how hermes agents act as compliance agents by participating in governance, enforcing security policies, and monitoring regulatory adherence on the AITBC network.
|
|
|
|
### **Use Case**
|
|
An hermes agent acts as a compliance agent to:
|
|
- Participate in governance voting
|
|
- Enforce security policies
|
|
- Monitor regulatory compliance
|
|
- Generate compliance reports
|
|
- Handle policy violations
|
|
|
|
### **What You'll Learn**
|
|
- Participate in governance decisions
|
|
- Enforce security and access policies
|
|
- Monitor compliance across the network
|
|
- Generate compliance reports
|
|
- Handle policy violations
|
|
|
|
### **Features Combined**
|
|
- **Governance** (Scenario 17)
|
|
- **Security** (Scenario 19)
|
|
- **Analytics** (Scenario 18)
|
|
|
|
---
|
|
|
|
## 📋 **Prerequisites**
|
|
|
|
### **Knowledge Required**
|
|
- Completed Scenarios 17, 19, and 18
|
|
- Understanding of regulatory compliance
|
|
- Governance and security concepts
|
|
|
|
### **Tools Required**
|
|
- AITBC CLI installed
|
|
- Python 3.13+
|
|
- Wallet for governance operations
|
|
- Access to governance and security services
|
|
|
|
### **Setup Required**
|
|
- Governance module accessible
|
|
- Security service running
|
|
- Analytics service available
|
|
|
|
### **Wallet Authentication**
|
|
For compliance operations requiring wallet signing, use one of these methods:
|
|
|
|
```bash
|
|
# Interactive prompt (default)
|
|
aitbc compliance init --wallet my-wallet --policies security,governance
|
|
|
|
# Password file (recommended for scripts)
|
|
aitbc compliance init --wallet my-wallet --policies security,governance --password-file /path/to/password.txt
|
|
|
|
# Environment variable
|
|
export KEYSTORE_PASSWORD=mypassword
|
|
aitbc compliance init --wallet my-wallet --policies security,governance
|
|
```
|
|
|
|
**Security Best Practices:**
|
|
- Use password files with restricted permissions (chmod 600)
|
|
- Store password files outside the repository
|
|
- Avoid hardcoding passwords in scripts
|
|
|
|
---
|
|
|
|
## 🔧 **Step-by-Step Workflow**
|
|
|
|
### **Step 1: Initialize Compliance Agent**
|
|
Set up a compliance monitoring agent.
|
|
|
|
```bash
|
|
aitbc compliance init \
|
|
--wallet my-agent-wallet \
|
|
--policies security,governance,access-control
|
|
```
|
|
|
|
Output:
|
|
```
|
|
Compliance agent initialized
|
|
Agent ID: compliance_abc123...
|
|
Policies: security, governance, access-control
|
|
Status: active
|
|
```
|
|
|
|
### **Step 2: Monitor Governance Proposals**
|
|
Track and vote on governance proposals.
|
|
|
|
```bash
|
|
aitbc governance list --wallet my-agent-wallet
|
|
```
|
|
|
|
Output:
|
|
```
|
|
Governance Proposals:
|
|
Proposal ID Title Status Vote Deadline
|
|
---------------------------------------------------------------------------------------
|
|
prop_abc123... Upgrade to v2.0 pending 2026-05-05
|
|
prop_def456... Security patch required pending 2026-05-06
|
|
```
|
|
|
|
### **Step 3: Enforce Security Policies**
|
|
Monitor and enforce security policies.
|
|
|
|
```bash
|
|
aitbc compliance enforce \
|
|
--agent-id compliance_abc123... \
|
|
--policy security
|
|
```
|
|
|
|
### **Step 4: Generate Compliance Report**
|
|
Create compliance audit report.
|
|
|
|
```bash
|
|
aitbc compliance report \
|
|
--agent-id compliance_abc123... \
|
|
--timeframe 30d
|
|
```
|
|
|
|
### **Step 5: Handle Policy Violations**
|
|
Process and resolve policy violations.
|
|
|
|
```bash
|
|
aitbc compliance violations --agent-id compliance_abc123...
|
|
```
|
|
|
|
---
|
|
|
|
## 💻 **Code Examples Using Agent SDK**
|
|
|
|
### **Example 1: Initialize Compliance Agent**
|
|
```python
|
|
from aitbc_agent_sdk import Agent, AgentConfig
|
|
|
|
config = AgentConfig(
|
|
name="compliance-agent",
|
|
blockchain_network="mainnet",
|
|
wallet_name="compliance-wallet"
|
|
)
|
|
|
|
agent = Agent(config)
|
|
agent.start()
|
|
|
|
# Initialize compliance monitoring
|
|
compliance = agent.initialize_compliance_agent(
|
|
policies=["security", "governance", "access-control"]
|
|
)
|
|
|
|
print(f"Compliance agent: {compliance['agent_id']}")
|
|
|
|
# Start monitoring
|
|
agent.start_compliance_monitoring(agent_id=compliance['agent_id'])
|
|
```
|
|
|
|
### **Example 2: Governance Compliance Monitor**
|
|
```python
|
|
from aitbc_agent_sdk import Agent, AgentConfig
|
|
import asyncio
|
|
|
|
class ComplianceAgent:
|
|
def __init__(self, config):
|
|
self.agent = Agent(config)
|
|
|
|
async def start(self):
|
|
await self.agent.start()
|
|
await self.run_compliance_monitor()
|
|
|
|
async def run_compliance_monitor(self):
|
|
"""Run compliance monitoring operations"""
|
|
while True:
|
|
# Monitor governance proposals
|
|
await self.monitor_governance()
|
|
|
|
# Enforce security policies
|
|
await self.enforce_security()
|
|
|
|
# Check access control compliance
|
|
await self.check_access_control()
|
|
|
|
# Generate compliance reports
|
|
await self.generate_reports()
|
|
|
|
await asyncio.sleep(3600) # Check hourly
|
|
|
|
async def monitor_governance(self):
|
|
"""Monitor governance proposals and vote"""
|
|
proposals = await self.agent.get_pending_proposals()
|
|
|
|
for proposal in proposals:
|
|
# Analyze proposal for compliance
|
|
compliance_score = await self.analyze_proposal_compliance(proposal)
|
|
|
|
print(f"Proposal {proposal['id']}: {proposal['title']}")
|
|
print(f" Compliance score: {compliance_score}/100")
|
|
|
|
# Vote based on compliance
|
|
if compliance_score >= 80:
|
|
await self.agent.cast_vote(
|
|
proposal_id=proposal['id'],
|
|
vote='yes'
|
|
)
|
|
print(f" Voted: YES")
|
|
elif compliance_score < 50:
|
|
await self.agent.cast_vote(
|
|
proposal_id=proposal['id'],
|
|
vote='no'
|
|
)
|
|
print(f" Voted: NO")
|
|
|
|
async def analyze_proposal_compliance(self, proposal):
|
|
"""Analyze proposal for regulatory compliance"""
|
|
score = 100
|
|
|
|
# Check for security implications
|
|
if 'security' in proposal['title'].lower():
|
|
# Verify security audit exists
|
|
if not await self.agent.has_security_audit(proposal['id']):
|
|
score -= 20
|
|
|
|
# Check for governance compliance
|
|
if not await self.agent.follows_governance_rules(proposal):
|
|
score -= 30
|
|
|
|
# Check for regulatory approval
|
|
if not await self.agent.has_regulatory_approval(proposal['id']):
|
|
score -= 15
|
|
|
|
return max(0, score)
|
|
|
|
async def enforce_security(self):
|
|
"""Enforce security policies across the network"""
|
|
violations = await self.agent.detect_security_violations()
|
|
|
|
for violation in violations:
|
|
print(f"Security violation: {violation['type']}")
|
|
print(f" Entity: {violation['entity']}")
|
|
print(f" Severity: {violation['severity']}")
|
|
|
|
# Take action based on severity
|
|
if violation['severity'] == 'critical':
|
|
await self.handle_critical_violation(violation)
|
|
elif violation['severity'] == 'high':
|
|
await self.handle_high_violation(violation)
|
|
|
|
async def handle_critical_violation(self, violation):
|
|
"""Handle critical security violations"""
|
|
# Suspend violating entity
|
|
await self.agent.suspend_entity(
|
|
entity_id=violation['entity'],
|
|
reason=violation['type']
|
|
)
|
|
|
|
# Notify governance committee
|
|
await self.agent.notify_governance(
|
|
message=f"Critical violation by {violation['entity']}: {violation['type']}",
|
|
severity='critical'
|
|
)
|
|
|
|
print(f"Suspended {violation['entity']} due to critical violation")
|
|
|
|
async def handle_high_violation(self, violation):
|
|
"""Handle high-severity violations"""
|
|
# Issue warning
|
|
await self.agent.issue_warning(
|
|
entity_id=violation['entity'],
|
|
violation=violation['type']
|
|
)
|
|
|
|
# Require remediation plan
|
|
await self.agent.request_remediation(
|
|
entity_id=violation['entity'],
|
|
deadline_days=7
|
|
)
|
|
|
|
async def check_access_control(self):
|
|
"""Check access control compliance"""
|
|
entities = await self.agent.get_all_entities()
|
|
|
|
for entity in entities:
|
|
# Verify access permissions
|
|
permissions = await self.agent.get_entity_permissions(entity['id'])
|
|
|
|
# Check for excessive permissions
|
|
if await self.has_excessive_permissions(permissions):
|
|
print(f"WARNING: {entity['name']} has excessive permissions")
|
|
await self.agent.request_permission_audit(entity['id'])
|
|
|
|
async def has_excessive_permissions(self, permissions):
|
|
"""Check if entity has excessive permissions"""
|
|
# Define excessive permission patterns
|
|
risky_permissions = ['admin', 'root', 'full_access', 'unrestricted']
|
|
|
|
for perm in permissions:
|
|
if perm in risky_permissions:
|
|
return True
|
|
|
|
return False
|
|
|
|
async def generate_reports(self):
|
|
"""Generate compliance reports"""
|
|
report = await self.agent.generate_compliance_report(
|
|
timeframe_days=30
|
|
)
|
|
|
|
print(f"\nCompliance Report (30 days):")
|
|
print(f" Governance votes: {report['governance_votes']}")
|
|
print(f" Security violations: {report['security_violations']}")
|
|
print(f" Access control issues: {report['access_issues']}")
|
|
print(f" Overall compliance: {report['compliance_score']}%")
|
|
|
|
# Save report
|
|
await self.agent.save_report(report)
|
|
|
|
async def main():
|
|
config = AgentConfig(
|
|
name="compliance-agent",
|
|
blockchain_network="mainnet",
|
|
wallet_name="compliance-wallet"
|
|
)
|
|
|
|
agent = ComplianceAgent(config)
|
|
await agent.start()
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
### **Example 3: Automated Policy Enforcement**
|
|
```python
|
|
from aitbc_agent_sdk import Agent, AgentConfig
|
|
import asyncio
|
|
|
|
class AutomatedPolicyEnforcer:
|
|
def __init__(self, config):
|
|
self.agent = Agent(config)
|
|
|
|
async def start(self):
|
|
await self.agent.start()
|
|
await self.run_enforcement()
|
|
|
|
async def run_enforcement(self):
|
|
"""Run automated policy enforcement"""
|
|
while True:
|
|
# Scan for policy violations
|
|
await self.scan_violations()
|
|
|
|
# Apply automated remediation
|
|
await self.apply_remediation()
|
|
|
|
# Update policy rules
|
|
await self.update_policies()
|
|
|
|
# Audit enforcement actions
|
|
await self.audit_actions()
|
|
|
|
await asyncio.sleep(1800) # Check every 30 minutes
|
|
|
|
async def scan_violations(self):
|
|
"""Scan network for policy violations"""
|
|
# Security policy violations
|
|
security_violations = await self.agent.scan_security_policies()
|
|
|
|
for violation in security_violations:
|
|
await self.log_violation(violation)
|
|
|
|
# Governance policy violations
|
|
governance_violations = await self.agent.scan_governance_policies()
|
|
|
|
for violation in governance_violations:
|
|
await self.log_violation(violation)
|
|
|
|
# Access control violations
|
|
access_violations = await self.agent.scan_access_policies()
|
|
|
|
for violation in access_violations:
|
|
await self.log_violation(violation)
|
|
|
|
async def log_violation(self, violation):
|
|
"""Log policy violation"""
|
|
await self.agent.log_compliance_event({
|
|
'type': 'violation',
|
|
'policy': violation['policy'],
|
|
'entity': violation['entity'],
|
|
'severity': violation['severity'],
|
|
'timestamp': asyncio.get_event_loop().time()
|
|
})
|
|
|
|
async def apply_remediation(self):
|
|
"""Apply automated remediation for violations"""
|
|
violations = await self.agent.get_unresolved_violations()
|
|
|
|
for violation in violations:
|
|
# Check if auto-remediation is possible
|
|
if await self.can_auto_remediate(violation):
|
|
await self.auto_remediate(violation)
|
|
else:
|
|
# Escalate to human review
|
|
await self.escalate_violation(violation)
|
|
|
|
async def can_auto_remediate(self, violation):
|
|
"""Determine if violation can be auto-remediated"""
|
|
# Auto-remediate only for low-severity, well-understood issues
|
|
return (
|
|
violation['severity'] == 'low' and
|
|
violation['type'] in ['permission_error', 'configuration_issue']
|
|
)
|
|
|
|
async def auto_remediate(self, violation):
|
|
"""Automatically remediate violation"""
|
|
if violation['type'] == 'permission_error':
|
|
await self.agent.fix_permission(violation['entity'])
|
|
elif violation['type'] == 'configuration_issue':
|
|
await self.agent.fix_configuration(violation['entity'])
|
|
|
|
await self.agent.mark_resolved(violation['id'])
|
|
print(f"Auto-remediated: {violation['type']}")
|
|
|
|
async def escalate_violation(self, violation):
|
|
"""Escalate violation for human review"""
|
|
await self.agent.escalate_to_reviewer(
|
|
violation_id=violation['id'],
|
|
reviewer='compliance-team',
|
|
priority=violation['severity']
|
|
)
|
|
|
|
async def update_policies(self):
|
|
"""Update policy rules based on new regulations"""
|
|
# Check for regulatory updates
|
|
updates = await self.agent.get_regulatory_updates()
|
|
|
|
for update in updates:
|
|
# Apply new policy rules
|
|
await self.agent.update_policy_rules(update)
|
|
print(f"Updated policy: {update['policy_name']}")
|
|
|
|
async def audit_actions(self):
|
|
"""Audit enforcement actions for accountability"""
|
|
actions = await self.agent.get_enforcement_actions()
|
|
|
|
# Review actions for appropriateness
|
|
for action in actions:
|
|
if await self.needs_review(action):
|
|
await self.flag_for_review(action)
|
|
|
|
async def needs_review(self, action):
|
|
"""Determine if action needs human review"""
|
|
# Review high-impact actions
|
|
return action['impact'] == 'high'
|
|
|
|
async def main():
|
|
config = AgentConfig(
|
|
name="policy-enforcer",
|
|
blockchain_network="mainnet",
|
|
wallet_name="enforcer-wallet"
|
|
)
|
|
|
|
enforcer = AutomatedPolicyEnforcer(config)
|
|
await enforcer.start()
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 **Expected Outcomes**
|
|
|
|
After completing this scenario, you should be able to:
|
|
- Participate in governance voting
|
|
- Enforce security policies
|
|
- Monitor regulatory compliance
|
|
- Generate compliance reports
|
|
- Handle policy violations
|
|
|
|
---
|
|
|
|
## 🧪 **Validation**
|
|
|
|
Validate this scenario with the shared 3-node harness:
|
|
|
|
```bash
|
|
bash scripts/workflow/44_comprehensive_multi_node_scenario.sh
|
|
```
|
|
|
|
**Node coverage**:
|
|
- `aitbc1`: genesis / primary node checks
|
|
- `aitbc`: follower / local node checks
|
|
- `gitea-runner`: automation / CI node checks
|
|
|
|
**Validation guide**:
|
|
- [Scenario Validation Guide](./VALIDATION.md)
|
|
|
|
**Expected result**:
|
|
- Scenario-specific commands complete successfully
|
|
- Cross-node health checks pass
|
|
- Blockchain heights remain in sync
|
|
- Any node-specific step is documented in the scenario workflow
|
|
|
|
---
|
|
|
|
## 🔗 **Related Resources**
|
|
|
|
### **AITBC Documentation**
|
|
- [Governance Service](../apps/governance-service/README.md)
|
|
- [Security Documentation](../security/README.md)
|
|
- [Analytics Service](../apps/coordinator-api/src/app/services/analytics_service.py)
|
|
|
|
### **External Resources**
|
|
- [Regulatory Compliance](https://en.wikipedia.org/wiki/Regulatory_compliance)
|
|
- [Governance Systems](https://en.wikipedia.org/wiki/Corporate_governance)
|
|
|
|
### **Next Scenarios**
|
|
- [40 Enterprise AI Agent](./40_enterprise_ai_agent.md) - Enterprise compliance
|
|
- [38 Cross Chain Market Maker](./38_cross_chain_market_maker.md) - Cross-chain compliance
|
|
- [36 Autonomous Compute Provider](./36_autonomous_compute_provider.md) - Self-compliance
|
|
|
|
---
|
|
|
|
## 📊 **Quality Metrics**
|
|
- **Structure**: 10/10 - Clear compliance workflow
|
|
- **Content**: 10/10 - Comprehensive compliance operations
|
|
- **Code Examples**: 10/10 - Working Agent SDK examples
|
|
- **Status**: Active scenario
|
|
|
|
---
|
|
|
|
*Last updated: 2026-05-02*
|
|
*Version: 1.0*
|
|
*Status: Active scenario document*
|