50 lines
1.6 KiB
Python
50 lines
1.6 KiB
Python
class GraphDB:
|
|
def __init__(self):
|
|
self.nodes = {} # node_id -> properties
|
|
self.edges = {} # node_id -> set of connected node_ids
|
|
|
|
def add_node(self, node_id, **properties):
|
|
self.nodes[node_id] = properties
|
|
self.edges.setdefault(node_id, set())
|
|
|
|
def add_edge(self, from_node, to_node):
|
|
if from_node in self.nodes and to_node in self.nodes:
|
|
self.edges[from_node].add(to_node)
|
|
self.edges[to_node].add(from_node) # undirected graph
|
|
|
|
def get_node(self, node_id):
|
|
return self.nodes.get(node_id)
|
|
|
|
def get_neighbors(self, node_id):
|
|
return self.edges.get(node_id, set())
|
|
|
|
def find_nodes_by_property(self, key, value):
|
|
return [nid for nid, props in self.nodes.items() if props.get(key) == value]
|
|
|
|
def shortest_path(self, start, end):
|
|
from collections import deque
|
|
visited = set()
|
|
queue = deque([(start, [start])])
|
|
|
|
while queue:
|
|
current, path = queue.popleft()
|
|
if current == end:
|
|
return path
|
|
visited.add(current)
|
|
for neighbor in self.edges.get(current, []):
|
|
if neighbor not in visited:
|
|
queue.append((neighbor, path + [neighbor]))
|
|
return None
|
|
|
|
|
|
g = GraphDB()
|
|
g.add_node("A", type="Person", name="Alice")
|
|
g.add_node("B", type="Person", name="Bob")
|
|
g.add_node("C", type="City", name="Phoenix")
|
|
g.add_edge("A", "B")
|
|
g.add_edge("A", "C")
|
|
g.add_edge("B", "C")
|
|
|
|
print(g.get_neighbors("A")) # {'B', 'C'}
|
|
print(g.find_nodes_by_property("type", "Person")) # ['A', 'B']
|
|
print(g.shortest_path("B", "C")) # ['B', 'A', 'C'] |