refactor: consolidate blockchain explorer into single app and update backup ignore patterns
- Remove standalone explorer-web app (README, HTML, package files) - Add /web endpoint to blockchain-explorer for web interface access - Update .gitignore to exclude application backup archives (*.tar.gz, *.zip) - Add backup documentation files to .gitignore (BACKUP_INDEX.md, README.md) - Consolidate explorer functionality into main blockchain-explorer application
This commit is contained in:
96
apps/wallet/src/app/keystore/service.py
Normal file
96
apps/wallet/src/app/keystore/service.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
|
||||
from secrets import token_bytes
|
||||
|
||||
from nacl.signing import SigningKey
|
||||
|
||||
from ..crypto.encryption import EncryptionSuite, EncryptionError
|
||||
from ..security import validate_password_rules, wipe_buffer
|
||||
|
||||
|
||||
@dataclass
|
||||
class WalletRecord:
|
||||
wallet_id: str
|
||||
public_key: str
|
||||
salt: bytes
|
||||
nonce: bytes
|
||||
ciphertext: bytes
|
||||
metadata: Dict[str, str]
|
||||
|
||||
|
||||
class KeystoreService:
|
||||
"""In-memory keystore with Argon2id + XChaCha20-Poly1305 encryption."""
|
||||
|
||||
def __init__(self, encryption: Optional[EncryptionSuite] = None) -> None:
|
||||
self._wallets: Dict[str, WalletRecord] = {}
|
||||
self._encryption = encryption or EncryptionSuite()
|
||||
|
||||
def list_wallets(self) -> List[str]:
|
||||
return list(self._wallets.keys())
|
||||
|
||||
def list_records(self) -> Iterable[WalletRecord]:
|
||||
return list(self._wallets.values())
|
||||
|
||||
def get_wallet(self, wallet_id: str) -> Optional[WalletRecord]:
|
||||
return self._wallets.get(wallet_id)
|
||||
|
||||
def create_wallet(
|
||||
self,
|
||||
wallet_id: str,
|
||||
password: str,
|
||||
secret: Optional[bytes] = None,
|
||||
metadata: Optional[Dict[str, str]] = None,
|
||||
) -> WalletRecord:
|
||||
if wallet_id in self._wallets:
|
||||
raise ValueError("wallet already exists")
|
||||
|
||||
validate_password_rules(password)
|
||||
|
||||
metadata_map = {str(k): str(v) for k, v in (metadata or {}).items()}
|
||||
|
||||
if secret is None:
|
||||
signing_key = SigningKey.generate()
|
||||
secret_bytes = signing_key.encode()
|
||||
else:
|
||||
if len(secret) != SigningKey.seed_size:
|
||||
raise ValueError("secret key must be 32 bytes")
|
||||
secret_bytes = secret
|
||||
signing_key = SigningKey(secret_bytes)
|
||||
|
||||
salt = token_bytes(self._encryption.salt_bytes)
|
||||
nonce = token_bytes(self._encryption.nonce_bytes)
|
||||
ciphertext = self._encryption.encrypt(password=password, plaintext=secret_bytes, salt=salt, nonce=nonce)
|
||||
record = WalletRecord(
|
||||
wallet_id=wallet_id,
|
||||
public_key=signing_key.verify_key.encode().hex(),
|
||||
salt=salt,
|
||||
nonce=nonce,
|
||||
ciphertext=ciphertext,
|
||||
metadata=metadata_map,
|
||||
)
|
||||
self._wallets[wallet_id] = record
|
||||
return record
|
||||
|
||||
def unlock_wallet(self, wallet_id: str, password: str) -> bytes:
|
||||
record = self._wallets.get(wallet_id)
|
||||
if record is None:
|
||||
raise KeyError("wallet not found")
|
||||
try:
|
||||
return self._encryption.decrypt(password=password, ciphertext=record.ciphertext, salt=record.salt, nonce=record.nonce)
|
||||
except EncryptionError as exc:
|
||||
raise ValueError("failed to decrypt wallet") from exc
|
||||
|
||||
def delete_wallet(self, wallet_id: str) -> bool:
|
||||
return self._wallets.pop(wallet_id, None) is not None
|
||||
|
||||
def sign_message(self, wallet_id: str, password: str, message: bytes) -> bytes:
|
||||
secret_bytes = bytearray(self.unlock_wallet(wallet_id, password))
|
||||
try:
|
||||
signing_key = SigningKey(bytes(secret_bytes))
|
||||
signed = signing_key.sign(message)
|
||||
return signed.signature
|
||||
finally:
|
||||
wipe_buffer(secret_bytes)
|
||||
Reference in New Issue
Block a user