Source code for sdgx.data_loader

from __future__ import annotations

from functools import cached_property
from typing import Any, Generator

import pandas as pd

from sdgx.cachers.base import Cacher, NoCache
from sdgx.cachers.disk_cache import DiskCache
from sdgx.cachers.manager import CacherManager
from sdgx.data_connectors.base import DataConnector
from sdgx.data_connectors.dataframe_connector import DataFrameConnector
from sdgx.data_connectors.generator_connector import GeneratorConnector
from sdgx.exceptions import DataLoaderInitError
from sdgx.utils import cache


[docs] class DataLoader: """ Combine :ref:`Cacher` and :ref:`DataConnector` to load data in an efficient way. Default Cacher is :ref:`DiskCache`. Use ``cacher`` or ``cache_mode`` to specify a :ref:`Cacher`. GeneratorConnector must combine with Cacher, we will warmup cache for generator to support random access. Args: data_connector (:ref:`DataConnector`): The data connector chunksize (int, optional): The chunksize of the cacher. Defaults to 1000. cacher (:ref:`Cacher`, optional): The cacher. Defaults to None. cache_mode (str, optional): The cache mode(cachers' name). Defaults to "DiskCache", more info in :ref:`DiskCache`. cacher_kwargs (dict, optional): The kwargs for cacher. Defaults to None identity (str, optional): The identity of the data source. When using :ref:`GeneratorConnector`, it can be pointed to the original data source, makes it possible to work with :ref:`MetadataCombiner`. Example: Load and cache data from existing csv file or other data source. .. code-block:: python from sdgx.data_loader import DataLoader from sdgx.data_connectors.csv_connector import CsvConnector from sdgx.utils import download_demo_data dataset_csv = download_demo_data() data_connector = CsvConnector(path=dataset_csv) # Use DataConnector to initialize dataloader = DataLoader(data_connector) # Access data dataloader.load_all() # This will read all data from csv, and cache it. dataloader.load_all() # This will read all data from cache. dataloader[:10] # dataloader support slicing for df in dataloader.iter(): # dataloader support iteration print(df.shape) Advanced usage: Load and cache data from a generator. .. code-block:: python from sdgx.data_loader import DataLoader from sdgx.data_connectors.generator_connector import GeneratorConnector def generator() -> Generator[pd.DataFrame, None, None]: for i in range(100): yield pd.DataFrame({"a": [i], "b": [i]}) data_connector = GeneratorConnector(generator) # Use DataConnector to initialize. # Generator is not support random access, but we can achieve it by caching. dataloader = DataLoader(data_connector) # Access data dataloader.load_all() # This will read all data from cache dataloader.load_all() # This will read all data from cache. dataloader[:10] # dataloader support slicing for df in dataloader.iter(): # dataloader support iteration print(df.shape) """ DEFAULT_CACHER_INITIAL = DiskCache def __init__( self, data_connector: DataConnector, chunksize: int = 10000, cacher: Cacher | str | type[Cacher] | None = None, cacher_kwargs: None | dict[str, Any] = None, identity: str | None = None, ) -> None: if isinstance(data_connector, DataFrameConnector): self.DEFAULT_CACHER = NoCache else: self.DEFAULT_CACHER = DataLoader.DEFAULT_CACHER_INITIAL self.data_connector = data_connector self.chunksize = chunksize self.cache_manager = CacherManager() self.identity = identity or self.data_connector.identity or str(id(self)) if not cacher_kwargs: cacher_kwargs = {} cacher_kwargs.setdefault("blocksize", self.chunksize) cacher_kwargs.setdefault("identity", self.data_connector.identity) if isinstance(cacher, Cacher): self.cacher = cacher elif isinstance(cacher, str) or isinstance(cacher, type): self.cacher = self.cache_manager.init_cacher(cacher, **cacher_kwargs) else: self.cacher = self.cache_manager.init_cacher(self.DEFAULT_CACHER, **cacher_kwargs) self.cacher.clear_invalid_cache() if isinstance(data_connector, GeneratorConnector): if isinstance(self.cacher, NoCache): raise DataLoaderInitError("NoCache can't be used with GeneratorConnector") # Warmup cache for generator, this allows random access self.load_all()
[docs] def iter(self) -> Generator[pd.DataFrame, None, None]: """ Load data from cache in chunk. """ for d in self.cacher.iter(self.chunksize, self.data_connector): yield d
[docs] def keys(self) -> list: """ Same as ``columns`` """ return self.data_connector.keys()
[docs] def columns(self) -> list: """ Peak columns. Returns: list: name of columns """ return self.data_connector.columns()
[docs] def load_all(self) -> pd.DataFrame: """ Load all data from cache. """ return self.cacher.load_all(self.data_connector)
[docs] def finalize(self, clear_cache=False) -> None: """ Finalize the dataloader. """ self.data_connector.finalize() if clear_cache: self.cacher.clear_cache()
def __getitem__(self, key: int | slice | list) -> pd.DataFrame: """ Support get data by index and slice. """ if isinstance(key, int): return self.cacher.load( offset=(key // self.chunksize) * self.chunksize, chunksize=self.chunksize, data_connector=self.data_connector, )[0] if isinstance(key, list): return pd.concat((d[key] for d in self.iter()), ignore_index=True) assert isinstance(key, slice) start = key.start or 0 stop = key.stop or len(self) step = key.step or 1 offset = (start // self.chunksize) * self.chunksize n_iter = ((stop - start) // self.chunksize) + 1 tables = ( self.cacher.load( offset=offset + i * self.chunksize, chunksize=self.chunksize, data_connector=self.data_connector, ) for i in range(n_iter) ) return pd.concat(tables, ignore_index=True)[start - offset : stop - offset : step] @cache def __len__(self): return sum(len(l) for l in self.iter()) @cached_property def shape(self): return (len(self), len(self.columns()))