Source code for protfasta

from __future__ import annotations

import os
from typing import Callable, Optional, Union

from protfasta import utilities as _utilities
from protfasta import io as _io
from protfasta._configs import STANDARD_AAS, STANDARD_CONVERSION
from protfasta import protfasta as _protfasta
from protfasta.protfasta_exceptions import ProtfastaException

# Re-export the streaming parser for callers handling very large files.
from protfasta.io import iter_fasta  # noqa: F401

## ------------------------------------------------------------
# READTHEDOCS versioning hack
#
# Generate _version.py if missing and in the Read the Docs environment
if os.getenv("READTHEDOCS") == "True" and not os.path.isfile('../protfasta/_version.py'):   
    import versioningit            
    __version__ = versioningit.get_version('../')
else:
    from ._version import __version__
    
    
## ------------------------------------------------------------




_ROOT = os.path.abspath(os.path.dirname(__file__))


def _get_data(path: str) -> str:
    """Return the absolute path to a bundled data directory.

    Used internally to locate test data shipped with the package.

    Parameters
    ----------
    path : str
        Relative path (under the package ``data/`` directory) to resolve.

    Returns
    -------
    str
        Absolute filesystem path.
    """
    return os.path.join(_ROOT, 'data', path)


[docs] def read_fasta( filename: str, expect_unique_header: bool = True, header_parser: Optional[Callable[[str], str]] = None, check_header_parser: bool = True, duplicate_sequence_action: str = 'ignore', duplicate_record_action: str = 'fail', invalid_sequence_action: str = 'fail', alignment: bool = False, return_list: bool = False, output_filename: Optional[str] = None, correction_dictionary: Optional[dict[str, str]] = None, verbose: bool = False, ) -> Union[dict[str, str], list[list[str]]]: """Read a FASTA file, sanitize sequences, and return a dict or list. This is the primary entry point for **protfasta**. At its simplest:: sequences = read_fasta('proteins.fasta') returns a dictionary whose keys are FASTA headers and whose values are amino-acid sequences. Many optional parameters allow automatic handling of duplicates, invalid residues, alignment gap characters, and more. Sanitization is applied in the following order: 1. File is read, custom headers are parsed, and header uniqueness is checked (when *expect_unique_header* is ``True``). 2. Duplicate records are processed (*duplicate_record_action*). 3. Duplicate sequences are processed (*duplicate_sequence_action*). 4. Invalid residues are processed (*invalid_sequence_action*). 5. Final sequences are optionally written to *output_filename*. 6. A dictionary or list is returned to the caller. Incompatible option combinations are caught before the file is read. Parameters ---------- filename : str Path to the FASTA file to read. expect_unique_header : bool, optional If ``True`` (default), an exception is raised when a duplicate header is encountered during parsing. Set to ``False`` when the file is known to contain duplicate headers -- in that case *return_list* should typically be ``True`` as well so that no entries are silently lost via dictionary-key overwriting. header_parser : callable or None, optional A function ``(str) -> str`` applied to every raw header before any uniqueness checks. Useful for extracting accession IDs from structured headers. When *check_header_parser* is ``True`` (the default) the function is smoke-tested with the string ``'this test string should work'`` before parsing begins. check_header_parser : bool, optional If ``True`` (default), *header_parser* is tested with a dummy string before the file is read to catch obvious problems early. Set to ``False`` to skip this pre-check. duplicate_record_action : str, optional How to handle records that are identical in both header **and** sequence. Default ``'fail'``. * ``'ignore'`` -- keep all occurrences (requires *expect_unique_header* = ``False``). * ``'fail'`` -- raise an exception. * ``'remove'`` -- keep only the first occurrence. duplicate_sequence_action : str, optional How to handle entries that share the same sequence regardless of header. Default ``'ignore'``. * ``'ignore'`` -- keep all occurrences. * ``'fail'`` -- raise an exception. * ``'remove'`` -- keep only the first occurrence. invalid_sequence_action : str, optional How to handle sequences containing non-standard amino-acid characters. Default ``'fail'``. * ``'ignore'`` -- silently accept invalid residues. * ``'fail'`` -- raise an exception. * ``'remove'`` -- discard the entire sequence. * ``'convert'`` -- convert non-standard residues using *correction_dictionary* (or built-in defaults); fail if any unconvertible residues remain. * ``'convert-ignore'`` -- convert what can be converted, then ignore any remaining invalid residues. * ``'convert-remove'`` -- convert what can be converted, then discard sequences that still contain invalid residues. alignment : bool, optional If ``True``, dash (``'-'``) characters are treated as valid gap characters and are neither flagged as invalid nor converted. Default ``False``. return_list : bool, optional If ``True``, return a list of ``[header, sequence]`` pairs instead of a dictionary. Required when duplicate headers are present and you want to keep all of them. Default ``False``. output_filename : str or None, optional If provided, the final (sanitized) set of sequences is written to a new FASTA file at this path before the function returns. correction_dictionary : dict or None, optional A mapping of non-standard characters to replacement strings used when *invalid_sequence_action* involves conversion. When ``None``, the built-in table is used: * ``B`` -> ``N``, ``U`` -> ``C``, ``X`` -> ``G``, ``Z`` -> ``Q`` * ``*`` -> ``''``, ``-`` -> ``''``, ``' '`` -> ``''`` A custom dictionary **replaces** the built-in table entirely. verbose : bool, optional If ``True``, informational messages are printed to stdout during each processing step. Default ``False``. Returns ------- dict[str, str] or list[list[str]] When *return_list* is ``False`` (default), a dictionary mapping headers to sequences. When ``True``, a list of two-element lists ``[header, sequence]``. Ordering always matches the original file. Raises ------ ProtfastaException If any validation check fails or incompatible options are provided. """ # first we sanity check all of the inputs provided. NOTE. If additional functionality is added, new # keywords MUST be sanity checked in this function _io.check_inputs(expect_unique_header, header_parser, check_header_parser, duplicate_record_action, duplicate_sequence_action, invalid_sequence_action, alignment, return_list, output_filename, verbose, correction_dictionary) # the actual file i/o happens here raw = _io.internal_parse_fasta_file(filename, expect_unique_header=expect_unique_header, header_parser=header_parser, verbose=verbose) # first deal with duplicate records updated = _protfasta._deal_with_duplicate_records(raw, duplicate_record_action, verbose) # deal with duplicate sequences updated = _protfasta._deal_with_duplicate_sequences(updated, duplicate_sequence_action, verbose) # next decide how we deal with invalid amino acid sequences ## ## If we're using the convert-remove action... if invalid_sequence_action == 'convert-remove': # first run a convert ignore updated = _protfasta._deal_with_invalid_sequences(updated, 'convert-ignore', alignment=alignment, verbose=verbose, correction_dictionary=correction_dictionary) # then a remove on those that are left updated = _protfasta._deal_with_invalid_sequences(updated, 'remove', alignment=alignment, verbose=verbose, correction_dictionary=False) else: updated = _protfasta._deal_with_invalid_sequences(updated, invalid_sequence_action, alignment=alignment, verbose=verbose, correction_dictionary=correction_dictionary) # If we wanted to write the final set of sequences we're going to use...: if output_filename: write_fasta(updated, output_filename) # if we asked for a list... if return_list is True: pass else: updated = _utilities.convert_list_to_dictionary(updated, verbose) return updated
# ------------------------------------------------------------------ #
[docs] def write_fasta( fasta_data: Union[dict[str, str], list[list[str]]], filename: str, linelength: Union[int, bool, None] = 60, append_to_fasta: bool = False, ) -> None: """Write sequences to a FASTA file. Accepts sequence data as either a dictionary (header -> sequence) or a list of ``[header, sequence]`` pairs and writes a standards- compliant FASTA file to *filename*. Parameters ---------- fasta_data : dict[str, str] or list[list[str]] Sequence data. If a dictionary, keys are headers and values are amino-acid sequences. If a list, each element must be a two-element list ``[header, sequence]``. filename : str Destination file path. Should conventionally end with ``.fasta`` or ``.fa``, but this is not enforced. linelength : int, bool, or None, optional Maximum number of residues per line in the output. Default is ``60`` (the UniProt convention). Values below ``5`` are clamped to ``5``. Set to ``0``, ``None``, or ``False`` to write each sequence on a single line. append_to_fasta : bool, optional If ``True``, new entries are appended to *filename* if it already exists; otherwise the file is created. If ``False`` (default), any existing file is overwritten. Returns ------- None Raises ------ ProtfastaException If a sequence is empty or a list element does not contain exactly two items. """ # This part of code means we can pass either a dictionary or a list of lists # in for write_fasta to deal with if type(fasta_data) == list: def get_sequence(): return (entry[0], entry[1]) # quick validate for i in fasta_data: if len(i) != 2: raise ProtfastaException('While processing a list for write_fasta_file at least one of the elements was not a 2-position sublist:\n%s'%(str(i))) if type(fasta_data) == dict: def get_sequence(): return (entry, fasta_data[entry]) # override line length for sane input. N if linelength == False or linelength == None or linelength < 1: linelength = False else: # cast linelength to an integer here as a soft type checking, and # if it's shorter than 5 then reset to 5 linelength = int(linelength) if linelength < 5: linelength = 5 # set the 'mode' for open. If append_to_file==False, use 'w' and overwrite # existing .fasta file. Otherwise use 'a' and add to existing file if it exists. if append_to_fasta==False: open_mode='w' else: open_mode='a' # Use a large write buffer (1 MiB) to minimise syscall overhead when # writing very large files. with open(filename, open_mode, buffering=1024 * 1024) as fh: # for each entry for entry in fasta_data: (header, seq) = get_sequence() if len(seq) < 1: raise ProtfastaException('Seqence associated with [%s] is empty'%(header)) # write the header line fh.write('>') fh.write(header) fh.write('\n') if linelength: # Slice the sequence into line-length chunks and write # each chunk followed by a newline. This is O(n) with # a handful of Python-level write calls, vs one write # per residue in the naive implementation. seq_len = len(seq) for start in range(0, seq_len, linelength): fh.write(seq[start:start + linelength]) fh.write('\n') # Blank separator line between records. fh.write('\n') else: # Single-line sequence output. fh.write(seq) fh.write('\n\n')