SampleRepertoire — Overview#

Loads multi-locus paired immune repertoires from the isalgo/airr_benchmark SRA tarball and visualises the per-chain composition of each sample as two stacked bar charts:

  • Top — total read count (duplicate_count) per locus per sample

  • Bottom — unique clonotype count per locus per sample

Data source: airr_benchmark/sra/samples.tar.gz Metadata: airr_benchmark/sra/meta.tsv (columns: PMID, Run, BioProject, Sample)

[1]:
import importlib.metadata as _meta
import sys as _sys
print(f"Python {_sys.version.split()[0]}")
for _pkg in ["mirpy-lib", "numpy", "pandas", "matplotlib", "scipy", "polars"]:
    try:
        print(f"  {_pkg}: {_meta.version(_pkg)}")
    except _meta.PackageNotFoundError:
        pass

import sys
from pathlib import Path

repo_root = Path.cwd().resolve().parent if Path.cwd().name == "notebooks" else Path.cwd().resolve()
if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))
Python 3.12.12
  mirpy-lib: 1.1.0
  numpy: 1.26.4
  pandas: 3.0.3
  matplotlib: 3.10.9
  scipy: 1.17.1
  polars: 1.40.1
[2]:
import io
import tarfile
import warnings
from pathlib import Path

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd

from mir.common.clonotype import Clonotype
from mir.common.repertoire import SampleRepertoire, infer_locus
from mir.utils.notebook_assets import ensure_airr_benchmark, find_airr_benchmark_sra_meta
/Users/mikesh/vcs/mirpy/.venv/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

Configuration#

[3]:
benchmark_root = ensure_airr_benchmark(repo_root, allow_patterns=["sra/**"])
TARBALL, META_CSV = find_airr_benchmark_sra_meta(benchmark_root)

# Loci shown in the plot (ordered and coloured consistently)
LOCI_ORDER = ["TRA", "TRB", "TRG", "TRD", "IGH", "IGK", "IGL"]
LOCI_COLORS = {
    "TRA": "#4e79a7",
    "TRB": "#f28e2b",
    "TRG": "#59a14f",
    "TRD": "#76b7b2",
    "IGH": "#e15759",
    "IGK": "#b07aa1",
    "IGL": "#ff9da7",
}

# Max samples shown in stacked bar charts (pie charts always use the full dataset)
N_DISPLAY = 30

Load SampleRepertoires#

Each TSV in the tarball follows the AIRR Rearrangement Schema with v_call / j_call columns.
Locus is inferred from the gene prefix (TRBV…TRB, TRAV…TRA, etc.).
[4]:
_CALL_RENAMES = {"v_call": "v_gene", "j_call": "j_gene", "c_call": "c_gene"}
_PREFIX_TO_LOCUS: dict[str, str] = {
    "TRA": "TRA", "TRB": "TRB", "TRG": "TRG", "TRD": "TRD",
    "IGH": "IGH", "IGK": "IGK", "IGL": "IGL",
}

# Single sequential pass through the tarball — avoids repeated seeks in .tar.gz.
_run_frames: dict[str, pd.DataFrame] = {}
with tarfile.open(TARBALL, mode="r:gz") as tar:
    for member in tar.getmembers():
        if not member.name.endswith(".tsv") or member.size == 0:
            continue
        run_id = member.name.replace("./", "").replace(".tsv", "")
        fh = tar.extractfile(member)
        if fh is None:
            continue
        df = pd.read_csv(fh, sep="\t").rename(columns=_CALL_RENAMES)
        # Vectorised locus inference from the V-gene prefix (no iterrows).
        df["locus"] = (
            df["v_gene"].fillna("").str[:3].str.upper()
            .map(_PREFIX_TO_LOCUS).fillna("")
        )
        _run_frames[run_id] = df

meta_df = pd.read_csv(META_CSV, sep="\t")
meta_df = meta_df[meta_df["Run"].isin(_run_frames)]
sample_map = meta_df.groupby("Sample")["Run"].apply(list).to_dict()

sample_repertoires: list[SampleRepertoire] = []
for sample_id, run_ids in sample_map.items():
    frames = [_run_frames[r] for r in run_ids if r in _run_frames]
    if not frames:
        continue
    merged = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0]
    sr = SampleRepertoire.from_pandas(merged, locus_column="locus", sample_id=sample_id)
    sample_repertoires.append(sr)

# Sort richest samples first so bar-chart selection is stable.
sample_repertoires.sort(key=lambda sr: sr.clonotype_count, reverse=True)

