"""Fonduer featurizer."""
import itertools
import logging
from collections import defaultdict
from typing import (
Any,
Collection,
DefaultDict,
Dict,
Iterable,
List,
Optional,
Type,
Union,
)
from scipy.sparse import csr_matrix
from sqlalchemy.orm import Session
from fonduer.candidates.models import Candidate
from fonduer.features.feature_extractors import FeatureExtractor
from fonduer.features.models import Feature, FeatureKey
from fonduer.parser.models.document import Document
from fonduer.utils.udf import UDF, UDFRunner
from fonduer.utils.utils_udf import (
ALL_SPLITS,
batch_upsert_records,
drop_all_keys,
drop_keys,
get_docs_from_split,
get_mapping,
get_sparse_matrix,
get_sparse_matrix_keys,
upsert_keys,
)
logger = logging.getLogger(__name__)
[docs]class Featurizer(UDFRunner):
"""An operator to add Feature Annotations to Candidates.
:param session: The database session to use.
:param candidate_classes: A list of candidate_subclasses to featurize.
:param parallelism: The number of processes to use in parallel. Default 1.
"""
def __init__(
self,
session: Session,
candidate_classes: List[Candidate],
feature_extractors: FeatureExtractor = FeatureExtractor(),
parallelism: int = 1,
) -> None:
"""Initialize the Featurizer."""
super().__init__(
session,
FeaturizerUDF,
parallelism=parallelism,
candidate_classes=candidate_classes,
feature_extractors=feature_extractors,
)
self.candidate_classes = candidate_classes
[docs] def update(
self,
docs: Optional[Collection[Document]] = None,
split: int = 0,
parallelism: Optional[int] = None,
progress_bar: bool = True,
) -> None:
"""Update the features of the specified candidates.
:param docs: If provided, apply features to all the candidates in these
documents.
:param split: If docs is None, apply features to the candidates in this
particular split.
:param parallelism: How many threads to use for extraction. This will
override the parallelism value used to initialize the Featurizer if
it is provided.
:param progress_bar: Whether or not to display a progress bar. The
progress bar is measured per document.
"""
self.apply(
docs=docs,
split=split,
train=True,
clear=False,
parallelism=parallelism,
progress_bar=progress_bar,
)
[docs] def apply( # type: ignore
self,
docs: Optional[Collection[Document]] = None,
split: int = 0,
train: bool = False,
clear: bool = True,
parallelism: Optional[int] = None,
progress_bar: bool = True,
) -> None:
"""Apply features to the specified candidates.
:param docs: If provided, apply features to all the candidates in these
documents.
:param split: If docs is None, apply features to the candidates in this
particular split.
:param train: Whether or not to update the global key set of features
and the features of candidates.
:param clear: Whether or not to clear the features table before
applying features.
:param parallelism: How many threads to use for extraction. This will
override the parallelism value used to initialize the Featurizer if
it is provided.
:param progress_bar: Whether or not to display a progress bar. The
progress bar is measured per document.
"""
if docs:
# Call apply on the specified docs for all splits
# TODO: split is int
split = ALL_SPLITS # type: ignore
super().apply(
docs,
split=split,
train=train,
clear=clear,
parallelism=parallelism,
progress_bar=progress_bar,
)
# Needed to sync the bulk operations
self.session.commit()
else:
# Only grab the docs containing candidates from the given split.
split_docs = get_docs_from_split(
self.session, self.candidate_classes, split
)
super().apply(
split_docs,
split=split,
train=train,
clear=clear,
parallelism=parallelism,
progress_bar=progress_bar,
)
# Needed to sync the bulk operations
self.session.commit()
[docs] def upsert_keys(
self,
keys: Iterable[str],
candidate_classes: Union[Candidate, Iterable[Candidate], None] = None,
) -> None:
"""Upsert the specified keys to FeatureKey.
:param keys: A list of FeatureKey names to upsert.
:param candidate_classes: A list of the Candidates to upsert the key for.
If None, upsert the keys for all candidate classes associated with
this Featurizer.
"""
# Make sure keys is iterable
keys = keys if isinstance(keys, (list, tuple)) else [keys]
# Make sure candidate_classes is iterable
if candidate_classes:
candidate_classes = (
candidate_classes
if isinstance(candidate_classes, Iterable)
else [candidate_classes]
)
# Ensure only candidate classes associated with the featurizer
# are used.
candidate_classes = [
_.__tablename__
for _ in candidate_classes
if _ in self.candidate_classes
]
if len(candidate_classes) == 0:
logger.warning(
"You didn't specify valid candidate classes for this featurizer."
)
return
# If unspecified, just use all candidate classes
else:
candidate_classes = [_.__tablename__ for _ in self.candidate_classes]
# build dict for use by utils
key_map = dict()
for key in keys:
key_map[key] = set(candidate_classes)
upsert_keys(self.session, FeatureKey, key_map)
[docs] def drop_keys(
self,
keys: Iterable[str],
candidate_classes: Union[Candidate, Iterable[Candidate], None] = None,
) -> None:
"""Drop the specified keys from FeatureKeys.
:param keys: A list of FeatureKey names to delete.
:param candidate_classes: A list of the Candidates to drop the key for.
If None, drops the keys for all candidate classes associated with
this Featurizer.
"""
# Make sure keys is iterable
keys = keys if isinstance(keys, (list, tuple)) else [keys]
# Make sure candidate_classes is iterable
if candidate_classes:
candidate_classes = (
candidate_classes
if isinstance(candidate_classes, Iterable)
else [candidate_classes]
)
# Ensure only candidate classes associated with the featurizer
# are used.
candidate_classes = [
_.__tablename__
for _ in candidate_classes
if _ in self.candidate_classes
]
if len(candidate_classes) == 0:
logger.warning(
"You didn't specify valid candidate classes for this featurizer."
)
return
# If unspecified, just use all candidate classes
else:
candidate_classes = [_.__tablename__ for _ in self.candidate_classes]
# build dict for use by utils
key_map = dict()
for key in keys:
key_map[key] = set(candidate_classes)
drop_keys(self.session, FeatureKey, key_map)
[docs] def get_keys(self) -> List[FeatureKey]:
"""Return a list of keys for the Features.
:return: List of FeatureKeys.
"""
return list(get_sparse_matrix_keys(self.session, FeatureKey))
def _add(self, session: Session, records_list: List[List[Dict[str, Any]]]) -> None:
# Make a flat list of all records from the list of list of records.
# This helps reduce the number of queries needed to update.
all_records = list(itertools.chain.from_iterable(records_list))
batch_upsert_records(session, Feature, all_records)
[docs] def clear(self, train: bool = False, split: int = 0) -> None: # type: ignore
"""Delete Features of each class from the database.
:param train: Whether or not to clear the FeatureKeys
:param split: Which split of candidates to clear features from.
"""
# Clear Features for the candidates in the split passed in.
logger.info(f"Clearing Features (split {split})")
if split == ALL_SPLITS:
sub_query = self.session.query(Candidate.id).subquery()
else:
sub_query = (
self.session.query(Candidate.id)
.filter(Candidate.split == split)
.subquery()
)
query = self.session.query(Feature).filter(Feature.candidate_id.in_(sub_query))
query.delete(synchronize_session="fetch")
# Delete all old annotation keys
if train:
logger.debug(f"Clearing all FeatureKeys from {self.candidate_classes}...")
drop_all_keys(self.session, FeatureKey, self.candidate_classes)
[docs] def clear_all(self) -> None:
"""Delete all Features."""
logger.info("Clearing ALL Features and FeatureKeys.")
self.session.query(Feature).delete(synchronize_session="fetch")
self.session.query(FeatureKey).delete(synchronize_session="fetch")
def _after_apply(self, train: bool = False, **kwargs: Any) -> None:
# Insert all Feature Keys
if train:
key_map: DefaultDict[str, set] = defaultdict(set)
for feature in self.session.query(Feature).all():
cand = feature.candidate
for key in feature.keys:
key_map[key].add(cand.__class__.__tablename__)
self.session.query(FeatureKey).delete(synchronize_session="fetch")
# TODO: upsert is too much. insert is fine as all keys are deleted.
upsert_keys(self.session, FeatureKey, key_map)
[docs] def get_feature_matrices(
self, cand_lists: List[List[Candidate]]
) -> List[csr_matrix]:
"""Load sparse matrix of Features for each candidate_class.
:param cand_lists: The candidates to get features for.
:return: A list of MxN sparse matrix where M are the candidates and N is the
features.
"""
return get_sparse_matrix(self.session, FeatureKey, cand_lists)
class FeaturizerUDF(UDF):
"""UDF for performing candidate extraction."""
def __init__(
self,
candidate_classes: Iterable[Type[Candidate]],
feature_extractors: FeatureExtractor,
**kwargs: Any,
) -> None:
"""Initialize the FeaturizerUDF."""
self.candidate_classes = (
candidate_classes
if isinstance(candidate_classes, (list, tuple))
else [candidate_classes]
)
self.feature_extractors = feature_extractors
super().__init__(**kwargs)
def apply(self, doc: Document, **kwargs: Any) -> List[List[Dict[str, Any]]]:
"""Extract candidates from the given Context.
:param doc: A document to process.
"""
logger.debug(f"Document: {doc}")
# Get all the candidates in this doc that will be featurized
cands_list = [
getattr(doc, candidate_class.__tablename__ + "s")
for candidate_class in self.candidate_classes
]
records_list = [
list(get_mapping(Feature, cands, self.feature_extractors.extract))
for cands in cands_list
]
return records_list