Source code for mhcmatch.store

"""MHC restriction & presentation from a reference epitope panel.

Productionizes the validated reverse-problem method (seqtree ``bench/bench_mhc_guess.py``):
index reference peptides by their anchored *presentation* signature
(:func:`seqtree.layout.presentation_features`), widen the search scope around a query until it
has enough neighbours, then rank presenting alleles by neighbour **vote fraction** and score
**confidence** by a binomial-tail enrichment over the panel background. The vote fraction is the
ranking statistic (robust to panel skew); the enrichment is the non-binder filter.

Significance theory: ``appendix/mhcmatch.tex`` §2-3 (forward per-allele E-value + reverse problem).
"""
from __future__ import annotations

import csv
import gzip
import math
import os
from collections import Counter, defaultdict
from dataclasses import dataclass

from seqtree import KmerIndex, SearchParams, layout

_CLASS = {"MHCI": "mhc1", "I": "mhc1", "mhc1": "mhc1",
          "MHCII": "mhc2", "II": "mhc2", "mhc2": "mhc2"}
_SPECIES = {"human": "HomoSapiens", "mouse": "MusMusculus"}
_AA = set("ACDEFGHIKLMNPQRSTVWY")
_SCOPES = (0, 1, 2, 3)
_DEFAULT_LENGTHS = {"mhc1": (8, 9, 10, 11), "mhc2": (13, 14, 15, 16, 17, 18)}


