Source code for pdb_prot_align.utils

"""
=====
utils
=====

Utilities for aligning proteins to PDB.

"""


import collections
import re
import subprocess
import tempfile
import textwrap  # noqa: F401
import warnings

import Bio.AlignIO
import Bio.Data.IUPACData
import Bio.PDB
import Bio.SeqIO
import Bio.SeqUtils

import natsort

import numpy

import pandas as pd


[docs]def alignment_to_count_df(fasta_alignment, *, ignore_gaps=False, as_freqs=False, add_entropy_neff=False, ): """Convert FASTA alignment to counts data frame. Parameters ---------- fasta_alignment : str FASTA file with alignment. ignore_gaps : bool Ignore gap characters ('-') in counts. as_freqs : bool Return frequencies rather than counts. add_entropy_neff : bool Add columns with site entropy (bits) and number effective amino acids. Requires `as_freqs` to be `True`. Returns ------- pandas.DataFrame First column ('isite') is 1, 2, ... site numbering. Other columns give counts of each observed alignment character at that site. Example -------- >>> with tempfile.NamedTemporaryFile('w+') as f: ... _ = f.write(textwrap.dedent( ... ''' ... >seq1 ... GK-AC-L ... >seq2 ... -K-AM-L ... >seq3 ... G-NAC-L ... >seq4 ... GK-ACSI ... ''' ... )) ... f.flush() ... counts = alignment_to_count_df(f.name) ... counts_nogaps = alignment_to_count_df(f.name, ignore_gaps=True) ... freqs_nogaps = alignment_to_count_df(f.name, ... ignore_gaps=True, ... as_freqs=True) ... freqs_with_ent = alignment_to_count_df(f.name, ... ignore_gaps=True, ... as_freqs=True, ... add_entropy_neff=True, ... ) >>> counts isite - A C G I K L M N S 0 1 1 0 0 3 0 0 0 0 0 0 1 2 1 0 0 0 0 3 0 0 0 0 2 3 3 0 0 0 0 0 0 0 1 0 3 4 0 4 0 0 0 0 0 0 0 0 4 5 0 0 3 0 0 0 0 1 0 0 5 6 3 0 0 0 0 0 0 0 0 1 6 7 0 0 0 0 1 0 3 0 0 0 >>> counts_nogaps isite A C G I K L M N S 0 1 0 0 3 0 0 0 0 0 0 1 2 0 0 0 0 3 0 0 0 0 2 3 0 0 0 0 0 0 0 1 0 3 4 4 0 0 0 0 0 0 0 0 4 5 0 3 0 0 0 0 1 0 0 5 6 0 0 0 0 0 0 0 0 1 6 7 0 0 0 1 0 3 0 0 0 >>> freqs_nogaps.round(2) isite A C G I K L M N S 0 1 0.0 0.00 1.0 0.00 0.0 0.00 0.00 0.0 0.0 1 2 0.0 0.00 0.0 0.00 1.0 0.00 0.00 0.0 0.0 2 3 0.0 0.00 0.0 0.00 0.0 0.00 0.00 1.0 0.0 3 4 1.0 0.00 0.0 0.00 0.0 0.00 0.00 0.0 0.0 4 5 0.0 0.75 0.0 0.00 0.0 0.00 0.25 0.0 0.0 5 6 0.0 0.00 0.0 0.00 0.0 0.00 0.00 0.0 1.0 6 7 0.0 0.00 0.0 0.25 0.0 0.75 0.00 0.0 0.0 >>> (freqs_with_ent.drop(columns=['entropy', 'n_effective']) == ... freqs_nogaps).all(None) True >>> freqs_with_ent[['entropy', 'n_effective']].round(3) entropy n_effective 0 0.000 1.000 1 0.000 1.000 2 0.000 1.000 3 0.000 1.000 4 0.811 1.755 5 0.000 1.000 6 0.811 1.755 """ seqs = [str(s.seq) for s in Bio.AlignIO.read(fasta_alignment, 'fasta')] counts = {} for i, i_chars in enumerate(zip(*seqs)): counts[i + 1] = collections.Counter(i_chars) # nested dict to data frame: https://stackoverflow.com/a/54300940 df = (pd.concat({site: pd.DataFrame(site_counts, index=[0]) for site, site_counts in counts.items()}, axis=0, ignore_index=True, sort=True) .fillna(0) .astype(int) .assign(isite=lambda x: x.index + 1) ) char_cols = [char for char in df.columns[: -1] if char != '-' or not ignore_gaps] df = df[['isite'] + char_cols] if as_freqs: df = (df .set_index('isite') .div(df.set_index('isite').sum(axis=1), axis=0) .reset_index() ) row_sums = df[char_cols].sum(axis=1) if not numpy.allclose(1, row_sums): raise RuntimeError('rows do not sum to 1:\n' + str(sorted(row_sums.tolist()))) if add_entropy_neff: if not as_freqs: raise ValueError('cannot set `add_entropy_neff` without `as_freq`') df = df.assign( entropy=lambda x: (x[char_cols] .apply(lambda r: -(r[r > 0] * numpy.log(r[r > 0]) / numpy.log(2) ).sum(), axis=1) ), n_effective=lambda x: 2**x['entropy'] ) if df['entropy'].any() < 0: raise RuntimeError('negative entropy') else: # get rid of annoying -0 by changing to 0 df['entropy'] = numpy.abs(df['entropy']) return df
[docs]def align_prots_mafft(prots, *, mafft='mafft'): """Align protein sequences with ``mafft``. Parameters ---------- prots : list List of sequences as `Biopython.SeqRecord.SeqRecord` objects. mafft : str Command that resolves to ``mafft`` executable, potentially with arbitrary additional options, such as ``mafft --reorder``. Returns ------- Bio.Align.MultipleSeqAlignment The protein alignment. """ for p in prots: if not re.fullmatch( f"[{Bio.Data.IUPACData.extended_protein_letters}]+", str(p.seq)): raise ValueError(f"invalid amino acids for {p.description}\n" + str(p.seq)) mafft_cmds = mafft.split() mafft = mafft_cmds[0] mafft_args = [arg for arg in mafft_cmds[1:] if arg != '--amino'] try: _ = subprocess.run([mafft, '--version'], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except FileNotFoundError: raise ValueError(f"cannot call `mafft` executable with: {mafft}") with tempfile.NamedTemporaryFile('w') as prots_in: Bio.SeqIO.write(prots, prots_in, 'fasta') prots_in.flush() res = subprocess.run([mafft, *mafft_args, '--amino', prots_in.name], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True, ) alignment_text = res.stdout with tempfile.TemporaryFile('w+') as f: f.write(alignment_text) f.flush() f.seek(0) aln = Bio.AlignIO.read(f, 'fasta') return aln
[docs]def aligned_site_map(aln, heads): """Get mapping between sequential site numbers of aligned sequences. Parameters ---------- aln : Bio.Align.MultipleSeqAlignment Alignment of sequences heads : list Headers of sequences of interest in `aln`, must be at least two. Returns ------- pandas.DataFrame Columns are sequence names in `heads`. There is a row for alignment positions that are non-gapped in at least one sequence in `heads`, and the entry in the row gives the number of that site in 1, 2, ... numbering of the sequence, or `NaN` if there is no residue for sequence at that position in alignment. Example ------- Create and parse an alignment: >>> with tempfile.TemporaryFile('w+') as f: ... _ = f.write(textwrap.dedent( ... ''' ... >seq1 ... GK-AC-L ... >seq2 ... -K-AM-L ... >seq3 ... G-NAC-L ... >seq4 ... GK-ACSI ... ''' ... )) ... f.flush() ... _ = f.seek(0) ... aln = Bio.AlignIO.read(f, 'fasta') Show the alignment site map. The gymnastics with converting to str and replacing 'nan' are to make output format the same in both `pandas` >= and < 1.0 and earlier versions so doctest passes in both: >>> (aligned_site_map(aln, ['seq1', 'seq2', 'seq3']) ... .astype(str).replace('nan', '<NA>')) seq1 seq2 seq3 0 1 <NA> 1 1 2 1 <NA> 2 <NA> <NA> 2 3 3 2 3 4 4 3 4 5 5 4 5 """ heads_set = set(heads) if len(heads) != len(heads_set): raise ValueError(f"non-unique entries in `heads`:\n{heads}") if len(heads) < 2: raise ValueError('`heads` does not have at least 2 entries') aln_heads = [s.description for s in aln] aln_heads_set = set(aln_heads) if len(aln_heads) != len(aln_heads_set): raise ValueError('alignment has non-unique headers') missing_heads = heads_set - aln_heads_set if missing_heads: raise ValueError('following headers in `heads` but not alignment:\n' + '\n'.join(missing_heads)) select_aln = [] for s in aln: if s.description in heads_set: select_aln.append((s.description, str(s.seq))) assert len(select_aln) == len(heads) aln_length = len(select_aln[0][1]) if any(aln_length != len(s) for _, s in select_aln): raise ValueError('aligned sequences not all of same length') site_lists = {head: [] for head in heads} indices = {head: 0 for head in heads} for i in range(aln_length): for head, s in select_aln: if s[i] == '-': site_lists[head].append(float('nan')) else: indices[head] += 1 site_lists[head].append(indices[head]) df = (pd.DataFrame(site_lists, dtype='Int64') [heads] .dropna(axis=0, how='all') .reset_index(drop=True) ) return df
[docs]def pdb_seq_to_number(pdbfile, chain_ids, chain_identity='require_same'): """Get sequence and number of chain(s) for some protein in PDB file. Parameters ---------- pdbfile : str Path to existing PDB file. chain_ids : list List of chains in PDB file. All these chains must correspond to the same underlying molecule (i.e., be monomers in homo-oligomer). chain_identity : {'union', 'intersection', 'require_same'} How to parse chains. They are required to share the same wildtype at all sites they have in common. If all sites are not shared between all chains, take the union, the intersection, or raise an error if they are not exactly the same. Returns ------- pandas.DataFrame Columns are: - 'sequential' : sequential 1, 2, ... numbering of residues - 'pdb_site' : PDB file residue number - 'wildtype' : wildtype residue identity (1-letter code) """ with warnings.catch_warnings(): warnings.simplefilter( 'ignore', category=Bio.PDB.PDBExceptions.PDBConstructionWarning) structure = Bio.PDB.PDBParser().get_structure('_', pdbfile) if len(structure) != 1: raise ValueError(f"{pdbfile} has multiple models") else: model = list(structure)[0] aa_3to1 = {three.upper(): one for three, one in Bio.SeqUtils.IUPACData.protein_letters_3to1.items()} df = pd.DataFrame({}, columns=['pdb_site', 'wildtype', 'chain']) for chain_id in chain_ids: chain = model[chain_id] pdb_sites = [] wildtypes = [] for res in chain.child_list: heteroflag, resnum, insertcode = res.get_id() if not heteroflag.strip(): # ignore hetero-residues pdb_sites.append(f"{resnum}{insertcode.strip()}") wildtypes.append(aa_3to1[res.resname]) if pdb_sites != natsort.natsorted(pdb_sites): raise ValueError(f"residues in {pdbfile} chain {chain_id} not " 'naturally sorted') df = df.append(pd.DataFrame({'pdb_site': pdb_sites, 'wildtype': wildtypes, 'chain': chain_id})) # check all chains have same wildtype at each position mismatched_wt = ( df .groupby('pdb_site', sort=False) .aggregate({'wildtype': 'nunique'}) .reset_index() .query('wildtype != 1') ['pdb_site'] .tolist() ) if mismatched_wt: raise ValueError(f"{pdbfile} chains {', '.join(chain_ids)} differ in " f"wildtype at sites:\n{', '.join(mismatched_wt)}") if chain_identity == 'require_same': not_exact = ( df .groupby(['pdb_site', 'wildtype'], sort=False) .aggregate({'chain': 'count'}) .reset_index() .query(f"chain != {len(chain_ids)}") ['pdb_site'] .tolist() ) if not_exact: raise ValueError(f"`chain_identity` is {chain_identity}, but " f"{pdbfile} chains {', '.join(chain_ids)} " f"don't all have sites:\n{', '.join(not_exact)}") else: df = df[['pdb_site', 'wildtype']].drop_duplicates() elif chain_identity == 'union': df = (df [['pdb_site', 'wildtype']] .drop_duplicates() ) elif chain_identity == 'intersection': df = (df .groupby(['pdb_site', 'wildtype'], sort=False) .aggregate({'chain': 'count'}) .reset_index() .query(f"chain == {len(chain_ids)}") ) else: raise ValueError(f"invalid `chain_identity` {chain_identity}") # natural sort on site as here: https://stackoverflow.com/a/29582718 df = df.set_index('pdb_site') df = (df .reindex(index=natsort.natsorted(df.index)) .reset_index() .assign(sequential=lambda x: x.reset_index().index + 1) [['sequential', 'pdb_site', 'wildtype']] ) return df
[docs]def strip_gaps_to_ref(aln, ref_head): """Strip gaps in alignment relative to reference. Parameters ---------- aln : Bio.Align.MultipleSeqAlignment The alignment of sequences. ref_head : str Header of reference sequence in `aln`. Returns ------- list Alignment as list of 2-tuples of strings (`header, sequence`) with all gaps relative to reference removed. Example ------- >>> with tempfile.TemporaryFile('w+') as f: ... _ = f.write(textwrap.dedent( ... ''' ... >seq1 ... GK-AC-L ... >seq2 ... -K-AM-L ... >seq3 ... G-NAC-L ... >seq4 ... GK-ACSI ... ''' ... )) ... f.flush() ... _ = f.seek(0) ... aln = Bio.AlignIO.read(f, 'fasta') >>> strip_gaps_to_ref(aln, 'seq1') # doctest: +NORMALIZE_WHITESPACE [('seq1', 'GKACL'), ('seq2', '-KAML'), ('seq3', 'G-ACL'), ('seq4', 'GKACI')] """ aln = [(s.description, str(s.seq)) for s in aln] refseq = [s for head, s in aln if head == ref_head] if not refseq: raise ValueError(f"alignment lacks `ref_head`:\n{ref_head}") elif len(refseq) > 1: raise ValueError(f"alignment has {len(refseq)} sequences with " f"`ref_head`:\n{ref_head}") else: refseq = refseq[0] ungapped_indices = [i for i, aa in enumerate(refseq) if aa != '-'] if not ungapped_indices: raise ValueError('no ungapped sites in reference') if any(len(refseq) != len(s) for _, s in aln): raise ValueError('sequences in `aln` not all same length') ungapped_aln = [] for head, s in aln: ungapped_aln.append((head, ''.join(s[i] for i in ungapped_indices))) return ungapped_aln
if __name__ == '__main__': import doctest doctest.testmod()