Source code for pdb_prot_align.main

"""
=======
main
=======

Main PDB / protein alignment function.

You can run all of the command-line functionality of ``pdb_prot_align`` via
the :func:`run` function.

"""


import argparse
import os
import re
import sys
import tempfile
import textwrap

import Bio.AlignIO
import Bio.Seq
import Bio.SeqIO

import pandas as pd

import pdb_prot_align
import pdb_prot_align.utils


def _run_main():
    parser = get_parser()
    kwargs = vars(parser.parse_args(sys.argv[1:]))
    run(**kwargs)


def _str2bool(v):
    """Convert string to boolean value."""
    if isinstance(v, bool):
        return v
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
        return False
    else:
        raise argparse.ArgumentTypeError('Boolean value expected.')


# arguments for command-line parser, specified this way so can also
# be used in docs for `run`
_ARGS = {
         'protsfile': {
              'required': True,
              'help': 'input FASTA file of protein sequences',
              'type': str,
              },
         'refprot_regex': {
              'required': True,
              'help': 'regex for reference protein header in `protsfile`',
              'type': str,
              },
         'pdbfile': {
              'required': True,
              'help': 'input PDB file',
              'type': str,
              },
         'chain_ids': {
              'required': True,
              'help': 'chains in PDB file to align; all chains aligning to a '
                      'site must share the same residue number and amino-acid '
                      'or an error will be raised',
              'type': str,
              'nargs': '+',
              },
         'outprefix': {
              'required': True,
              'help': 'prefix for output files (can be / include directory): '
                      '"alignment.fa" (alignment with gaps relative to '
                      'reference stripped); "alignment_unstripped.fa" ('
                      'non-stripped alignment with PDB chains still included);'
                      ' "sites.csv" (sequential sites in reference, PDB sites'
                      ', PDB chains, wildtype in reference, wildtype in PDB, '
                      'site entropy in bits, n effective amino acids at site, '
                      'amino acid, frequency of amino acid)',
              'type': str,
              },
         'ignore_gaps': {
              'required': False,
              'default': True,
              'help': 'ignore gaps (-) when calculating frequencies, number '
                      'effective amino acids, entropy',
              'type': _str2bool,
              },
         'drop_pdb': {
              'required': False,
              'default': True,
              'help': 'drop PDB protein chains from "alignment.fa" and '
                      'computation of stats in "sites.csv" output files',
              'type': _str2bool,
              },
         'drop_refprot': {
              'required': False,
              'default': False,
              'help': 'drop reference protein from "alignment.fa" and '
                      'computation of stats in "sites.csv" output files',
              'type': _str2bool,
              },
         'mafft': {
             'required': False,
             'default': 'mafft',
             'help': 'path to `mafft`, potentially with additional args '
                     'such as "mafft --reorder" (if multiple args, it all '
                     'needs to be in quotes)',
             'type': str,
             },
         }


