Source code for dms_variants.fastq

"""
===========
fastq
===========

Tools for processing FASTQ files.

"""

import collections
import gzip
import itertools
import os
import tempfile  # noqa: F401

import numpy


[docs] def qual_str_to_array(q_str, *, offset=33): """Convert quality score string to array of integers. Parameters ---------- q_str : str Quality score string. offset : int Offset in ASCII encoding of Q-scores. Returns ------- numpy.ndarray Array of integer quality scores. Example ------- >>> qual_str_to_array('!I:0G') array([ 0, 40, 25, 15, 38]) """ return numpy.array([ord(q) - offset for q in q_str], dtype="int")
[docs] def iterate_fastq_pair( r1filename, r2filename, *, r1trim=None, r2trim=None, qual_format="str" ): r"""Iterate over paired R1 and R2 FASTQ files. Parameters ---------- r1filename : str R1 FASTQ file name, can be gzipped (extension ``.gz``). r2filename : str R2 FASTQ file name, can be gzipped (extension ``.gz``). r1trim : int or None If not `None`, trim R1 reads and Q scores to be longer than this. r2trim : int or None If not `None`, trim R2 reads and Q scores to be longer than this. qual_format : {'str', 'array'} Return the quality scores as string of ASCII codes or array of numbers? Yields ------ namedtuple The entries in the tuple are (in order): - `id` : read id - `r1_seq` : R1 read sequence - `r2_seq` : R2 read sequence - `r1_qs` : R1 Q scores (`qual_format` parameter determines format) - `r2_qs` : R2 Q scores (`qual_format` parameter determines format) - `fail` : did either read fail chastity filter? (`None` if no info) Example ------- >>> f1 = tempfile.NamedTemporaryFile(mode='w') >>> _ = f1.write( ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984 1:N:0:CGATGT\n' ... 'ATGCAATTG\n' ... '+\n' ... 'GGGGGIIII\n' ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985 1:Y:0:CGATGT\n' ... 'ACGCTATTC\n' ... '+\n' ... 'GHGGGIKII\n' ... ) >>> f1.flush() >>> f2 = tempfile.NamedTemporaryFile(mode='w') >>> _ = f2.write( ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984 2:N:0:CGATGT\n' ... 'CAGCATA\n' ... '+\n' ... 'AGGGGII\n' ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985 2:Y:0:CGATGT\n' ... 'CTGAATA\n' ... '+\n' ... 'GHBGGIK\n' ... ) >>> f2.flush() >>> for tup in iterate_fastq_pair(f1.name, f2.name, r1trim=8, r2trim=5, ... qual_format='array'): ... print(tup) ... # doctest: +NORMALIZE_WHITESPACE FastqPairEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984', r1_seq='ATGCAATT', r2_seq='CAGCA', r1_qs=array([38, 38, 38, 38, 38, 40, 40, 40]), r2_qs=array([32, 38, 38, 38, 38]), fail=False) FastqPairEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985', r1_seq='ACGCTATT', r2_seq='CTGAA', r1_qs=array([38, 39, 38, 38, 38, 40, 42, 40]), r2_qs=array([38, 39, 33, 38, 38]), fail=True) >>> for tup in iterate_fastq_pair(f1.name, f2.name, r1trim=8, r2trim=5): ... print(tup) ... # doctest: +NORMALIZE_WHITESPACE FastqPairEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984', r1_seq='ATGCAATT', r2_seq='CAGCA', r1_qs='GGGGGIII', r2_qs='AGGGG', fail=False) FastqPairEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985', r1_seq='ACGCTATT', r2_seq='CTGAA', r1_qs='GHGGGIKI', r2_qs='GHBGG', fail=True) >>> f1.close() >>> f2.close() """ FastqPairEntry = collections.namedtuple( "FastqPairEntry", "id r1_seq r2_seq r1_qs r2_qs fail" ) r1_iterator = iterate_fastq( r1filename, trim=r1trim, check_pair=1, qual_format=qual_format ) r2_iterator = iterate_fastq( r2filename, trim=r2trim, check_pair=2, qual_format=qual_format ) for r1_entry, r2_entry in itertools.zip_longest(r1_iterator, r2_iterator): if (r1_entry is None) or (r2_entry is None): raise OSError( f"{r1filename} and {r2filename} have unequal " "number of entries" ) if r1_entry[0] != r2_entry[0]: raise OSError( f"{r1filename} and {r2filename} specify different " f"read IDs:\n{r1_entry[0]}\n{r2_entry[0]}" ) yield FastqPairEntry( id=r1_entry[0], r1_seq=r1_entry[1], r2_seq=r2_entry[1], r1_qs=r1_entry[2], r2_qs=r2_entry[2], fail=(r1_entry[3] or r2_entry[3]), )
[docs] def iterate_fastq(filename, *, trim=None, check_pair=None, qual_format="str"): r"""Iterate over a FASTQ file. Parameters ---------- filename : str FASTQ file name, can be gzipped (extension ``.gz``). trim : int or None If not `None`, trim reads and Q scores to be longer than this. check_pair : {1, 2, None} If not `None`, check reads are read 1 or read 2 if this info given. Assumes Casava 1.8 or SRA header format. qual_format : {'str', 'array'} Return the quality scores as string of ASCII codes or array of numbers? Yields ------ namedtuple The entries in the tuple are (in order): - `id` : read id - `seq` : read sequence - `qs` : Q scores (`qual_format` parameter determines format) - `fail` : did read fail chastity filter? (`None` if no filter info) Example ------- >>> f = tempfile.NamedTemporaryFile(mode='w') >>> _ = f.write( ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984 1:N:0:CGATGT\n' ... 'ATGCAATTG\n' ... '+\n' ... 'GGGGGIIII\n' ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985 1:Y:0:CGATGT\n' ... 'ACGCTATTC\n' ... '+\n' ... 'GHGGGIKII\n' ... '@DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985 2:Y:0:CGATGT\n' ... 'ACGCTATTC\n' ... '+\n' ... 'GHGGGIKII\n' ... ) >>> f.flush() >>> try: ... for tup in iterate_fastq(f.name, trim=5, check_pair=1): ... print(tup) ... except ValueError as e: ... print(f"ValueError: {e}") ... # doctest: +NORMALIZE_WHITESPACE FastqEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984', seq='ATGCA', qs='GGGGG', fail=False) FastqEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985', seq='ACGCT', qs='GHGGG', fail=True) ValueError: header not for R1: @DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985 2:Y:0:CGATGT >>> for tup in iterate_fastq(f.name, qual_format='array'): ... print(tup) ... # doctest: +NORMALIZE_WHITESPACE FastqEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1984', seq='ATGCAATTG', qs=array([38, 38, 38, 38, 38, 40, 40, 40, 40]), fail=False) FastqEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985', seq='ACGCTATTC', qs=array([38, 39, 38, 38, 38, 40, 42, 40, 40]), fail=True) FastqEntry(id='DH1DQQN1:933:HMLH5BCXY:1:1101:2165:1985', seq='ACGCTATTC', qs=array([38, 39, 38, 38, 38, 40, 42, 40, 40]), fail=True) >>> f.close() """ FastqEntry = collections.namedtuple("FastqEntry", "id seq qs fail") if check_pair is not None: check_pair = str(check_pair) if check_pair not in {"1", "2"}: raise ValueError(f"invalid `check_pair` of {check_pair}") if not os.path.isfile(filename): raise OSError(f"no FASTQ file {filename}") if qual_format == "array": qual_to_array = True elif qual_format == "str": qual_to_array = False else: raise ValueError(f"invalid value for `qual_format`: {qual_format}") if os.path.splitext(filename)[1].lower() == ".gz": openfunc = gzip.open else: openfunc = open with openfunc(filename, mode="rt") as f: head = f.readline() while head: if head[0] != "@": raise OSError(f"id starts with {head[0]}, not @:\n{head}") else: head = head.rstrip() headspl = head[1:].split() read_id = headspl[0] seq = f.readline().rstrip() plusline = f.readline().rstrip() qs = f.readline().rstrip() if (not seq) or (len(seq) != len(qs)) or (plusline != "+"): raise OSError( f"invalid entry for {read_id} in {filename}:\n" f"{head}\n{seq}\n{plusline}\n{qs}" ) # trim last two characters (needed for SRA downloads) if read_id[-2:] in {".1", ".2"}: if check_pair and (read_id[-1] != check_pair): raise ValueError(f"header not for R{check_pair}:\n{head}") read_id = read_id[:-2] if check_pair and len(headspl) > 1 and headspl[1][0] != check_pair: raise ValueError(f"header not for R{check_pair}:\n{head}") # parse chastity filter assuming CASAVA 1.8 header try: chastity = headspl[1][2] if chastity == "Y": fail = True elif chastity == "N": fail = False else: raise ValueError(f"cannot parse chastity filter in {head}") except IndexError: fail = None # header does not specify chastity filter if trim is not None: seq = seq[:trim] qs = qs[:trim] if qual_to_array: qs = qual_str_to_array(qs) yield FastqEntry( id=read_id, seq=seq, qs=qs, fail=fail, ) head = f.readline()
if __name__ == "__main__": import doctest doctest.testmod()