Source code for fairgraph.utility

"""


"""

# Copyright 2019-2020 CNRS

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from copy import deepcopy
import hashlib
import logging
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, TYPE_CHECKING
import warnings

from openminds.registry import lookup_type

from .base import OPENMINDS_VERSION

if TYPE_CHECKING:
    from .client import KGClient
    from .kgobject import KGObject

logger = logging.getLogger("fairgraph")

JSONdict = Dict[str, Any]  # see https://github.com/python/typing/issues/182 for some possible improvements
ATTACHMENT_SIZE_LIMIT = 1024 * 1024  # 1 MB


[docs] def as_list(obj: Union[None, KGObject, dict, str, list, tuple]) -> list: """ Converts the input obj into a list. Args: obj: The input object to be converted to a list. Returns: list: A list - see Notes below. Raises: TypeError: If the input obj cannot be converted to a list. Notes: - If obj is None, it returns an empty list. - If obj is a dict or a str, it returns a list containing obj. - If obj is a list or a tuple, it returns a list with the same elements as obj. - If obj is not any of the above, it tries to convert obj into a list. If it fails due to a TypeError, it raises a TypeError with an appropriate error message. """ if obj is None: return [] elif isinstance(obj, (dict, str)): return [obj] try: L = list(obj) except TypeError: L = [obj] return L
def invert_dict(D): newD = {} for key, value in D.items(): newD[value] = key return newD
[docs] def expand_uri(uri_list: Union[str, List[str]], context: Dict[str, Any]) -> Union[str, Tuple[str, ...]]: """ Expands a URI or a list of URIs using a given context. Args: uri_list (Union[str, List[str]]): A URI or a list of URIs to be expanded. context (Dict[str, Any]): A dictionary containing a mapping of prefixes to base URLs. Returns: Union[str, Tuple[str, ...]]: An expanded URI or a tuple of expanded URIs. Raises: ValueError: If a prefix in the URI is not found in the context. Examples: >>> context = {'foaf': 'http://xmlns.com/foaf/0.1/'} >>> uri_list = 'foaf:Person' >>> expand_uri(uri_list, context) 'http://xmlns.com/foaf/0.1/Person' """ expanded_uris = [] for uri in as_list(uri_list): if uri.startswith("http") or uri.startswith("@"): expanded_uris.append(uri) else: parts = uri.split(":") if len(parts) == 1: prefix = "@vocab" identifier = uri else: prefix, identifier = parts if prefix not in context: raise ValueError(f"prefix {prefix} not found in context") base_url = context[prefix] if not base_url.endswith("/"): base_url += "/" expanded_uris.append(f"{base_url}{identifier}") if isinstance(uri_list, str): return expanded_uris[0] else: return tuple(expanded_uris)
[docs] def compact_uri( uri_list: Union[str, List[str]], context: Dict[str, Any], strict: bool = False ) -> Union[str, Tuple[str, ...]]: """ Compacts a URI or a list of URIs using a given context. Args: uri_list (Union[str, List[str]]): A URI or a list of URIs to be compacted. context (Dict[str, Any]): A dictionary containing a mapping of prefixes to base URLs. strict (bool, optional): Whether to raise an error if a URI cannot be compacted. Defaults to False. Returns: Union[str, Tuple[str, ...]]: A compacted URI or a tuple of compacted URIs. Raises: ValueError: If strict is True and a URI cannot be compacted. Examples: >>> context = {'foaf': 'http://xmlns.com/foaf/0.1/'} >>> uri_list = 'http://xmlns.com/foaf/0.1/Person' >>> compact_uri(uri_list, context) 'foaf:Person' """ compacted_uris = [] for uri in as_list(uri_list): if uri.startswith("http"): found = False for prefix, base_url in context.items(): if uri.startswith(base_url): start = len(base_url) identifier = uri[start:].strip("/") if prefix == "@vocab": compacted_uris.append(identifier) else: compacted_uris.append(f"{prefix}:{identifier}") found = True break if not found: if strict: raise ValueError(f"Unable to compact {uri} with the provided context") else: compacted_uris.append(uri) else: compacted_uris.append(uri) if isinstance(uri_list, str): return compacted_uris[0] else: return tuple(compacted_uris)
[docs] def normalize_data(data: Union[None, JSONdict], context: Dict[str, Any]) -> Union[None, JSONdict]: """ Normalizes JSON-LD data using a given context. Args: data (Union[None, JSONdict]): A JSON-LD data dict to be normalized. context (Dict[str, Any]): A dictionary containing a mapping of prefixes to base URLs. Returns: Union[None, JSONdict]: A normalized JSON-LD data dict. Examples: >>> context = {'foaf': 'http://xmlns.com/foaf/0.1/'} >>> data = { ... "foaf:name": "John Smith", ... "foaf:age": 35, ... "foaf:knows": { ... "foaf:name": "Jane Doe", ... "foaf:age": 25 ... } ... } >>> normalize_data(data, context) { "http://xmlns.com/foaf/0.1/name": "John Smith", "http://xmlns.com/foaf/0.1/age": 35, "http://xmlns.com/foaf/0.1/knows": { "http://xmlns.com/foaf/0.1/name": "Jane Doe", "http://xmlns.com/foaf/0.1/age": 25 } } """ if data is None: return data normalized: JSONdict = {} for key, value in data.items(): assert isinstance(key, str) if key == "@context": continue elif key.startswith("Q"): expanded_key = key else: result = expand_uri(key, context) assert isinstance(result, str) # for type checking expanded_key = result assert expanded_key.startswith("http") or expanded_key.startswith("@") or expanded_key.startswith("Q") if hasattr(value, "__len__") and len(value) == 0: pass elif expanded_key == "@id": if value.startswith("http"): # do not take local ids, e.g., those starting with "_" normalized[expanded_key] = value elif expanded_key == "@type": normalized[expanded_key] = value elif isinstance(value, (list, tuple)): normalized[expanded_key] = [] for item in value: if isinstance(item, dict): normalized[expanded_key].append(normalize_data(item, context)) else: normalized[expanded_key].append(item) elif isinstance(value, dict): normalized[expanded_key] = normalize_data(value, context) else: normalized[expanded_key] = value return normalized
def in_notebook() -> bool: try: shell = get_ipython().__class__.__name__ # type: ignore if shell == "ZMQInteractiveShell": return True elif shell == "TerminalInteractiveShell": return False else: return False except NameError: return False def expand_filter(filter_dict: Dict[str, Any]): """ Expand single-level filter specification (provided by user) into a multi-level dict as required by the query-generation machinery. Example: >>> filter = { ... "developers__affiliations__member_of__alias": "CNRS", ... "digital_identifier__identifier": "https://doi.org/some-doi" ... } >>> expand_filter(filter) { "developers": { "affiliations": { "member_of": { "alias": "CNRS } } }, "digital_identifier": { "identifier": "https://doi.org/some-doi" } } """ expanded = {} for key, value in filter_dict.items(): if hasattr(value, "items"): raise TypeError("Filter specifications should be a single-level dict, without nesting") local_path = expanded parts = key.split("__") for part in parts[:-1]: local_path[part] = {} local_path = local_path[part] local_path[parts[-1]] = value return expanded def sha1sum(filename): BUFFER_SIZE = 128 * 1024 h = hashlib.sha1() with open(filename, "rb") as fp: while True: data = fp.read(BUFFER_SIZE) if not data: break h.update(data) return h.hexdigest()
[docs] class LogEntry: """ Represents an entry in an activity log. Attributes: cls (str): The name of the class of the Knowledge Grapg object. id (Optional[str]): The identifer of the object being logged. delta (Optional[JSONdict]): A dictionary containing the changes made to the object. space (Optional[str]): The Knowledge Graph space containing the object. type_ (str): The type of the log entry. """ def __init__( self, cls: str, id: Optional[str], delta: Optional[JSONdict], space: Optional[str], type_: str, ): self.cls = cls self.id = id self.delta = delta self.space = space self.type = type_ def __repr__(self): return f"{self.type}: {self.cls}({self.id}) in '{self.space}'" def as_dict(self): return { "cls": self.cls, "id": self.id, "delta": self.delta, "space": self.space, "type_": self.type }
[docs] class ActivityLog: """ Represents a log of activities performed on Knowledge Graph objects. Attributes: entries (List[LogEntry]): A list of LogEntry objects representing the activities performed. """ def __init__(self): self.entries = []
[docs] def update(self, item: KGObject, delta: Optional[JSONdict], space: Optional[str], entry_type: str): """ Adds a new log entry to the activity log. Args: item (KGObject): The object being logged. delta (Optional[JSONdict]): A dictionary containing the changes made to the object. space (Optional[str]): The Knowledge Graph space containing the object. entry_type (str): The type of the log entry. """ self.entries.append(LogEntry(item.__class__.__name__, item.uuid, delta, space, entry_type))
def __repr__(self): return "\n".join((str(entry) for entry in self.entries))
TERMS_OF_USE = """ # EBRAINS Knowledge Graph Data Platform Citation Requirements This text is provided to describe the requirements for citing datasets, models and software found via EBRAINS Knowledge Graph Data Platform (KG): https://kg.ebrains.eu/search. It is meant to provide a more human-readable form of key parts of the KG Terms of Service, but in the event of disagreement between the KG Terms of Service and these Citation Requirements, the former is to be taken as authoritative. ## Dataset, model and software licensing Datasets, models and software in the KG have explicit licensing conditions attached. The license is typically one of the Creative Commons licenses. You must follow the licensing conditions attached to the dataset, model or software, including all restrictions on commercial use, requirements for attribution or requirements to share-alike. ## EBRAINS Knowledge Graph citation policy If you use content or services from the EBRAINS Knowledge Graph (Search or API) to advance a scientific publication you must follow the following citation policy: 1. For a dataset or model which is released under a Creative Commons license which includes "Attribution": 1. Cite the dataset / model as defined in the provided citation instructions ("Cite dataset / model") and - if available - also cite the primary publication listed or 2. in cases where neither citation instructions nor a primary publication are provided, and only in such cases, the names of the contributors should be cited (Data / model provided by Contributor 1, Contributor 2, …, and Contributor N) . 2. For software, please cite as defined in the software's respective citation policy. If you can't identify a clear citation policy for the software in question, use the open source repository as the citation link. 3. For EBRAINS services which were key in attaining your results, please consider citing the corresponding software which the service relies on, including but not limited to: EBRAINS Knowledge Graph, "https://kg.ebrains.eu" Failure to cite datasets, models, or software used in another publication or presentation would constitute scientific misconduct. Failure to cite datasets, models, or software used in a scientific publication must be corrected by an Erratum and correction of the given article if it was discovered post-publication. ## Final thoughts Citations are essential for encouraging researchers to release their datasets, models and software through the KG or other scientific sharing platforms. Your citation may help them to get their next job or next grant and will ultimately encourage researchers to produce and release more useful open data and open source. Make science more reproducible and more efficient. """ def accepted_terms_of_use(client: KGClient, accept_terms_of_use: bool = False) -> bool: if accept_terms_of_use or client.accepted_terms_of_use: return True else: if in_notebook(): from IPython.display import display, Markdown # type: ignore display(Markdown(TERMS_OF_USE)) else: print(TERMS_OF_USE) user_response = input("Do you accept the EBRAINS KG Terms of Service? ") if user_response in ("y", "Y", "yes", "YES"): client.accepted_terms_of_use = True return True else: warnings.warn("Please accept the terms of use before downloading the dataset") return False def types_match(a, b): # temporarily, during the openMINDS transition v3-v4, we allow different namespaces for the types assert isinstance(a, str), a assert isinstance(b, str), b if a == b: return True elif a.split("/")[-1] == b.split("/")[-1]: logger.warning(f"Assuming {a} matches {b} in types_match()") return True else: return False def _adapt_namespaces(data, adapt_keys, adapt_type, adapt_instance_uri): if isinstance(data, list): for item in data: _adapt_namespaces(item, adapt_keys, adapt_type, adapt_instance_uri) elif isinstance(data, dict): # adapt property URIs old_keys = tuple(data.keys()) new_keys = adapt_keys(old_keys) for old_key, new_key in zip(old_keys, new_keys): data[new_key] = data.pop(old_key) for key, value in data.items(): if key == "@id": data[key] = adapt_instance_uri(value) elif isinstance(value, (list, dict)): _adapt_namespaces(value, adapt_keys, adapt_type, adapt_instance_uri) # adapt @type URIs if "@type" in data: data["@type"] = adapt_type(data["@type"]) else: pass def adapt_namespaces_3to4(data): def adapt_keys_3to4(uri_list): replacement = ("openminds.ebrains.eu/vocab", "openminds.om-i.org/props") return (uri.replace(*replacement) for uri in uri_list) def adapt_type_3to4(uri): if isinstance(uri, list): assert len(uri) == 1 uri = uri[0] return f"https://openminds.om-i.org/types/{uri.split('/')[-1]}" def adapt_instance_uri_3to4(uri): if uri.startswith("https://openminds"): return uri.replace("ebrains.eu", "om-i.org") else: return uri return _adapt_namespaces(data, adapt_keys_3to4, adapt_type_3to4, adapt_instance_uri_3to4) def adapt_type_4to3(uri): if isinstance(uri, list): assert len(uri) == 1 uri = uri[0] cls = lookup_type(uri, OPENMINDS_VERSION) if cls.__module__ == "test.test_client": return cls.type_ module_name = cls.__module__.split(".")[2] # e.g., 'fairgraph.openminds.core.actors.person' -> "core" module_name = {"controlled_terms": "controlledTerms", "specimen_prep": "specimenPrep"}.get( module_name, module_name ) return f"https://openminds.ebrains.eu/{module_name}/{cls.__name__}" def adapt_namespaces_4to3(data): def adapt_keys_4to3(uri_list): replacement = ("openminds.om-i.org/props", "openminds.ebrains.eu/vocab") return (uri.replace(*replacement) for uri in uri_list) def adapt_instance_uri_4to3(uri): if uri.startswith("https://openminds"): return uri.replace("om-i.org", "ebrains.eu") else: return uri return _adapt_namespaces(data, adapt_keys_4to3, adapt_type_4to3, adapt_instance_uri_4to3) def adapt_namespaces_for_query(query): """Map from v4+ to v3 openMINDS namespace""" def adapt_path(item_path, replacement): if isinstance(item_path, str): return item_path.replace(*replacement) elif isinstance(item_path, list): return [adapt_path(part, replacement) for part in item_path] else: assert isinstance(item_path, dict) new_item_path = item_path.copy() new_item_path["@id"] = item_path["@id"].replace(*replacement) if "typeFilter" in item_path: if isinstance(item_path["typeFilter"], list): new_item_path["typeFilter"] = [ {"@id": adapt_type_4to3(subitem["@id"])} for subitem in item_path["typeFilter"] ] else: new_item_path["typeFilter"]["@id"] = adapt_type_4to3(item_path["typeFilter"]["@id"]) return new_item_path def adapt_structure(structure, replacement): for item in structure: item["path"] = adapt_path(item["path"], replacement) if "structure" in item: adapt_structure(item["structure"], replacement) def adapt_filters(structure, replacement): for item in structure: if "filter" in item and "value" in item["filter"]: item["filter"]["value"] = item["filter"]["value"].replace(*replacement) if "structure" in item: adapt_filters(item["structure"], replacement) migrated_query = deepcopy(query) migrated_query["meta"]["type"] = adapt_type_4to3(migrated_query["meta"]["type"]) adapt_structure(migrated_query["structure"], ("openminds.om-i.org/props", "openminds.ebrains.eu/vocab")) adapt_filters(migrated_query["structure"], ("openminds.om-i.org/instances", "openminds.ebrains.eu/instances")) return migrated_query def initialise_instances(class_list): """Cast openMINDS instances to their fairgraph subclass""" for cls in class_list: cls.set_error_handling(None) # find parent openMINDS class for parent_cls in cls.__mro__[1:]: if parent_cls.__name__ == cls.__name__: # could also do this by looking for issubclass(parent_cls, openminds.Node) break for key, value in parent_cls.__dict__.items(): if isinstance(value, parent_cls): fg_instance = cls.from_jsonld(value.to_jsonld()) fg_instance._space = cls.default_space setattr(cls, key, fg_instance) cls.set_error_handling("log") def handle_scope_keyword(scope, release_status): """ The keyword 'scope' has been renamed 'release_status', use of 'scope' is deprecated but still accepted. """ if scope in ("released", "in progress", "any"): warnings.warn( "The keyword 'scope' is deprecated, and will be removed in version 1.0; it has been renamed to 'release_status'", DeprecationWarning, stacklevel=2, ) return scope else: return release_status