Files
aitbc/docs/scenarios/37_distributed_ai_training.md
aitbc 852f2e5a8a
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
CLI Tests / test-cli (push) Has been cancelled
Blockchain Synchronization Verification / sync-verification (push) Successful in 11s
Contract Performance Benchmarks / benchmark-gas-usage (push) Successful in 1m36s
Contract Performance Benchmarks / benchmark-execution-time (push) Successful in 1m24s
Contract Performance Benchmarks / benchmark-throughput (push) Successful in 1m25s
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Successful in 2s
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Successful in 5s
Cross-Chain Functionality Tests / test-cross-chain-bridge (push) Has been skipped
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Successful in 3s
Cross-Chain Functionality Tests / aggregate-results (push) Has been skipped
Multi-Chain Island Architecture Tests / test-multi-chain-island (push) Successful in 2s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 3s
P2P Network Verification / p2p-verification (push) Successful in 2s
Smart Contract Tests / test-solidity (map[name:aitbc-contracts path:contracts]) (push) Failing after 1m28s
Smart Contract Tests / test-solidity (map[name:aitbc-token path:packages/solidity/aitbc-token]) (push) Successful in 21s
Smart Contract Tests / test-foundry (push) Failing after 20s
Smart Contract Tests / lint-solidity (push) Successful in 30s
Smart Contract Tests / deploy-contracts (push) Successful in 1m40s
Systemd Sync / sync-systemd (push) Successful in 26s
Contract Performance Benchmarks / compare-benchmarks (push) Successful in 4s
Rename openclaw to hermes across documentation and workflows
- Update workflow paths from docs/openclaw to docs/hermes
- Rename skill prefixes from openclaw-* to hermes-*
- Update agent skill references in refactoring and analysis docs
- Rename OPENCLAW_AITBC_MASTERY_PLAN.md to reflect hermes branding
- Update CLI examples and command references throughout documentation
2026-05-07 11:42:06 +02:00

20 KiB

Distributed AI Training for hermes Agents

Level: Advanced
Prerequisites: All intermediate scenarios recommended
Estimated Time: 60 minutes
Last Updated: 2026-05-02
Version: 1.0

🧭 Navigation Path:

🏠 Documentation Home🎭 Agent ScenariosYou are here

breadcrumb: Home → Scenarios → Distributed AI Training


🎯 See Also:


📚 Scenario Overview

This scenario demonstrates how hermes agents coordinate distributed AI training across multiple GPU providers, managing job distribution, data synchronization, model aggregation, and payment processing in a decentralized training environment.

Use Case

An hermes agent coordinates distributed AI training to:

  • Distribute training jobs across multiple GPUs
  • Synchronize training data and gradients
  • Aggregate model updates from workers
  • Manage payments to GPU providers
  • Monitor training progress and quality
  • Handle fault tolerance and recovery

What You'll Learn

  • Coordinate distributed AI training
  • Manage multi-GPU job distribution
  • Synchronize training across nodes
  • Aggregate model updates
  • Handle distributed training payments
  • Implement fault tolerance

Features Combined

  • AI Job Submission (Scenario 07)
  • GPU Marketplace (Scenario 09)
  • Messaging (Scenario 04)
  • IPFS Storage (Scenario 11)
  • Transaction Sending (Scenario 02)
  • Monitoring (Scenario 15)

📋 Prerequisites

Knowledge Required

  • Completed all intermediate scenarios (recommended)
  • Understanding of distributed AI training
  • GPU computing concepts

Tools Required

  • AITBC CLI installed
  • Python 3.13+
  • Wallet for payment operations
  • Access to AI engine and marketplace

Setup Required

  • AI engine accessible
  • Marketplace with GPU providers
  • IPFS service running

🔧 Step-by-Step Workflow

Step 1: Initialize Distributed Training

Set up distributed training coordinator.

aitbc distributed-training init \
  --wallet my-agent-wallet \
  --model-type transformer \
  --dataset-size 100GB \
  --target-workers 10

Output:

Distributed training initialized
Training ID: training_abc123...
Model Type: transformer
Dataset Size: 100GB
Target Workers: 10
Status: initializing

Step 2: Upload Training Data

Upload dataset to IPFS for distributed access.

aitbc distributed-training upload-data \
  --training-id training_abc123... \
  --data-path /path/to/dataset

Step 3: Discover and Select Workers

Find available GPU workers for training.

aitbc distributed-training discover-workers \
  --training-id training_abc123... \
  --min-gpu RTX3090 \
  --required-workers 10

Step 4: Distribute Training Jobs

Assign training tasks to selected workers.

aitbc distributed-training distribute \
  --training-id training_abc123... \
  --strategy data-parallel