[docs]def get_parser(): """Command-line `argparse.ArgumentParser` for ``pdb_prot_align``.""" parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Align proteins to reference and PDB.', ) parser.add_argument('-v', '--version', action='version', version='%(prog)s {0}'.format( pdb_prot_align.__version__)) for param, param_d in _ARGS.items(): parser.add_argument(f"--{param}", **param_d) return parser
[docs]def run(protsfile, refprot_regex, pdbfile, chain_ids, outprefix, ignore_gaps=True, drop_pdb=True, drop_refprot=True, mafft='mafft', ): """Run main function to align proteins to reference and PDB chain(s). Note ---- This function implements the full command-line functionality of ``pdb_prot_align`` via a Python function. Parameters ---------- {_PARAM_DOCS} """ # get output file names and make output directory if needed os.makedirs(os.path.dirname(outprefix), exist_ok=True) if not os.path.isdir(outprefix): outprefix = f"{outprefix}_" alignment = f"{outprefix}alignment.fa" alignment_unstripped = f"{outprefix}unstripped_alignment.fa" csv = f"{outprefix}sites.csv" print(f"\nRunning `pdb_prot_align` {pdb_prot_align.__version__}\n") if not os.path.isfile(pdbfile): raise IOError(f"no `pdbfile` of {pdbfile}") # parse PDB chains into dataframe mapping sequential to PDB residues print(f"Parsing PDB {pdbfile} chains {' '.join(chain_ids)}") pdb_dfs = {} # dataframe mapping sequential to PDB residue for each chain pdb_prots = {} # SeqRecord for each chain pdb_name = os.path.splitext(os.path.basename(pdbfile))[0] if len(chain_ids) != len(set(chain_ids)): raise ValueError(f"`chain_ids` not unique: {chain_ids}") for chain_id in chain_ids: pdb_dfs[chain_id] = pdb_prot_align.utils.pdb_seq_to_number(pdbfile, chain_id) with tempfile.NamedTemporaryFile('w+') as f: f.write(f">PDB_{pdb_name}_{chain_id}\n" + ''.join(pdb_dfs[chain_id]['wildtype'])) f.flush() f.seek(0) pdb_prots[chain_id] = Bio.SeqIO.read(f, 'fasta') print(f"For chain {chain_id}, parsed {len(pdb_dfs[chain_id])} residues" f", ranging from {pdb_dfs[chain_id]['pdb_site'].iloc[0]} to " f"{pdb_dfs[chain_id]['pdb_site'].iloc[-1]} in PDB numbering.") assert all(len(pdb_dfs[chain_id]) == len(pdb_prots[chain_id]) for chain_id in chain_ids) # read in all the proteins to align if not os.path.isfile(protsfile): raise IOError(f"no `protsfile` file {protsfile}") prots = list(Bio.SeqIO.parse(protsfile, 'fasta')) print(f"\nRead {len(prots)} sequences from {protsfile}") # ensure no proteins have same header we used for PDB pdb_prot_headers = [pdb_prot.description.strip() for pdb_prot in pdb_prots.values()] if any(p.description.strip() in pdb_prot_headers for p in prots): raise ValueError(f"sequences in {protsfile} cannot have header in " f"{pdb_prot_headers}") # get reference protein from set of proteins refprot = [s for s in prots if re.search(refprot_regex, s.description)] if not refprot: raise ValueError(f"no headers in {protsfile} match `refprot_regex` " f"{refprot_regex}") elif len(refprot) > 1: raise ValueError(f"multiple headers in {protsfile} match " f"`refprot_regex` {refprot_regex}:\n" + '\n'.join(s.description for s in refprot)) else: refprot = refprot[0] print(f"Reference protein is of length {len(refprot)} and has " f"the following header:\n{refprot.description}\n") # make alignment of all proteins and the PDB sequence print(f"Using `mafft` to align sequences to {alignment_unstripped}") aln = pdb_prot_align.utils.align_prots_mafft( prots=prots + list(pdb_prots.values()), mafft=mafft) assert len(aln) == len(prots) + len(pdb_prots) Bio.AlignIO.write(aln, alignment_unstripped, 'fasta') # get mapping of reference protein and PDB sequential sites ref_pdb_site_map = pdb_prot_align.utils.aligned_site_map( aln, [refprot.description, *pdb_prot_headers] ) assert all(len(p) == len(ref_pdb_site_map[p.description].dropna()) for p in [refprot, *pdb_prots.values()]) # strip gaps relative to reference sequence print(f"Stripping gaps relative to reference {refprot.description}") aln = pdb_prot_align.utils.strip_gaps_to_ref(aln, refprot.description) assert all(len(refprot) == len(s) for _, s in aln) # remove PDB sequences? if drop_pdb: print('Dropping PDB chains from alignment') n = len(aln) aln = [(head, s) for head, s in aln if head not in pdb_prot_headers] assert len(aln) == len(prots) == n - len(pdb_prot_headers) # remove reference protein? if drop_refprot: print('Dropping reference protein from alignment') n = len(aln) aln = [(head, s) for head, s in aln if head != refprot.description] assert len(aln) == n - 1 if any(all(s[1][r] == '-' for s in aln) for r in range(len(refprot))): if ignore_gaps: raise ValueError('Some sites all gaps after dropping refprot. ' 'To use `drop_refprot=True` for this ' 'alignment, also use `ignore_gaps=False`') # write alignment print(f"Writing gap-stripped alignment to {alignment}\n") with open(alignment, 'w') as f: f.write('\n'.join(f">{head}\n{s}" for head, s in aln)) # remove sites gapped in reference from `ref_pdb_site_map` ref_pdb_site_map = (ref_pdb_site_map .dropna(axis=0, subset=[refprot.description]) ) # map (pdb_chain, pdb_isite) to pdb_site or pdb_wildtype to_pdb = {} for valstr in ['pdb_site', 'pdb_wildtype']: to_pdb[valstr] = {('NaN', 'NaN'): 'NaN'} for chain_id, pdb_df in pdb_dfs.items(): for isite, val in (pdb_df .rename(columns={'wildtype': 'pdb_wildtype'}) .set_index('sequential') [valstr] .to_dict() .items() ): to_pdb[valstr][chain_id, isite] = val # now create data frame with PDB sites / wildtypes df = ( ref_pdb_site_map .rename(columns={old: new for old, new in [(refprot.description, 'isite'), *[(pdb_prot.description, key) for key, pdb_prot in pdb_prots.items()] ] } ) .assign(wildtype=lambda x: (x['isite'] .map({i + 1: wt for i, wt in enumerate(str(refprot.seq))}, na_action='ignore') ) ) .melt(id_vars=['isite', 'wildtype'], var_name='pdb_chain', value_name='pdb_isite', ) .assign(pdb_chain=lambda x: (x['pdb_chain'] # chain NaN if site is NaN .where(pd.notna(x['pdb_isite']), 'NaN') ), pdb_isite=lambda x: x['pdb_isite'].fillna('NaN'), ) .drop_duplicates() .assign( pdb_site=lambda x: x.apply(lambda r: (to_pdb['pdb_site'] [r.pdb_chain, r.pdb_isite] ), axis=1), pdb_wildtype=lambda x: x.apply(lambda r: (to_pdb['pdb_wildtype'] [r.pdb_chain, r.pdb_isite] ), axis=1), nchains=lambda x: x.groupby('isite').pdb_chain.transform('count') ) .query('(pdb_chain != "NaN") or (nchains == 1)') .reset_index(drop=True) .replace({'NaN': float('nan')}) [['isite', 'wildtype', 'pdb_chain', 'pdb_site', 'pdb_wildtype']] ) # check that each site aligns to unique PDB wildtype nonunique_isites = ( df .drop(columns=['pdb_chain']) .drop_duplicates() .groupby('isite') .size() .rename('n') .reset_index() .query('n > 1') ['isite'] .tolist() ) if nonunique_isites: raise ValueError('Sites map to non-unique PDB site / wildtype:\n' + str(df.query('isite in @nonunique_isites') .sort_values(['isite', 'pdb_chain']))) # add amino-acid frequencies and entropies / n_effective df = ( df .merge(pdb_prot_align.utils.alignment_to_count_df( alignment, ignore_gaps=ignore_gaps, as_freqs=True, add_entropy_neff=True), on='isite', validate='many_to_one', ) .melt(id_vars=['isite', 'wildtype', 'pdb_chain', 'pdb_site', 'pdb_wildtype', 'entropy', 'n_effective'], var_name='amino_acid', value_name='frequency', ) .sort_values(['isite', 'pdb_chain', 'amino_acid']) ) print(f"Writing CSV with detailed information to {csv}") df.to_csv(csv, index=False, float_format='%.5f') print('\nProgram complete.\n')
# complete docs for `run` with parameter specs parsed from `_ARGS` _PARAM_DOCS = [] for param, param_d in _ARGS.items(): if 'choices' in param_d: param_type = f"{{{', '.join(param_d['choices'])}}}" elif 'nargs' in param_d and param_d['nargs'] == '+': param_type = 'list' elif param_d['type'] == _str2bool: param_type = 'bool' else: param_type = param_d['type'].__name__ param_help = textwrap.indent('\n'.join(textwrap.wrap(param_d['help'], 70)), ' ') _PARAM_DOCS.append(f" {param} : {param_type}\n{param_help}") _PARAM_DOCS = '\n'.join(_PARAM_DOCS) del param, param_d, param_type, param_help # add parameter specs to docs for `run` run.__doc__ = run.__doc__.format(_PARAM_DOCS=_PARAM_DOCS.lstrip()) if __name__ == '__main__': import doctest doctest.testmod()