Source code for rfmix_reader.readers.fb_read

from dask.delayed import delayed
from dask.array import from_delayed, Array, concatenate
from numpy import (
    float32,
    memmap,
    int32
)

__all__ = ["read_fb"]

[docs] def read_fb( filepath: str, nrows: int, ncols: int, row_chunk: int, col_chunk: int ) -> Array: """ Read and process data from a file in chunks, skipping the first 2 rows (comments) and 4 columns (loci annotation). Parameters ---------- filepath : str Path to the binary file. nrows : int Total number of rows in the dataset. ncols : int Total number of columns in the dataset. row_chunk : int Number of rows to process in each chunk. col_chunk : int Number of columns to process in each chunk. Returns ------- dask.array: Concatenated array of processed data. Raises ------ ValueError: If row_chunk or col_chunk is not a positive integer. FileNotFoundError: If the specified file does not exist. IOError: If there is an error reading the file. """ # Validate input parameters if row_chunk <= 0 or col_chunk <= 0: raise ValueError("row_chunk and col_chunk must be positive integers.") # Calculate row size and total size for memory mapping col_sx: list[Array] = [] row_start = 0 while row_start < nrows: row_end = min(row_start + row_chunk, nrows) col_start = 0 row_sx: list[Array] = [] while col_start < ncols: col_end = min(col_start + col_chunk, ncols) x = delayed(_read_chunk)( filepath, nrows, ncols, row_start, row_end, col_start, col_end, ) shape = (row_end - row_start, col_end - col_start) row_sx.append(from_delayed(x, shape, dtype=int32)) col_start = col_end col_sx.append(concatenate(row_sx, 1, True)) row_start = row_end # Concatenate all chunks X = concatenate(col_sx, 0, True) assert isinstance(X, Array) return X
def _read_chunk( filepath, nrows, ncols, row_start, row_end, col_start, col_end ): """ Helper function to read a chunk of data from the binary file. Parameters ---------- filepath (str): Path to the binary file. nrows (int): Total number of rows in the dataset. ncols (int): Total number of columns in the dataset. row_start (int): Starting row index for the chunk. row_end (int): Ending row index for the chunk. col_start (int): Starting column index for the chunk. col_end (int): Ending column index for the chunk. Returns ------- np.ndarray: The chunk of data read from the file. """ base_size = float32().nbytes offset = (row_start * ncols + col_start) * base_size size = (row_end - row_start, col_end - col_start) buff = memmap(filepath, dtype=float32, mode="r", offset=offset, shape=size) return buff.astype(int32, copy=False)