Step 5: Monitor Training Progress

Track distributed training metrics.

aitbc distributed-training monitor --training-id training_abc123...

💻 Code Examples Using Agent SDK

Example 1: Initialize Distributed Training

from aitbc_agent_sdk import Agent, AgentConfig

config = AgentConfig(
    name="distributed-training",
    blockchain_network="mainnet",
    wallet_name="training-wallet"
)

agent = Agent(config)
agent.start()

# Initialize distributed training
training = agent.initialize_distributed_training(
    model_type="transformer",
    dataset_size=100,
    target_workers=10
)

print(f"Distributed training: {training['training_id']}")

# Upload data to IPFS
data_cid = agent.upload_to_ipfs(
    path="/path/to/dataset",
    pin=True
)

print(f"Dataset IPFS CID: {data_cid}")

Example 2: Distributed Training Coordinator

from aitbc_agent_sdk import Agent, AgentConfig
import asyncio

class DistributedTrainingCoordinator:
    def __init__(self, config):
        self.agent = Agent(config)
        self.training_id = None
        self.workers = {}
    
    async def start(self):
        await self.agent.start()
        await self.initialize_training()
        await self.run_training_coordinator()
    
    async def initialize_training(self):
        """Initialize distributed training"""
        training = await self.agent.initialize_distributed_training(
            model_type="transformer",
            dataset_size=100,
            target_workers=10
        )
        self.training_id = training['training_id']
        
        # Upload dataset to IPFS
        data_cid = await self.agent.upload_to_ipfs(
            path="/path/to/dataset",
            pin=True
        )
        
        print(f"Training initialized: {self.training_id}")
        print(f"Dataset CID: {data_cid}")
    
    async def run_training_coordinator(self):
        """Run distributed training coordination"""
        while True:
            # Check training status
            status = await self.agent.get_training_status(self.training_id)
            
            if status['phase'] == 'initializing':
                await self.discover_and_select_workers()
            elif status['phase'] == 'distributing':
                await self.distribute_training_jobs()
            elif status['phase'] == 'training':
                await self.monitor_training_progress()
            elif status['phase'] == 'aggregating':
                await self.aggregate_model_updates()
            elif status['phase'] == 'complete':
                await self.finalize_training()
                break
            elif status['phase'] == 'failed':
                await self.handle_failure()
                break
            
            await asyncio.sleep(30)
    
    async def discover_and_select_workers(self):
        """Discover and select GPU workers"""
        # Get available GPU workers
        workers = await self.agent.discover_gpu_workers(
            min_gpu="RTX3090",
            required_workers=10
        )
        
        print(f"Discovered {len(workers)} workers")
        
        # Select best workers based on criteria
        selected = await self.select_best_workers(workers, count=10)
        
        # Negotiate contracts with workers
        for worker in selected:
            contract = await self.agent.negotiate_worker_contract(
                worker_id=worker['worker_id'],
                training_id=self.training_id,
                price=worker['price']
            )
            
            self.workers[worker['worker_id']] = {
                'contract': contract,
                'status': 'ready'
            }
        
        print(f"Selected {len(selected)} workers")
        
        # Update training phase
        await self.agent.update_training_phase(
            training_id=self.training_id,
            phase='distributing'
        )
    
    async def select_best_workers(self, workers, count):
        """Select best workers based on criteria"""
        # Score workers based on multiple factors
        scored = []
        
        for worker in workers:
            score = 0
            
            # GPU performance score
            gpu_score = self.get_gpu_score(worker['gpu_type'])
            score += gpu_score * 0.4
            
            # Price score (lower is better)
            price_score = 1 / (worker['price'] + 1)
            score += price_score * 0.3
            
            # Reputation score
            rep_score = worker['reputation'] / 5.0
            score += rep_score * 0.2
            
            # Availability score
            avail_score = worker['availability']
            score += avail_score * 0.1
            
            scored.append({
                'worker': worker,
                'score': score
            })
        
        # Sort by score and select top count
        scored.sort(key=lambda x: x['score'], reverse=True)
        return [s['worker'] for s in scored[:count]]
    
    def get_gpu_score(self, gpu_type):
        """Get GPU performance score"""
        scores = {
            'RTX4090': 1.0,
            'RTX3090': 0.85,
            'RTX3080': 0.75,
            'RTX3070': 0.65
        }
        return scores.get(gpu_type, 0.5)
    
    async def distribute_training_jobs(self):
        """Distribute training jobs to workers"""
        # Divide dataset into shards
        shards = await self.agent.create_data_shards(
            training_id=self.training_id,
            num_shards=len(self.workers)
        )
        
        # Assign shards to workers
        for i, (worker_id, worker_data) in enumerate(self.workers.items()):
            # Send training job to worker
            job = await self.agent.send_training_job(
                worker_id=worker_id,
                training_id=self.training_id,
                shard_id=shards[i]['shard_id'],
                config={
                    'batch_size': 32,
                    'learning_rate': 0.001,
                    'epochs': 10
                }
            )
            
            worker_data['job_id'] = job['job_id']
            worker_data['status'] = 'training'
            
            print(f"Sent job to worker {worker_id}: {job['job_id']}")
        
        # Update training phase
        await self.agent.update_training_phase(
            training_id=self.training_id,
            phase='training'
        )
    
    async def monitor_training_progress(self):
        """Monitor training progress across workers"""
        total_loss = 0
        completed_workers = 0
        
        for worker_id, worker_data in self.workers.items():
            # Get worker progress
            progress = await self.agent.get_worker_progress(
                job_id=worker_data['job_id']
            )
            
            print(f"Worker {worker_id}: {progress['progress']}% complete")
            print(f"  Loss: {progress['loss']}")
            print(f"  Epoch: {progress['epoch']}")
            
            total_loss += progress['loss']
            
            if progress['status'] == 'complete':
                completed_workers += 1
                worker_data['status'] = 'complete'
                
                # Collect model checkpoint
                checkpoint = await self.agent.collect_checkpoint(
                    worker_id=worker_id,
                    job_id=worker_data['job_id']
                )
                worker_data['checkpoint'] = checkpoint
        
        # Check if all workers complete
        if completed_workers == len(self.workers):
            await self.agent.update_training_phase(
                training_id=self.training_id,
                phase='aggregating'
            )
    
    async def aggregate_model_updates(self):
        """Aggregate model updates from all workers"""
        # Collect all checkpoints
        checkpoints = [
            w['checkpoint'] for w in self.workers.values()
            if 'checkpoint' in w
        ]
        
        # Perform federated averaging
        aggregated_model = await self.agent.federated_average(
            checkpoints=checkpoints,
            weights=[1.0 / len(checkpoints)] * len(checkpoints)
        )
        
        print(f"Aggregated model: {aggregated_model['model_id']}")
        
        # Upload aggregated model to IPFS
        model_cid = await self.agent.upload_to_ipfs(
            data=aggregated_model['model'],
            pin=True
        )
        
        # Save training results
        await self.agent.save_training_results(
            training_id=self.training_id,
            model_cid=model_cid,
            metrics={
                'final_loss': aggregated_model['loss'],
                'workers': len(self.workers),
                'epochs': 10
            }
        )
        
        # Update training phase
        await self.agent.update_training_phase(
            training_id=self.training_id,
            phase='complete'
        )
    
    async def finalize_training(self):
        """Finalize training and process payments"""
        # Pay workers
        for worker_id, worker_data in self.workers.items():
            await self.agent.process_payment(
                worker_id=worker_id,
                contract_id=worker_data['contract']['contract_id'],
                amount=worker_data['contract']['price']
            )
            
            print(f"Paid worker {worker_id}")
        
        print("Training complete and payments processed")
    
    async def handle_failure(self):
        """Handle training failure"""
        # Identify failed workers
        failed = [
            wid for wid, wdata in self.workers.items()
            if wdata['status'] == 'failed'
        ]
        
        print(f"Failed workers: {failed}")
        
        # Attempt recovery or reassignment
        if len(failed) < len(self.workers) / 2:
            # Can recover with remaining workers
            await self.reassign_failed_jobs(failed)
        else:
            # Training failed, refund payments
            await self.refund_payments()