[docs] def infer_class(peptide: str) -> str: """Heuristic class from length: MHC-I if <=11, else MHC-II. Pass ``cls`` to override.""" return "mhc1" if len(peptide) <= 11 else "mhc2"
[docs] @dataclass class Restriction: allele: str vote: float # neighbour vote fraction P(allele | neighbours) -- ranking score enrichment: float # -log10 binomial-tail p vs panel background -- confidence n_votes: int binder: bool anchor_score: float | None = None # diffused anchor log-odds (set only when diffuse=True) def __iter__(self): return iter((self.allele, self.vote, self.enrichment, self.binder))
[docs] @dataclass class Decomposition: peptide: str tcr_facing: str # anchors masked with X (recognition readout) presentation: str # TCR-facing masked with X (anchor readout) anchors: tuple # 0-based anchor indices
def _binom_sf(k, n, p): """P(Binomial(n, p) >= k) -- upper tail.""" if k <= 0: return 1.0 if p <= 0: return 0.0 if p >= 1: return 1.0 return min(1.0, sum(math.comb(n, i) * p**i * (1 - p) ** (n - i) for i in range(k, n + 1))) def _bh_cutoff(pvals, alpha): """Benjamini-Hochberg p-value cutoff controlling FDR <= ``alpha``: the largest ``p_(r)`` with ``p_(r) <= r/m * alpha`` (``0`` if none qualify, so nothing is called).""" m = len(pvals) if m == 0: return 0.0 cutoff = 0.0 for r, p in enumerate(sorted(pvals), 1): if p <= r / m * alpha: cutoff = p return cutoff def _mhc2_core_anchors(peptide: str) -> tuple: """0-based P1/P4/P6/P9 indices of the register-anchored 9-mer core (one-pass register trick).""" if len(peptide) < 9: return () best_s = max(range(len(peptide) - 8), key=lambda s: layout._core_anchor_score(peptide[s:s + 9])) return tuple(best_s + j for j in (0, 3, 5, 8))
[docs] def anchor_indices(peptide: str, cls: str) -> tuple: """0-based anchor positions for a peptide: class-I P2/PΩ, class-II core P1/P4/P6/P9.""" if cls == "mhc2": return _mhc2_core_anchors(peptide) return tuple(sorted(layout.spec_for(cls).resolve(len(peptide))))
[docs] def resolve_anchor_index(peptide: str, cls: str, anchor: int): """0-based index of a scoring ``anchor`` in ``peptide`` (or None if out of range). MHC-I: ``anchor`` is a 1-based peptide position (negatives count from the C-terminus). MHC-II: ``anchor`` is a 1-based position *within the register-anchored 9-mer core* (P1..P9). """ if cls == "mhc2": if len(peptide) < 9: return None s = max(range(len(peptide) - 8), key=lambda i: layout._core_anchor_score(peptide[i:i + 9])) idx = s + (anchor - 1) return idx if s <= idx < s + 9 else None idx = (anchor - 1) if anchor > 0 else (len(peptide) + anchor) return idx if 0 <= idx < len(peptide) else None
class _Panel: """One MHC class: presentation-signature KmerIndex + allele bookkeeping.""" def __init__(self, cls): self.cls = cls self.epitopes = [] self.alleles = [] self.weights = [] def add(self, epitope, allele, weight=1.0): self.epitopes.append(epitope) self.alleles.append(allele) self.weights.append(weight) def build(self): feats = [layout.presentation_features(e, self.cls, register="anchored") for e in self.epitopes] self.allele_to_id = {} ids = [] for a in self.alleles: self.allele_to_id.setdefault(a, len(self.allele_to_id)) ids.append(self.allele_to_id[a]) self.index = KmerIndex.build(feats, alphabet="aa", allele_ids=ids) if feats else None counts = Counter(self.alleles) total = len(self.alleles) or 1 self.panel = sorted(counts) self.freq = {a: counts[a] / total for a in self.panel} def tally(self, query, lo=10, hi=100): """Counter(allele -> votes) from the query's anchored-signature neighbours, scope-widened.""" if self.index is None: return None feats = layout.presentation_features(query, self.cls, register="anchored") cands = [] for sc in _SCOPES: p = SearchParams(max_subs=sc, engine="seqtm") cands = [c for c in self.index.seed_and_gather([feats], p, 1, -1, 1)[0] if self.epitopes[c.peptide_id] != query] if len(cands) >= lo: break if not cands: return None return Counter(self.alleles[c.peptide_id] for c in cands[:hi])
[docs] class Store: """Searchable reference panel of presented peptides, partitioned by MHC class.""" def __init__(self): self._panel = {"mhc1": _Panel("mhc1"), "mhc2": _Panel("mhc2")} self._am = {} # cls -> AnchorModel (lazy, for diffuse=True) # -- construction ---------------------------------------------------------
[docs] @classmethod def from_records(cls, records): """records: dicts with ``epitope``, ``mhc_a`` (or ``mhc``), ``mhc_class``; optional ``weight`` (default 1.0) confidence-weights the peptide in anchor-preference estimation.""" from .pseudoseq import class2_key store = cls() for r in records: c = _CLASS.get(str(r.get("mhc_class", "")).strip()) ep = str(r.get("epitope", "")).strip().upper() allele = str(r.get("mhc_a") or r.get("mhc") or "").strip() if c is None or not ep or not allele or not all(x in _AA for x in ep): continue if c == "mhc2": # key class II by the alpha-beta pair (locus-aware) allele = class2_key(allele, str(r.get("mhc_b") or "").strip()) store._panel[c].add(ep, allele, float(r.get("weight", 1.0) or 1.0)) for p in store._panel.values(): p.build() return store
[docs] @classmethod def from_pmhc(cls, path=None, tier="full", species=None, classes=("mhc1", "mhc2")): """Load the isalgo/pmhc_data TSV(.gz). ``species`` filters the *MHC* species (``"human"`` / ``"mouse"``). If ``path`` is None, uses ``$MHCMATCH_PMHC/pmhc_<tier>.tsv.gz``.""" if path is None: base = os.environ.get("MHCMATCH_PMHC") if not base: raise ValueError("pass path= or set MHCMATCH_PMHC to the pmhc_data directory") path = os.path.join(base, f"pmhc_{tier}.tsv.gz") sp = _SPECIES.get(species) if species else None keep = {_CLASS[c] for c in classes} csv.field_size_limit(10 ** 7) op = gzip.open if str(path).endswith(".gz") else open recs = [] with op(path, "rt") as fh: for row in csv.DictReader(fh, delimiter="\t"): c = _CLASS.get(str(row.get("mhc_class", "")).strip()) if c is None or c not in keep: continue if sp and row.get("mhc_species") != sp: continue recs.append(row) return cls.from_records(recs)
def __len__(self): return sum(len(p.epitopes) for p in self._panel.values())
[docs] def alleles(self, cls): return list(self._panel[cls].panel)
# -- forward problem: restriction / presentation -------------------------- def _allele_set(self, panel, alleles): if alleles == "all": return panel.panel if isinstance(alleles, str): alleles = [alleles] return [a for a in alleles if a in panel.freq] def _anchor_model(self, cls): if cls not in self._am: self._am[cls] = self.anchor_model(cls) return self._am[cls]
[docs] def restriction(self, peptide, cls=None, alleles="all", top=10, alpha=0.05, diffuse=False): """Rank presenting alleles for ``peptide`` (vote fraction), flag binders (enrichment). ``alleles``: ``"all"``, a single allele, or a list. ``alpha``: per-allele significance for the non-binder flag (binder iff binomial-tail p <= alpha and the allele got votes). With ``diffuse=True`` the diffusion-shrunk anchor log-odds (:class:`mhcmatch.diffusion.AnchorModel`) **ranks** and the neighbour vote/enrichment **gates**: an allele is a binder if it is vote-significant *or* its anchors are plausible (``anchor_score > 0``). On held-out (novel) peptides the anchor log-odds is the far better ranker---the vote method relies on same-allele signature neighbours, which are sparse for a genuinely new peptide, so vote-first ranking buries the true allele; the diffused anchor score scores every allele directly and rescues rare ones. Vote breaks ties. Without diffusion, vote fraction ranks and the call returns ``[]`` when there are no neighbours. """ peptide = peptide.strip().upper() cls = cls or infer_class(peptide) panel = self._panel[cls] tally = panel.tally(peptide) if tally is None and not diffuse: return [] n = sum(tally.values()) if tally else 0 thr = -math.log10(alpha) am = self._anchor_model(cls) if diffuse else None out = [] for a in self._allele_set(panel, alleles): k = tally.get(a, 0) if tally else 0 vote = k / n if n else 0.0 enr = -math.log10(max(_binom_sf(k, n, panel.freq[a]), 1e-300)) if (k and n) else 0.0 if diffuse: s = am.score(peptide, a) binder = (enr >= thr and k > 0) or s > 0.0 out.append(Restriction(a, vote, enr, k, binder, round(s, 3))) else: out.append(Restriction(a, vote, enr, k, enr >= thr and k > 0)) out.sort(key=(lambda r: (r.anchor_score, r.vote)) if diffuse else (lambda r: (r.vote, r.enrichment)), reverse=True) return out[:top]
[docs] def is_binder(self, peptide, allele, cls=None, alpha=0.05): res = self.restriction(peptide, cls=cls, alleles=[allele], top=1, alpha=alpha) return bool(res and res[0].binder)
[docs] def is_presented(self, peptide, cls=None, alpha=0.05): """Overall presentation: does any panel allele present this peptide?""" return any(r.binder for r in self.restriction(peptide, cls=cls, alpha=alpha))
[docs] def scan_protein(self, protein, cls="mhc1", alleles="all", lengths=None, alpha=0.05, top=3, correction=None): """Slide all binding-length windows over ``protein`` and return presented peptides. Returns ``[(position, peptide, [Restriction, ...]), ...]`` for windows with >=1 binder. ``correction`` controls multiple testing over the (window, allele) presentation tests in the scan (appendix §5): ``None`` (default) keeps the per-window per-allele ``alpha``; ``"bonferroni"`` controls the family-wise error rate (threshold ``alpha/m``); ``"bh"`` controls the Benjamini-Hochberg false-discovery rate. ``m`` is the number of voted (window, allele) tests. The vote tail p-value is ``10**(-enrichment)``; corrected calls replace the per-window binder flag. """ protein = "".join(protein.split()).upper() lengths = lengths or _DEFAULT_LENGTHS[cls] nA = len(self._allele_set(self._panel[cls], alleles)) hits = [] # windows that returned candidate alleles, before multiple-testing control for L in lengths: for i in range(len(protein) - L + 1): pep = protein[i:i + L] if not all(c in _AA for c in pep): continue rs = self.restriction(pep, cls, alleles, top=nA, alpha=alpha) if rs: hits.append((i, pep, rs)) if correction is None: return [(i, pep, [r for r in rs if r.binder][:top]) for i, pep, rs in hits if any(r.binder for r in rs)] pvals = [10 ** (-r.enrichment) for _, _, rs in hits for r in rs if r.n_votes > 0] if correction == "bonferroni": cutoff = alpha / len(pvals) if pvals else 0.0 elif correction == "bh": cutoff = _bh_cutoff(pvals, alpha) else: raise ValueError(f"unknown correction {correction!r} (None|'bonferroni'|'bh')") out = [] for i, pep, rs in hits: keep = sorted((r for r in rs if r.n_votes > 0 and 10 ** (-r.enrichment) <= cutoff), key=lambda r: r.enrichment, reverse=True) if keep: out.append((i, pep, keep[:top])) return out
# -- anchor / TCR-facing split -------------------------------------------
[docs] def decompose(self, peptide, cls=None, allele=None): """Split ``peptide`` into anchor and TCR-facing parts, each masked with ``X``. ``tcr_facing``: anchors -> X (the recognition readout). ``presentation``: TCR-facing -> X (the anchor readout). ``allele`` is accepted for forward-compat (allele-specific learned anchors, Phase 1); v0 uses class-default anchor positions. """ peptide = peptide.strip().upper() cls = cls or infer_class(peptide) anchors = set(anchor_indices(peptide, cls)) tcr = "".join(layout.MASK if i in anchors else c for i, c in enumerate(peptide)) present = "".join(c if i in anchors else layout.MASK for i, c in enumerate(peptide)) return Decomposition(peptide, tcr, present, tuple(sorted(anchors)))
# -- diffusion-powered forward scorer -------------------------------------
[docs] def anchor_model(self, cls="mhc1", h=2.0, prior_strength=10.0, anchors=None, learn_weights=True, prune_dpi=False, weights="learned"): """Anchor-factored presentation model with cross-allele kernel-shrinkage diffusion. See :class:`mhcmatch.diffusion.AnchorModel`. The diffusion rescues rare alleles by borrowing anchor preferences from groove-similar frequent ones, with a bounded prior strength so a large neighbour cannot swamp a rare allele's own peptides. """ from .diffusion import AnchorModel return AnchorModel(self, cls=cls, anchors=anchors, h=h, prior_strength=prior_strength, learn_weights=learn_weights, prune_dpi=prune_dpi, weights=weights)
# -- per-allele anchor preferences (feeds pseudoseq diffusion) ------------
[docs] def anchor_preferences(self, cls, anchor): """{allele: Counter(residue)} at a 1-based ``anchor`` position (negative from C-term).""" panel = self._panel[cls] prefs = defaultdict(Counter) for ep, a, w in zip(panel.epitopes, panel.alleles, panel.weights): idx = resolve_anchor_index(ep, cls, anchor) if idx is not None: prefs[a][ep[idx]] += w return prefs