chore: enhance security configuration across applications

- Add root-level *.json to .gitignore to prevent wallet backup leaks
- Replace wildcard CORS origins with explicit localhost URLs across all apps
- Add OPTIONS method to CORS allowed methods for preflight requests
- Update coordinator database to use absolute path in data/ directory to prevent duplicates
- Add JWT secret validation in coordinator config (must be set via environment)
- Replace deprecated get_session dependency with Session
This commit is contained in:
oib
2026-02-13 16:07:03 +01:00
parent e9646cc7dd
commit c984a1e052
13 changed files with 434 additions and 120 deletions

View File

@@ -9,7 +9,70 @@ import yaml
from pathlib import Path
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from ..utils import output, error, success
from ..utils import output, error, success, encrypt_value, decrypt_value
import getpass
def _get_wallet_password(wallet_name: str) -> str:
"""Get or prompt for wallet encryption password"""
# Try to get from keyring first
try:
import keyring
password = keyring.get_password("aitbc-wallet", wallet_name)
if password:
return password
except:
pass
# Prompt for password
while True:
password = getpass.getpass(f"Enter password for wallet '{wallet_name}': ")
if not password:
error("Password cannot be empty")
continue
confirm = getpass.getpass("Confirm password: ")
if password != confirm:
error("Passwords do not match")
continue
# Store in keyring for future use
try:
import keyring
keyring.set_password("aitbc-wallet", wallet_name, password)
except:
pass
return password
def _save_wallet(wallet_path: Path, wallet_data: Dict[str, Any], password: str = None):
"""Save wallet with encrypted private key"""
# Encrypt private key if provided
if password and 'private_key' in wallet_data:
wallet_data['private_key'] = encrypt_value(wallet_data['private_key'], password)
wallet_data['encrypted'] = True
# Save wallet
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
def _load_wallet(wallet_path: Path, wallet_name: str) -> Dict[str, Any]:
"""Load wallet and decrypt private key if needed"""
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
# Decrypt private key if encrypted
if wallet_data.get('encrypted') and 'private_key' in wallet_data:
password = _get_wallet_password(wallet_name)
try:
wallet_data['private_key'] = decrypt_value(wallet_data['private_key'], password)
except Exception:
error("Invalid password for wallet")
raise click.Abort()
return wallet_data
@click.group()
@@ -56,8 +119,9 @@ def wallet(ctx, wallet_name: Optional[str], wallet_path: Optional[str]):
@wallet.command()
@click.argument('name')
@click.option('--type', 'wallet_type', default='hd', help='Wallet type (hd, simple)')
@click.option('--no-encrypt', is_flag=True, help='Skip wallet encryption (not recommended)')
@click.pass_context
def create(ctx, name: str, wallet_type: str):
def create(ctx, name: str, wallet_type: str, no_encrypt: bool):
"""Create a new wallet"""
wallet_dir = ctx.obj['wallet_dir']
wallet_path = wallet_dir / f"{name}.json"
@@ -70,10 +134,29 @@ def create(ctx, name: str, wallet_type: str):
if wallet_type == 'hd':
# Hierarchical Deterministic wallet
import secrets
seed = secrets.token_hex(32)
address = f"aitbc1{seed[:40]}"
private_key = f"0x{seed}"
public_key = f"0x{secrets.token_hex(32)}"
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, NoEncryption, PrivateFormat
import base64
# Generate private key
private_key_bytes = secrets.token_bytes(32)
private_key = f"0x{private_key_bytes.hex()}"
# Derive public key from private key using ECDSA
priv_key = ec.derive_private_key(int.from_bytes(private_key_bytes, 'big'), ec.SECP256K1())
pub_key = priv_key.public_key()
pub_key_bytes = pub_key.public_bytes(
encoding=Encoding.X962,
format=PublicFormat.UncompressedPoint
)
public_key = f"0x{pub_key_bytes.hex()}"
# Generate address from public key (simplified)
digest = hashes.Hash(hashes.SHA256())
digest.update(pub_key_bytes)
address_hash = digest.finalize()
address = f"aitbc1{address_hash[:20].hex()}"
else:
# Simple wallet
import secrets
@@ -92,9 +175,14 @@ def create(ctx, name: str, wallet_type: str):
"transactions": []
}
# Get password for encryption unless skipped
password = None
if not no_encrypt:
success("Wallet encryption is enabled. Your private key will be encrypted at rest.")
password = _get_wallet_password(name)
# Save wallet
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
_save_wallet(wallet_path, wallet_data, password)
success(f"Wallet '{name}' created successfully")
output({
@@ -123,13 +211,16 @@ def list(ctx):
for wallet_file in wallet_dir.glob("*.json"):
with open(wallet_file, 'r') as f:
wallet_data = json.load(f)
wallets.append({
wallet_info = {
"name": wallet_data['wallet_id'],
"type": wallet_data.get('type', 'simple'),
"address": wallet_data['address'],
"created_at": wallet_data['created_at'],
"active": wallet_data['wallet_id'] == active_wallet
})
}
if wallet_data.get('encrypted'):
wallet_info['encrypted'] = True
wallets.append(wallet_info)
output(wallets, ctx.obj.get('output_format', 'table'))
@@ -163,9 +254,11 @@ def switch(ctx, name: str):
yaml.dump(config, f, default_flow_style=False)
success(f"Switched to wallet '{name}'")
# Load wallet to get address (will handle encryption)
wallet_data = _load_wallet(wallet_path, name)
output({
"active_wallet": name,
"address": json.load(open(wallet_path))['address']
"address": wallet_data['address']
}, ctx.obj.get('output_format', 'table'))
@@ -255,7 +348,8 @@ def restore(ctx, backup_path: str, name: str, force: bool):
wallet_data['wallet_id'] = name
wallet_data['restored_at'] = datetime.utcnow().isoformat() + "Z"
# Save restored wallet
# Save restored wallet (preserve encryption state)
# If wallet was encrypted, we save it as-is (still encrypted with original password)
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
@@ -279,8 +373,7 @@ def info(ctx):
error(f"Wallet '{wallet_name}' not found. Use 'aitbc wallet create' to create one.")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
# Get active wallet from config
active_wallet = 'default'
@@ -317,22 +410,46 @@ def balance(ctx):
# Auto-create wallet if it doesn't exist
if not wallet_path.exists():
import secrets
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
# Generate proper key pair
private_key_bytes = secrets.token_bytes(32)
private_key = f"0x{private_key_bytes.hex()}"
# Derive public key from private key
priv_key = ec.derive_private_key(int.from_bytes(private_key_bytes, 'big'), ec.SECP256K1())
pub_key = priv_key.public_key()
pub_key_bytes = pub_key.public_bytes(
encoding=Encoding.X962,
format=PublicFormat.UncompressedPoint
)
public_key = f"0x{pub_key_bytes.hex()}"
# Generate address from public key
digest = hashes.Hash(hashes.SHA256())
digest.update(pub_key_bytes)
address_hash = digest.finalize()
address = f"aitbc1{address_hash[:20].hex()}"
wallet_data = {
"wallet_id": wallet_name,
"type": "simple",
"address": f"aitbc1{secrets.token_hex(20)}",
"public_key": f"0x{secrets.token_hex(32)}",
"private_key": f"0x{secrets.token_hex(32)}",
"address": address,
"public_key": public_key,
"private_key": private_key,
"created_at": datetime.utcnow().isoformat() + "Z",
"balance": 0.0,
"transactions": []
}
wallet_path.parent.mkdir(parents=True, exist_ok=True)
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
# Auto-create with encryption
success("Creating new wallet with encryption enabled")
password = _get_wallet_password(wallet_name)
_save_wallet(wallet_path, wallet_data, password)
else:
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
# Try to get balance from blockchain if available
if config:
@@ -377,8 +494,7 @@ def history(ctx, limit: int):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
transactions = wallet_data.get('transactions', [])[-limit:]
@@ -413,8 +529,7 @@ def earn(ctx, amount: float, job_id: str, desc: Optional[str]):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
# Add transaction
transaction = {
@@ -428,9 +543,11 @@ def earn(ctx, amount: float, job_id: str, desc: Optional[str]):
wallet_data['transactions'].append(transaction)
wallet_data['balance'] = wallet_data.get('balance', 0) + amount
# Save wallet
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(wallet_path, wallet_data, password)
success(f"Earnings added: {amount} AITBC")
output({
@@ -454,8 +571,7 @@ def spend(ctx, amount: float, description: str):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
balance = wallet_data.get('balance', 0)
if balance < amount:
@@ -474,9 +590,11 @@ def spend(ctx, amount: float, description: str):
wallet_data['transactions'].append(transaction)
wallet_data['balance'] = balance - amount
# Save wallet
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(wallet_path, wallet_data, password)
success(f"Spent: {amount} AITBC")
output({
@@ -498,8 +616,7 @@ def address(ctx):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
output({
"wallet": wallet_name,
@@ -522,8 +639,7 @@ def send(ctx, to_address: str, amount: float, description: Optional[str]):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
balance = wallet_data.get('balance', 0)
if balance < amount:
@@ -589,8 +705,11 @@ def send(ctx, to_address: str, amount: float, description: Optional[str]):
wallet_data['transactions'].append(transaction)
wallet_data['balance'] = balance - amount
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(wallet_path, wallet_data, password)
output({
"wallet": wallet_name,
@@ -615,8 +734,7 @@ def request_payment(ctx, to_address: str, amount: float, description: Optional[s
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
# Create payment request
request = {
@@ -645,8 +763,7 @@ def stats(ctx):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
transactions = wallet_data.get('transactions', [])
@@ -680,8 +797,7 @@ def stake(ctx, amount: float, duration: int):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
balance = wallet_data.get('balance', 0)
if balance < amount:
@@ -714,8 +830,11 @@ def stake(ctx, amount: float, duration: int):
"timestamp": datetime.now().isoformat()
})
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(wallet_path, wallet_data, password)
success(f"Staked {amount} AITBC for {duration} days")
output({
@@ -774,8 +893,11 @@ def unstake(ctx, stake_id: str):
"timestamp": datetime.now().isoformat()
})
with open(wallet_path, 'w') as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(wallet_path, wallet_data, password)
success(f"Unstaked {stake_record['amount']} AITBC + {rewards:.4f} rewards")
output({
@@ -800,8 +922,7 @@ def staking_info(ctx):
error(f"Wallet '{wallet_name}' not found")
return
with open(wallet_path, 'r') as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(wallet_path, wallet_name)
staking = wallet_data.get('staking', [])
active_stakes = [s for s in staking if s['status'] == 'active']
@@ -995,14 +1116,14 @@ def multisig_sign(ctx, wallet_name: str, tx_id: str, signer: str):
@click.pass_context
def liquidity_stake(ctx, amount: float, pool: str, lock_days: int):
"""Stake tokens into a liquidity pool"""
wallet_name = ctx.obj['wallet_name']
wallet_path = ctx.obj.get('wallet_path')
if not wallet_path or not Path(wallet_path).exists():
error("Wallet not found")
ctx.exit(1)
return
with open(wallet_path) as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(Path(wallet_path), wallet_name)
balance = wallet_data.get('balance', 0)
if balance < amount:
@@ -1051,8 +1172,11 @@ def liquidity_stake(ctx, amount: float, pool: str, lock_days: int):
"timestamp": now.isoformat()
})
with open(wallet_path, "w") as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(Path(wallet_path), wallet_data, password)
success(f"Staked {amount} AITBC into '{pool}' pool ({tier} tier, {apy}% APY)")
output({
@@ -1071,14 +1195,14 @@ def liquidity_stake(ctx, amount: float, pool: str, lock_days: int):
@click.pass_context
def liquidity_unstake(ctx, stake_id: str):
"""Withdraw from a liquidity pool with rewards"""
wallet_name = ctx.obj['wallet_name']
wallet_path = ctx.obj.get('wallet_path')
if not wallet_path or not Path(wallet_path).exists():
error("Wallet not found")
ctx.exit(1)
return
with open(wallet_path) as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(Path(wallet_path), wallet_name)
liquidity = wallet_data.get('liquidity', [])
record = next((r for r in liquidity if r["stake_id"] == stake_id and r["status"] == "active"), None)
@@ -1118,8 +1242,11 @@ def liquidity_unstake(ctx, stake_id: str):
"timestamp": datetime.now().isoformat()
})
with open(wallet_path, "w") as f:
json.dump(wallet_data, f, indent=2)
# Save wallet with encryption
password = None
if wallet_data.get('encrypted'):
password = _get_wallet_password(wallet_name)
_save_wallet(Path(wallet_path), wallet_data, password)
success(f"Withdrawn {total:.6f} AITBC (principal: {record['amount']}, rewards: {rewards:.6f})")
output({
@@ -1138,14 +1265,14 @@ def liquidity_unstake(ctx, stake_id: str):
@click.pass_context
def rewards(ctx):
"""View all earned rewards (staking + liquidity)"""
wallet_name = ctx.obj['wallet_name']
wallet_path = ctx.obj.get('wallet_path')
if not wallet_path or not Path(wallet_path).exists():
error("Wallet not found")
ctx.exit(1)
return
with open(wallet_path) as f:
wallet_data = json.load(f)
wallet_data = _load_wallet(Path(wallet_path), wallet_name)
staking = wallet_data.get('staking', [])
liquidity = wallet_data.get('liquidity', [])

View File

@@ -76,20 +76,40 @@ class AuditLogger:
return entries[-limit:]
def encrypt_value(value: str, key: str = None) -> str:
"""Simple XOR-based obfuscation for config values (not cryptographic security)"""
def _get_fernet_key(key: str = None) -> bytes:
"""Derive a Fernet key from a password or use default"""
from cryptography.fernet import Fernet
import base64
key = key or "aitbc_config_key_2026"
encrypted = bytes([ord(c) ^ ord(key[i % len(key)]) for i, c in enumerate(value)])
import hashlib
if key is None:
# Use a default key (should be overridden in production)
key = "aitbc_config_key_2026_default"
# Derive a 32-byte key suitable for Fernet
return base64.urlsafe_b64encode(hashlib.sha256(key.encode()).digest())
def encrypt_value(value: str, key: str = None) -> str:
"""Encrypt a value using Fernet symmetric encryption"""
from cryptography.fernet import Fernet
import base64
fernet_key = _get_fernet_key(key)
f = Fernet(fernet_key)
encrypted = f.encrypt(value.encode())
return base64.b64encode(encrypted).decode()
def decrypt_value(encrypted: str, key: str = None) -> str:
"""Decrypt an XOR-obfuscated config value"""
"""Decrypt a Fernet-encrypted value"""
from cryptography.fernet import Fernet
import base64
key = key or "aitbc_config_key_2026"
fernet_key = _get_fernet_key(key)
f = Fernet(fernet_key)
data = base64.b64decode(encrypted)
return ''.join(chr(b ^ ord(key[i % len(key)])) for i, b in enumerate(data))
return f.decrypt(data).decode()
def setup_logging(verbosity: int, debug: bool = False) -> str: