from __future__ import annotations
from tqdm import tqdm
from numpy import ndarray, full
from multiprocessing import cpu_count
from typing import List, Union, Tuple, TYPE_CHECKING
from ..utils import get_pops, get_sample_names
from ..backends import (
_configure_dask_backends,
_select_array_backend,
_select_dataframe_backend
)
if TYPE_CHECKING:
from dask.array import Array
import dask.dataframe as dd
def _get_array_backend():
return _select_array_backend()
def _get_dataframe_backend():
return _select_dataframe_backend()
[docs]
def admix_to_bed_individual(
loci: DataFrame, g_anc: DataFrame, admix: Array, sample_num: int,
chunk_size: int = 10_000, min_segment: int = 3, verbose: bool=True
) -> DataFrame:
"""
Returns loci and admixture data to a BED (Browser Extensible Data) file for
a specific chromosome.
This function processes genetic loci data along with admixture proportions
and returns BED format DataFrame for a specific chromosome.
Parameters
----------
loci : DataFrame
A DataFrame containing genetic loci information. Expected to have
columns for chromosome, position, and other relevant genetic markers.
g_anc : DataFrame
A DataFrame containing sample and population information. Used to derive
sample IDs and population names.
admix : Array
A Dask Array containing admixture proportions. The shape should be
compatible with the number of loci and populations.
sample_num : int
Zero-based integer index of the sample to extract from ``g_anc``.
For example, ``0`` selects the first sample, ``1`` the second, and
so on.
chunk_size : int, optional
Size of chunks to process at once (default=10_000)
Adjust based on available memory
min_segment : int, optional
Minimum length of a segment to consider it a true change (default=3)
verbose : bool
:const:`True` for progress information; :const:`False` otherwise.
Default:`True`.
Returns
-------
DataFrame: A DataFrame (pandas or cudf) in BED-like format with columns:
'chromosome', 'start', 'end', and ancestry data columns.
Raises
------
IndexError
If ``sample_num`` is negative or >= the number of samples in ``g_anc``.
Notes
-----
- The function internally calls _generate_bed() to perform the actual BED
formatting.
- Column names in the output file are formatted as "{sample}_{population}".
- The output file includes data for all chromosomes present in the input
loci DataFrame.
- Large datasets may require significant processing time and disk space.
Example
-------
>>> loci, g_anc, admix = read_rfmix(prefix_path)
>>> admix_to_bed_individual(loci_df, g_anc_df, admix_array, "chr22")
"""
_configure_dask_backends()
# Column annotations
pops = get_pops(g_anc)
sample_ids = get_sample_names(g_anc)
if sample_num < 0 or sample_num >= len(sample_ids):
raise IndexError(
f"sample_num {sample_num} is out of range "
f"[0, {len(sample_ids) - 1}]"
)
col_names = [f"{sample}_{pop}" for pop in pops for sample in sample_ids]
sample_name = f"{sample_ids[sample_num]}"
# Generate BED dataframe
ddf = _generate_bed(loci, admix, pops, col_names, sample_name, verbose,
chunk_size, min_segment)
return ddf.compute()
def _generate_bed(
df: DataFrame, dask_matrix: Array, pops: List[str],
col_names: List[str], sample_name: str, verbose: bool,
chunk_size: int, min_segment: int
) -> DataFrame:
"""
Generate BED records from loci and admixture data and subsets for specific
chromosome.
This function processes genetic loci data along with admixture proportions
and returns the results for a specific chromosome.
Parameters
----------
df : DataFrame
A DataFrame containing genetic loci information from `read_rfmix`
dask_matrix : Array
A Dask Array containing admixture proportions. The shape should be
compatible with the number of loci and populations. This is from
`read_rfmix`.
pops : List[str]
A list of admixtured genetic populations
col_names : List[str]
A list of column names for the admixture data. These should be formatted
as "{sample}_{population}".
chrom : str
The chromosome to generate BED format for.
chunk_size : int, optional
Size of chunks to process at once. Adjust based on available memory.
min_segment : int
Minimum length of a segment to consider for a true change.
Returns
-------
DataFrame: A DataFrame (pandas or cudf) in BED-like format with columns:
'chromosome', 'start', 'end', and ancestry data columns.
Notes
-----
- The function internally calls _process_chromosome() to process each
chromosome.
- Large datasets may require significant processing time and disk space.
"""
import dask.dataframe as dd
from dask.array import from_array
# Check if the DataFrame and Dask array have the same number of rows
assert df.shape[0] == dask_matrix.shape[0], "DataFrame and Dask array must have the same number of rows"
# Convert the DataFrame to a Dask DataFrame
parts = cpu_count()
ncols = dask_matrix.shape[1]
df_mod = _get_dataframe_backend()
use_gpu = df_mod.__name__ == "cudf"
if use_gpu and isinstance(df, df_mod.DataFrame):
ddf = dd.from_pandas(df.to_pandas(), npartitions=parts)
else:
ddf = dd.from_pandas(df, npartitions=parts)
# Add each column of the Dask array to the DataFrame
if isinstance(dask_matrix, ndarray):
from dask.array import from_array
dask_matrix = from_array(dask_matrix, chunks="auto")
dask_df = dd.from_dask_array(dask_matrix, columns=col_names)
ddf = dd.concat([ddf, dask_df], axis=1)
del dask_df
# Subset for chromosome
results = []
chromosomes = ddf["chromosome"].drop_duplicates().compute()
for chrom in tqdm(sorted(chromosomes), desc="Processing chromosomes",
disable=not verbose):
chrom_group = ddf[ddf['chromosome'] == chrom]
chrom_group = chrom_group.repartition(npartitions=parts)
results.append(_process_chromosome(chrom_group, sample_name, pops,
chunk_size, min_segment))
return dd.concat(results, axis=0)
def _process_chromosome(
group: dd.DataFrame, sample_name: str, pops: List[str],
chunk_size: int, min_segment: int
) -> DataFrame:
"""
Process genetic data for a single chromosome to identify ancestry
intervals.
Converts genetic positions into BED-like intervals with constant ancestry,
detecting change points where ancestry composition shifts.
Parameters
----------
group : dd.DataFrame
Dask DataFrame containing genetic data for a single chromosome.
Must contain columns: chromosome, physical_position, and [sample_name].
Must be sorted by physical position.
sample_name : str
Name of the ancestry data column to preserve in output.
pops : List[str]
A list of admixtured populations
chunk_size : int, optional
Size of chunks to process at once. Adjust based on available memory.
min_segment : int, optional
Minimum length of a segment to consider it a true change.
Returns
-------
DataFrame
BED-formatted DataFrame with columns:
- chromosome (str/int): Chromosome identifier
- start (int): Interval start position
- end (int): Interval end position
- [sample_name] (float): Ancestry proportion value
Raises
------
ValueError
If input contains data for multiple chromosomes
If physical positions are not sorted
Notes
-----
Processing Workflow:
1. Validates single-chromosome input
2. Converts positions and ancestry data to Dask arrays
3. Detects ancestry change points using _find_intervals
4. Generates BED records for constant-ancestry intervals
5. Returns formatted results as Dask DataFrame
Example
-------
>>> group = dd.from_pandas(pd.DataFrame({
... 'chromosome': [1,1,1,1],
... 'physical_position': [100,200,300,400],
... 'pop1': [1,1,0,0]
... }), npartitions=1)
>>> _process_chromosome(group, 'pop1').compute()
chromosome start end pop1
0 1 100 200 1
1 1 300 400 0
"""
import dask.dataframe as dd
# Fetch chromosome
chrom_val = group["chromosome"].drop_duplicates().compute()
if len(chrom_val) != 1:
raise ValueError(f"Only one chromosome expected got: {len(chrom_val)}")
chrom_val = chrom_val.values[0]
# Convert to a Dask array
positions = group['physical_position'].to_dask_array(lengths=True)
target_samples = [f"{sample_name}_{pop}" for pop in pops]
sample_cols = [col for col in group.columns if col in target_samples]
data_matrix = group[sample_cols].to_dask_array(lengths=True)
# Detect changes
change_indices = _find_intervals(data_matrix, chunk_size, min_segment)
# Create BED records
chrom_col, numeric_data = _create_bed_records(chrom_val, positions,
data_matrix, change_indices,
len(pops))
cnames = ['chromosome', 'start', 'end'] + sample_cols
import dask.dataframe as dd
df_numeric = dd.from_dask_array(numeric_data, columns=cnames[1:])
return df_numeric.assign(chromosome=chrom_val)[cnames]
def _find_intervals(data_matrix: Array, chunk_size: int,
min_segment_length: int) -> List[int]:
"""
Detect ancestry change points in genetic data matrix.
Parameters
----------
data_matrix : dask.array.Array
2D array (positions × populations) of ancestry counts
Each row sums to 2 (diploid), with values 0,1,2 per ancestry
Shape example: (175_000, 2) for two populations
chunk_size : int, optional
Size of chunks to process at once. Adjust based on available memory.
min_segment_length : int, optional
Minimum length of a segment to consider it a true change.
Returns
-------
List[int]
Sorted indices of ancestry change points (0-based)
"""
cp = _get_array_backend()
# Get dimensions
n_positions = data_matrix.shape[0]
n_chunks = (n_positions + chunk_size - 1) // chunk_size
# Initialize change points list
change_points = set([0]) # Always include start point
# Process data in chunks with overlap to handle boundaries
overlap = min_segment_length + 1
last_state = None
for chunk_idx in range(n_chunks):
# Calculate chunk boundaries
start_idx = chunk_idx * chunk_size
end_idx = min(start_idx + chunk_size + overlap, n_positions)
# Get chunk data
chunk = data_matrix[start_idx:end_idx].compute()
# Find state changes within chunk
for pos in range(1, len(chunk)):
# Compare full state vectors
current_state = tuple(chunk[pos])
prev_state = tuple(chunk[pos-1])
if current_state != prev_state:
# Check if change persists for min_segment_length
is_stable_change = True
if pos + min_segment_length <= len(chunk):
future_states = chunk[pos:pos + min_segment_length]
# Check if the new state is stable
if not all(cp.array_equal(future_states[i], chunk[pos])
for i in range(min_segment_length)):
is_stable_change = False
else:
# If we're near chunk boundary,
# mark for checking in next chunk
is_stable_change = False
if is_stable_change:
global_pos = start_idx + pos
change_points.add(global_pos)
# Save last state for next chunk comparison
last_state = tuple(chunk[-1])
# Add final position
change_points.add(n_positions - 1)
return sorted(change_points)
def _create_bed_records(
chrom_value: Union[int, str], pos: Array, data_matrix: Array,
idx: List[int], npops: int) -> Tuple[Array, Array]:
"""
Generate BED records from genetic intervals and ancestry data.
Parameters
----------
chrom_value : int or str
Chromosome identifier for all records
pos : dask.array.Array
1D array of physical positions (int)
data_matrix : dask.array.Array
2D array of ancestry proportions (positions × samples)
idx : List[int]
List of change point indices from _find_intervals
npops : int
The number of populations in the admixture data.
Returns
-------
Tuple[Array, Array]
(chromosome_col, numeric_data) where:
- chromosome_col: Dask array of chromosome identifiers
- numeric_data: Dask array with columns [start, end, sample_data]
Notes
-----
Interval Construction Rules:
- First interval starts at position 0
- Subsequent intervals start at previous change point +1
- Final interval ends at last physical position
- Ancestry values taken from interval end points
"""
from dask.array import array, concatenate, expand_dims, from_array
cp = _get_array_backend()
idx = cp.asarray(idx)
if len(idx) == 0:
ancestry = data_matrix[-1, 0] if npops == 2 else data_matrix[-1, :]
start_col = pos[0]; end_col = pos[-1]
chrom_col = from_array(full((1,), chrom_value)[:, None])
ancestry_col = ancestry.compute().reshape(1, -1) if npops > 2 else [ancestry.compute()]
numeric_cols = cp.hstack([
cp.array([start_col.compute()]).reshape(-1, 1),
cp.array([end_col.compute()]).reshape(-1, 1),
cp.array(ancestry_col).reshape(1, -1)
])
return chrom_col, from_array(numeric_cols)
# Start and end index arrays
max_idx = len(pos) - 1
start_idx = cp.concatenate([cp.array([0]), idx[:-1] + 1])
end_idx = idx
# Position slices
start_col = pos[start_idx]; end_col = pos[end_idx]
ancestry_col = data_matrix[end_idx, :]
last_ancestry = data_matrix[max_idx, :]
# Add final interval only if there is remaining data after last change point
last_end_index = int(end_idx[-1])
if last_end_index + 1 < len(pos):
last_start = pos[last_end_index + 1]
last_end = pos[-1]
start_col = concatenate([start_col, array([last_start])])
end_col = concatenate([end_col, array([last_end])])
ancestry_col = concatenate([ancestry_col, expand_dims(last_ancestry, axis=0)])
# Stack numeric columns: start, end, ancestry_cols
numeric_cols = cp.hstack([
cp.array(start_col.compute().reshape(-1, 1)),
cp.array(end_col.compute().reshape(-1, 1)),
cp.array(ancestry_col.compute())
])
chrom_col = from_array(full((numeric_cols.shape[0],), chrom_value)[:, None])
numeric_data = from_array(numeric_cols)
return chrom_col, numeric_data