Source code for campa.tl._features
from copy import deepcopy
from typing import Any, Dict, List, Tuple, Union, Mapping, Iterable, Optional
from functools import partial
import os
import time
import logging
import multiprocessing
from numba import jit
from skimage.measure import label, regionprops
import numpy as np
import pandas as pd
import anndata as ad
import squidpy as sq
import numba.types as nt
from campa.data import MPPData
from campa.constants import CoOccAlgo, campa_config
from campa.tl._cluster import annotate_clustering
from campa.tl._experiment import Experiment
ft = nt.float32
it = nt.int64
ft = nt.float32
it = nt.int64
[docs]def thresholded_count(df, threshold=0.9):
"""
Count largest CSL objects per cell.
Sort objects by area (largest areas first) and count how many are needed to exceed threshold % of the total area.
This is essentially a small object invariant way of counting big objects.
Can be used as aggregation function in :meth:`FeatureExtractor.get_object_stats`.
Parameters
----------
threshold
Only consider large objects up to the cumulative sum of 90% of the total area.
Returns
-------
count
"""
df = df[_thresholded_mask(df, threshold)]
count = df.count()
return count
[docs]def thresholded_median(df, threshold=0.9):
"""
Calculate median area of large CSL objects per cell.
Sort objects by area (largest areas first)
and compute the median area of all objects that are below cumulative sum of threshold.
This is essentially a small object invariant way of computing median.
Can be used as aggregation function in :meth:`FeatureExtractor.get_object_stats`.
Parameters
----------
threshold
Only consider large objects up to the cumulative sum of 90% of the total area.
Returns
-------
median
"""
df = df[_thresholded_mask(df, threshold)]
median = df.median()
return median
def _thresholded_mask(df, threshold):
"""
Mask small objects in df.
Sort objects by area (largest areas first)
and mask all objects that are below cumulative sum of threshold.
This is used by thresholded_median and thresholded_count.
This is essentially a small object invariant way of computing median.
Can be used as aggregation function in :meth:`FeatureExtractor.get_object_stats`.
"""
total = df.sum()
cumsum = (df / total).sort_values(ascending=False).cumsum()
mask = cumsum < threshold
# object that will finally exceed threshold (would like to include in median calc, as is also included in count)
idx = (cumsum >= threshold).idxmax()
mask[idx] = True
res = pd.Series(index=df.index, data=False)
res[cumsum[mask].index] = True
return res
[docs]def extract_features(params: Mapping[str, Any]) -> None:
"""
Extract features from clustered dataset using :class:`FeaturesExtractor`.
Creates features :class:`anndata.AnnData` object.
Parameters determine what features are extracted from a given clustering.
The following keys in ``params`` are expected:
- ``experiment_dir``: path to experiment directory relative to campa_config.EXPERIMENT_DIR.
- ``cluster_name``: name of clustering to use.
- ``cluster_dir``: dir of subsampled clustering to load annotation.
Relative to experiment_dir.
Default is taking first of ``experiment_dir/aggregated/sub-*``.
- ``cluster_col``: cluster annotation to use. Defaults to ``cluster_name``.
- ``data_dirs``: data directories to be processed.
Relative to ``experiment_dir/aggregated/full_data``.
If None, all available data_dirs will be processed.
- ``save_name``: filename to use for saving extracted features.
- ``force``: force calculation even when adata exists.
- ``features``: type of features to extract. One or more of `intensity`, `co-occurrence`, `object-stats`.
- Intensity: per-cluster mean and size features. Needs to be calculated first to set up the adata.
- Co-occurrence: spatial co-occurrence between pairs of clusters at different distances.
- Object stats: number and area of connected components per cluster.
- ``co_occurrence_params``: parameters for co-occurrence calculation.
- ``min``, ``max``, ``nsteps``: size of distances interval.
- ``logspace``: use log spacing of co-occurrence intervals.
- ``num_processes``: number of processes to use to compute co-occurrence scores.
- ``object_stats_params``: parameter dict for object-stats calculation.
- ``features``: features to extract in mode object-stats.
Possible features: `area`, `circularity`, `elongation`, `extent`.
- ``channels``: intensity channels to extract mean per cluster from.
Parameters
----------
params
Parameter dictionary.
"""
# set up FeatureExtractor
log = logging.getLogger("extract_features")
exp = Experiment.from_dir(params["experiment_dir"])
data_dirs = params["data_dirs"]
if data_dirs is None or len(data_dirs) == 0:
data_dirs = exp.data_params["data_dirs"]
for data_dir in data_dirs:
log.info(f'extracting features {params["features"]} from {data_dir}')
adata_fname = os.path.join(exp.full_path, "aggregated/full_data", data_dir, params["save_name"])
if os.path.exists(adata_fname):
log.info(f"initialising from existing adata {adata_fname}")
extr = FeatureExtractor.from_adata(adata_fname)
else:
extr = FeatureExtractor(
exp,
data_dir=data_dir,
cluster_name=params["cluster_name"],
cluster_dir=params["cluster_dir"],
cluster_col=params["cluster_col"],
)
# extract features
if "intensity" in params["features"]:
extr.extract_intensity_size(force=params["force"], fname=params["save_name"])
if "co-occurrence" in params["features"]:
co_occ_params = params["co_occurrence_params"]
if co_occ_params["logspace"]:
interval = np.logspace(
np.log2(co_occ_params["min"]),
np.log2(co_occ_params["max"]),
co_occ_params["nsteps"],
base=2,
).astype(np.float32)
else:
interval = np.linspace(co_occ_params["min"], co_occ_params["max"], co_occ_params["nsteps"]).astype(
np.float32
)
extr.extract_co_occurrence(interval=interval, num_processes=co_occ_params["num_processes"])
if "object-stats" in params["features"]:
obj_params = params["object_stats_params"]
extr.extract_object_stats(
features=obj_params["features"],
intensity_channels=obj_params["channels"],
)
[docs]class FeatureExtractor:
"""
Extract features from clustering.
Parameters
----------
exp
Experiment to extract features from.
data_dir
Name of data to cluster. Relative to ``{exp.full_path}/aggregated/full_data``.
cluster_name
Name of clustering to use.
cluster_dir
Dir of subsampled clustering to load annotation.
Relative to ``exp.full_path```.
Default is taking first of ``{exp.full_path}/aggregated/sub-*``.
cluster_col
Cluster annotation to use. Defaults to ``cluster_name``.
adata
If existing, the features adata object containing extracted features.
"""
def __init__(
self,
exp: Experiment,
data_dir: str,
cluster_name: str,
cluster_dir: Optional[str] = None,
cluster_col: Optional[str] = None,
adata: Optional[ad.AnnData] = None,
):
self.log = logging.getLogger(self.__class__.__name__)
self.exp = exp
cluster_col = cluster_col if cluster_col is not None else cluster_name
self.params = {
"data_dir": data_dir,
"cluster_name": cluster_name,
"cluster_dir": cluster_dir,
"cluster_col": cluster_col,
"exp_name": exp.dir + "/" + exp.name,
}
self.adata = adata
self.annotation = self.exp.get_cluster_annotation(str(self.params["cluster_name"]), self.params["cluster_dir"])
clusters = list(np.unique(self.annotation[self.params["cluster_col"]]))
clusters.remove("")
self.clusters = clusters
self._mpp_data: Union[MPPData, None] = None
[docs] @classmethod
def from_adata(cls, fname: str) -> "FeatureExtractor":
"""
Initialise from existing features :class:`ad.AnnData` object.
Parameters
----------
fname
Full path to adata object.
"""
adata = ad.read(fname)
params = deepcopy(adata.uns["params"])
exp = Experiment.from_dir(params.pop("exp_name"))
self = cls(exp, adata=adata, **params)
self.fname = fname
return self
@property
def mpp_data(self) -> MPPData:
"""
:class:`MPPData` object containing pixel-wise clustered data from ``data_dir``.
"""
if self._mpp_data is None:
self._mpp_data = MPPData.from_data_dir(
str(self.params["data_dir"]),
base_dir=os.path.join(self.exp.full_path, "aggregated/full_data"),
keys=["x", "y", "mpp", "obj_ids", str(self.params["cluster_name"])],
data_config=self.exp.config["data"]["data_config"],
)
# ensure that cluster data is string
self._mpp_data._data[str(self.params["cluster_name"])] = self._mpp_data._data[
str(self.params["cluster_name"])
].astype(str)
# prepare according to data_params
data_params = deepcopy(self.exp.data_params)
if not self.exp.is_trainable:
# for non-trainable models, latent rep is mpp.
# if mpp is present in data_dir, this will overwrite the base_data_dir mpp
# but mpp present in data_dir is normalised already -> skip normalisation in mpp_data.prepare()
data_params["normalise"] = False
self._mpp_data.prepare(data_params)
return self._mpp_data
[docs] def extract_intensity_size(self, force: bool = False, fname: str = "features.h5ad") -> None:
"""
Calculate per cluster mean intensity and size for each object.
Saves adata in ``{exp.full_path}/aggregated/full_data/data_dir/{fname}``
Parameters
----------
force
Overwrite existing adata.
fname
Name of the saved adata.
"""
if self.adata is not None and not force:
self.log.info("extract_intensity_size: adata is not None. Specify force=True to overwrite. Exiting.")
return
self.log.info(
f"Calculating {self.params['cluster_name']} (col: {self.params['cluster_col']})"
+ f" mean and size for {self.params['data_dir']}"
)
df = pd.DataFrame(
data=self.mpp_data.center_mpp,
columns=list(self.mpp_data.channels.name),
index=self.mpp_data.obj_ids,
)
# create adata with X = mean intensity of "all" cluster
grouped = df.groupby(df.index)
adata = ad.AnnData(X=grouped.mean())
# add all metadata
OBJ_ID = self.mpp_data.data_config.OBJ_ID
metadata = self.mpp_data.metadata.copy()
metadata[OBJ_ID] = metadata[OBJ_ID].astype(str)
metadata = pd.merge(metadata, adata.obs, how="right", left_on=OBJ_ID, right_index=True)
metadata = metadata.reset_index(drop=True) # make new index, keep mapobject_id in column
metadata.index = metadata.index.astype(str) # make index str, because adata does not play nice with int indices
# add int col of mapobject_id for easier merging
metadata["obj_id_int"] = metadata[OBJ_ID].astype(int)
adata.obs = metadata
# add size of all cluster
# reindex to ensure all object are present
size = grouped[list(df.columns)[0]].count().reindex(adata.obs["obj_id_int"])
adata.obsm["size"] = pd.DataFrame(columns=["all"] + self.clusters, index=adata.obs.index)
adata.obsm["size"]["all"] = np.array(size)
# add uns metadata
adata.uns["clusters"] = self.clusters
adata.uns["params"] = self.params
# add intensities of each cluster as a layer in adata and fill size obsm
for c in self.clusters:
self.log.debug(f"processing {c}")
# get cluster ids to mask
c_ids = list(self.annotation[self.annotation[self.params["cluster_col"]] == c][self.params["cluster_name"]])
mask = np.where(np.isin(self.mpp_data.data(str(self.params["cluster_name"])), c_ids))
cur_df = df.iloc[mask]
# group by obj_id
grouped = cur_df.groupby(cur_df.index)
# add mean of cluster
# reindex to ensure all object are present
mean = grouped.mean().reindex(adata.obs["obj_id_int"])
mean = mean.fillna(0)
adata.layers[f"intensity_{c}"] = np.array(mean[adata.var.index])
# add size
# reindex to ensure all object are present
size = grouped[list(df.columns)[0]].count().reindex(adata.obs["obj_id_int"])
adata.obsm["size"][c] = np.array(size)
# fill nans in size obsm
adata.obsm["size"] = adata.obsm["size"].fillna(0)
self.adata = adata
# write to disk
fname = os.path.join(self.exp.full_path, "aggregated/full_data", str(self.params["data_dir"]), fname)
self.log.info(f"saving adata to {fname}")
self.fname = fname
self.adata.write(self.fname)
[docs] def extract_object_stats(
self,
features: Iterable[str] = ("area", "circularity", "elongation", "extent"),
intensity_channels: Iterable[str] = (),
) -> None:
"""
Extract features from connected components per cluster for each cell.
Implemented features are: `area`, `circlularity`, `elongation`, and `extent` of connected components
per cluster for each cell.
In addition, the mean intensity per component/region of
channels specified in intensity_channels is calculated.
Per component/region features are calculated and stored in ``uns['object_stats']``,
together with OBJ_ID and cluster that this region belongs to.
To aggregate these computed stats in a mean/median values per OBJ_ID,
use :meth:`FeatureExtractor.get_object_stats`.
$circularity = (4 * pi * Area) / Perimeter^2$
$elongation = (major_axis - minor_axis) / major_axis$
Parameters
----------
features
List of features to be calculated.
intensity_channels
List of channels for which the mean intensity should be extracted.
Returns
-------
Nothing, modifies ``adata``:
- Adds ``uns`` entry: ``object_stats`` and ``object_stats_params``
- Adds ``obs`` entries: ``mean_{channel_name}``
"""
if self.adata is None:
self.log.info(
"extract_object_stats: adata is None."
+ " Calculate it with extract_intensity_size before extracting object stats. Exiting."
)
return
if features is None:
features = []
features = list(features)
if intensity_channels is None:
intensity_channels = []
intensity_channels = list(intensity_channels)
assert len(features) + len(intensity_channels) > 0, "nothing to compute"
self.log.info(
f"calculating object stats {features} and channels {intensity_channels}"
+ f" for clustering {self.params['cluster_name']} (col: {self.params['cluster_col']})"
)
cluster_names = {n: i for i, n in enumerate(self.clusters + [""])}
feature_names = features
intensity_feature_names = [f"mean_{ch}" for ch in intensity_channels]
features: Dict[str, List[Any]] = {
feature: [] for feature in feature_names + intensity_feature_names + ["mapobject_id", "clustering"]
} # ignore: type[assignment]
for obj_id in self.mpp_data.unique_obj_ids:
mpp_data = self.mpp_data.subset(obj_ids=[obj_id], copy=True)
img, _ = mpp_data.get_object_img(
obj_id,
data=str(self.params["cluster_name"]),
annotation_kwargs={
"annotation": self.annotation,
"to_col": self.params["cluster_col"],
},
)
# convert labels to numbers
img = np.vectorize(cluster_names.__getitem__)(img[:, :, 0])
label_img = label(img, background=len(self.clusters), connectivity=2)
# define intensity image
intensity_img = img[:, :, np.newaxis]
if len(intensity_channels) > 0:
obj_img, _ = mpp_data.get_object_img(
obj_id,
data="mpp",
channel_ids=mpp_data.get_channel_ids(intensity_channels),
)
intensity_img = np.concatenate([intensity_img, obj_img], axis=-1)
# iterate over all regions in this image
for region in regionprops(label_img, intensity_image=intensity_img):
# filter out single pixel regions, they cause problems in elongation and circularity calculation
if region.area > 1:
# get cluster label for this region
assert region.min_intensity[0] == region.max_intensity[0]
c = int(region.min_intensity[0])
# add clustering and obj_id
features["clustering"].append((self.clusters + [""])[c])
features["mapobject_id"].append(obj_id)
# add all other features
for feature in feature_names:
if feature == "area":
features[feature].append(region.area)
elif feature == "circularity":
# circularity can max be 1,
# larger values are due to tiny regions where perimeter is overestimated
features[feature].append(min(4 * np.pi * region.area / region.perimeter**2, 1))
elif feature == "elongation":
features[feature].append(
(region.major_axis_length - region.minor_axis_length) / region.major_axis_length
)
elif feature == "extent":
features[feature].append(region.extent)
else:
raise NotImplementedError(feature)
# get intensity features for this region
for i, _ in enumerate(intensity_channels):
# mean intensity is in channel i+1 of intensity_img (channel 0 is cluster image)
features[intensity_feature_names[i]].append(region.mean_intensity[i + 1])
df = pd.DataFrame(features)
self.adata.uns["object_stats"] = df
self.adata.uns["object_stats_params"] = {
"features": feature_names,
"intensity_channels": intensity_channels,
}
# write adata
self.log.info(f"saving adata to {self.fname}")
self.log.info(f'adata params {self.adata.uns["params"]}')
self.adata.write(self.fname)
[docs] def extract_co_occurrence(
self,
interval: Iterable[float],
algorithm: Union[str, CoOccAlgo] = CoOccAlgo.OPT,
num_processes: Optional[int] = None,
) -> None:
"""
Extract co-occurrence for each cell individually.
TODO: add reset flag, that sets existing co-occurrence matrices to 0 before running
co-occurrence algorithm.
Parameters
----------
interval
Distance intervals for which to calculate co-occurrence score.
algorithm
Co-occurrence function to use:
- `squidpy`: use :func:`sq.gr.co_occurrence`.
- `opt`: use custom implementation which is optimised for a large number of pixels.
This implementation avoids recalculation of distances, using the fact that coordinates
in given images lie on a regular grid.
Use opt for very large inputs.
num_processes
only for ``algorithm='opt'``. Number if processes to use to compute scores.
Returns
-------
Nothing, modifies ``adata``
- Adds ``obsm`` entries: ``co_occurrence_{cluster1}_{cluster2}``
"""
if self.adata is None:
self.log.info(
"extract_co_occurrence: adata is None."
+ " Calculate it with extract_intensity_size before extracting co_occurrence. Exiting."
)
return
self.log.info(
f"calculating co-occurrence for intervals {interval} and clustering {self.params['cluster_name']}"
+ f" (col: {self.params['cluster_col']})"
)
if CoOccAlgo(algorithm) == CoOccAlgo.OPT:
cluster_names = {n: i for i, n in enumerate(self.clusters + [""])}
coords2_list = _prepare_co_occ(interval)
elif CoOccAlgo(algorithm) == CoOccAlgo.SQUIDPY:
cluster_names = {n: i for i, n in enumerate(self.clusters)}
obj_ids = []
co_occs = []
chunks = 20
i = 0
missing_obj_ids = self._missing_co_occ_obj_ids()
self.log.info(f"calculating co-occurrence for {len(missing_obj_ids)} objects")
for obj_id in missing_obj_ids:
if CoOccAlgo(algorithm) == CoOccAlgo.OPT:
mpp_data = self.mpp_data.subset(obj_ids=[obj_id], copy=True)
img, (pad_x, pad_y) = mpp_data.get_object_img(
obj_id,
data=str(self.params["cluster_name"]),
annotation_kwargs={
"annotation": self.annotation,
"to_col": self.params["cluster_col"],
},
)
# convert labels to numbers
img = np.vectorize(cluster_names.__getitem__)(img)
clusters1 = np.vectorize(cluster_names.__getitem__)(
annotate_clustering(
mpp_data._data[str(self.params["cluster_name"])],
self.annotation,
str(self.params["cluster_name"]),
self.params["cluster_col"],
)
)
# shift coords according to image padding, st coords correspond to img coords
coords1 = (np.array([mpp_data.x, mpp_data.y]) - np.array([pad_x, pad_y])[:, np.newaxis]).astype(
np.int64
)
self.log.info(f"co-occurrence for {obj_id}, with {len(mpp_data.x)} elements")
co_occ = _co_occ_opt(
coords1,
coords2_list,
clusters1,
img,
num_clusters=len(self.clusters),
num_processes=num_processes,
)
elif CoOccAlgo(algorithm) == CoOccAlgo.SQUIDPY:
adata = self.mpp_data.subset(obj_ids=[obj_id], copy=True).get_adata(
obs=[str(self.params["cluster_name"])]
)
# ensure that cluster annotation is present in adata
if self.params["cluster_name"] != self.params["cluster_col"]:
adata.obs[self.params["cluster_col"]] = annotate_clustering(
adata.obs[self.params["cluster_name"]],
self.annotation,
str(self.params["cluster_name"]),
self.params["cluster_col"],
)
adata.obs[str(self.params["cluster_col"])] = adata.obs[self.params["cluster_col"]].astype("category")
self.log.info(f"co-occurrence for {obj_id}, with shape {adata.shape}")
cur_co_occ, _ = sq.gr.co_occurrence(
adata,
cluster_key=self.params["cluster_col"],
spatial_key="spatial",
interval=interval,
copy=True,
show_progress_bar=False,
n_splits=1,
)
# ensure that co_occ has correct format incase of missing clusters
co_occ = np.zeros((len(self.clusters), len(self.clusters), len(list(interval)) - 1))
cur_clusters = np.vectorize(cluster_names.__getitem__)(
np.array(adata.obs[self.params["cluster_col"]].cat.categories)
)
grid = np.meshgrid(cur_clusters, cur_clusters)
co_occ[grid[0].flat, grid[1].flat] = cur_co_occ.reshape(-1, len(list(interval)) - 1)
co_occs.append(co_occ.copy())
obj_ids.append(obj_id)
i += 1
if (i % chunks == 0) or (obj_id == missing_obj_ids[-1]):
# save
self.log.info(f"Saving chunk {i-chunks}-{i}")
# add info to adata
co_occ = np.array(co_occs)
for i1, c1 in enumerate(self.clusters):
for i2, c2 in enumerate(self.clusters):
df = pd.DataFrame(
co_occ[:, i1, i2],
index=obj_ids,
columns=np.arange(len(list(interval)) - 1).astype(str),
)
df.index = df.index.astype(str)
# ensure obj_ids are in correct order
df = pd.merge(
df,
self.adata.obs,
how="right",
left_index=True,
right_on="mapobject_id",
suffixes=("", "right"),
)[df.columns]
df = df.fillna(0)
# add to adata.obsm
if f"co_occurrence_{c1}_{c2}" in self.adata.obsm:
# ensure that filled na values with 0 before trying to add some value on top of it.
# NOTE: this will convert all na values to 0
# (so co occ for coords outside of the cell will be 0)
self.adata.obsm[f"co_occurrence_{c1}_{c2}"] = self.adata.obsm[
f"co_occurrence_{c1}_{c2}"
].fillna(0)
self.adata.obsm[f"co_occurrence_{c1}_{c2}"] += df
else:
self.adata.obsm[f"co_occurrence_{c1}_{c2}"] = df
self.adata.uns["co_occurrence_params"] = {"interval": list(interval)}
self.log.info(f"saving adata to {self.fname}")
self.log.info(f'adata params {self.adata.uns["params"]}')
self.adata.write(self.fname)
# reset co_occ list and obj_ids
co_occs = []
obj_ids = []
[docs] def get_intensity_adata(self) -> ad.AnnData:
"""
Adata object with intensity per cluster combined in X.
Needed for intensity and dotplots.
"""
if self.adata is None:
raise ValueError("adata is None, need to calculate first.")
adata = self.adata
adatas = {}
cur_adata = ad.AnnData(X=adata.X, obs=adata.obs, var=adata.var)
cur_adata.obs["size"] = adata.obsm["size"]["all"]
adatas["all"] = cur_adata
for c in adata.uns["clusters"]:
cur_adata = ad.AnnData(X=adata.X, obs=adata.obs, var=adata.var)
cur_adata.X = adata.layers[f"intensity_{c}"]
cur_adata.obs["size"] = adata.obsm["size"][c]
adatas[c] = cur_adata
comb_adata = ad.concat(adatas, uns_merge="same", index_unique="-", label="cluster")
return comb_adata
[docs] def get_object_stats(
self, area_threshold: int = 10, agg: Union[Iterable[str], Mapping[str, str]] = ("median",), save: bool = False
) -> pd.DataFrame:
"""
Aggregate object stats per obj_id.
Parameters
----------
area_threshold
All components smaller than this threshold are discarded.
agg
List of aggregation function or dict with ``{feature: function}``.
Passed to :func:`pd.GroupBy.agg`
save
Save adata object with `object_stats_agg` entry in ``obsm``.
Returns
-------
pd.DataFrame
obj_id x (clustering, feature) dataframe
Additionally stores result in ``self.adata.obsm['object_stats_agg']``
"""
if self.adata is None:
raise ValueError("adata is None, need to calculate first.")
assert (
"object_stats" in self.adata.uns.keys()
), "No object stats found. Use self.extract_object_stats to calculate."
OBJ_ID = campa_config.get_data_config(self.exp.config["data"]["data_config"]).OBJ_ID
df = self.adata.uns["object_stats"]
# filter out small regions
df = df[df["area"] > area_threshold]
grp = df.groupby(["clustering", OBJ_ID])
agg_stats = grp.agg(agg)
# rename columns to feature_agg
agg_stats.columns = [f"{i}_{j}" for i, j in agg_stats.columns]
# add count column
agg_stats["count"] = grp.count()["area"]
# reshape to obj_id x (clustering, feature)
agg_stats = agg_stats.unstack(level=0)
# replace nan with 0 (clusters that do not exist in given obj)
agg_stats = agg_stats.fillna(0)
# ensure obj_ids are in correct order
agg_stats.index = agg_stats.index.astype(str)
agg_stats.columns = [(i, j) for i, j in agg_stats.columns]
agg_stats = pd.merge(
agg_stats,
self.adata.obs,
how="right",
left_index=True,
right_on=OBJ_ID,
suffixes=("", "right"),
)[agg_stats.columns]
agg_stats = agg_stats.fillna(0)
agg_stats.columns = pd.MultiIndex.from_tuples(agg_stats.columns)
# store result in adata
self.adata.obsm["object_stats_agg"] = deepcopy(agg_stats)
# flatten columns to allow saving adata
self.adata.obsm["object_stats_agg"].columns = [
f"{i}|{j}" for i, j in self.adata.obsm["object_stats_agg"].columns
]
if save:
self.adata.write(self.fname)
return agg_stats
[docs] def extract_intensity_csv(self, obs: Optional[Iterable[str]] = None) -> None:
"""
Extract csv file containing obj_id, mean cluster intensity and size for each channel.
Saves csv as ``export/intensity_{self.fname}.csv``.
Parameters
----------
obs
column names from `metadata.csv` that should be additionally stored.
"""
if self.adata is None:
self.log.warning(
"Intensity and size information is not present. Calculate extract_intensity_size first! Exiting."
)
return
adata = self.get_intensity_adata()
df = pd.DataFrame(data=adata.X, columns=adata.var_names)
# add size
df["size"] = np.array(adata.obs["size"])
# add cluster and obj_id
OBJ_ID = campa_config.get_data_config(self.exp.config["data"]["data_config"]).OBJ_ID
df["cluster"] = np.array(adata.obs["cluster"])
df[OBJ_ID] = np.array(adata.obs[OBJ_ID])
# add additional obs
if obs is not None:
for col in obs:
df[col] = np.array(adata.obs[col])
# save csv
dirname = os.path.join(os.path.dirname(self.fname), "export")
os.makedirs(dirname, exist_ok=True)
df.to_csv(os.path.join(dirname, f"intensity_{os.path.basename(os.path.splitext(self.fname)[0])}.csv"))
[docs] def extract_object_stats_csv(
self, obs: Optional[Iterable[str]] = None, features: Iterable[str] = None, clusters: Iterable[str] = None
) -> None:
"""
Extract csv files of object stats.
Saves csv as ``export/object_stats_{self.fname}.csv``.
Parameters
----------
obs
column names from `metadata.csv` that should be additionally stored.
clusters
List of cluster names for which pairwise co_occurrence scores should be calculated
features
List of features to display. Must be columns of ``adata.obsm['object_stats_agg']``.
If None, all features are displayed.
clusters
List of clusters to display. Must be columns of ``adata.obsm['object_stats_agg']``.
If None, all clusters are displayed.
"""
if self.adata is None or "object_stats_agg" not in self.adata.obsm:
self.log.warning(
"Object stats information is not present.\
Run extract_object_stats and get_objects_stats first! Exiting."
)
return
agg_stats = deepcopy(self.adata.obsm["object_stats_agg"])
if not isinstance(agg_stats.columns, pd.MultiIndex):
# restore multiindex for easier access
agg_stats.columns = pd.MultiIndex.from_tuples([tuple(i.split("|")) for i in agg_stats.columns])
if features is None:
features = agg_stats.columns.levels[0]
if clusters is None:
clusters = agg_stats.columns.levels[1]
df = {}
for feature in features:
for cluster in clusters:
name = f"{feature}|{cluster}"
df[name] = agg_stats[feature][cluster]
df = pd.DataFrame(df)
# add obj_id
OBJ_ID = campa_config.get_data_config(self.exp.config["data"]["data_config"]).OBJ_ID
df[OBJ_ID] = np.array(self.adata.obs[OBJ_ID])
# add additional obs
if obs is not None:
for col in obs:
df[col] = np.array(self.adata.obs[col])
# save csv
dirname = os.path.join(os.path.dirname(self.fname), "export")
os.makedirs(dirname, exist_ok=True)
df.to_csv(os.path.join(dirname, f"object_stats_{os.path.basename(os.path.splitext(self.fname)[0])}.csv"))
[docs] def extract_co_occurrence_csv(
self, obs: Optional[Iterable[str]] = None, clusters: Optional[Iterable[str]] = None
) -> None:
"""
Extract csv files of co_occurrence scores for every cluster-cluster pair.
Data will contain obj_id, co_occurrence scores at each distance
interval for every cluster-cluster pair.
Saves csv as ``export/co_occurrence_{cluster1}_{cluster2}_{self.fname}.csv``.
Parameters
----------
obs
column names from `metadata.csv` that should be additionally stored.
clusters
List of cluster names for which pairwise co_occurrence scores should be calculated
"""
if self.adata is None:
self.log.warning(
"Co-occurrence information is not present. Calculate extract_co_occurrence first! Exiting."
)
return
if clusters is None:
clusters = self.adata.uns["clusters"]
for c1 in clusters:
for c2 in clusters:
# columns = list(
# map(
# lambda x: f"{x[0]:.2f}-{x[1]:.2f}",
# zip(
# self.adata.uns["co_occurrence_params"]["interval"][:-1],
# self.adata.uns["co_occurrence_params"]["interval"][1:],
# ),
# )
# )
columns = [
f"{x0:.2f}-{x1:.2f}"
for x0, x1 in zip(
self.adata.uns["co_occurrence_params"]["interval"][:-1],
self.adata.uns["co_occurrence_params"]["interval"][1:],
)
]
df = self.adata.obsm[f"co_occurrence_{c1}_{c2}"].copy()
df.columns = columns
# add obj_id
OBJ_ID = campa_config.get_data_config(self.exp.config["data"]["data_config"]).OBJ_ID
df[OBJ_ID] = np.array(self.adata.obs[OBJ_ID])
# add additional obs
if obs is not None:
for col in obs:
df[col] = np.array(self.adata.obs[col])
# save csv
dirname = os.path.join(os.path.dirname(self.fname), "export")
os.makedirs(dirname, exist_ok=True)
df.to_csv(
os.path.join(
dirname, f"co_occurrence_{c1}_{c2}_{os.path.basename(os.path.splitext(self.fname)[0])}.csv"
)
)
def _missing_co_occ_obj_ids(self):
"""
Return those obj_ids that do not yet have co-occurence scores calculated.
"""
if self.adata is None:
raise ValueError("adata is None, need to calculate first.")
n = f"co_occurrence_{self.clusters[0]}_{self.clusters[0]}"
if n not in self.adata.obsm.keys():
# no co-occ calculated
return self.mpp_data.unique_obj_ids
else:
OBJ_ID = campa_config.get_data_config(self.exp.config["data"]["data_config"]).OBJ_ID
masks = []
for c1 in self.clusters:
for c2 in self.clusters:
self.adata.obsm[f"co_occurrence_{c1}_{c2}"]
mask1 = (self.adata.obsm[f"co_occurrence_{c1}_{c2}"] == 0).all(axis=1)
masks.append(mask1)
obj_ids = np.array(self.adata[np.array(masks).T.all(axis=1)].obs[OBJ_ID]).astype(np.uint32)
return obj_ids
def _compare(self, obj: "FeatureExtractor") -> Tuple[bool, Dict[str, Any]]:
"""
Compare feature extractors.
Compares all features contained in adata and annotation dict.
Parameters
----------
obj
Object to compare to.
Returns
-------
Tuple of (``overall_result``, ``results_dict``).
``overall_result`` is True, if all data in ``results_dict`` is True.
``results_dict`` contains for each tested key True or False.
"""
def array_comp(arr1, arr2):
if type(arr1) == pd.DataFrame:
if type(arr2) == pd.DataFrame:
if (arr1.columns != arr2.columns).any():
return False
for c in arr1:
if not array_comp(arr1[c], arr2[c]):
return False
return True
else:
return False
if arr1.shape != arr2.shape:
return False
if arr1.dtype == object:
comp = arr1 == arr2
else:
comp = np.isclose(arr1, arr2)
if comp is False:
return False
else:
return comp.all()
if self.adata is None or obj.adata is None:
self.log.warning("Cannot compare FeatureExtractors, one or more adatas is None")
return False, {}
results_dict = {}
results_dict["annotation"] = self.annotation.equals(obj.annotation)
res = {}
res["X"] = array_comp(self.adata.X, obj.adata.X)
res["obs"] = self.adata.obs.equals(obj.adata.obs)
res["obsm"] = {}
for key in self.adata.obsm.keys():
res["obsm"][key] = self.adata.obsm[key].equals(obj.adata.obsm[key])
res["layers"] = {}
for key in self.adata.layers.keys():
res["layers"][key] = array_comp(self.adata.layers[key], obj.adata.layers[key])
res["uns"] = {}
for key in self.adata.uns.keys():
if key == "params":
# params are experiment specific, but here we just care about the resulting values
continue
if key == "object_stats":
res["uns"][key] = array_comp(self.adata.uns[key], obj.adata.uns[key])
elif key == "clusters":
res["uns"][key] = array_comp(self.adata.uns[key], obj.adata.uns[key])
elif key in ("object_stats_params", "co_occurrence_params"):
res["uns"][key] = {}
for k in self.adata.uns[key].keys():
res["uns"][key][k] = array_comp(self.adata.uns[key][k], obj.adata.uns[key][k])
else:
res["uns"][key] = self.adata.uns[key] == obj.adata.uns[key]
results_dict["adata"] = res
# summarise results
# flatten dict
df = pd.json_normalize(results_dict, sep="_")
vals = df.to_dict(orient="records")[0].values()
return all(list(vals)), results_dict
@jit(ft[:, :](it[:], it[:], it), fastmath=True)
def _count_co_occ(clus1: np.ndarray, clus2: np.ndarray, num_clusters: int) -> np.ndarray:
co_occur = np.zeros((num_clusters, num_clusters), dtype=np.float32)
for i, j in zip(clus1, clus2):
co_occur[i, j] += 1
return co_occur
def _co_occ_opt_helper(coords2, coords1, clusters1, img, num_clusters):
"""
Calculate co-occurrence scores (helper).
Count occurrence of cluster pairs given lists of coords.
"""
# NOTE: order of arguments is important here, because of call to multiprocessing.Pool.map
# (coords2 is iterated over and therefore needs to be first argument)
# get img coords to consider for this interval (len(interval_coords), num obs)
cur_coords = np.expand_dims(coords2, 2) + np.expand_dims(coords1, 1)
# get cluster of center pixel + repeat for len(interval_coords)
clus1 = np.tile(clusters1, [cur_coords.shape[1], 1])
# reshape to (2, xxx)
cur_coords = cur_coords.reshape((2, -1))
clus1 = clus1.reshape([-1])
# filter cur_coords that are outside image
shape = np.expand_dims(np.array([img.shape[1], img.shape[0]]), 1)
mask = np.all((cur_coords >= 0) & (cur_coords < shape), axis=0)
cur_coords = cur_coords[:, mask]
clus1 = clus1[mask]
# get cluster of cur_coords
clus2 = img[cur_coords[1], cur_coords[0]].flatten()
# remove those pairs where clus2 is outside of this image (cluster id is not a valid id)
mask = clus2 < num_clusters
# assert (clus1 < num_clusters).all()
clus1 = clus1[mask]
clus2 = clus2[mask]
co_occur = _count_co_occ(clus1, clus2, num_clusters)
return co_occur
def _co_occ_opt(
coords1: np.ndarray, # int64
coords2_list: np.ndarray, # int64
clusters1: np.ndarray, # int64
img: np.ndarray, # int64
num_clusters: int,
num_processes: Optional[int] = None,
) -> np.ndarray:
"""
Calculate co-occurrence scores for several intervals.
For decreased memory usage coords1 x coords2 pairs are chunked in CO_OCC_CHUNK_SIZE chunks and processed.
If num_processes is specified, uses multiprocessing to calculate co-occurrence scores.
Args:
coords1: first list of coordiantes
coords2_list: second list of coordinates, for different intervals
clusters1: cluster assigments of coords1
img: cluster image used to look up cluster assigments of coords2
num_clusters: total number of clusters.
Every cluster assigment greater than this number is filtered (assuming eg background values)
num_processes: if not none, uses multiprocessing.Pool to calculate co-occurrence scores
Returns:
co-occurrence scores in num_clusters x num_clusters x num_intervals matrix
"""
log = logging.getLogger("_co_occ_opt")
if num_processes is not None:
pool = multiprocessing.Pool(num_processes)
log.info(f"using {num_processes} processes to calculate co-occ scores.")
if coords1.shape[1] > campa_config.CO_OCC_CHUNK_SIZE:
raise ValueError(
f"coords1 with size {coords1.shape[1]} is larger than CO_OCC_CHUNK_SIZE {campa_config.CO_OCC_CHUNK_SIZE}."
+ " Cannot compute _co_occ_opt"
)
out = np.zeros((num_clusters, num_clusters, len(coords2_list)), dtype=np.float32)
# iterate over each interval
for idx, coords2 in enumerate(coords2_list):
# log.info(f'co occ for interval {idx+1}/{len(coords2_list)},
# with {coords1.shape[1]} x {coords2.shape[1]} coord pairs')
if (coords1.shape[1] * coords2.shape[1]) > campa_config.CO_OCC_CHUNK_SIZE:
chunk_size = int(campa_config.CO_OCC_CHUNK_SIZE / coords1.shape[1])
coords2_chunks = np.split(coords2, np.arange(0, coords2.shape[1], chunk_size), axis=1)
# log.info(f'splitting coords2 in {len(coords2_chunks)} chunks')
else:
coords2_chunks = [coords2]
# calculate pairwise cluster counts
time.time()
co_occur = np.zeros((num_clusters, num_clusters), dtype=np.float32)
map_fn = partial(
_co_occ_opt_helper,
coords1=coords1,
clusters1=clusters1,
img=img,
num_clusters=num_clusters,
)
if num_processes is not None:
# for res in tqdm.tqdm(pool.imap_unordered(map_fn, coords2_chunks), total=len(coords2_chunks)):
for res in pool.imap_unordered(map_fn, coords2_chunks):
co_occur += res
else:
for res in map(map_fn, coords2_chunks):
co_occur += res
time.time()
# log.info(f'calculating co_occur for these coords took {t2-t1:.0f}s.')
# calculate co-occ scores
probs_matrix = co_occur / np.sum(co_occur)
probs = np.sum(probs_matrix, axis=1)
probs_con = np.zeros((num_clusters, num_clusters), dtype=np.float32)
for c in np.unique(img):
# do not consider background value in img
if c >= num_clusters:
continue
probs_conditional = co_occur[c] / np.sum(co_occur[c])
probs_con[c, :] = probs_conditional / probs
out[:, :, idx] = probs_con
return out
def _prepare_co_occ(interval):
"""
Return lists of coordinates to consider for each interval. Coordinates are relative to [0,0].
"""
arr = np.zeros((int(interval[-1]) * 2 + 1, int(interval[-1]) * 2 + 1))
# calc distances for interval range (assuming c as center px)
c = int(interval[-1]) + 1
c: np.ndarray = np.array([c, c]).T
xx, yy = np.meshgrid(np.arange(len(arr)), np.arange(len(arr)))
coords = np.array([xx.flatten(), yy.flatten()]).T
dists = np.sqrt(np.sum((coords - c) ** 2, axis=-1))
dists = dists.reshape(int(interval[-1]) * 2 + 1, -1)
# calc coords for each thresh interval
coord_lists = []
for thres_min, thres_max in zip(interval[:-1], interval[1:]):
xy = np.where((dists <= thres_max) & (dists > thres_min))
coord_lists.append(xy - c[:, np.newaxis])
return coord_lists