Commit 992d2004 authored by Mathias Walzer's avatar Mathias Walzer

Merge branch 'refactor_QC' into 'master'

Refactor qc

See merge request !5
parents 5c50da9a 4ae782fd
Pipeline #77905 passed with stages
in 15 minutes and 55 seconds
FROM python:3.6.9-slim-buster
#FROM python:3.6.5-slim-jessie
#FROM python:2.7-slim-jessie
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends --no-install-suggests \
......@@ -73,10 +71,11 @@ RUN Rscript -e "library('devtools'); install_github('twitter/AnomalyDetection')"
RUN pip install pip --upgrade
RUN pip install setuptools --upgrade
RUN pip install numpy mypy pyopenms==2.3.*
RUN pip install pronto pytest jupyter rpy2 biopython flask pandas requests
RUN pip install numpy mypy jupyter rpy2 flask pytest
RUN pip install --force-reinstall 'pytest<=5.0.1'
RUN pip install -U git+https://github.com/bigbio/mzqc-pylib.git#egg=mzqc-pylib
RUN pip install pronto biopython pandas requests plotly-express toposort click
RUN pip install pyopenms==2.5.*
#RUN pip install -e . # devcontainer.json: "postCreateCommand": "pip install -e ."
......
......@@ -2,7 +2,9 @@
"name": "MZQC Python Development container",
"dockerFile": "Dockerfile",
"context": "..",
"extensions": ["ms-python.python", "littlefoxteam.vscode-python-test-adapter", "ms-azuretools.vscode-docker", "hbenl.vscode-test-explorer"],
"extensions": ["ms-python.python", "littlefoxteam.vscode-python-test-adapter",
"ms-azuretools.vscode-docker", "hbenl.vscode-test-explorer",
"njpwerner.autodocstring"],
"settings": {"python.pythonPath": "/usr/local/bin/python"},
"postCreateCommand": "sudo pip install -U -e .",
"runArgs": ["-u", "vscode", "-v", "/media/walzer/My Passport2/mzqc-stuff:/data"]
......
This diff is collapsed.
......@@ -8,7 +8,8 @@ from typing import List
import pyopenms as oms
from mzqc import MZQCFile as qc
from .qccalculator import getBasicQuality, getIDQuality
# from .qccalculator import getBasicQuality, getIDQuality
from QCCalculator import utils, basicqc, idqc, idqcmq, enzymeqc, masstraceqc
rqs: List[qc.RunQuality] = list()
sqs: List[qc.SetQuality] = list()
......@@ -67,7 +68,7 @@ def full(filename, mzid=None, idxml=None):
"""Calculate all possible metrics for these files. These data sources will be included in set metrics."""
exp = oms.MSExperiment()
oms.MzMLFile().load(click.format_filename(filename), exp)
rq = getBasicQuality(exp)
rq = basicqc.getBasicQuality(exp)
if idxml and mzid:
with click.Context(command) as ctx:
......@@ -97,11 +98,11 @@ def full(filename, mzid=None, idxml=None):
oms_id = oms.MzIdentMLFile()
idf = mzid
if idxml:
oms_id = oms.MzIdentMLFile()
oms_id = oms.IdXMLFile()
idf = idxml
if idf:
oms_id.load(click.format_filename(idf), pros, peps)
rq.qualityMetrics.extend(getIDQuality(exp, pros, peps, ms2num))
rq.qualityMetrics.extend(idqc.getIDQuality(exp, pros, peps, ms2num))
rqs.append(rq)
finale()
......@@ -114,7 +115,7 @@ def maxq(filename, zipurl, rawname):
"""Calculate all possible metrics for these files. These data sources will be included in set metrics."""
exp = oms.MSExperiment()
oms.MzMLFile().load(click.format_filename(filename), exp)
rq = getBasicQuality(exp)
rq = basicqc.getBasicQuality(exp)
ms2num = 0
for x in rq.qualityMetrics:
......@@ -129,12 +130,12 @@ def maxq(filename, zipurl, rawname):
ms2num = 1
try:
mq,params = get_mq_zipped_evidence(mq_zip_url)
mq,params = idqcmq.loadMQZippedResults(mq_zip_url)
if not rawname:
logging.warning("Infering rawname from mzML")
rawname = basename(exp.getExperimentalSettings().getSourceFiles()[0].getNameOfFile().decode()) # TODO split extensions
rq.qualityMetrics.extend(getMQMetrics(rawname, params,mq, ms2num))
rq.qualityMetrics.extend(idqcmq.getMQMetrics(rawname, params,mq, ms2num))
rqs.append(rq)
except:
logging.warn("Retrieving any results from the URL failed.")
......@@ -148,7 +149,7 @@ def basic(filename):
"""Calculate the basic metrics available from virtually every mzML file."""
exp = oms.MSExperiment()
oms.MzMLFile().load(click.format_filename(filename), exp)
rq = getBasicQuality(exp)
rq = basicqc.getBasicQuality(exp)
rqs.append(rq)
finale()
......
This diff is collapsed.
import io
import zipfile
import urllib.request
import warnings
from itertools import chain
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import pandas
import pronto
from Bio import SeqIO, SeqRecord
from Bio.SeqUtils import ProtParam
from mzqc import MZQCFile as mzqc
from QCCalculator import utils
"""
Calculate id based metrics from MaxQuant result files
"""
def loadMQZippedResults(url: str) -> Tuple[pandas.DataFrame,pandas.DataFrame]:
"""
get_mq_zipped_evidence acquires the necessary MQ inputfiles from a URL to a zipped archive
The predominant way identifications from MQ are stored in open mass spectrometry data repositories is in a zip file for the submission.
The methods loads the archive and retrieves the QC metric relevant result files from the archive.
Parameters
----------
url : str
A URL to a zip file with MQ result files.
Returns
-------
Tuple[pandas.DataFrame,pandas.DataFrame]
The parameters and evidence files from a MaxQuant result rehashed in accessible pandas dataframes.
"""
with urllib.request.urlopen(url, timeout=10) as dl:
with zipfile.ZipFile(io.BytesIO(dl.read())) as z:
ef = 'evidence.txt'
pf = 'parameters.txt'
ld = dict() # {'ev':'evidence.txt', 'pa':'parameters.txt'}
dirs = dict() # {f: {'ev':'evidence.txt', 'pa':'parameters.txt'} }
for f in z.namelist():
if(z.getinfo(f).is_dir()):
dirs[f] = dict()
elif f == ef:
ld['ev'] = f
elif f == pf:
ld['pa'] = f
if len(ld) < 2:
for f in z.namelist():
for d in dirs.keys():
# exact expected match otherwise oddities like 'SEARCH/._parameters.txt' are picked up
if f == d+ef:
dirs[d]['ev'] = f
elif f == d+pf:
dirs[d]['pa'] = f
dirs = {k:v for k,v in dirs.items() if len(v)>0}
if len(dirs) > 1:
warnings.warn("MQ result zip contains more than one results folder.", Warning)
elif len(dirs) < 1:
warnings.warn("MQ result zip contains no results, even in subfolders.", Warning)
return None, None
ld = next(iter(dirs.values()))
if len(ld) < 2:
warnings.warn("MQ result zip contains no results.", Warning)
return None, None
with z.open(ld['ev']) as e:
ev = pandas.read_csv(e,sep='\t')
ev.columns = map(str.lower, ev.columns)
with z.open(ld['pa']) as p:
pa = pandas.read_csv(p,sep='\t', dtype={'Parameter':str})
pa.columns = map(str.lower, pa.columns)
pa['parameter'] = pa['parameter'].str.lower()
pa.set_index('parameter', inplace=True)
return pa,ev
def getMQMetrics(target_raw: str, params: pandas.DataFrame, evidence: pandas.DataFrame, ms2num: int = 0) -> List[mzqc.QualityMetric]:
"""
getMQMetrics calculates id based QC metrics from MaxQuant results as close as possible to the way they are calculated from regular id files.
For a given raw file (name), the respective results are extracted from dataframes derived off the parameters and evidence files from a
MaxQuant result (of potentially multiple raw files combined analysis). As many metrics similar or equal to those dependent of regular id files
are calculated.
Parameters
----------
target_raw : str
The name of the raw file (as per MaxQuant usage without file type extension)
params : pandas.DataFrame
Dataframe with data from the parameters result file as produced by MaxQuant and stratified column names
evidence : pandas.DataFrame
Dataframe with data from the evidence result file as produced by MaxQuant and stratified column names
ms2num : int, optional
The total number of tandem spectra as from the id-free metrics, by default 0
Returns
-------
List[mzqc.QualityMetric]
A list of QualityMetrics close to what is calculated from a regular id-based QC calculation.
"""
if not target_raw in evidence['raw file'].unique():
return list() # TODO warn
else:
mq_metrics : List[mzqc.QualityMetric] = list()
#https://stackoverflow.com/questions/17071871/how-to-select-rows-from-a-dataframe-based-on-column-values
target_mq = evidence.loc[(evidence['raw file'] == target_raw) & (evidence['ms/ms scan number'].notnull())]
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Sequence database name",
value=params.loc['fasta file']['value'])
)
proteins = len(target_mq['leading proteins'].unique())
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Total number of identified proteins",
value=proteins)
)
# # name="Total number of PSM", # NA
# metrics.append(
# mzqc.QualityMetric(cvRef="QC",
# accession="QC:0000000",
# name="Total number of PSM",
# value=psm_count)
# )
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Total number of identified peptide spectra",
value=len(target_mq))
)
peptides = len(target_mq['sequence'].unique())
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Total number identified unique peptide sequences",
value=peptides)
)
score_type = "Andromeda:score"
psims = utils.obtainOntology("psi-ms")
name_indexed = {psims[x].name: psims[x] for x in psims}
score_indexed = {x.name: x for x in chain(psims['MS:1001143'].subclasses(),psims['MS:1001153'].subclasses(),psims['MS:1002347'].subclasses(),psims['MS:1002363'].subclasses())}
if score_type in name_indexed:
if not score_type in score_indexed:
warnings.warn("Score type does not correspond to a score type in the OBO, proceed at own risk.", Warning)
score_col_name = name_indexed[score_type].id
else:
score_col_name = score_indexed[score_type].id
else:
warnings.warn("OBO does not contain any entry matching the identification score, proceed at own risk.", Warning)
score_col_name = score_type
identification_scoring_metrics = target_mq[['retention time','charge','score']].rename(columns={'retention time':'RT','charge': 'c','score':score_type}).to_dict(orient='list')
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identification scoring metric values",
value=identification_scoring_metrics)
)
# TODO comparison column with qccalculator dppm values
# TODO RT/native id?
identification_accuracy_metrics = target_mq[['ms/ms m/z','mass error [ppm]','uncalibrated mass error [da]']]\
.rename(columns={'ms/ms m/z': 'MZ','mass error [ppm]':'delta_ppm','uncalibrated mass error [da]':'abs_error'})
identification_accuracy_metrics['abs_error'] = identification_accuracy_metrics['abs_error'].abs()
identification_accuracy_metrics = identification_accuracy_metrics.to_dict(orient='list')
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identifications accuracy metric values",
value=identification_accuracy_metrics)
)
hydrophobicity_metrics = target_mq[['retention time','sequence']].rename(columns={'retention time':'RT','sequence':'peptide'})
hydrophobicity_metrics['gravy'] = hydrophobicity_metrics['peptide'].apply(lambda x: ProtParam.ProteinAnalysis(x).gravy())
hydrophobicity_metrics = hydrophobicity_metrics[['RT','gravy']].to_dict(orient='list')
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Hydrophobicity metric values",
value=hydrophobicity_metrics)
)
# TODO target/decoy info available??
identification_sequence_metrics = target_mq[['sequence','retention time','ms/ms scan number']].rename(columns={'sequence':'peptide','retention time':'RT','ms/ms scan number':'native_id'}).to_dict(orient='list')
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identifications sequence metric values",
value=identification_sequence_metrics)
)
## simple id metrics
mq_metrics.append(
mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Identification to tandem spectra ratio",
value=float(len(target_mq))/float(ms2num))
)
return mq_metrics
from collections import defaultdict
from itertools import chain
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import numpy as np
from toposort import toposort
from mzqc import MZQCFile as mzqc
import pyopenms as oms
def getLongestTag(spec: oms.MSSpectrum, aa_weights: List[float], tol: float=0.5) -> int:
# TODO spec.getPrecursors()[0].getCharge() > 2 consider aa_weights for doubly charged or always?
# what about internal fragments and modifications
# aa_weights_z1 = np.array(list({r.getMonoWeight(1) for r in oms.ResidueDB().getResidues('AllNatural')}),dtype=float)
# aa_weights_z2 = np.array(list({r.getMonoWeight(2) for r in oms.ResidueDB().getResidues('AllNatural')}),dtype=float)/2
if spec.getMSLevel() == 1:
return -1
if not spec.isSorted():
spec.sortByPosition()
edges: List[Tuple[int,int]] = list()
node_dependencies: Dict[int,Set[Any]] = defaultdict(set)
path_score: Dict[int,int] = defaultdict(lambda: np.NINF)
for i in range(0,spec.size()-1):
for j in range(i,spec.size()):
dist = spec[j].getMZ()-spec[i].getMZ()
if np.any(np.isclose(dist,aa_weights, atol=tol)):
edges.append((i,j))
node_dependencies[j].add(i)
topological = list(toposort(node_dependencies))
if len(topological)<=1:
return 0
for obound in topological[0]:
path_score[obound] = 0
# topological = list(toposort({2: {11},
# 9: {11, 8, 10},
# 10: {11, 3},
# 11: {7, 5},
# 8: {7, 3},
# }))
# edges = [(3,8),(3,10),(5,11),(7,8),(7,11),(8,9),(11,2),(11,9),(11,10),(10,9)]
edge_array = np.array(edges, ndmin = 2)
edge_sort_order = list(chain.from_iterable(topological))
for u in edge_sort_order:
for edge in edge_array[edge_array[:,0] == u]:
if path_score[edge[1]] < path_score[edge[0]] + 1: # edgecost always 1
path_score[edge[1]] = path_score[edge[0]] + 1
return max(path_score.values())
def getMassTraceMatchingMS2(exp: oms.MSExperiment, tol: float=0.5) -> List[mzqc.QualityMetric]:
mts: List[oms.MassTrace] = list()
oms.MassTraceDetection().run(exp,mts,0) # since 2.5.0 with 3rd argument
mts_coord = np.array([[m.getCentroidMZ(),m.getCentroidRT()] for m in mts])
# ms2_coord = np.array([[s.getPrecursors()[0].getMZ(), s.getRT()] for s in exp if s.getMSLevel()==2])
for s in exp:
if s.getMSLevel()==2:
mz_matches = np.isclose(mts_coord[:,0], s.getPrecursors()[0].getMZ(), atol=tol)
rt_dist_per_match = np.abs(mts_coord[np.where(mz_matches)][:,1] - s.getRT())
match_idx_in_dist = np.argwhere(mz_matches) # indices of match only in mts and mts_coord
closest_rt_rowidx = rt_dist_per_match.argmin() # index in match_only distances array
# rt_dist_per_match[closest_rt_rowidx] == mts_coord[match_idx[closest_rt_rowidx][0]][1]-s.getRT()
closest_match_mt = mts[match_idx_in_dist[closest_rt_rowidx][0]]
np.partition(rt_dist_per_match,2)[2-1] # 2nd closest dist
np.partition(rt_dist_per_match,1)[1-1] # closest dist
closest_match_mt.getSize() # peaks
closest_match_mt.getTraceLength() # seconds
closest_match_mt.getFWHM() # seconds - what if 0 or getTraceLength()?
closest_match_mt.getMaxIntensity(False)
# NB precursor intensity is always 0!
# NB masstrace does not store peak intensities (except max and sum)
# 4 categories for MS2 regarding sampling
# -2 (out of trace, before centr RT) ; -1 (in trace, before centr RT) ;1 (in trace, after centr RT) ;2 (out of trace, after centr RT) ;
rt_1st = np.min(closest_match_mt.getConvexhull().getHullPoints()[:,0])
rt_last = np.max(closest_match_mt.getConvexhull().getHullPoints()[:,0])
rt_centr = closest_match_mt.getCentroidRT()
# np.digitize(s.getRT(),[rt_1st,rt_centr,rt_last])
if s.getRT() > rt_centr: # 'after' categ
if s.getRT() > rt_last:
return 2
else:
return 1
else: # 'before' categ
if s.getRT() < rt_1st:
return -2
else:
return -1
# get mts
# for each ms2 find mz matched within tol
# pick closest match in RT np.array([1.1,2.2,3.3,.8,5.5,6.6,.7,8.8]
# report:
# how close is the closest
# on which side is the precursor
# how wobbly is the mt (mz sd)
# how long is the mt (fwhm)
# are there others close? (next closest)
# computePeakArea(
# computeSmoothedPeakArea(
# findMaxByIntPeak(
# estimateFWHM(
# getFWHM(
# getSmoothedIntensities(
# getTraceLength(
# match_all = np.apply_along_axis(lambda a :np.isclose(mts_coord[:,0],a[0],atol=tol),1,ms2_coord) # boolean arrays indicating the (mis)matches in mst; shape = [ms2,mts]
# where_matches = np.apply_along_axis(lambda a : np.where(a),1,match_all) # does not work because each ms2 has different amount of matches; shape=[ms2,matching mts (49,60,...)]
import numpy as np
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from mzqc import MZQCFile as mzqc
import pyopenms as oms
"""
Calculate noise related QC metrics
"""
def getSN_medianmethod(spec: oms.MSSpectrum, norm: bool=True) -> float:
"""
getSN_medianmethod get S/N from a spectrum via the median method
As calculated in signal processing, signal and noise are descerned by the median
intensity. The ratio is formed by the intensity sum in each category, scaled by
the number of peaks recieved in each category.
Parameters
----------
spec : oms.MSSpectrum
Spectrum to compute S/N from
norm : bool, optional
Scale by the number of peaks recieved in each category, by default True
Returns
-------
float
The ratio of signal-to-noise intensities
"""
if spec.size() == 0:
return 0.0
median: float = 0.0
maxi: float = 0.0
spec.sortByIntensity(False)
mar = np.array([s.getIntensity() for s in spec])
median = np.median(mar)
if (not norm):
return np.max(mar) / median
sig = np.sum(mar[mar<=median])/mar[mar<=median].size
noi = np.sum(mar[mar>median])/mar[mar>median].size
# def sz():
# test = np.random.rand(30)
# median = np.median(test)
# sig = np.sum(test[test<=median])/test[test<=median].size
# def ln():
# test = np.random.rand(30)
# median = np.median(test)
# sig = np.sum(test[test<=median])/len(test[test<=median])
# from timeit import timeit
# import numpy as np
# timeit(sz, number=100000)
# timeit(ln, number=100000)
return sig/noi
def getSNMetrics(spectrum_acquisition_metrics_MS:mzqc.QualityMetric, ms_level: int) -> List[mzqc.QualityMetric]:
"""
getSNMetrics collect S/N related QC metrics from a super metric collected in a first pass of the input mzML
S/N from each spectrum are computed into 'spectrum acquisition metrics' for each MS level, from there S/N
distribution values are computed.
Parameters
----------
spectrum_acquisition_metrics_MS : mzqc.QualityMetric
QualityMetric object with the spectrum acquisition metrics
ms_level : int
The MS level to which the given spectrum acquisition metrics belong to
Returns
-------
List[mzqc.QualityMetric]
A list of new QualityMetric objects for mzQC deposition
"""
metrics: List[mzqc.QualityMetric] = list()
np_sn = np.array(spectrum_acquisition_metrics_MS.value['SN'])
qs = np.quantile(np_sn, [.25,.5,.75])
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Signal-to-noise ratio Q1, Q2, Q3 for MS level {ms_level} collection".format(ms_level=ms_level),
value=list(qs))
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Signal-to-noise ratio sigma for MS level {ms_level} collection".format(ms_level=ms_level),
value=np.std(np_sn))
)
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Signal-to-noise ratio mean for MS level {ms_level} collection".format(ms_level=ms_level),
value=np.mean(np_sn))
)
low_out = qs[0]-(1.5*(qs[2]-qs[0]))
high_out = qs[2]+(1.5*(qs[2]-qs[0]))
metrics.append(mzqc.QualityMetric(cvRef="QC",
accession="QC:0000000",
name="Signal-to-noise ratio +/-1.5*IQR outlier for MS level {ms_level} collection".format(ms_level=ms_level),
value=np.extract((np_sn<low_out) | (np_sn>high_out), np_sn))
)
return metrics
import hashlib
import re
import requests
import warnings
from io import StringIO
import urllib
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import pyopenms as oms
import numpy as np
import pronto
from Bio import SeqIO, SeqRecord
"""
Utility functions that do not contribute directly to QC calculations
"""
def sha256fromfile(abs_file_path: str) -> str:
"""
sha256fromfile will create a sha256 digest from the file at given path.
To preserve memory and speed up the digest,
the file is digested with the help of a memoryview and hashlib.sha256().update.
:raises
Parameters
----------
abs_file_path : str
The absolute path to the file to digest
Returns
-------
str
The cast or unchanged argument
Raises
------
FileNotFoundError
If abs_file_path is not a file
"""
sha = hashlib.sha256()
b = bytearray(128*1024)
mv = memoryview(b)
with open(abs_file_path, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(mv), 0):
sha.update(mv[:n])
return sha.hexdigest()
def cast_if_int(pot_int: Any) -> Union[int,Any]:
"""
cast_if_int convenience function to cast to int
Due to the frequent use of numpy.dtypes and pyOpenMS return of binary encode strings,
this function will ease the level of verbosity.
Parameters
----------
pot_int : Any
The potential int value
Returns
-------
Union[int,Any]
In case the argument is cast-able into int, will return that int, unchanged argument otherwise.
"""
try:
return int(pot_int)
except ValueError as e:
return pot_int
def spec_native_id(spec: oms.MSSpectrum) -> Union[int,None]:
"""
spec_native_id convenience function to retrieve the native id number from a spectrum
Since the spectrums native id is a string formatted with much additional, albeit
usually redundant information, this method cuts through the clutter and extracts
the numerical id.
Parameters
----------
spec : oms.MSSpectrum
Spectrum to get the native id from
Returns
-------
Union[int,None]
Return is None if spectrum native id cannot be interpreted (e.g. not of scan=number format)
"""
spre = spec.getNativeID()
if spre:
matches = re.findall("scan=(\d+)$", spre)
if len(matches)!=1: # should really never be >1 with the `$`
return None
else:
return cast_if_int(matches[0])
else:
return None
def pep_native_id(p: oms.Peptide) -> Union[int,None]:
"""
pep_native_id convenience function to retrieve the native id number from an identification
Counterpart to spec_native_id.
Identifications loaded from mzid et al. should carry the native id to which spectra they
carry the identification information (as 'spectrum_reference'). Since the spectrums
native id is a string formatted with much additional, albeit usually redundant
information, this method cuts through the clutter and extracts the numerical id.
Parameters
----------
p : oms.Peptide
PeptideIdentification from which to get the native id of the involved spectrum
Returns
-------
Union[int,None]
Return is None if native id cannot be interpreted (e.g. not of scan=number format)
"""
spre = p.getMetaValue('spectrum_reference')
if spre:
matches = re.findall("scan=(\d+)$", spre)
if len(matches)!=1: # should really never be >1 with the `$`
return None
else:
return cast_if_int(matches[0])