"""
Defines a function to run the Percolator algorithm.
"""
import logging
import copy
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from .model import PercolatorModel
LOGGER = logging.getLogger(__name__)
# Functions -------------------------------------------------------------------
[docs]def brew(psms, model=None, test_fdr=0.01, folds=3, max_workers=1, rng=None):
"""Re-score one or more collection of PSMs.
The provided PSMs analyzed using the semi-supervised learning
algorithm that was introduced by
`Percolator <http://percolator.ms>`_. Cross-validation is used to
ensure that the learned models to not overfit to the PSMs used for
model training. If a multiple collections of PSMs are provided, they
are aggregated for model training, but the confidence estimates are
calculated separately for each collection.
A list of previously trained models can be provided to the ``model``
argument to rescore the PSMs in each fold. Note that the number of
models must match ``folds``. Furthermore, it is valid to use the
learned models on the same dataset from which they were trained,
but they must be provided in the same order, such that the
relationship of the cross-validation folds is maintained.
Parameters
----------
psms : PsmDataset object or list of PsmDataset objects
One or more :doc:`collections of PSMs <dataset>` objects.
PSMs are aggregated across all of the collections for model
training, but the confidence estimates are calculated and
returned separately.
model: Model object or list of Model objects, optional
The :py:class:`mokapot.Model` object to be fit. The default is
:code:`None`, which attempts to mimic the same support vector
machine models used by Percolator. If a list of
:py:class:`mokapot.Model` objects is provided, they are assumed
to be previously trained models and will and one will be
used to rescore each fold.
test_fdr : float, optional
The false-discovery rate threshold at which to evaluate
the learned models.
folds : int, optional
The number of cross-validation folds to use. PSMs originating
from the same mass spectrum are always in the same fold.
max_workers : int, optional
The number of processes to use for model training. More workers
will require more memory, but will typically decrease the total
run time. An integer exceeding the number of folds will have
no additional effect. Note that logging messages will be garbled
if more than one worker is enabled.
rng : int, np.random.Generator, optional
A seed or generator used to generate splits, or None to use the
default random number generator state.
Returns
-------
Confidence object or list of Confidence objects
An object or a list of objects containing the
:doc:`confidence estimates <confidence>` at various levels
(i.e. PSMs, peptides) when assessed using the learned score.
If a list, they will be in the same order as provided in the
`psms` parameter.
list of Model objects
The learned :py:class:`~mokapot.model.Model` objects, one
for each fold.
"""
rng = np.random.default_rng(rng)
if model is None:
model = PercolatorModel()
try:
iter(psms)
except TypeError:
psms = [psms]
# Set random seeds:
for dset in psms:
dset.rng = rng
try:
model.estimator
model.rng = rng
except AttributeError:
pass
# Check that all of the datasets have the same features:
feat_set = set(psms[0].features.columns)
if not all([set(p.features.columns) == feat_set for p in psms]):
raise ValueError("All collections of PSMs must use the same features.")
if len(psms) > 1:
LOGGER.info("")
LOGGER.info("Found %i total PSMs.", sum([len(p.data) for p in psms]))
LOGGER.info("Splitting PSMs into %i folds...", folds)
test_idx = [p._split(folds) for p in psms]
train_sets = _make_train_sets(psms, test_idx)
if max_workers != 1:
# train_sets can't be a generator for joblib :(
train_sets = [copy.copy(d) for d in train_sets]
# If trained models are provided, use the them as-is.
try:
fitted = [[m, False] for m in model if m.is_trained]
assert len(fitted) == len(model) # Test that all models are fitted.
assert len(model) == folds
except AssertionError as orig_err:
if len(model) != folds:
err = ValueError(
f"The number of trained models ({len(model)}) "
f"must match the number of folds ({folds})."
)
else:
err = RuntimeError(
"One or more of the provided models was not previously trained"
)
raise err from orig_err
except TypeError:
fitted = Parallel(n_jobs=max_workers, require="sharedmem")(
delayed(_fit_model)(d, copy.deepcopy(model), f)
for f, d in enumerate(train_sets)
)
# Sort models to have deterministic results with multithreading.
fitted.sort(key=lambda x: x[0].fold)
models, resets = list(zip(*fitted))
# Determine if the models need to be reset:
reset = any(resets)
# If we reset, just use the original model on all the folds:
if reset:
scores = [
p._calibrate_scores(model.predict(p), test_fdr) for p in psms
]
# If we don't reset, assign scores to each fold:
elif all([m.is_trained for m in models]):
scores = [
_predict(p, i, models, test_fdr) for p, i in zip(psms, test_idx)
]
# If model training has failed
else:
scores = [np.zeros(len(p.data)) for p in psms]
# Find which is best: the learned model, the best feature, or
# a pretrained model.
if not all([m.override for m in models]):
best_feats = [p._find_best_feature(test_fdr) for p in psms]
feat_total = sum([best_feat[1] for best_feat in best_feats])
else:
feat_total = 0
preds = [p._update_labels(s, test_fdr) for p, s in zip(psms, scores)]
pred_total = sum([(pred == 1).sum() for pred in preds])
# Here, f[0] is the name of the best feature, and f[3] is a boolean
if feat_total > pred_total:
using_best_feat = True
scores = []
descs = []
for dat, (feat, _, _, desc) in zip(psms, best_feats):
scores.append(dat.data[feat].values)
descs.append(desc)
else:
using_best_feat = False
descs = [True] * len(psms)
if using_best_feat:
LOGGER.warning(
"Learned model did not improve over the best feature. "
"Now scoring by the best feature for each collection "
"of PSMs."
)
elif reset:
LOGGER.warning(
"Learned model did not improve upon the pretrained "
"input model. Now re-scoring each collection of PSMs "
"using the original model."
)
LOGGER.info("")
res = [
p.assign_confidence(s, eval_fdr=test_fdr, desc=d)
for p, s, d in zip(psms, scores, descs)
]
if len(res) == 1:
return res[0], models
return res, models
# Utility Functions -----------------------------------------------------------
def _make_train_sets(psms, test_idx):
"""
Parameters
----------
psms : list of PsmDataset
The PsmDataset to get a subset of.
test_idx : list of list of numpy.ndarray
The indicies of the test sets
Yields
------
PsmDataset
The training set.
"""
train_set = copy.copy(psms[0])
all_idx = [set(range(len(p.data))) for p in psms]
for idx in zip(*test_idx):
train_set._data = None
data = []
for i, j, dset in zip(idx, all_idx, psms):
data.append(dset.data.loc[list(j - set(i)), :])
train_set._data = pd.concat(data, ignore_index=True)
yield train_set
def _predict(dset, test_idx, models, test_fdr):
"""
Return the new scores for the dataset
Parameters
----------
dset : PsmDataset
The dataset to rescore
test_idx : list of numpy.ndarray
The indicies of the test sets
models : list of Model
The models for each dataset and whether it
was reset or not.
test_fdr : the fdr to calibrate at.
Returns
-------
numpy.ndarray
A :py:class:`numpy.ndarray` containing the new scores.
"""
test_set = copy.copy(dset)
scores = []
for fold_idx, mod in zip(test_idx, models):
test_set._data = dset.data.loc[list(fold_idx), :]
# Don't calibrate if using predict_proba.
try:
mod.estimator.decision_function
scores.append(
test_set._calibrate_scores(mod.predict(test_set), test_fdr)
)
except AttributeError:
scores.append(mod.predict(test_set))
except RuntimeError:
raise RuntimeError(
"Failed to calibrate scores between cross-validation folds, "
"because no target PSMs could be found below 'test_fdr'. Try "
"raising 'test_fdr'."
)
rev_idx = np.argsort(sum(test_idx, [])).tolist()
return np.concatenate(scores)[rev_idx]
def _fit_model(train_set, model, fold):
"""
Fit the estimator using the training data.
Parameters
----------
train_set : PsmDataset
A PsmDataset that specifies the training data
model : tuple of Model
A Classifier to train.
fold : int
The fold number. Only used for logging.
Returns
-------
model : mokapot.model.Model
The trained model.
reset : bool
Whether the models should be reset to their original parameters.
"""
LOGGER.info("")
LOGGER.info("=== Analyzing Fold %i ===", fold + 1)
model.fold = fold + 1
reset = False
try:
model.fit(train_set)
except RuntimeError as msg:
if str(msg) != "Model performs worse after training.":
raise
if model.is_trained:
reset = True
return model, reset