async def main():
    config = AgentConfig(
        name="distributed-training",
        blockchain_network="mainnet",
        wallet_name="training-wallet"
    )
    
    coordinator = DistributedTrainingCoordinator(config)
    await coordinator.start()

asyncio.run(main())

Example 3: Fault-Tolerant Training

from aitbc_agent_sdk import Agent, AgentConfig
import asyncio

class FaultTolerantTraining:
    def __init__(self, config):
        self.agent = Agent(config)
    
    async def start(self):
        await self.agent.start()
        await self.run_fault_tolerant_training()
    
    async def run_fault_tolerant_training(self):
        """Run fault-tolerant distributed training"""
        while True:
            # Monitor worker health
            await self.monitor_worker_health()
            
            # Handle stragglers
            await self.handle_stragglers()
            
            # Implement checkpoint recovery
            await self.checkpoint_recovery()
            
            # Dynamic worker scaling
            await self.dynamic_scaling()
            
            await asyncio.sleep(60)
    
    async def monitor_worker_health(self):
        """Monitor health of all workers"""
        workers = await self.agent.get_all_workers()
        
        for worker in workers:
            # Check if worker is responsive
            if not await self.agent.ping_worker(worker['worker_id']):
                print(f"Worker {worker['worker_id']} unresponsive")
                
                # Mark as failed
                await self.agent.mark_worker_failed(worker['worker_id'])
                
                # Reassign work
                await self.reassign_worker_work(worker['worker_id'])
    
    async def reassign_worker_work(self, failed_worker_id):
        """Reassign work from failed worker"""
        # Get failed worker's current job
        job = await self.agent.get_worker_job(failed_worker_id)
        
        if job:
            # Find replacement worker
            replacement = await self.agent.find_replacement_worker(
                gpu_type=job['gpu_type']
            )
            
            if replacement:
                # Reassign job to replacement
                await self.agent.reassign_job(
                    job_id=job['job_id'],
                    new_worker_id=replacement['worker_id']
                )
                
                print(f"Reassigned job to {replacement['worker_id']}")
    
    async def handle_stragglers(self):
        """Handle slow workers (stragglers)"""
        workers = await self.agent.get_all_workers()
        
        # Calculate average progress
        progress = []
        for worker in workers:
            p = await self.agent.get_worker_progress(worker['job_id'])
            progress.append(p['progress'])
        
        avg_progress = sum(progress) / len(progress)
        
        # Identify stragglers (progress < avg - 20%)
        stragglers = []
        for worker, prog in zip(workers, progress):
            if prog < avg_progress - 20:
                stragglers.append(worker)
        
        # Handle stragglers
        for straggler in stragglers:
            print(f"Straggler detected: {straggler['worker_id']}")
            
            # Option 1: Reduce workload
            await self.agent.reduce_worker_workload(straggler['worker_id'])
            
            # Option 2: Reassign partial work
            await self.agent.reassign_partial_work(straggler['worker_id'])
    
    async def checkpoint_recovery(self):
        """Implement checkpoint-based recovery"""
        # Get latest checkpoints from all workers
        checkpoints = await self.agent.get_all_checkpoints()
        
        # Verify checkpoint integrity
        for checkpoint in checkpoints:
            if not await self.agent.verify_checkpoint(checkpoint):
                print(f"Checkpoint corrupted: {checkpoint['checkpoint_id']}")
                
                # Request re-upload from worker
                await self.agent.request_checkpoint_resend(
                    worker_id=checkpoint['worker_id']
                )
    
    async def dynamic_scaling(self):
        """Dynamically scale worker count based on needs"""
        # Get training progress
        progress = await self.agent.get_training_progress()
        
        # Get available workers
        available = await self.agent.get_available_workers()
        
        # If progress is slow and workers available, add workers
        if progress['speed'] < progress['target_speed'] * 0.7:
            if len(available) > 0:
                # Add worker
                new_worker = available[0]
                await self.agent.add_worker(
                    worker_id=new_worker['worker_id']
                )
                print(f"Added worker: {new_worker['worker_id']}")
        
        # If progress is fast, can reduce workers
        elif progress['speed'] > progress['target_speed'] * 1.3:
            workers = await self.agent.get_active_workers()
            if len(workers) > 5:  # Keep minimum 5
                # Remove slowest worker
                slowest = min(workers, key=lambda x: x['speed'])
                await self.agent.remove_worker(slowest['worker_id'])
                print(f"Removed worker: {slowest['worker_id']}")

