Files
aitbc/apps/wallet/src/app/keystore/service.py
oib 15427c96c0 chore: update file permissions to executable across repository
- Change file mode from 644 to 755 for all project files
- Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet"
- Rename Miner.extra_meta_data to extra_metadata for consistency
2026-03-06 22:17:54 +01:00

97 lines
3.2 KiB
Python
Executable File

from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
from secrets import token_bytes
from nacl.signing import SigningKey
from ..crypto.encryption import EncryptionSuite, EncryptionError
from ..security import validate_password_rules, wipe_buffer
@dataclass
class WalletRecord:
wallet_id: str
public_key: str
salt: bytes
nonce: bytes
ciphertext: bytes
metadata: Dict[str, str]
class KeystoreService:
"""In-memory keystore with Argon2id + XChaCha20-Poly1305 encryption."""
def __init__(self, encryption: Optional[EncryptionSuite] = None) -> None:
self._wallets: Dict[str, WalletRecord] = {}
self._encryption = encryption or EncryptionSuite()
def list_wallets(self) -> List[str]:
return list(self._wallets.keys())
def list_records(self) -> Iterable[WalletRecord]:
return list(self._wallets.values())
def get_wallet(self, wallet_id: str) -> Optional[WalletRecord]:
return self._wallets.get(wallet_id)
def create_wallet(
self,
wallet_id: str,
password: str,
secret: Optional[bytes] = None,
metadata: Optional[Dict[str, str]] = None,
) -> WalletRecord:
if wallet_id in self._wallets:
raise ValueError("wallet already exists")
validate_password_rules(password)
metadata_map = {str(k): str(v) for k, v in (metadata or {}).items()}
if secret is None:
signing_key = SigningKey.generate()
secret_bytes = signing_key.encode()
else:
if len(secret) != SigningKey.seed_size:
raise ValueError("secret key must be 32 bytes")
secret_bytes = secret
signing_key = SigningKey(secret_bytes)
salt = token_bytes(self._encryption.salt_bytes)
nonce = token_bytes(self._encryption.nonce_bytes)
ciphertext = self._encryption.encrypt(password=password, plaintext=secret_bytes, salt=salt, nonce=nonce)
record = WalletRecord(
wallet_id=wallet_id,
public_key=signing_key.verify_key.encode().hex(),
salt=salt,
nonce=nonce,
ciphertext=ciphertext,
metadata=metadata_map,
)
self._wallets[wallet_id] = record
return record
def unlock_wallet(self, wallet_id: str, password: str) -> bytes:
record = self._wallets.get(wallet_id)
if record is None:
raise KeyError("wallet not found")
try:
return self._encryption.decrypt(password=password, ciphertext=record.ciphertext, salt=record.salt, nonce=record.nonce)
except EncryptionError as exc:
raise ValueError("failed to decrypt wallet") from exc
def delete_wallet(self, wallet_id: str) -> bool:
return self._wallets.pop(wallet_id, None) is not None
def sign_message(self, wallet_id: str, password: str, message: bytes) -> bytes:
secret_bytes = bytearray(self.unlock_wallet(wallet_id, password))
try:
signing_key = SigningKey(bytes(secret_bytes))
signed = signing_key.sign(message)
return signed.signature
finally:
wipe_buffer(secret_bytes)