print(f"Loaded {len(sample_repertoires)} samples from {TARBALL}")
print(f"Total clonotypes : {sum(sr.clonotype_count for sr in sample_repertoires):,}")
print(f"Total reads      : {sum(sr.duplicate_count  for sr in sample_repertoires):,}")
Loaded 1764 samples from /Users/mikesh/vcs/mirpy/notebooks/assets/large/airr_benchmark/sra/samples.tar.gz
Total clonotypes : 874,440
Total reads      : 1,339,480
[5]:
# sample_repertoires already loaded in the previous cell.
samples = sample_repertoires
print(f"{len(samples)} samples ready for analysis")
1764 samples ready for analysis

Build summary table#

One row per (sample_id, locus) with duplicate_count and clonotype_count.

[6]:
def build_summary(samples: list[SampleRepertoire]) -> pd.DataFrame:
    """Return a long-form DataFrame with per-locus counts for every sample."""
    rows = []
    for sr in samples:
        for locus, lr in sr.loci.items():
            rows.append({
                "sample_id":       sr.sample_id,
                "locus":           locus,
                "duplicate_count": lr.duplicate_count,
                "clonotype_count": lr.clonotype_count,
            })
    return pd.DataFrame(rows)


summary = build_summary(samples)
summary.head()
[6]:
sample_id locus duplicate_count clonotype_count
0 SRX14820701 TRD 19 13
1 SRX14820701 TRA 216 188
2 SRX14820701 IGH 984 746
3 SRX14820701 TRB 305 276
4 SRX14820701 TRG 47 37

Locus coverage#

Which loci are absent from at least one sample?

[7]:
coverage = (
    summary
    .pivot_table(index="sample_id", columns="locus", values="clonotype_count", fill_value=0)
    .reindex(columns=[l for l in LOCI_ORDER if l in summary["locus"].unique()], fill_value=0)
)

missing_per_sample = (coverage == 0).sum(axis=1)
samples_with_gaps  = missing_per_sample[missing_per_sample > 0]

print(f"{len(samples_with_gaps)}/{len(coverage)} samples have at least one missing locus\n")
print("Missing locus frequency across samples:")
for locus in coverage.columns:
    n_miss = int((coverage[locus] == 0).sum())
    pct    = 100 * n_miss / len(coverage)
    print(f"  {locus}: absent in {n_miss:4d}/{len(coverage)} samples ({pct:.1f}%)")
974/1763 samples have at least one missing locus

Missing locus frequency across samples:
  TRA: absent in  271/1763 samples (15.4%)
  TRB: absent in  274/1763 samples (15.5%)
  TRG: absent in  324/1763 samples (18.4%)
  TRD: absent in  868/1763 samples (49.2%)
  IGH: absent in  366/1763 samples (20.8%)
  IGK: absent in  176/1763 samples (10.0%)
  IGL: absent in  190/1763 samples (10.8%)

Stacked bar charts#

[8]:
def _pivot_sorted(summary: pd.DataFrame, value_col: str) -> pd.DataFrame:
    """Pivot *summary* and sort rows by total value descending."""
    present_loci = [l for l in LOCI_ORDER if l in summary["locus"].unique()]
    wide = (
        summary
        .pivot_table(index="sample_id", columns="locus", values=value_col, fill_value=0)
        .reindex(columns=present_loci, fill_value=0)
    )
    return wide.loc[wide.sum(axis=1).sort_values(ascending=False).index]


