Source code for sdgx.data_connectors.csv_connector

from __future__ import annotations

import hashlib
from functools import cached_property
from typing import Generator

import pandas as pd

from sdgx.data_connectors.base import DataConnector


[docs] class CsvConnector(DataConnector): """ Wraps csv file into :ref:`DataConnector` Args: path (str): Path to csv file sep (str, optional): Separator. Defaults to ','. header (str, optional): Header. Defaults to 'infer'. read_csv_kwargs (dict, optional): kwargs for pd.read_csv, please refer to https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html Example: .. code-block:: python from sdgx.data_connectors.csv_connector import CsvConnector connector = CsvConnector( path="data.csv", ) df = connector.read() """ @cached_property def identity(self): """ Identity of the data source is the sha256 of the file """ with open(self.path, "rb") as f: return f"csvfile-{hashlib.sha256(f.read()).hexdigest()}" def __init__( self, path, sep=",", header="infer", **read_csv_kwargs, ): self.path = path self.sep = sep self.header = header self.read_csv_kwargs = read_csv_kwargs
[docs] def _read(self, offset: int = 0, limit: int | None = None) -> pd.DataFrame | None: return pd.read_csv( self.path, sep=self.sep, header=self.header, skiprows=range(1, offset + 1), # don't skip header nrows=limit, **self.read_csv_kwargs, )
[docs] def _columns(self) -> list[str]: d = pd.read_csv( self.path, sep=self.sep, header=self.header, nrows=0, **self.read_csv_kwargs, ).columns.tolist() return d
[docs] def _iter(self, offset: int = 0, chunksize: int = 1000) -> Generator[pd.DataFrame, None, None]: if chunksize is None: yield self._read(offset=offset) return for d in pd.read_csv( self.path, sep=self.sep, header=self.header, skiprows=range(1, offset + 1), # don't skip header chunksize=chunksize, **self.read_csv_kwargs, ): yield d
from sdgx.data_connectors.extension import hookimpl @hookimpl def register(manager): manager.register("CsvConnector", CsvConnector)