Source code for sdgx.data_connectors.generator_connector

from __future__ import annotations

import os
from functools import cached_property
from typing import Callable, Generator

import pandas as pd

from sdgx.data_connectors.base import DataConnector


[docs] class GeneratorConnector(DataConnector): """ A virtual data connector that wrap `Generator <https://docs.python.org/3/glossary.html#term-generator>`_ into a DataConnector. Passing ``offset=0`` to ``read`` will reset the generator. Warning: ``offset`` and ``limit`` are ignored as ``Generator`` not supporting random access. But we can use :ref:`Cacher` to support it. See :ref:`Data Loader` for more details. Note: This connector is not been registered by default. So only be used with the library way. """ @cached_property def identity(self) -> str: return f"generator-{os.getpid()}-{id(self.generator_caller)}" def __init__( self, generator_caller: Callable[[], Generator[pd.DataFrame, None, None]], *args, **kwargs, ): super().__init__(*args, **kwargs) self.generator_caller = generator_caller self._generator = self.generator_caller()
[docs] def _read(self, offset: int = 0, limit: int | None = None) -> pd.DataFrame | None: """ Ingore limit and allow sequential reading. """ if offset == 0: self._generator = self.generator_caller() try: return next(self._generator) except StopIteration: return None
[docs] def _columns(self) -> list[str]: for df in self._iter(): return list(df.columns)
[docs] def _iter(self, offset=0, chunksize=0) -> Generator[pd.DataFrame, None, None]: """ Subclass should implement this for reading data in chunk. See ``iter`` for more details. """ return self.generator_caller()