From 1f9aa8afc30651850d2dc3b53bcd43f1a235229c Mon Sep 17 00:00:00 2001 From: aitbc Date: Fri, 15 May 2026 00:38:32 +0200 Subject: [PATCH] Implement knowledge graph marketplace backend with database and REST endpoints Marketplace service: - Added KnowledgeGraph model (id, name, description, owner, status, created_at, updated_at) - Added GraphNode model (id, graph_id, node_type, label, properties, created_at, updated_at) - Added GraphEdge model (id, graph_id, source_node_id, target_node_id, edge_type, properties, weight, created_at, updated_at) - Added create_graph() service method - Added add_node() service method - Added add_edge() service method - Added query_graph() service method - Added POST /v1/knowledge-graph endpoint for creating graphs - Added POST /v1/knowledge-graph/{graph_id}/nodes endpoint for adding nodes - Added POST /v1/knowledge-graph/{graph_id}/edges endpoint for adding edges - Added GET /v1/knowledge-graph/{graph_id} endpoint for querying graphs - Created knowledgegraph, graphnode, and graphedge tables in aitbc_marketplace database Implements knowledge graph operations (create graph, add node, add edge, query) referenced by CLI commands --- .../marketplace_service/domain/marketplace.py | 41 +++++++ .../src/marketplace_service/main.py | 67 ++++++++++++ .../services/marketplace_service.py | 101 ++++++++++++++++++ 3 files changed, 209 insertions(+) diff --git a/apps/marketplace-service/src/marketplace_service/domain/marketplace.py b/apps/marketplace-service/src/marketplace_service/domain/marketplace.py index 774ef580..b8265353 100644 --- a/apps/marketplace-service/src/marketplace_service/domain/marketplace.py +++ b/apps/marketplace-service/src/marketplace_service/domain/marketplace.py @@ -58,3 +58,44 @@ class Plugin(SQLModel, table=True): updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) download_count: int = Field(default=0) rating: float = Field(default=0.0) + + +class KnowledgeGraph(SQLModel, table=True): + __tablename__ = "knowledgegraph" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True) + name: str = Field(index=True) + description: str = Field(default="") + owner: str = Field(index=True) + status: str = Field(default="active", index=True) # active, archived, deleted + created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, index=True) + updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) + + +class GraphNode(SQLModel, table=True): + __tablename__ = "graphnode" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True) + graph_id: str = Field(index=True) + node_type: str = Field(index=True) # entity, concept, relation, etc. + label: str = Field(index=True) + properties: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False)) + created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) + updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) + + +class GraphEdge(SQLModel, table=True): + __tablename__ = "graphedge" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True) + graph_id: str = Field(index=True) + source_node_id: str = Field(index=True) + target_node_id: str = Field(index=True) + edge_type: str = Field(index=True) # relates_to, depends_on, etc. + properties: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False)) + weight: float = Field(default=1.0) + created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) + updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False) diff --git a/apps/marketplace-service/src/marketplace_service/main.py b/apps/marketplace-service/src/marketplace_service/main.py index 5e452e3e..ae129d49 100644 --- a/apps/marketplace-service/src/marketplace_service/main.py +++ b/apps/marketplace-service/src/marketplace_service/main.py @@ -278,6 +278,73 @@ async def register_plugin( raise +@app.post("/v1/knowledge-graph") +async def create_graph( + graph_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Create a new knowledge graph""" + try: + logger.info(f"POST /v1/knowledge-graph called with data keys: {graph_data.keys()}") + result = await svc.create_graph(graph_data) + logger.info(f"POST /v1/knowledge-graph created graph with id: {result['id']}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/knowledge-graph: {type(e).__name__}: {str(e)}") + raise + + +@app.post("/v1/knowledge-graph/{graph_id}/nodes") +async def add_node( + graph_id: str, + node_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Add a node to a knowledge graph""" + try: + node_data["graph_id"] = graph_id + logger.info(f"POST /v1/knowledge-graph/{graph_id}/nodes called") + result = await svc.add_node(node_data) + logger.info(f"Added node with id: {result['id']}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/knowledge-graph/{graph_id}/nodes: {type(e).__name__}: {str(e)}") + raise + + +@app.post("/v1/knowledge-graph/{graph_id}/edges") +async def add_edge( + graph_id: str, + edge_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Add an edge to a knowledge graph""" + try: + edge_data["graph_id"] = graph_id + logger.info(f"POST /v1/knowledge-graph/{graph_id}/edges called") + result = await svc.add_edge(edge_data) + logger.info(f"Added edge with id: {result['id']}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/knowledge-graph/{graph_id}/edges: {type(e).__name__}: {str(e)}") + raise + + +@app.get("/v1/knowledge-graph/{graph_id}") +async def query_graph( + graph_id: str, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Query a knowledge graph""" + try: + logger.info(f"GET /v1/knowledge-graph/{graph_id} called") + result = await svc.query_graph(graph_id) + return result + except Exception as e: + logger.error(f"Error in GET /v1/knowledge-graph/{graph_id}: {type(e).__name__}: {str(e)}") + raise + + @app.post("/v1/transactions") async def submit_transaction(transaction_data: dict, session: AsyncSession = Depends(get_session_dep)): """Submit marketplace transaction""" diff --git a/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py b/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py index 677bb336..abb1e3e4 100644 --- a/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py +++ b/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py @@ -262,3 +262,104 @@ class MarketplaceService: except Exception as e: logger.error(f"Error in register_plugin: {type(e).__name__}: {str(e)}") raise + + async def create_graph(self, graph_data: dict) -> dict: + """Create a new knowledge graph""" + from .domain.marketplace import KnowledgeGraph + + try: + graph = KnowledgeGraph(**graph_data) + self.session.add(graph) + await self.session.commit() + await self.session.refresh(graph) + logger.info(f"Created graph with id: {graph.id}") + return { + "id": graph.id, + "name": graph.name, + "status": graph.status, + } + except Exception as e: + logger.error(f"Error in create_graph: {type(e).__name__}: {str(e)}") + raise + + async def add_node(self, node_data: dict) -> dict: + """Add a node to a knowledge graph""" + from .domain.marketplace import GraphNode + + try: + node = GraphNode(**node_data) + self.session.add(node) + await self.session.commit() + await self.session.refresh(node) + logger.info(f"Added node with id: {node.id} to graph: {node.graph_id}") + return { + "id": node.id, + "graph_id": node.graph_id, + "label": node.label, + } + except Exception as e: + logger.error(f"Error in add_node: {type(e).__name__}: {str(e)}") + raise + + async def add_edge(self, edge_data: dict) -> dict: + """Add an edge to a knowledge graph""" + from .domain.marketplace import GraphEdge + + try: + edge = GraphEdge(**edge_data) + self.session.add(edge) + await self.session.commit() + await self.session.refresh(edge) + logger.info(f"Added edge with id: {edge.id} to graph: {edge.graph_id}") + return { + "id": edge.id, + "graph_id": edge.graph_id, + "source_node_id": edge.source_node_id, + "target_node_id": edge.target_node_id, + } + except Exception as e: + logger.error(f"Error in add_edge: {type(e).__name__}: {str(e)}") + raise + + async def query_graph(self, graph_id: str) -> dict: + """Query a knowledge graph (get all nodes and edges)""" + from sqlalchemy import select + from .domain.marketplace import GraphNode, GraphEdge + + try: + # Get nodes + node_stmt = select(GraphNode).where(GraphNode.graph_id == graph_id) + node_result = await self.session.execute(node_stmt) + nodes = node_result.scalars().all() + + # Get edges + edge_stmt = select(GraphEdge).where(GraphEdge.graph_id == graph_id) + edge_result = await self.session.execute(edge_stmt) + edges = edge_result.scalars().all() + + return { + "graph_id": graph_id, + "nodes": [ + { + "id": n.id, + "node_type": n.node_type, + "label": n.label, + "properties": n.properties, + } + for n in nodes + ], + "edges": [ + { + "id": e.id, + "source_node_id": e.source_node_id, + "target_node_id": e.target_node_id, + "edge_type": e.edge_type, + "weight": e.weight, + "properties": e.properties, + } + for e in edges + ], + } + except Exception as e: + logger.error(f"Error in query_graph: {type(e).__name__}: {str(e)}") + raise