Source code for data2neo.neo4j.graph_elements

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Copyright 2011-2023, Nigel Small and Julian Minder
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Adapted from the now EOL py2neo: https://github.com/py2neo-org/py2neo

Elements that represents any entity in a Neo4j graph.

Inheritance Structure:

    GraphElement
        |- Attribute
        |- Subgraph
            |- Node
            |- Relationship

TODO: If you want to change the underlying driver, be sure to update the classes Subgraph, Node, Relation and NodeMatcher.

authors: Julian Minder and Nigel Small
"""

from abc import ABC
from typing import Any, List, Union
from datetime import datetime,date
import numbers
from itertools import chain
from .cypher import unwind_create_nodes_query, \
                    unwind_merge_nodes_query, \
                    unwind_create_relationships_query, \
                    unwind_merge_relationships_query, \
                    cypher_join, cypher_escape
from .encoder import encode_value, xstr, is_safe_key
from collections import OrderedDict

class GraphElement(ABC):
    """Abstract GraphElementType
    
    Represents attributes, subgraphs, nodes and relations. 
    """
    pass

[docs]class Attribute: """Represents an attribute in an Node or in an Relation. Attributes: key: String signifying the key of the attribute value: Can be any value that is allowed in the graph """
[docs] def __init__(self, key: str, value: Union[str, int, float, bool, datetime]) -> None: """Inits an attribute with a key and a value Args: key: String signifying the key of the attribute value: Can be any value that is allowed in the graph (String, Int, Float, Bool) """ super().__init__() self._key = key self._value = value
@property def key(self): """String signifying the key of the attribute""" return self._key @property def value(self): """Any value that is allowed in the graph (String, Int, Float, Bool)""" # If the value is not allowed in neo4j it is converted to strings if not self._value is None and not isinstance(self._value, (numbers.Number, str, bool, date, datetime)): return str(self._value) return self._value
class _GhostPrimaryKey: """ A class that represents the presence of a primary key without any value, this is used to merge relations that have no primary key. """ pass
[docs]class Subgraph(GraphElement): """ A :class:`.Subgraph` is an arbitrary collection of nodes and relationships. It is also the base class for :class:`.Node`, :class:`.Relationship` and :class:`.Path`. By definition, a subgraph must contain at least one node; `null subgraphs <http://mathworld.wolfram.com/NullGraph.html>`_ should be represented by :const:`None`. To test for `emptiness <http://mathworld.wolfram.com/EmptyGraph.html>`_ the built-in :func:`bool` function can be used. The simplest way to construct a subgraph is by combining nodes and relationships using standard set operations. For example:: >>> s = ab | ac >>> s {(alice:Person {name:"Alice"}), (bob:Person {name:"Bob"}), (carol:Person {name:"Carol"}), (Alice)-[:KNOWS]->(Bob), (Alice)-[:WORKS_WITH]->(Carol)} >>> s.nodes() frozenset({(alice:Person {name:"Alice"}), (bob:Person {name:"Bob"}), (carol:Person {name:"Carol"})}) >>> s.relationships() frozenset({(Alice)-[:KNOWS]->(Bob), (Alice)-[:WORKS_WITH]->(Carol)}) .. describe:: subgraph | other | ... Union. Return a new subgraph containing all nodes and relationships from *subgraph* as well as all those from *other*. Any entities common to both will only be included once. .. describe:: subgraph & other & ... Intersection. Return a new subgraph containing all nodes and relationships common to both *subgraph* and *other*. .. describe:: subgraph - other - ... Difference. Return a new subgraph containing all nodes and relationships that exist in *subgraph* but do not exist in *other*, as well as all nodes that are connected by the relationships in *subgraph* regardless of whether or not they exist in *other*. .. describe:: subgraph ^ other ^ ... Symmetric difference. Return a new subgraph containing all nodes and relationships that exist in *subgraph* or *other*, but not in both, as well as all nodes that are connected by those relationships regardless of whether or not they are common to *subgraph* and *other*. """
[docs] def __init__(self, nodes=None, relationships=None): self.__nodes = frozenset(nodes or []) self.__relationships = frozenset(relationships or []) self.__nodes |= frozenset(chain.from_iterable(r.nodes for r in self.__relationships))
#if not self.__nodes: # raise ValueError("Subgraphs must contain at least one node") def __repr__(self): return "Subgraph({%s}, {%s})" % (", ".join(map(repr, self.nodes)), ", ".join(map(repr, self.relationships))) def __db_create__(self, tx): """ Create new data in a remote :class:`.Graph` from this :class:`.Subgraph`. :param tx: """ # Convert nodes into a dictionary of # {frozenset(labels): [Node, Node, ...]} node_dict = {} for node in self.nodes: if node.identity is None: key = frozenset(node.labels) node_dict.setdefault(key, []).append(node) # Convert relationships into a dictionary of # {rel_type: [Rel, Rel, ...]} rel_dict = {} for relationship in self.relationships: if relationship.identity is None: key = relationship.type rel_dict.setdefault(key, []).append(relationship) for labels, nodes in node_dict.items(): pq = unwind_create_nodes_query(list(map(dict, nodes)), labels=labels) # TODO: id() is deprecated, in the future we need to move to something else pq = cypher_join(pq, "RETURN id(_)") records = tx.run(*pq) for i, record in enumerate(records): node = nodes[i] node.identity = record[0] node._remote_labels = frozenset(labels) for r_type, relationships in rel_dict.items(): data = map(lambda r: [r.start_node.identity, dict(r), r.end_node.identity], relationships) pq = unwind_create_relationships_query(data, r_type) # TODO: id() is deprecated, in the future we need to move to something else pq = cypher_join(pq, "RETURN id(_)") for i, record in enumerate(tx.run(*pq)): relationship = relationships[i] relationship.identity = record[0] def __db_merge__(self, tx, primary_label=None, primary_key=None): """ Merge data into a remote :class:`.Graph` from this :class:`.Subgraph`. :param tx: :param primary_label: :param primary_key: """ # Convert nodes into a dictionary of # {(p_label, p_key, frozenset(labels)): [Node, Node, ...]} node_dict = {} for node in self.nodes: if node.identity is None: # Determine primary label if node.__primarylabel__ is not None: p_label = node.__primarylabel__ elif node.__model__ is not None: p_label = node.__model__.__primarylabel__ or primary_label else: p_label = primary_label # Determine primary key if node.__primarykey__ is not None: p_key = node.__primarykey__ else: p_key = primary_key # Add node to the node dictionary key = (p_label, p_key, frozenset(node.labels)) node_dict.setdefault(key, []).append(node) # Convert relationships into a dictionary of # {rel_type: [Rel, Rel, ...]} rel_dict = {} for relationship in self.relationships: if relationship.identity is None: # Determine primary key if getattr(relationship, "__primarykey__", None) is not None: p_key = relationship.__primarykey__ else: p_key = primary_key key = (p_key, relationship.type) rel_dict.setdefault(key, []).append(relationship) for (pl, pk, labels), nodes in node_dict.items(): if pl is None or pk is None: raise ValueError("Primary label and primary key are required for node MERGE operation") pq = unwind_merge_nodes_query(map(dict, nodes), (pl, pk), labels) # TODO: id() is deprecated, in the future we need to move to something else pq = cypher_join(pq, "RETURN id(_)") identities = [record[0] for record in tx.run(*pq)] if len(identities) > len(nodes): raise ValueError("Found %d matching nodes for primary label %r and primary " "key %r with labels %r but merging requires no more than " "one" % (len(identities), pl, pk, set(labels))) for i, identity in enumerate(identities): node = nodes[i] node.identity = identity node._remote_labels = frozenset(labels) for (pk, r_type), relationships in rel_dict.items(): if pk is None: raise ValueError("Primary key are required for relationship MERGE operation") data = map(lambda r: [r.start_node.identity, dict(r), r.end_node.identity], relationships) if isinstance(pk, _GhostPrimaryKey): pq = unwind_merge_relationships_query(data, r_type) else: pq = unwind_merge_relationships_query(data, (r_type, pk)) # TODO: id() is deprecated, in the future we need to move to something else pq = cypher_join(pq, "RETURN id(_)") identities = [record[0] for record in tx.run(*pq)] if len(identities) > len(relationships): raise ValueError("Found %d matching relations for primary " "key %r with type %r but merging requires no more than " "one" % (len(identities), pk, r_type)) for i, identity in enumerate(identities): relationship = relationships[i] relationship.identity = identity def __db_pull__(self, tx): """ Copy data from a remote :class:`.Graph` into this :class:`.Subgraph`. :param tx: """ # Pull nodes nodes = {} for node in self.nodes: if node.identity is not None: nodes[node.identity] = node query = tx.run("MATCH (_) WHERE id(_) in $x " "RETURN id(_), labels(_), properties(_)", x=list(nodes.keys())) for identity, new_labels, new_properties in query: node = nodes[identity] node.labels = set(new_labels) node.properties = new_properties # Pull relationships relationships = {} for relationship in self.relationships: if relationship.identity is not None: relationships[relationship.identity] = relationship query = tx.run("MATCH ()-[_]->() WHERE id(_) in $x " "RETURN id(_), properties(_)", x=list(relationships.keys())) for identity, new_properties in query: relationship = relationships[identity] relationship.properties = new_properties def __db_push__(self, tx): """ Copy data into a remote :class:`.Graph` from this :class:`.Subgraph`. :param tx: """ for node in self.nodes: if node.identity is not None: clauses = ["MATCH (_) WHERE id(_) = $x", "SET _ = $y"] parameters = {"x": node.identity, "y": dict(node)} old_labels = node._remote_labels - node.labels if old_labels: clauses.append("REMOVE _:%s" % ":".join(map(cypher_escape, old_labels))) new_labels = node.labels - node._remote_labels if new_labels: clauses.append("SET _:%s" % ":".join(map(cypher_escape, new_labels))) tx.run("\n".join(clauses), parameters) node._remote_labels = node.labels for relationship in self.relationships: if relationship.identity is not None: clauses = ["MATCH ()-[_]->() WHERE id(_) = $x", "SET _ = $y"] parameters = {"x": relationship.identity, "y": dict(relationship)} tx.run("\n".join(clauses), parameters) @property def nodes(self): """ The set of all nodes in this subgraph. """ return tuple(self.__nodes) @property def relationships(self): """ The set of all relationships in this subgraph. """ return tuple(self.__relationships) def __iter__(self): return iter(self.__relationships) def __bool__(self): return bool(self.__relationships) def __nonzero__(self): return bool(self.__relationships) def __or__(self, other): return Subgraph(set(self.nodes) | set(other.nodes), set(self.relationships) | set(other.relationships)) def __and__(self, other): return Subgraph(set(self.nodes) & set(other.nodes), set(self.relationships) & set(other.relationships)) def __sub__(self, other): r = set(self.relationships) - set(other.relationships) n = (set(self.nodes) - set(other.nodes)) | set().union(*(set(rel.nodes) for rel in r)) return Subgraph(n, r) def __xor__(self, other): r = set(self.relationships) ^ set(other.relationships) n = (set(self.nodes) ^ set(other.nodes)) | set().union(*(set(rel.nodes) for rel in r)) return Subgraph(n, r)
class PropertyDict: """Abstract PropertyDict class that represents a dictionary of properties.""" def __init__(self, **properties): self.properties = properties self._identity = None self.__primarykey__ = None @property def identity(self): """Identity of element""" return self._identity @identity.setter def identity(self, value): self._identity = value def keys(self): """Returns properties keys""" return self.properties.keys() def update(self, properties): """Updates the properties of the element""" self.properties.update(properties) def __getitem__(self, key): if key not in self.properties: return None return self.properties[key] def __setitem__(self, key, value): self.properties[key] = value def __delitem__(self, key): del self.properties[key] def __contains__(self, key): return key in self.properties def __iter__(self): return iter(self.properties) def __len__(self): return len(self.properties) def keys(self): """Returns properties keys""" return self.properties.keys() def set_primary_key(self, key): """Sets the primary key of the element""" if key not in self.properties.keys() and not isinstance(key, _GhostPrimaryKey): raise ValueError("Primary key must be one of the node properties") self.__primarykey__ = key def __getstate__(self): return self.__dict__ def __hash__(self): try: if self._identity is not None: return hash(self._identity) except: pass return hash(id(self))
[docs]class Node(PropertyDict, Subgraph): """ A node is a fundamental unit of data storage within a property graph that may optionally be connected, via relationships, to other nodes. It possible to combine nodes (along with relationships and other graph data objects) into :class:`.Subgraph` objects using set operations. For more details, look at the documentation for the :class:`.Subgraph` class. All positional arguments passed to the constructor are interpreted as labels and all keyword arguments as properties:: >>> from data2neo.neo4j import Node >>> a = Node("Person", name="Alice") """
[docs] @staticmethod def from_attributes(labels: List[Attribute], attributes: List[Attribute] = [], primary_key: str = None, primary_label: str = None): """Creates a Node from a list of attributes and labels Args: labels: List of static attributes specifying the labels of the Node (first label will be the primary label) attributes: List of attributes (only one can be primary) primary_key: Optional key of the primary attribute. Used to merge the Node with existing nodes in the graph (default: None) primary_label: Optional label of the primary attribute. Used to merge the Node with existing nodes in the graph (default: None) """ labels = [label.value for label in labels] properties = dict((attr.key, attr.value) for attr in attributes) node = Node(*labels, **properties) if primary_key: node.set_primary_key(primary_key) if primary_label: node.set_primary_label(primary_label) return node
[docs] @staticmethod def from_dict(labels: List[str], properties: dict, primary_key: str = None, primary_label: str = None, identity: str = None): """Creates a Node from a list of attributes and labels Args: labels: List of static attributes specifying the labels of the Node (first label will be the primary label) properties: Dictionary of attributes (only one can be primary) primary_key: Optional key of the primary attribute. Used to merge the Node with existing nodes in the graph (default: None) primary_label: Optional label of the primary attribute. Used to merge the Node with existing nodes in the graph (default: None) """ node = Node(*labels, **properties) if primary_key: node.set_primary_key(primary_key) if primary_label: node.set_primary_label(primary_label) if identity is not None: node.identity = identity return node
[docs] def __init__(self, *labels: str, **attributes: str) -> None: """Inits a Node with labels and attributes Args: labels: List of static attributes specifying the labels of the Node (first label will be the primary label) attributes: Key value pairs of attributes for the Node """ self.labels = set(labels) self._remote_labels = frozenset() self.__primarylabel__ = labels[0] PropertyDict.__init__(self, **attributes) Subgraph.__init__(self, nodes=[self])
[docs] def set_primary_label(self, label): """Sets the primary label of the node Args: label: Label of the primary attribute. Used to merge the Node with existing nodes in the graph """ if label not in self.labels: raise ValueError("Primary label must be one of the node labels") self.__primarylabel__ = label
def __repr__(self): args = list(self.labels) kwargs = OrderedDict() d = dict(self) if self.identity is None: d["identity"] = self.identity for key in sorted(d): if is_safe_key(key): args.append("%s=%r" % (key, d[key])) else: kwargs[key] = d[key] if kwargs: args.append("**{%s}" % ", ".join("%r: %r" % (k, kwargs[k]) for k in kwargs)) return "Node(%s)" % ", ".join(args) def __str__(self): return repr(self) def __eq__(self, other): if self is other: return True try: if any(x is None for x in [self.identity, other.identity]): return False return (issubclass(type(self), Node) and issubclass(type(other), Node) and self.identity == other.identity) except (AttributeError, TypeError): return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return super().__hash__()
[docs]class Relationship(PropertyDict, Subgraph):
[docs] @staticmethod def from_attributes(start_node: Node, relationship_type: Attribute, end_node: Node, attributes: List[Attribute] = [], primary_key: str = None): """Creates a Relationship from a list of attributes and labels Args: start_node: Origin of the relationship relationship_type: Type of the relationship end_node: Destination of the relationship attributes: List of attributes for the relationship primary_key: Optional key of the primary attribute. Used to merge the Relationship with existing relationships in the graph (default: None) """ properties = dict((attr.key, attr.value) for attr in attributes) relationship = Relationship(start_node, relationship_type.value, end_node, **properties) if primary_key: relationship.set_primary_key(primary_key) return relationship
[docs] @staticmethod def from_dict(start_node: Node, end_node: Node, relationship_type: str, properties: dict, primary_key: str = None, identity: str = None): """Creates a Relationship from a list of attributes and labels Args: start_node: Origin of the relationship end_node: Destination of the relationship relationship_type: Type of the relationship properties: Dictionary of attributes for the relationship primary_key: Optional key of the primary attribute. Used to merge the relationship with existing relations in the graph (default: None) """ relationship = Relationship(start_node, relationship_type, end_node, **properties) if primary_key: relationship.set_primary_key(primary_key) if identity is not None: relationship.identity = identity return relationship
[docs] def __init__(self, start_node: Node, relationship_type: str, end_node: Node, **attributes) -> None: """ A relationship represents a typed connection between a pair of nodes. The positional arguments passed to the constructor identify the nodes to relate and the type of the relationship. Keyword arguments describe the properties of the relationship:: >>> from py2neo import Node, Relationship >>> a = Node("Person", name="Alice") >>> b = Node("Person", name="Bob") >>> a_knows_b = Relationship(a, "KNOWS", b, since=1999) Args: start_node: Origin of the relationship end_node: Destination of the relationship relationship_type: Type of the relationship attributes: Key value pairs of attributes for the Relationship """ self._type = relationship_type self._start_node = start_node self._end_node = end_node PropertyDict.__init__(self, **attributes) Subgraph.__init__(self, nodes=[start_node, end_node], relationships=[self])
@property def start_node(self): """Start node of the relationship""" return self._start_node @property def end_node(self): """End node of the relationship""" return self._end_node @property def type(self): """Type of the relationship""" return self._type def __repr__(self): args = [repr(self.start_node), self.type, repr(self.end_node)] kwargs = OrderedDict() d = self.properties for key in sorted(d): if is_safe_key(key): args.append("%s=%r" % (key, d[key])) else: kwargs[key] = d[key] if kwargs: args.append("**{%s}" % ", ".join("%r: %r" % (k, kwargs[k]) for k in kwargs)) return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) def __str__(self): return repr(self) def __eq__(self, other): if self is other: return True try: if any(x is None for x in [self.identity, other.identity]): try: return self.type == other.type and list(self.nodes) == list(other.nodes) and dict(self) == dict(other) and id(self) == id(other) except (AttributeError, TypeError): return False return issubclass(type(self), Relationship) and issubclass(type(other), Relationship) and self.identity == other.identity except (AttributeError, TypeError): return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return super().__hash__()