Source code for sdgx.data_models.combiner

from __future__ import annotations

from collections.abc import Iterable
from itertools import chain
from pathlib import Path
from typing import Dict, List

import pandas as pd
from pydantic import BaseModel

from sdgx.data_loader import DataLoader
from sdgx.data_models.inspectors.base import Inspector
from sdgx.data_models.inspectors.manager import InspectorManager
from sdgx.data_models.metadata import Metadata
from sdgx.data_models.relationship import Relationship
from sdgx.exceptions import MetadataCombinerInitError, MetadataCombinerInvalidError
from sdgx.utils import logger


[docs] class MetadataCombiner(BaseModel): """ Combine different tables with relationship, used for describing the relationship between tables. Args: version (str): version named_metadata (Dict[str, Any]): pairs of table name and metadata relationships (List[Any]): list of relationships """ version: str = "1.0" named_metadata: Dict[str, Metadata] = {} relationships: List[Relationship] = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def check(self): """Do necessary checks: - Whether number of tables corresponds to relationships. - Whether table names corresponds to the relationship between tables; """ for m in self.named_metadata.values(): m.check() table_names = set(self.named_metadata.keys()) relationship_parents = set(r.parent_table for r in self.relationships) relationship_children = set(r.child_table for r in self.relationships) # each relationship's table must have metadata if not table_names.issuperset(relationship_parents): raise MetadataCombinerInvalidError( f"Relationships' parent table {relationship_parents - table_names} is missing." ) if not table_names.issuperset(relationship_children): raise MetadataCombinerInvalidError( f"Relationships' child table {relationship_children - table_names} is missing." ) # each table in metadata must in a relationship if not (relationship_parents | relationship_children).issuperset(table_names): raise MetadataCombinerInvalidError( f"Table {table_names - (relationship_parents+relationship_children)} is missing in relationships." ) logger.info("MultiTableCombiner check finished.")
[docs] @classmethod def from_dataloader( cls, dataloaders: list[DataLoader], metadata_from_dataloader_kwargs: None | dict = None, relationshipe_inspector: None | str | type[Inspector] = "SubsetRelationshipInspector", relationships_inspector_kwargs: None | dict = None, relationships: None | list[Relationship] = None, ): """ Combine multiple dataloaders with relationship. Args: dataloaders (list[DataLoader]): list of dataloaders max_chunk (int): max chunk count for relationship inspector. metadata_from_dataloader_kwargs (dict): kwargs for :func:`Metadata.from_dataloader` relationshipe_inspector (str | type[Inspector]): relationship inspector relationships_inspector_kwargs (dict): kwargs for :func:`InspectorManager.init` relationships (list[Relationship]): list of relationships """ if not isinstance(dataloaders, list): dataloaders = [dataloaders] metadata_from_dataloader_kwargs = metadata_from_dataloader_kwargs or {} named_metadata = { d.identity: Metadata.from_dataloader(d, **metadata_from_dataloader_kwargs) for d in dataloaders } if relationships is None and relationshipe_inspector is not None: if relationships_inspector_kwargs is None: relationships_inspector_kwargs = {} inspector = InspectorManager().init( relationshipe_inspector, **relationships_inspector_kwargs ) for d in dataloaders: for chunk in d.iter(): inspector.fit( chunk, name=d.identity, metadata=named_metadata[d.identity], ) relationships = inspector.inspect()["relationships"] return cls(named_metadata=named_metadata, relationships=relationships)
[docs] @classmethod def from_dataframe( cls, dataframes: list[pd.DataFrame], names: list[str], metadata_from_dataloader_kwargs: None | dict = None, relationshipe_inspector: None | str | type[Inspector] = "SubsetRelationshipInspector", relationships_inspector_kwargs: None | dict = None, relationships: None | list[Relationship] = None, ) -> "MetadataCombiner": """ Combine multiple dataframes with relationship. Args: dataframes (list[pd.DataFrame]): list of dataframes names (list[str]): list of names metadata_from_dataloader_kwargs (dict): kwargs for :func:`Metadata.from_dataloader` relationshipe_inspector (str | type[Inspector]): relationship inspector relationships_inspector_kwargs (dict): kwargs for :func:`InspectorManager.init` relationships (list[Relationship]): list of relationships """ if not isinstance(dataframes, list): dataframes = [dataframes] if not isinstance(names, list): names = [names] metadata_from_dataloader_kwargs = metadata_from_dataloader_kwargs or {} if len(dataframes) != len(names): raise MetadataCombinerInitError("dataframes and names should have same length.") named_metadata = { n: Metadata.from_dataframe(d, **metadata_from_dataloader_kwargs) for n, d in zip(names, dataframes) } if relationships is None and relationshipe_inspector is not None: if relationships_inspector_kwargs is None: relationships_inspector_kwargs = {} inspector = InspectorManager().init( relationshipe_inspector, **relationships_inspector_kwargs ) for n, d in zip(names, dataframes): inspector.fit( d, name=n, metadata=named_metadata[n], ) relationships = inspector.inspect()["relationships"] return cls(named_metadata=named_metadata, relationships=relationships)
def _dump_json(self): return self.model_dump_json()
[docs] def save( self, save_dir: str | Path, metadata_subdir: str = "metadata", relationship_subdir: str = "relationship", ): """ Save metadata to json file. This will create several subdirectories for metadata and relationship. Args: save_dir (str | Path): directory to save metadata_subdir (str): subdirectory for metadata, default is "metadata" relationship_subdir (str): subdirectory for relationship, default is "relationship" """ save_dir = Path(save_dir).expanduser().resolve() save_dir.mkdir(parents=True, exist_ok=True) version_file = save_dir / "version" version_file.write_text(self.version) metadata_subdir = save_dir / metadata_subdir relationship_subdir = save_dir / relationship_subdir metadata_subdir.mkdir(parents=True, exist_ok=True) for name, metadata in self.named_metadata.items(): metadata.save(metadata_subdir / f"{name}.json") relationship_subdir.mkdir(parents=True, exist_ok=True) for relationship in self.relationships: relationship.save( relationship_subdir / f"{relationship.parent_table}_{relationship.child_table}.json" )
[docs] @classmethod def load( cls, save_dir: str | Path, metadata_subdir: str = "metadata", relationship_subdir: str = "relationship", version: None | str = None, ) -> "MetadataCombiner": """ Load metadata from json file. Args: save_dir (str | Path): directory to save metadata_subdir (str): subdirectory for metadata, default is "metadata" relationship_subdir (str): subdirectory for relationship, default is "relationship" version (str): Manual version, if not specified, try to load from version file """ save_dir = Path(save_dir).expanduser().resolve() if not version: logger.debug("No version specified, try to load from version file.") version_file = save_dir / "version" if version_file.exists(): version = version_file.read_text().strip() else: logger.info("No version file found, assume version is 1.0") version = "1.0" named_metadata = {p.stem: Metadata.load(p) for p in (save_dir / metadata_subdir).glob("*")} relationships = [Relationship.load(p) for p in (save_dir / relationship_subdir).glob("*")] cls.upgrade(version, named_metadata, relationships) return cls( version=version, named_metadata=named_metadata, relationships=relationships, )
[docs] @classmethod def upgrade( cls, old_version: str, named_metadata: dict[str, Metadata], relationships: list[Relationship], ) -> None: """ Upgrade metadata from old version to new version :ref:`Metadata.upgrade` and :ref:`Relationship.upgrade` will try upgrade when loading. So here we just do Combiner's upgrade. """ pass
@property def fields(self) -> Iterable[str]: """ Return all fields in MetadataCombiner. """ return chain( (k for k in self.model_fields if k.endswith("_columns")), ) def __eq__(self, other): if not isinstance(other, MetadataCombiner): return super().__eq__(other) # if self and other has the same return ( self.version == other.version and all( self.get(key) == other.get(key) for key in set(chain(self.fields, other.fields)) ) and set(self.fields) == set(other.fields) )