async def main():
    config = AgentConfig(
        name="fault-tolerant",
        blockchain_network="mainnet",
        wallet_name="ft-wallet"
    )
    
    ft = FaultTolerantTraining(config)
    await ft.start()

asyncio.run(main())

🎯 Expected Outcomes

After completing this scenario, you should be able to:

  • Coordinate distributed AI training
  • Manage multi-GPU job distribution
  • Synchronize training across nodes
  • Aggregate model updates
  • Handle distributed training payments
  • Implement fault tolerance

🧪 Validation

Validate this scenario with the shared 3-node harness:

bash scripts/workflow/44_comprehensive_multi_node_scenario.sh

Node coverage:

  • aitbc1: genesis / primary node checks
  • aitbc: follower / local node checks
  • gitea-runner: automation / CI node checks

Validation guide:

Expected result:

  • Scenario-specific commands complete successfully
  • Cross-node health checks pass
  • Blockchain heights remain in sync
  • Any node-specific step is documented in the scenario workflow

AITBC Documentation

External Resources

Next Scenarios


📊 Quality Metrics

  • Structure: 10/10 - Clear distributed training workflow
  • Content: 10/10 - Comprehensive distributed operations
  • Code Examples: 10/10 - Working Agent SDK examples
  • Status: Active scenario

Last updated: 2026-05-02
Version: 1.0
Status: Active scenario document