Source code for sdgx.cachers.disk_cache
from __future__ import annotations
import shutil
from pathlib import Path
from typing import Generator, List
import pandas as pd
from sdgx.cachers.base import Cacher
from sdgx.cachers.extension import hookimpl
from sdgx.data_connectors.base import DataConnector
from sdgx.exceptions import CacheError
from sdgx.utils import logger
[docs]
class DiskCache(Cacher):
"""
Cacher that cache data in disk with parquet format
Args:
blocksize (int): The blocksize of the cache.
cache_dir (str | Path | None, optional): The directory where the cache will be stored. Defaults to None.
identity (str | None, optional): The identity of the data source. Defaults to None.
Todo:
* Add partial cache when blocksize > chunksize
* Improve cache invalidation
* Improve performance if blocksize > chunksize
"""
def __init__(
self,
cache_dir: str | Path | None = None,
identity: str | None = None,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
if not cache_dir:
cache_dir = Path.cwd() / ".sdgx_cache"
if identity:
cache_dir = cache_dir / identity
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
[docs]
def clear_cache(self) -> None:
"""
Clear all cache in cache_dir.
"""
for f in self.cache_dir.glob("*.parquet"):
f.unlink()
shutil.rmtree(self.cache_dir, ignore_errors=True)
[docs]
def clear_invalid_cache(self):
"""
Clear all cache in cache_dir.
TODO: Improve cache invalidation
"""
return self.clear_cache()
[docs]
def _get_cache_filename(self, offset: int) -> Path:
"""
Get cache filename
"""
return self.cache_dir / f"{offset}.parquet"
[docs]
def is_cached(self, offset: int) -> bool:
"""
Check if the data is cached by checking if the cache file exists
"""
return self._get_cache_filename(offset).exists()
[docs]
def _refresh(self, offset: int, data: pd.DataFrame) -> None:
"""
Refresh cache, will write data to cache file in parquet format.
"""
self.cache_dir.mkdir(parents=True, exist_ok=True)
if len(data) < self.blocksize:
data.to_parquet(self._get_cache_filename(offset))
elif len(data) > self.blocksize:
for i in range(0, len(data), self.blocksize):
data[i : i + self.blocksize].to_parquet(self._get_cache_filename(offset + i))
else:
data.to_parquet(self._get_cache_filename(offset))
[docs]
def load(self, offset: int, chunksize: int, data_connector: DataConnector) -> pd.DataFrame:
"""
Load data from data_connector or cache
"""
if chunksize % self.blocksize != 0:
raise CacheError(
"chunksize must be multiple of blocksize, current chunksize is {} and blocksize is {}".format(
chunksize, self.blocksize
)
)
if chunksize != self.blocksize:
logger.warning("chunksize must be equal to blocksize, may cause performance issue.")
if self.is_cached(offset):
cached_data = pd.read_parquet(self._get_cache_filename(offset))
if len(cached_data) >= chunksize:
return cached_data[:chunksize]
return cached_data
limit = max(self.blocksize, chunksize)
data = data_connector.read(offset=offset, limit=limit)
if data is None:
return data
data_list: List[pd.DataFrame] = [data]
while len(data) < limit:
# When generator size is less than blocksize
# Continue to read until fit the limit
next_data = data_connector.read(offset=offset + len(data), limit=limit - len(data))
if next_data is None or len(next_data) == 0:
break
data_list.append(next_data)
data = (
pd.concat(
data_list,
ignore_index=True,
)
if len(data_list) > 1
else data
)
self._refresh(offset, data)
if len(data) < chunksize:
return data
return data[:chunksize]
[docs]
def iter(
self, chunksize: int, data_connector: DataConnector
) -> Generator[pd.DataFrame, None, None]:
"""
Load data from data_connector or cache in chunk
"""
offset = 0
while True:
data = self.load(offset, chunksize, data_connector)
if data is None or len(data) == 0:
break
yield data
offset += len(data)
@hookimpl
def register(manager):
manager.register("DiskCache", DiskCache)