def plot_stacked_bars(
    summary: pd.DataFrame,
    *,
    n_display: int = N_DISPLAY,
    figsize: tuple[int, int] = (16, 9),
) -> None:
    """Stacked bar charts of read depth and clonotype richness for the top samples.

    Selects the *n_display* samples with the highest total read count,
    sorted richest-first, and labels each bar with its sample ID.

    Parameters
    ----------
    summary:
        Long-form DataFrame with columns ``sample_id``, ``locus``,
        ``duplicate_count``, ``clonotype_count``.
    n_display:
        Number of richest samples to show.
    figsize:
        Overall figure size ``(width, height)`` in inches.
    """
    top_ids = (
        summary.groupby("sample_id")["duplicate_count"].sum()
        .nlargest(n_display).index
    )
    sub = summary[summary["sample_id"].isin(top_ids)]

    reads      = _pivot_sorted(sub, "duplicate_count")
    clonotypes = _pivot_sorted(sub, "clonotype_count")
    colors     = [LOCI_COLORS[l] for l in reads.columns]
    n          = len(reads)

    fig, (ax_reads, ax_clones) = plt.subplots(2, 1, figsize=figsize, sharex=True)

    reads.plot(kind="bar", stacked=True, ax=ax_reads,
               color=colors, legend=False, width=0.8)
    ax_reads.set_ylabel("Total reads (duplicate_count)")
    ax_reads.set_title(f"Per-chain read depth — top {n} samples by read count")
    ax_reads.yaxis.set_major_formatter(ticker.FuncFormatter(lambda x, _: f"{x:,.0f}"))
    ax_reads.tick_params(axis="x", labelbottom=False)

    clonotypes.plot(kind="bar", stacked=True, ax=ax_clones,
                    color=colors, legend=True, width=0.8)
    ax_clones.set_ylabel("Unique clonotypes")
    ax_clones.set_title(f"Per-chain clonotype richness — top {n} samples")
    ax_clones.set_xlabel("")
    ax_clones.tick_params(axis="x", rotation=45, labelsize=7)
    ax_clones.legend(title="Locus", bbox_to_anchor=(1.01, 1), loc="upper left", frameon=False)

    fig.tight_layout()
    plt.show()


plot_stacked_bars(summary)
../_images/notebooks_sample_repertoire_overview_13_0.png

Locus coverage pie charts#

For each chain, samples are classified as missing (locus absent), < 100 reads (low coverage), or ≥ 100 reads (adequate). All samples are included.

[9]:
def plot_locus_coverage_pies(
    summary: pd.DataFrame,
    n_samples: int,
    *,
    low_reads_threshold: int = 100,
    figsize: tuple[int, int] | None = None,
) -> None:
    """Pie charts showing missing / low-read / adequate coverage per locus.

    For every locus in LOCI_ORDER that appears in the data, samples are
    split into three categories:

    * **Missing** — locus absent from the sample entirely.
    * **< threshold** — locus detected but total reads below *low_reads_threshold*.
    * **≥ threshold** — adequately sequenced.

    Parameters
    ----------
    summary:
        Long-form per-(sample, locus) DataFrame from :func:`build_summary`.
    n_samples:
        Total number of samples (denominator for the missing category).
    low_reads_threshold:
        Read count below which a present locus is considered low-coverage.
    figsize:
        Figure size; auto-computed when ``None``.
    """
    present_loci = [l for l in LOCI_ORDER if l in summary["locus"].unique()]
    n_cols = 4
    n_rows = (len(present_loci) + n_cols - 1) // n_cols
    if figsize is None:
        figsize = (4 * n_cols, 3.5 * n_rows)

    fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
    axes_flat = axes.flatten() if hasattr(axes, "flatten") else [axes]

    # Missing=red, low=orange, adequate=green
    PIE_COLORS = ["#d62728", "#ff7f0e", "#2ca02c"]

    for i, locus in enumerate(present_loci):
        ax = axes_flat[i]
        locus_reads = summary.loc[summary["locus"] == locus, "duplicate_count"]
        n_missing  = n_samples - len(locus_reads)          # locus absent from sample
        n_low      = int(((locus_reads > 0) & (locus_reads < low_reads_threshold)).sum())
        n_adequate = int((locus_reads >= low_reads_threshold).sum())
        # Fold zero-read detections into the missing bucket.
        n_missing += int((locus_reads == 0).sum())

        categories = [
            (f"Missing\n({n_missing})",                           n_missing,  PIE_COLORS[0]),
            (f"< {low_reads_threshold} reads\n({n_low})",         n_low,      PIE_COLORS[1]),
            (f"≥ {low_reads_threshold} reads\n({n_adequate})",    n_adequate, PIE_COLORS[2]),
        ]
        labels = [c[0] for c in categories if c[1] > 0]
        sizes  = [c[1] for c in categories if c[1] > 0]
        colors = [c[2] for c in categories if c[1] > 0]

        ax.pie(sizes, labels=labels, colors=colors,
               autopct="%1.0f%%", startangle=90,
               textprops={"fontsize": 8})
        ax.set_title(locus, fontweight="bold")

    for j in range(len(present_loci), len(axes_flat)):
        axes_flat[j].set_visible(False)

    fig.suptitle(
        f"Locus coverage across all {n_samples} samples "
        f"(threshold = {low_reads_threshold} reads)",
        fontsize=12,
    )
    fig.tight_layout()
    plt.show()


plot_locus_coverage_pies(summary, n_samples=len(samples), low_reads_threshold=100)
../_images/notebooks_sample_repertoire_overview_15_0.png