Commit 78230679 authored by Mathias Walzer's avatar Mathias Walzer

refactored project for easier extension

* split value collection/metric calculation per topic
* extensive documentation (numpy doc) for functions
TODO:
- improve naming consistency
- migrate unit tests
- replace rpy2 with plotly.express
parent 50f09afd
Pipeline #77751 passed with stages
in 14 minutes and 45 seconds
FROM python:3.6.9-slim-buster
#FROM python:3.6.5-slim-jessie
#FROM python:2.7-slim-jessie
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends --no-install-suggests \
......@@ -73,10 +71,11 @@ RUN Rscript -e "library('devtools'); install_github('twitter/AnomalyDetection')"
RUN pip install pip --upgrade
RUN pip install setuptools --upgrade
RUN pip install numpy mypy pyopenms==2.5.*
RUN pip install jupyter rpy2 flask pytest pronto biopython pandas requests plotly-express toposort
RUN pip install numpy mypy jupyter rpy2 flask pytest
RUN pip install --force-reinstall 'pytest<=5.0.1'
RUN pip install -U git+https://github.com/bigbio/mzqc-pylib.git#egg=mzqc-pylib
RUN pip install pronto biopython pandas requests plotly-express toposort click
RUN pip install pyopenms==2.5.*
#RUN pip install -e . # devcontainer.json: "postCreateCommand": "pip install -e ."
......
......@@ -2,7 +2,9 @@
"name": "MZQC Python Development container",
"dockerFile": "Dockerfile",
"context": "..",
"extensions": ["ms-python.python", "littlefoxteam.vscode-python-test-adapter", "ms-azuretools.vscode-docker", "hbenl.vscode-test-explorer"],
"extensions": ["ms-python.python", "littlefoxteam.vscode-python-test-adapter",
"ms-azuretools.vscode-docker", "hbenl.vscode-test-explorer",
"njpwerner.autodocstring"],
"settings": {"python.pythonPath": "/usr/local/bin/python"},
"postCreateCommand": "sudo pip install -U -e .",
"runArgs": ["-u", "vscode", "-v", "/media/walzer/My Passport2/mzqc-stuff:/data"]
......
This diff is collapsed.
......@@ -8,7 +8,8 @@ from typing import List
import pyopenms as oms
from mzqc import MZQCFile as qc
from .qccalculator import getBasicQuality, getIDQuality
# from .qccalculator import getBasicQuality, getIDQuality
from QCCalculator import utils, basicqc, idqc, idqcmq, enzymeqc, masstraceqc
rqs: List[qc.RunQuality] = list()
sqs: List[qc.SetQuality] = list()
......@@ -67,7 +68,7 @@ def full(filename, mzid=None, idxml=None):
"""Calculate all possible metrics for these files. These data sources will be included in set metrics."""
exp = oms.MSExperiment()
oms.MzMLFile().load(click.format_filename(filename), exp)
rq = getBasicQuality(exp)
rq = basicqc.getBasicQuality(exp)
if idxml and mzid:
with click.Context(command) as ctx:
......@@ -97,11 +98,11 @@ def full(filename, mzid=None, idxml=None):
oms_id = oms.MzIdentMLFile()
idf = mzid
if idxml:
oms_id = oms.MzIdentMLFile()
oms_id = oms.IdXMLFile()
idf = idxml
if idf:
oms_id.load(click.format_filename(idf), pros, peps)
rq.qualityMetrics.extend(getIDQuality(exp, pros, peps, ms2num))
rq.qualityMetrics.extend(idqc.getIDQuality(exp, pros, peps, ms2num))
rqs.append(rq)
finale()
......@@ -114,7 +115,7 @@ def maxq(filename, zipurl, rawname):
"""Calculate all possible metrics for these files. These data sources will be included in set metrics."""
exp = oms.MSExperiment()
oms.MzMLFile().load(click.format_filename(filename), exp)
rq = getBasicQuality(exp)
rq = basicqc.getBasicQuality(exp)
ms2num = 0
for x in rq.qualityMetrics:
......@@ -129,12 +130,12 @@ def maxq(filename, zipurl, rawname):
ms2num = 1
try:
mq,params = get_mq_zipped_evidence(mq_zip_url)
mq,params = idqcmq.loadMQZippedResults(mq_zip_url)
if not rawname:
logging.warning("Infering rawname from mzML")
rawname = basename(exp.getExperimentalSettings().getSourceFiles()[0].getNameOfFile().decode()) # TODO split extensions
rq.qualityMetrics.extend(getMQMetrics(rawname, params,mq, ms2num))
rq.qualityMetrics.extend(idqcmq.getMQMetrics(rawname, params,mq, ms2num))
rqs.append(rq)
except:
logging.warn("Retrieving any results from the URL failed.")
......@@ -148,7 +149,7 @@ def basic(filename):
"""Calculate the basic metrics available from virtually every mzML file."""
exp = oms.MSExperiment()
oms.MzMLFile().load(click.format_filename(filename), exp)
rq = getBasicQuality(exp)
rq = basicqc.getBasicQuality(exp)
rqs.append(rq)
finale()
......
import warnings
import requests
import re
from collections import defaultdict
from itertools import chain
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, Pattern
from Bio import SeqIO, SeqRecord
from Bio.SeqUtils import ProtParam
from mzqc import MZQCFile as mzqc
import pyopenms as oms
import numpy as np
from QCCalculator import utils
"""
Methods to calculate quality metrics related to digestion enzyme use during sample preparation and identification process
"""
def getCoverageRatios(pro_ids: oms.ProteinIdentification,
pep_ids: List[oms.PeptideIdentification],
fasta= Dict[str,SeqRecord.SeqRecord], fetch= False) -> List[mzqc.QualityMetric]:
"""
getCoverageRatios calculates the coverage ratios per protein from the identified searchspace.
Calculating the coverage from all individual petide identification also requires all protein
sequences expected to be known. For this there are two options, either retrieve the sequences
from the originally used fasta, or try to retrieve the sequences via UniProt through the
accessions with the PeptideHits.
Parameters
----------
pro_ids : List[oms.ProteinIdentification]
The PyOpenMS ProteinIdentification as from reading a common identification file
pep_ids : List[oms.PeptideIdentification]
List of PyOpenMS PeptideIdentification as from reading a common identification file
fasta : [type], optional
Dictionary of sequences from a fasta file, Dict[accession,SeqRecord] by default Dict[str,SeqRecord.SeqRecord]
fetch : bool, optional
If set true, will attempt to retrieve sequences by accession, is ignored if `fasta` is provided, by default False
Returns
-------
List[mzqc.QualityMetric]
[description]
"""
metrics: List[mzqc.QualityMetric] = list()
# check all proteinhits have seq set
# first via proteinhits, missing then either via fasta or
# calc coverage
missing_acc = list()
nup = list()
for p in pro_ids.getHits():
ac = p.getAccession()
nup.append(oms.ProteinHit(p))
if not p.getSequence():
if fasta:
nup[-1].setSequence(str(fasta.get(ac,SeqRecord.SeqRecord('')).seq))
# if still no sequence
if not p.getSequence():
missing_acc.append(ac)
if missing_acc:
uniprot = {x.id: x for x in utils.getUniProtSequences(missing_acc)}
for n in nup:
ac = n.getAccession()
if not n.getSequence():
n.setSequence(str(uniprot.get(ac,SeqRecord.SeqRecord('')).seq))
urx = re.compile('\w*\|(\w*)\|\w*')
uniprot = {re.search(urx, x.id).group(): x for x in utils.getUniProtSequences(missing_acc)}
del uniprot['']
for n in nup:
ac = n.getAccession()
if not n.getSequence():
n.setSequence(str(uniprot.get(ac,SeqRecord.SeqRecord('')).seq))
coverage_tab: Dict[str,List[Any]] = defaultdict(list)
na = [n.getAccession() for n in nup if not n.getSequence()]
nup = [n for n in nup if n.getSequence()]
pro_ids.setHits(nup)
pro_ids.computeCoverage(pep_ids)
for p in pro_ids.getHits():
coverage_tab['Accession'].append(p.getAccession())
coverage_tab['Coverage'].append(p.getCoverage())
coverage_tab['Length'].append(len(p.getSequence()))
# TODO figure out decoy string by fasta
coverage_tab['TD'].append('decoy' if 'decoy' in p.getAccession().lower() else 'target')
for n in na:
coverage_tab['Accession'].append(n.getAccession())
coverage_tab['Coverage'].append('NA')
coverage_tab['Length'].append('NA')
coverage_tab['TD'].append('decoy' if 'decoy' in n.getAccession().lower() else 'target')
metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Protein coverage",
value=coverage_tab)
)
return metrics
def getPeptideLengthMetrics(identification_sequence_metrics:mzqc.QualityMetric) -> List[mzqc.QualityMetric]:
"""
describePeptideLengthMetrics calculates the descriptive statistics metrics for identified sequences' length
From the proto-metrics on identification sequences, the function calculates descriptive statistics metrics for
the distribution of peak density from all involved mass spectra.
Namely, mean, standard deviation, Quartiles, and 1.5*IQR outliers.
Parameters
----------
identification_sequence_metrics : mzqc.QualityMetric
QualityMetric with 'peptide' value, filtered for final outcome
Returns
-------
List[mzqc.QualityMetric]
List of resulting QualityMetrics
"""
metrics: List[mzqc.QualityMetric] = list()
regex_mod = r'(\([^\(]*\))'
regex_noaa = r'([^A-Za-z])'
# TODO test this: '.(iTRAQ4plex)M(Oxidation)C(Carbamidomethyl)HNVNR'
lengths = np.array([len(re.sub(regex_noaa, '', re.sub(regex_mod, '', x))) for x in identification_sequence_metrics.value['peptide'] ])
q1, q2, q3, s, m, ol = utils.extractDistributionStats(lengths)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identified peptide lengths Q1, Q2, Q3",
value=[q1, q2, q3])
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identified peptide lengths sigma",
value=s)
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identified peptide lengths mean",
value=m)
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identified peptide lengths +/-1.5*IQR outlier",
value=ol)
)
return metrics
def matchEnzyme(enzre: Pattern, pepseq: str) -> Tuple[int,int]:
"""
matchEnzyme matches a peptide sequence against a regular expression pattern
The regular expression pattern is matched against the peptide sequence
and the matches counted according to their position/type. For that, the
peptide sequences should include the amino acids before and after as they
occurr in the protein. It is distinguished between match/semi (at the peptide
end(s)) and internal matches. This makes for the return values of
match/semi/none and number of internal matches (a.k.a. missed cleavages).
Parameters
----------
enzre : re._pattern_type
Pattern as created by re.compile(...)
pepseq : str
amino acid sequence string
Returns
-------
Tuple(int,int)
First int indicates matched (2) semi matched (1) none (0), second int is the count or internal matches
"""
matches = np.array([x.start() if x.start()==x.end() else None for x in enzre.finditer(pepseq)])
is_matched = False
is_semi = False
if 1 in matches and len(pepseq)-1 in matches:
is_matched = True
internal_matches = len(matches) -2
elif 1 in matches or len(pepseq)-1 in matches:
internal_matches = len(matches) - 1
is_semi = True
else:
internal_matches = len(matches)
return (2 if is_matched else 1 if is_semi else 0, internal_matches)
def getEnzymeContaminationMetrics(pep, pro, force_enzymes = False) -> List[mzqc.QualityMetric]:
"""
getEnzymeContaminationMetrics calculates enzyme and enzyme contamination metrics from the
identifications given.
The function calculates the number of missed cleavages (internal), peptide length distribution,
and peptide boundaries matching known enzyme patterns from the given identifications. Matching
against digestion enzyme patterns other than the enyme used for identification processess has to
be switched with 'force_enzymes' and is sensible if the identification was conducted with
unspecific cleavage to detect enzyme contamination or enzyme setting mixup is suspected.
Parameters
----------
pro : List[oms.ProteinIdentification]
List of PyOpenMS ProteinIdentification as from reading a common identification file
pep : List[oms.PeptideIdentification]
List of PyOpenMS PeptideIdentification as from reading a common identification file
force_enzymes : bool, optional
If set, will force checking the identified peptide sequences against other known
digestion enzyme patterns. By default False
Returns
-------
List[mzqc.QualityMetric]
List of resulting QualityMetrics
"""
metrics: List[mzqc.QualityMetric] = list()
# include all psm actually does not make much sense to assess the enzyme efficiency
gre = {pro[0].getSearchParameters().digestion_enzyme.getName():
re.compile(pro[0].getSearchParameters().digestion_enzyme.getRegEx())}
#TODO pyopenms wrappers for DigestionEnzymeDB etc
# li: List = list()
# oms.DigestionEnzymeDB().getAllNames(li)
# ore = {e: re.compile(oms.DigestionEnzymeDB().getEnzyme(e).getRegEx()) for e in li
# if e not in gre and e != 'no cleavage'}
enzymematch_tab: Dict[str,List[Any]] = defaultdict(list)
missed_ranks = list()
matched_ranks = list()
# alt = dict()
for i,pepi in enumerate(pep):
pepi.sort()
spec_id = pepi.getMetaValue('spectrum_reference') \
if pepi.metaValueExists('spectrum_reference') else i
for i,h in enumerate(pepi.getHits()):
pepseq = h.getPeptideEvidences()[0].getAABefore() \
+ h.getSequence().toUnmodifiedString() \
+ h.getPeptideEvidences()[0].getAAAfter()
is_matched, internal_matches = matchEnzyme(next(iter(gre.values())), pepseq)
if i ==0:
enzymematch_tab['native_id'].append(spec_id)
enzymematch_tab['matched'].append(is_matched)
enzymematch_tab['missed'].append(internal_matches)
else:
missed_ranks.append(internal_matches)
matched_ranks.append(is_matched)
# if force_enzymes or not is_matched:
# oth_enz_matched = {k: matchEnzyme(v, pepseq) for k,v in ore.items()}
# alt[spec_id] = oth_enz_matched
if len(missed_ranks):
arr = np.array(missed_ranks)
q1, q2, q3, s, m, ol = utils.extractDistributionStats(arr)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Q1, Q2, Q3 of missed clevage counts for all lower rank identifications.",
value=[q1, q2, q3])
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Sigma of missed clevage counts for all lower rank identifications.",
value=s)
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Mean of missed clevage counts for all lower rank identifications.",
value=m)
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Missed clevage count for all lower rank identifications +/-1.5*IQR outlier",
value=ol)
)
if len(matched_ranks):
mdl: Dict[int,int] = defaultdict(int)
arr = np.array(matched_ranks)
uniq, counts = np.unique(arr, return_counts=True)
mdl.update(dict(zip(uniq,counts)))
metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Match/semi/none counts for all lower rank identifications.",
value=[mdl[2], mdl[1],mdl[0]])
)
metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Missed cleavages",
value=enzymematch_tab)
)
arr = np.array(enzymematch_tab['missed'])
q1, q2, q3, s, m, ol = utils.extractDistributionStats(arr)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Q1, Q2, Q3 of missed clevage counts for top identifications.",
value=[q1, q2, q3])
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Sigma of missed clevage counts for top identifications.",
value=s)
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Mean of missed clevage counts for top identifications.",
value=m)
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Missed clevage count for top identifications +/-1.5*IQR outlier",
value=ol)
)
return metrics
# def calcCoverageHelperDatabase():
# import xml.etree.cElementTree as cet
# sdbs: List = list()
# source = "tests/CPTAC_CompRef_00_iTRAQ_01_2Feb12_Cougar_11-10-09.mzid"
# for event, elem in cet.iterparse(source):
# sdb: Dict = dict()
# if elem.tag.endswith("SearchDatabase"):
# sdb = elem.attrib
# sdb['cvs'] = list()
# for chil in elem.getchildren():
# if chil.tag.endswith("DatabaseName"):
# for subchil in chil.getchildren():
# # print("#" + subchil)
# if subchil.tag.endswith("cvParam") and 'accession' in subchil.attrib and subchil.attrib['accession'] == "MS:1001013":
# sdb['databasename'] = subchil.attrib['value']
# # print(subchil.attrib)
# subchil.clear()
# elif chil.tag.endswith("cvParam"):
# print(chil.tag)
# sdb['cvs'].append(chil.attrib)
# print(chil.attrib)
# chil.clear()
# sdbs.append(sdb)
# elif not(elem.tag.endswith("DatabaseName") or elem.tag.endswith("cvParam")):
# elem.clear()
#TODO unit tests
# XMAGHHHEHEQERDHEQEHEHDSLQRP
# KPNPASMX
# RPSTNDPTSCCSX
This diff is collapsed.
import io
import zipfile
import urllib.request
import warnings
from itertools import chain
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import pandas
import pronto
from Bio import SeqIO, SeqRecord
from Bio.SeqUtils import ProtParam
from mzqc import MZQCFile as mzqc
from QCCalculator import utils
"""
Calculate id based metrics from MaxQuant result files
"""
def loadMQZippedResults(url: str) -> Tuple[pandas.DataFrame,pandas.DataFrame]:
"""
get_mq_zipped_evidence acquires the necessary MQ inputfiles from a URL to a zipped archive
The predominant way identifications from MQ are stored in open mass spectrometry data repositories is in a zip file for the submission.
The methods loads the archive and retrieves the QC metric relevant result files from the archive.
Parameters
----------
url : str
A URL to a zip file with MQ result files.
Returns
-------
Tuple[pandas.DataFrame,pandas.DataFrame]
The parameters and evidence files from a MaxQuant result rehashed in accessible pandas dataframes.
"""
with urllib.request.urlopen(url, timeout=10) as dl:
with zipfile.ZipFile(io.BytesIO(dl.read())) as z:
ef = 'evidence.txt'
pf = 'parameters.txt'
ld = dict() # {'ev':'evidence.txt', 'pa':'parameters.txt'}
dirs = dict() # {f: {'ev':'evidence.txt', 'pa':'parameters.txt'} }
for f in z.namelist():
if(z.getinfo(f).is_dir()):
dirs[f] = dict()
elif f == ef:
ld['ev'] = f
elif f == pf:
ld['pa'] = f
if len(ld) < 2:
for f in z.namelist():
for d in dirs.keys():
# exact expected match otherwise oddities like 'SEARCH/._parameters.txt' are picked up
if f == d+ef:
dirs[d]['ev'] = f
elif f == d+pf:
dirs[d]['pa'] = f
dirs = {k:v for k,v in dirs.items() if len(v)>0}
if len(dirs) > 1:
warnings.warn("MQ result zip contains more than one results folder.", Warning)
elif len(dirs) < 1:
warnings.warn("MQ result zip contains no results, even in subfolders.", Warning)
return None, None
ld = next(iter(dirs.values()))
if len(ld) < 2:
warnings.warn("MQ result zip contains no results.", Warning)
return None, None
with z.open(ld['ev']) as e:
ev = pandas.read_csv(e,sep='\t')
ev.columns = map(str.lower, ev.columns)
with z.open(ld['pa']) as p:
pa = pandas.read_csv(p,sep='\t', dtype={'Parameter':str})
pa.columns = map(str.lower, pa.columns)
pa['parameter'] = pa['parameter'].str.lower()
pa.set_index('parameter', inplace=True)
return pa,ev
def getMQMetrics(target_raw: str, params: pandas.DataFrame, evidence: pandas.DataFrame, ms2num: int = 0) -> List[mzqc.QualityMetric]:
"""
getMQMetrics calculates id based QC metrics from MaxQuant results as close as possible to the way they are calculated from regular id files.
For a given raw file (name), the respective results are extracted from dataframes derived off the parameters and evidence files from a
MaxQuant result (of potentially multiple raw files combined analysis). As many metrics similar or equal to those dependent of regular id files
are calculated.
Parameters
----------
target_raw : str
The name of the raw file (as per MaxQuant usage without file type extension)
params : pandas.DataFrame
Dataframe with data from the parameters result file as produced by MaxQuant and stratified column names
evidence : pandas.DataFrame
Dataframe with data from the evidence result file as produced by MaxQuant and stratified column names
ms2num : int, optional
The total number of tandem spectra as from the id-free metrics, by default 0
Returns
-------
List[mzqc.QualityMetric]
A list of QualityMetrics close to what is calculated from a regular id-based QC calculation.
"""
if not target_raw in evidence['raw file'].unique():
return list() # TODO warn
else:
mq_metrics : List[mzqc.QualityMetric] = list()
#https://stackoverflow.com/questions/17071871/how-to-select-rows-from-a-dataframe-based-on-column-values
target_mq = evidence.loc[(evidence['raw file'] == target_raw) & (evidence['ms/ms scan number'].notnull())]
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Sequence database name",
value=params.loc['fasta file']['value'])
)
proteins = len(target_mq['leading proteins'].unique())
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Total number of identified proteins",
value=proteins)
)
# # name="Total number of PSM", # NA
# metrics.append(
# mzqc.QualityMetric(cvRef="QC",
# accession="QC:0000000",
# name="Total number of PSM",
# value=psm_count)
# )
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Total number of identified peptide spectra",
value=len(target_mq))
)
peptides = len(target_mq['sequence'].unique())
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Total number identified unique peptide sequences",
value=peptides)
)
score_type = "Andromeda:score"
psims = utils.obtainOntology("psi-ms")
name_indexed = {psims[x].name: psims[x] for x in psims}
score_indexed = {x.name: x for x in chain(psims['MS:1001143'].subclasses(),psims['MS:1001153'].subclasses(),psims['MS:1002347'].subclasses(),psims['MS:1002363'].subclasses())}
if score_type in name_indexed:
if not score_type in score_indexed:
warnings.warn("Score type does not correspond to a score type in the OBO, proceed at own risk.", Warning)
score_col_name = name_indexed[score_type].id
else:
score_col_name = score_indexed[score_type].id
else:
warnings.warn("OBO does not contain any entry matching the identification score, proceed at own risk.", Warning)
score_col_name = score_type
identification_scoring_metrics = target_mq[['retention time',