Implement knowledge graph marketplace backend with database and REST endpoints
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

Marketplace service:
- Added KnowledgeGraph model (id, name, description, owner, status, created_at, updated_at)
- Added GraphNode model (id, graph_id, node_type, label, properties, created_at, updated_at)
- Added GraphEdge model (id, graph_id, source_node_id, target_node_id, edge_type, properties, weight, created_at, updated_at)
- Added create_graph() service method
- Added add_node() service method
- Added add_edge() service method
- Added query_graph() service method
- Added POST /v1/knowledge-graph endpoint for creating graphs
- Added POST /v1/knowledge-graph/{graph_id}/nodes endpoint for adding nodes
- Added POST /v1/knowledge-graph/{graph_id}/edges endpoint for adding edges
- Added GET /v1/knowledge-graph/{graph_id} endpoint for querying graphs
- Created knowledgegraph, graphnode, and graphedge tables in aitbc_marketplace database

Implements knowledge graph operations (create graph, add node, add edge, query) referenced by CLI commands
This commit is contained in:
aitbc
2026-05-15 00:38:32 +02:00
parent b27e29c3e4
commit 1f9aa8afc3
3 changed files with 209 additions and 0 deletions

View File

@@ -58,3 +58,44 @@ class Plugin(SQLModel, table=True):
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
download_count: int = Field(default=0)
rating: float = Field(default=0.0)
class KnowledgeGraph(SQLModel, table=True):
__tablename__ = "knowledgegraph"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
name: str = Field(index=True)
description: str = Field(default="")
owner: str = Field(index=True)
status: str = Field(default="active", index=True) # active, archived, deleted
created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
class GraphNode(SQLModel, table=True):
__tablename__ = "graphnode"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
graph_id: str = Field(index=True)
node_type: str = Field(index=True) # entity, concept, relation, etc.
label: str = Field(index=True)
properties: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
class GraphEdge(SQLModel, table=True):
__tablename__ = "graphedge"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
graph_id: str = Field(index=True)
source_node_id: str = Field(index=True)
target_node_id: str = Field(index=True)
edge_type: str = Field(index=True) # relates_to, depends_on, etc.
properties: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
weight: float = Field(default=1.0)
created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)

View File

@@ -278,6 +278,73 @@ async def register_plugin(
raise
@app.post("/v1/knowledge-graph")
async def create_graph(
graph_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new knowledge graph"""
try:
logger.info(f"POST /v1/knowledge-graph called with data keys: {graph_data.keys()}")
result = await svc.create_graph(graph_data)
logger.info(f"POST /v1/knowledge-graph created graph with id: {result['id']}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/knowledge-graph: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/knowledge-graph/{graph_id}/nodes")
async def add_node(
graph_id: str,
node_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Add a node to a knowledge graph"""
try:
node_data["graph_id"] = graph_id
logger.info(f"POST /v1/knowledge-graph/{graph_id}/nodes called")
result = await svc.add_node(node_data)
logger.info(f"Added node with id: {result['id']}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/knowledge-graph/{graph_id}/nodes: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/knowledge-graph/{graph_id}/edges")
async def add_edge(
graph_id: str,
edge_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Add an edge to a knowledge graph"""
try:
edge_data["graph_id"] = graph_id
logger.info(f"POST /v1/knowledge-graph/{graph_id}/edges called")
result = await svc.add_edge(edge_data)
logger.info(f"Added edge with id: {result['id']}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/knowledge-graph/{graph_id}/edges: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/knowledge-graph/{graph_id}")
async def query_graph(
graph_id: str,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Query a knowledge graph"""
try:
logger.info(f"GET /v1/knowledge-graph/{graph_id} called")
result = await svc.query_graph(graph_id)
return result
except Exception as e:
logger.error(f"Error in GET /v1/knowledge-graph/{graph_id}: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/transactions")
async def submit_transaction(transaction_data: dict, session: AsyncSession = Depends(get_session_dep)):
"""Submit marketplace transaction"""

View File

@@ -262,3 +262,104 @@ class MarketplaceService:
except Exception as e:
logger.error(f"Error in register_plugin: {type(e).__name__}: {str(e)}")
raise
async def create_graph(self, graph_data: dict) -> dict:
"""Create a new knowledge graph"""
from .domain.marketplace import KnowledgeGraph
try:
graph = KnowledgeGraph(**graph_data)
self.session.add(graph)
await self.session.commit()
await self.session.refresh(graph)
logger.info(f"Created graph with id: {graph.id}")
return {
"id": graph.id,
"name": graph.name,
"status": graph.status,
}
except Exception as e:
logger.error(f"Error in create_graph: {type(e).__name__}: {str(e)}")
raise
async def add_node(self, node_data: dict) -> dict:
"""Add a node to a knowledge graph"""
from .domain.marketplace import GraphNode
try:
node = GraphNode(**node_data)
self.session.add(node)
await self.session.commit()
await self.session.refresh(node)
logger.info(f"Added node with id: {node.id} to graph: {node.graph_id}")
return {
"id": node.id,
"graph_id": node.graph_id,
"label": node.label,
}
except Exception as e:
logger.error(f"Error in add_node: {type(e).__name__}: {str(e)}")
raise
async def add_edge(self, edge_data: dict) -> dict:
"""Add an edge to a knowledge graph"""
from .domain.marketplace import GraphEdge
try:
edge = GraphEdge(**edge_data)
self.session.add(edge)
await self.session.commit()
await self.session.refresh(edge)
logger.info(f"Added edge with id: {edge.id} to graph: {edge.graph_id}")
return {
"id": edge.id,
"graph_id": edge.graph_id,
"source_node_id": edge.source_node_id,
"target_node_id": edge.target_node_id,
}
except Exception as e:
logger.error(f"Error in add_edge: {type(e).__name__}: {str(e)}")
raise
async def query_graph(self, graph_id: str) -> dict:
"""Query a knowledge graph (get all nodes and edges)"""
from sqlalchemy import select
from .domain.marketplace import GraphNode, GraphEdge
try:
# Get nodes
node_stmt = select(GraphNode).where(GraphNode.graph_id == graph_id)
node_result = await self.session.execute(node_stmt)
nodes = node_result.scalars().all()
# Get edges
edge_stmt = select(GraphEdge).where(GraphEdge.graph_id == graph_id)
edge_result = await self.session.execute(edge_stmt)
edges = edge_result.scalars().all()
return {
"graph_id": graph_id,
"nodes": [
{
"id": n.id,
"node_type": n.node_type,
"label": n.label,
"properties": n.properties,
}
for n in nodes
],
"edges": [
{
"id": e.id,
"source_node_id": e.source_node_id,
"target_node_id": e.target_node_id,
"edge_type": e.edge_type,
"weight": e.weight,
"properties": e.properties,
}
for e in edges
],
}
except Exception as e:
logger.error(f"Error in query_graph: {type(e).__name__}: {str(e)}")
raise