Source code for sdgx.data_connectors.manager
from __future__ import annotations
from typing import Any
from sdgx import data_connectors
from sdgx.data_connectors import extension
from sdgx.data_connectors.base import DataConnector
from sdgx.data_connectors.extension import project_name as PROJECT_NAME
from sdgx.manager import Manager
[docs]
class DataConnectorManager(Manager):
register_type = DataConnector
project_name = PROJECT_NAME
hookspecs_model = extension
@property
def registed_data_connectors(self):
return self.registed_cls
[docs]
def load_all_local_model(self):
self._load_dir(data_connectors)
[docs]
def init_data_connector(self, connector_name, **kwargs: dict[str, Any]) -> DataConnector:
return self.init(connector_name, **kwargs)