Source code for campa.tl._cluster

from __future__ import annotations

from typing import Any, Mapping, Iterable, TYPE_CHECKING, MutableMapping

if TYPE_CHECKING:
    from campa.tl import Experiment
    import anndata as ad

from copy import deepcopy
import os
import json
import pickle
import urllib
import logging

from pynndescent import NNDescent
from matplotlib.colors import rgb2hex
import numpy as np
import pandas as pd
import scanpy as sc
import matplotlib.pyplot as plt

from campa.data import MPPData
from campa.utils import merged_config
from campa.constants import campa_config
from campa.tl._evaluate import Predictor


# annotation fns
def annotate_clustering(
    clustering: Iterable[str | int],
    annotation: pd.DataFrame,
    cluster_name: str,
    annotation_col: str | None = None,
) -> Iterable[str | int]:
    """
    Annotate clustering according to annotation.

    Parameters
    ----------
    clustering
        Clustering to annotate.
    annotation
        Annotation table containing mapping from clustering values to annotated values.
    cluster_name
        Column in ``annotation`` containing current clustering values.
    annotation_col
        Columns in ``annotation`` containing desired annotation.

    Returns
    -------
    Iterable[str, int]
        Annotated clustering.
    """
    if annotation_col is None:
        return clustering
    if cluster_name == annotation_col:
        return clustering
    return np.array(annotation.set_index(cluster_name)[annotation_col].loc[clustering])


[docs]def add_clustering_to_adata( data_dir: str, cluster_name: str, adata: ad.AnnData, annotation: pd.DataFrame, added_name: str = None, annotation_col: str = None, ) -> None: """ Add clustering to adata. Adds `cluster_name` to `adata.obs[added_name]` and colour values for each cluster stored in the annotation dict. Assumes that adata has the same dimensionality as the clustering. Parameters ---------- data_dir Full path to directory containing ``{cluster_name}.npy``. cluster_name Column in ``annotation`` containing current clustering values. adata Adata to which to add the clustering to. annotation Annotation table containing mapping from clustering values to annotated values. added_name Name to use for clustering in adata. Defaults to `annotation_col` annotation_col Columns in ``annotation`` containing desired annotation. Defaults to `cluster_name`. Returns ------- Nothing, adds clustering to `adata.obs[added_name]`. """ if added_name is None: added_name = annotation_col if added_name is None: added_name = cluster_name if annotation_col is None: annotation_col = cluster_name # load clustering clustering = np.load(os.path.join(data_dir, f"{cluster_name}.npy"), allow_pickle=True) # map clustering to annotation clustering = annotate_clustering(clustering, annotation, cluster_name, annotation_col) # add to adata adata.obs[added_name] = clustering adata.obs[added_name] = adata.obs[added_name].astype("category") # add cmap cmap = annotation.drop_duplicates(subset=annotation_col).set_index(annotation_col)[annotation_col + "_colors"] adata.uns[added_name + "_colors"] = list(cmap.loc[adata.obs[added_name].cat.categories])
[docs]class Cluster: """ Cluster data. Contains functions to create a (subsampled) :class:`MPPData` for clustering, cluster it, and to project the clustering to other MPPDatas. Cluster is initialised from a cluster config dictionary with the following keys: - ``data_config``: name of the data config to use, should be registered in ``campa.ini`` - ``data_dirs``: where to read data from (relative to ``DATA_DIR`` defined in data config) - ``process_like_dataset``: name of dataset that gives parameters for processing (except subsampling/subsetting) - ``subsample``: (bool) subsampling of pixels - ``subsample_kwargs``: kwargs for :meth:`MPPData.subsample` defining the fraction of pixels to be sampled - ``subset``: (bool) subset to objects with certain metadata. - ``subset_kwargs``: kwargs to :meth:`MPPData.subset` defining which object to subset to - ``seed``: random seed to make subsampling reproducible - ``cluster_data_dir``: name of the dir containing the mpp_data that is clustered. Relative to EXPERIMENT_DIR - ``cluster_name``: name of the cluster assignment file - ``cluster_rep``: representation that should be clustered (name of existing file, should be predicted with :meth:`Predictor.get_representation`). - ``cluster_method``: `leiden` or `kmeans` (`kmeans` not tested). - ``leiden_resolution``: resolution parameter for leiden clustering. - ``kmeans_n``: number of clusters for `kmeans`. - ``umap``: (bool) predict UMAP of ``cluster_rep``. Parameters ---------- config Cluster config. cluster_mpp Data to cluster. save_config Save cluster config in ``{config['cluster_data_dir']}/cluster_params.json``. """ config: MutableMapping[str, Any] = { # --- cluster data creation (mpp_data is saved to cluster_data_dir) --- "data_config": "NascentRNA", "data_dirs": [], # name of dataset that gives params for processing (except subsampling/subsetting) "process_like_dataset": None, "subsample": False, "subsample_kwargs": {}, "subset": False, "subset_kwargs": {}, "seed": 42, # name of the dir containing the mpp_data that is clustered. Relative to campa_config.EXPERIMENT_DIR "cluster_data_dir": None, # --- cluster params --- # name of the cluster assignment file "cluster_name": "clustering", # representation that should be clustered (name of existing file) "cluster_rep": "latent", "cluster_method": "leiden", # leiden or kmeans "leiden_resolution": 0.8, "kmeans_n": 20, "umap": True, # calculate umap of cluster data } def __init__(self, config: Mapping[str, Any], cluster_mpp: MPPData = None, save_config: bool = False): self.log = logging.getLogger(self.__class__.__name__) self.config = merged_config(self.config, config) """Cluster config.""" self.data_config_name = self.config["data_config"] self.data_config = campa_config.get_data_config(self.data_config_name) # load dataset_params self.dataset_params = None if self.config["process_like_dataset"] is not None: params_fname = os.path.join( self.data_config.DATASET_DIR, self.config["process_like_dataset"], "params.json", ) self.dataset_params = json.load(open(params_fname)) # initialise cluster_mpp self._cluster_mpp = cluster_mpp # try to load it from disk if not already initialised self.cluster_mpp # initialise annotation self._cluster_annotation: pd.DataFrame | None = None self.cluster_annotation # save config if save_config: config_fname = os.path.join( campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"], "cluster_params.json" ) os.makedirs(os.path.dirname(config_fname), exist_ok=True) json.dump(self.config, open(config_fname, "w"), indent=4)
[docs] @classmethod def from_cluster_data_dir(cls, data_dir: str) -> Cluster: """ Initialise from existing ``cluster_data_dir``. Parameters ---------- data_dir Dir containing complete :class:`MPPData` with ``cluster_rep`` and ``cluster_name`` files. Relative to ``campa_config.EXPERIMENT_DIR``. """ # load mpp_data and cluster_params (for reference) from data_dir # TODO config_fname = os.path.join(campa_config.EXPERIMENT_DIR, data_dir, "cluster_params.json") config = json.load(open(config_fname)) return cls(config, save_config=False)
[docs] @classmethod def from_exp( cls, exp: Experiment, cluster_config: Mapping[str, Any] | None = None, data_dir: str = None ) -> Cluster: """ Initialise from experiment for clustering of entire data that went into creating training data. Cluster parameters are read from ``Experiment.config['cluster']``. Parameters ---------- exp Trained Experiment. cluster_config Additional cluster parameters (like subsampling etc). Overwrites default cluster parameters. data_dir Directory containing ``cluster_mpp`` (relative to ``{exp.dir}/{exp.name}``). """ add_cluster_config = cluster_config if cluster_config is not None else {} cur_cluster_config = deepcopy(exp.config["cluster"]) # add information on data to cluster cur_cluster_config["data_config"] = exp.config["data"]["data_config"] cur_cluster_config["data_dirs"] = exp.data_params["data_dirs"] # only process data for model if experiment has model to evaluate cur_cluster_config["process_like_dataset"] = exp.config["data"]["dataset_name"] cur_cluster_config["seed"] = exp.data_params["seed"] if data_dir is None: data_dir = os.path.join("aggregated", "sub-" + cur_cluster_config["subsample_kwargs"]["frac"]) cur_cluster_config["cluster_data_dir"] = os.path.join(exp.dir, exp.name, data_dir) # add passed cluster_config cur_cluster_config = merged_config(cur_cluster_config, add_cluster_config) return cls(cur_cluster_config, save_config=True)
[docs] @classmethod def from_exp_split(cls, exp: Experiment) -> Cluster: """ Initialise from experiment for clustering of val/test split. Parameters ---------- exp Trained Experiment. """ # TODO load exp # est = Estimator(exp.set_to_evaluate()) # mpp_data = est.ds.data[exp.evaluate_config['split']] cluster_config = deepcopy(exp.config["cluster"]) # add data_config cluster_config["data_config"] = exp.config["data"]["data_config"] cluster_config["subsample"] = False cluster_config["cluster_data_dir"] = os.path.join( exp.dir, exp.name, f"results_epoch{exp.epoch:03d}", exp.evaluate_config["split"], ) return cls(cluster_config, save_config=True)
[docs] def set_cluster_name(self, cluster_name): """ Change the cluster name and reloads ``cluster_mpp``, and ``cluster_annotation``. """ if self.config["cluster_name"] != cluster_name: self.config["cluster_name"] = cluster_name self._cluster_mpp = self._load_cluster_mpp() self._cluster_annotation = self._load_cluster_annotation()
# --- Properties and loading fns --- @property def cluster_mpp(self) -> MPPData | None: """ :class:`MPPData` that is used for clustering. None if data could not be loaded. """ if self._cluster_mpp is None: self._cluster_mpp = self._load_cluster_mpp() return self._cluster_mpp def _load_cluster_mpp(self) -> MPPData | None: """ Load MPPData that is used for clustering. Tries to read MPPData with cluster_rep and cluster_name from cluster_data_dir. Returns ------- :class:`MPPData` or None if data could not be loaded. """ data_dir = self.config["cluster_data_dir"] rep = self.config["cluster_rep"] name = self.config["cluster_name"] # check that dir is defined if data_dir is None: self.log.warning("Cluster_data_dir is None, cannot load cluster_mpp") return None # load data try: mpp_data = MPPData.from_data_dir( data_dir, base_dir=campa_config.EXPERIMENT_DIR, optional_keys=["mpp", rep, name, "umap"], data_config=self.data_config_name, ) self.log.info(f"Loaded cluster_mpp {mpp_data}") return mpp_data except FileNotFoundError: self.log.warning(f"Could not load MPPData from {data_dir}") return None @property def cluster_annotation(self) -> pd.DataFrame: """ Cluster annotation `pd.DataFrame`, read from ``{cluster_name}_annotation.csv``. """ if self._cluster_annotation is None: self._cluster_annotation = self._load_cluster_annotation() return self._cluster_annotation def _load_cluster_annotation(self, recreate: bool = False) -> pd.DataFrame: """ Read cluster annotation file / create it. """ fname = os.path.join( campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"], f"{self.config['cluster_name']}_annotation.csv", ) # try to read file if os.path.exists(fname) and not recreate: annotation = pd.read_csv(fname, index_col=0, dtype=str, keep_default_na=False) return annotation else: # can create? if (self.cluster_mpp is None) or (self.cluster_mpp.data(self.config["cluster_name"]) is None): self.log.info("cannot create annotation without clustering in cluster_mpp") return None # create empty annnotation from unique clusters and empty cluster for background in images annotation = pd.DataFrame( { self.config["cluster_name"]: sorted( np.unique(self.cluster_mpp.data(self.config["cluster_name"])), key=int, ) + [""] } ) annotation.index.name = "index" # save annotation annotation.to_csv(fname) self._cluster_annotation = annotation # add colors self.add_cluster_colors(colors=None) return self._cluster_annotation # --- fns modifying annotation ---
[docs] def add_cluster_annotation( self, annotation: Mapping[str, str], to_col: str, from_col: str | None = None, colors: Mapping[str, str] | None = None, ) -> None: """ Add annotation and colormap to clustering. Is saved in ``{cluster_name}_annotation.csv`` Parameters ---------- annotation: Dict mapping from values of ``from_col`` to the annotation. to_col Name under which the annotation should be saved. from_col Optionally set the annotation name from which to annotate. Default is ``cluster_name``. colors Colour dict, mapping from annotations to hex colours. Default is using tab20 colormap. """ if from_col is None: from_col = self.config["cluster_name"] df = pd.DataFrame.from_dict(annotation, orient="index", columns=[to_col]) # remove to_col and to_col_colors if present annotation = self.cluster_annotation annotation.drop(columns=[to_col], errors="ignore", inplace=True) # add annotation col annotation = pd.merge(annotation, df, how="left", left_on=from_col, right_index=True) self._cluster_annotation = annotation # save to disk fname = os.path.join( campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"], f"{self.config['cluster_name']}_annotation.csv", ) self._cluster_annotation.to_csv(fname) # add colors self.add_cluster_colors(colors, to_col)
[docs] def add_cluster_colors(self, colors: Mapping[str, str] | None, from_col: str | None = None) -> None: """ Add colours to clustering or to annotation. Adds column ``{from_col}_colors`` to :attr:`Cluster.cluster_annotation` and saves it to ``{cluster_name}_annotation.csv``. Parameters ---------- colors Colour dict, mapping from unique clustering values from ``from_col`` to hex colours. Default is using tab20 colormap. from_col Optionally set clustering name for which to add colours. Default is ``cluster_name``. """ if from_col is None: from_col = self.config["cluster_name"] to_col = from_col + "_colors" # remove previous colors annotation = self.cluster_annotation annotation.drop(columns=[to_col], errors="ignore", inplace=True) # add colors if colors is None: # get unique values, removing nan and empty string values = list(np.unique(annotation[from_col].dropna())) if "" in values: values.remove("") N = len(values) cmap = plt.get_cmap("tab20", N) colors = {k: rgb2hex(cmap(i)) for i, k in enumerate(values)} df = pd.DataFrame.from_dict(colors, orient="index", columns=[to_col]) annotation = pd.merge(annotation, df, how="left", left_on=from_col, right_index=True) # fill nan values with white background color annotation[to_col] = annotation[to_col].fillna("#ffffff") self._cluster_annotation = annotation # save to disk fname = os.path.join( campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"], f"{self.config['cluster_name']}_annotation.csv", ) self._cluster_annotation.to_csv(fname)
# --- getters ---
[docs] def get_nndescent_index(self, recreate=False): """ Calculate and return pynndescent index of existing clustering for fast prediction of new data. """ index_fname = os.path.join( campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"], "pynndescent_index.pickle" ) if os.path.isfile(index_fname) and not recreate: try: # load and return index return pickle.load(open(index_fname, "rb")) except AttributeError as e: if "'NNDescent' object has no attribute 'parallel_batch_queries'" in str(e): # need to recreate index self.log.info( "Version of pynndescent does not match version that index was created with. Recreating index." ) else: raise (e) # need to create index # check that cluster_rep has been computed already for cluster_mpp assert self.cluster_mpp is not None assert self.cluster_mpp.data(self.config["cluster_rep"]) is not None self.log.info(f"Creating pynndescent index for {self.config['cluster_rep']}") data = self.cluster_mpp._data[self.config["cluster_rep"]] if self.config["cluster_rep"] == "mpp": data = self.cluster_mpp.center_mpp index = NNDescent(data.astype(np.float32)) pickle.dump(index, open(index_fname, "wb")) return index
# --- functions creating and adding data to cluster_mpp ---
[docs] def create_cluster_mpp(self): """ Use cluster parameters to create and save :attr:`Cluster.cluster_mpp` to use for clustering. Raises ------ ValueError if config does not contain keys ``data_dirs`` and ``process_like_dataset``. """ # TODO: add option how to process data (e.g. for MPPcluster, do not need to add neighborhood) self.log.info("creating cluster mpp from config") # check that have required information if len(self.config["data_dirs"]) == 0: raise ValueError("Cannot create cluster data without data_dirs") self.log.info(f"processing cluster_mpp like dataset {self.config['process_like_dataset']}") # load params to use when processing data_config = campa_config.get_data_config(self.config["data_config"]) data_params = json.load( open( os.path.join( data_config.DATASET_DIR, self.config["process_like_dataset"], "params.json", ), ) ) # load and process data mpp_data = [] for data_dir in self.config["data_dirs"]: mpp_data.append( MPPData.from_data_dir( data_dir, seed=self.config["seed"], data_config=self.config["data_config"], ) ) # subset if necessary (do before subsampling, to get expected # of samples) if data_params["subset"]: mpp_data[-1].subset(**data_params["subset_kwargs"]) mpp_data = MPPData.concat(mpp_data) # TODO after have reproduced data, could do subsampling inside for loop (after subsetting) if self.config["subsample"]: mpp_data = mpp_data.subsample( add_neighborhood=data_params["neighborhood"], neighborhood_size=data_params["neighborhood_size"], **self.config["subsample_kwargs"], ) mpp_data.prepare(data_params) self._cluster_mpp = mpp_data if self.config["cluster_data_dir"] is not None: self._cluster_mpp.write(os.path.join(campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"]))
[docs] def predict_cluster_rep(self, exp: Experiment) -> None: """ Use experiment to predict the necessary cluster representation. Saves predicted representations to ``cluster_data_dir``. Parameters ---------- exp Experiment to use for predicting cluster_rep. """ assert self.cluster_mpp is not None if self.cluster_mpp.data(self.config["cluster_rep"]) is not None: self.log.info(f"cluster_mpp already contains key {self.config['cluster_rep']}. Not recalculating.") return pred = Predictor(exp) cluster_rep = pred.get_representation(self.cluster_mpp, rep=self.config["cluster_rep"]) self.cluster_mpp._data[self.config["cluster_rep"]] = cluster_rep # save cluster_rep if self.config["cluster_data_dir"] is not None: self.cluster_mpp.write( os.path.join(campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"]), save_keys=[self.config["cluster_rep"]], )
[docs] def create_clustering(self) -> None: """ Cluster :attr:`Cluster.cluster_mpp` using ``cluster_method`` defined in :attr:`Cluster.config`. If ``cluster_data_dir`` is defined, saves clustering there. Raises ------ ValueError if cluster_rep is not available """ # check that have cluster_rep assert self.cluster_mpp is not None if self.cluster_mpp.data(self.config["cluster_rep"]) is None: raise ValueError(f"Key {self.config['cluster_rep']} is not available for clustering.") save_keys = [self.config["cluster_name"]] # cluster if self.config["cluster_method"] == "leiden": self.log.info("Creating leiden clustering") # leiden clustering adata = self.cluster_mpp.get_adata(X=self.config["cluster_rep"]) sc.pp.neighbors(adata) sc.tl.leiden( adata, resolution=self.config["leiden_resolution"], key_added="clustering", ) self.cluster_mpp._data[self.config["cluster_name"]] = np.array(adata.obs["clustering"]) if self.config["umap"]: self.log.info("Calculating umap") sc.tl.umap(adata) self.cluster_mpp._data["umap"] = adata.obsm["X_umap"] save_keys.append("umap") elif self.config["cluster_method"] == "kmeans": self.log.info("Creating kmeans clustering") from sklearn.cluster import KMeans est = KMeans(n_clusters=self.config["kmeans_n"], random_state=0) kmeans = est.fit(self.cluster_mpp.data(self.config["cluster_rep"])).labels_ # TODO: cast kmeans to str? self.cluster_mpp._data[self.config["cluster_name"]] = kmeans else: raise NotImplementedError(self.config["cluster_method"]) # create and save pynndescent index _ = self.get_nndescent_index(recreate=True) # save to cluster_data_dir if self.config["cluster_data_dir"] is not None: self.cluster_mpp.write( os.path.join(campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"]), save_keys=save_keys, ) # add umap if not exists already if self.config["umap"] and not self.config["cluster_method"] == "leiden": self.add_umap() # recreate annotation file self._cluster_annotation = self._load_cluster_annotation(recreate=True)
[docs] def add_umap(self) -> None: """ If umap does not yet exist, but should be calculated, calculates umap. """ assert self.cluster_mpp is not None if self.config["umap"]: if self.cluster_mpp.data("umap") is not None: # already have umap, no need to calculate self.log.info("found existing umap, not recalculating.") return self.log.info("Calculating umap") adata = self.cluster_mpp.get_adata(X=self.config["cluster_rep"]) sc.pp.neighbors(adata) sc.tl.umap(adata) self.cluster_mpp._data["umap"] = adata.obsm["X_umap"] # save to cluster_data_dir if self.config["cluster_data_dir"] is not None: self.cluster_mpp.write( os.path.join(campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"]), save_keys=["umap"], )
# --- using existing cluster_mpp, project clustering ---
[docs] def project_clustering(self, mpp_data: MPPData, save_dir: str | None = None, batch_size: int = 200000) -> MPPData: """ Project already computed clustering from :attr:`Cluster.cluster_mpp` to ``mpp_data``. Parameters ---------- mpp_data Data to project the clustering to. Should contain ``cluster_rep``. save_dir Full path to dir where the clustering should be saved. batch_size Iterate over data in batches of size ``batch_size``. Returns ------- :class:`MPPData` Data with clustering. """ # check that clustering has been computed already for cluster_mpp assert self.cluster_mpp is not None assert self.cluster_mpp.data(self.config["cluster_name"]) is not None assert mpp_data.data(self.config["cluster_rep"]) is not None # get NNDescent index for fast projection index = self.get_nndescent_index() self.log.info(f"Projecting clustering to {len(mpp_data.obj_ids)} samples") # func for getting max count cluster in each row def most_frequent(arr): els, counts = np.unique(arr, return_counts=True) return els[np.argmax(counts)] # project clusters clustering = [] samples = mpp_data._data[str(self.config["cluster_rep"])] if self.config["cluster_rep"] == "mpp": # use center mpp in this special case samples = mpp_data.center_mpp for i in np.arange(0, samples.shape[0], batch_size): self.log.info(f"processing chunk {i}") cur_samples = samples[i : i + batch_size] neighs = index.query(cur_samples.astype(np.float32), k=15)[0] # NOTE: do not use apply_along_axis, because dtype is inferred incorrectly! clustering.append( np.array( [most_frequent(row) for row in self.cluster_mpp._data[self.config["cluster_name"]][neighs]], dtype=self.cluster_mpp._data[self.config["cluster_name"]].dtype, ) ) # convert from str to int to save space when saving full data NOTE new mpp_data._data[self.config["cluster_name"]] = np.concatenate(clustering) # save if save_dir is not None: mpp_data.write(save_dir, save_keys=[self.config["cluster_name"]]) return mpp_data
[docs] def predict_cluster_imgs(self, exp: Experiment) -> MPPData | None: """ Predict cluster images from experiment. Parameters ---------- exp Experiment. """ if exp.is_trainable: # create Predictor pred = Predictor(exp) # predict clustering on imgs img_save_dir = os.path.join( exp.full_path, f"results_epoch{pred.est.epoch:03d}", exp.config["evaluation"]["split"] + "_imgs", ) mpp_imgs = pred.est.ds.imgs[exp.config["evaluation"]["split"]] # add latent space to mpp_imgs + subset try: mpp_imgs.add_data_from_dir( img_save_dir, keys=[self.config["cluster_rep"]], subset=True, base_dir="", ) except FileNotFoundError: self.log.warning( f"Did not find {self.config['cluster_rep']} in {img_save_dir}. Run create_clustering first." ) return None else: img_save_dir = os.path.join( exp.full_path, "results_epoch000", exp.config["evaluation"]["split"] + "_imgs", ) mpp_imgs = MPPData.from_data_dir(img_save_dir, base_dir="", data_config=self.data_config_name) self.log.info(f"Projecting cluster_imgs for {exp.dir}/{exp.name}") return self.project_clustering(mpp_imgs, save_dir=img_save_dir)
# --- use cluster_mpp to query HPA ---
[docs] def get_hpa_localisation( self, cluster_name: str = "clustering_res0.5", thresh: float = 1, max_num_channels: int = 3, limit_to_groups: Mapping[str, str | list[str]] | None = None, **kwargs: Any, ) -> Mapping[str, Mapping[str, Any]]: """ Query subcellular localisation for each cluster from Human Protein Atlas (https://www.proteinatlas.org). Calculates cluster loadings and returns the subcellular localisations of the channels that are enriched for each cluster. Requires "hpa_gene_name" column in channel_metadata.csv file in DATA_DIR to map channel names to genes available in HPA. Parameters ---------- cluster_name Clustering to calculate localisations for. Must exist already. thresh Minimum z-scored intensity value of channel in cluster to be considered for HPA query. thresh=0 considers all enriched channel of this cluster max_num_channels Maximal number of channels to be considered for HPA query. Channels with highest z-scored intensity value will be used. If None, all channels passing `thresh` will be used. limit_to_groups Dict with obs as keys and groups from obs as values, to subset data before calculating loadings. kwargs Keyword arguments for :func:`campa.tl.query_hpa_subcellular_location`. Returns ------- Mapping[str, Mapping[str, Any]]: Results dictionary with clusters as keys, and return value from :func:`campa.tl.query_hpa_subcellular_location` """ if (self.cluster_mpp is None) or (self.cluster_mpp.data(cluster_name) is None): self.log.info(f"cannot query HPA: cluster_mpp does not exist or {cluster_name} does not exist") return {} self.set_cluster_name(cluster_name) adata = self.cluster_mpp.get_adata(X="mpp", obsm={"X_latent": "latent", "X_umap": "umap"}) # ensure that clustering is available in adata add_clustering_to_adata( os.path.join(campa_config.EXPERIMENT_DIR, self.config["cluster_data_dir"]), cluster_name, adata, self.cluster_annotation, ) # subset data if limit_to_groups is None: limit_to_groups = {} for key, groups in limit_to_groups.items(): if not isinstance(groups, list): groups = [groups] adata = adata[adata.obs[key].isin(groups)] # calculate mean z-scored intensity values means = ( pd.concat( [ pd.DataFrame(adata.X, columns=adata.var_names).reset_index(drop=True), adata.obs[[cluster_name]].reset_index(drop=True), ], axis=1, ) .groupby(cluster_name) .aggregate("mean") ) means = (means - means.mean()) / means.std() # means = means.apply(zscore, axis=1) # for each cluster, determine channels that localise to this cluster (mean z-scored intensity > thresh) # and map channel names to gene names used in HPA channels_metadata = pd.read_csv(os.path.join(self.data_config.DATA_DIR, "channels_metadata.csv"), index_col=0) cluster_localisation = {} for idx, row in means.iterrows(): channels = np.array(row[row > thresh].index) weights = np.array(row[row > thresh]) if max_num_channels is not None: selected_indices = np.argsort(weights)[::-1][:max_num_channels] weights = weights[selected_indices] channels = channels[selected_indices] cluster_localisation[idx] = (list(channels_metadata.set_index("name").loc[channels].gene_name_hpa), weights) # query hpa for each cluster results = {} for idx, (genes, weights) in cluster_localisation.items(): results[idx] = query_hpa_subcellular_location(genes, gene_weights=weights, **kwargs) return results
def query_hpa_subcellular_location( genes: Iterable[str], gene_weights: Iterable[float] | None = None, filter_reliability: Iterable[str] = ("Uncertain",), ) -> Mapping[str, Any]: """ Query the Human Protein Atlas for a consensus subcellular locations from a list of genes. HPA is availablen at https://www.proteinatlas.org Parameters ---------- genes List of genes to query in the HPA (using field "gene_name"). gene_weights List of weights for each gene, used to compute main subcellular locations. filter_reliability Do not return genes with this subcellular location reliability ("Reliability IF"). Available reliabilities (in order from most reliable to least reliable) are: Enhanced, Supported, Approved, Uncertain. See also https://www.proteinatlas.org/about/assays+annotation#if_reliability_score Returns ------- Mapping[str, Any]: Results dictionary with keys: - hpa_data: data frame of available genes and their subcellular locations according to HPA data. - subcellular_locations: pd.Series of all subcellular locations ocurring for this list of genes, sorted by most frequent """ if gene_weights is None: gene_weights = [1] * len(list(genes)) url_str = "http://www.proteinatlas.org/api/search_download.php?search=gene_name:{gene}&format=json&columns=g,gs,scml,scal,relce&compress=no" # noqa: <E501> data = [] index = [] for gene in genes: if gene is None or gene == np.nan: # type: ignore[comparison-overlap] continue cur_url_str = url_str.format(gene=gene) with urllib.request.urlopen(cur_url_str) as url: res = json.load(url) if len(res) > 0: index.append(gene) data.append(res[0]) else: print(f"No result for {gene}") # no resulting data? (either len(genes)==0 or no genes found) if len(data) == 0: return {"hpa_data": None, "subcellular_locations": None} data = pd.DataFrame(data, index=index) # filter out any columns with filter_reliability or None reliability score data = data[~data["Reliability (IF)"].isin(list(filter_reliability) + [None])] data = pd.merge( data, pd.DataFrame({"gene_weights": gene_weights}, index=genes), how="left", right_index=True, left_index=True ) # summarise locations & their occurrence counts summary = {} # type: ignore[var-annotated] for locations, weight in zip(data["Subcellular main location"], data["gene_weights"]): for loc in locations: if loc not in summary.keys(): summary[loc] = weight else: summary[loc] += weight summary = pd.Series(summary).sort_values(ascending=False) return {"hpa_data": data, "subcellular_locations": summary}
[docs]def prepare_full_dataset( experiment_dir: str, save_dir: str = "aggregated/full_data", data_dirs: list[str] | None = None ) -> None: """ Prepare all data for clustering by predicting cluster-rep. Parameters ---------- experiment_dir Experiment directory relative to ``EXPERIMENT_PATH``. save_dir Directory to save prepared full data to, relative to ``experiment_dir``. data_dirs Data to prepare. Defaults for ``exp.data_params['data_dirs']``. """ from campa.tl import Experiment log = logging.getLogger("prepare full dataset") exp = Experiment.from_dir(experiment_dir) # iterate over all data dirs if data_dirs is None: data_dirs = exp.data_params["data_dirs"] print("iterating over data dirs", data_dirs) for data_dir in data_dirs: log.info(f"Processing data_dir {data_dir}") mpp_data = MPPData.from_data_dir( data_dir, data_config=exp.config["data"]["data_config"], ) # params for partial saving of mpp_data mpp_params = {"base_data_dir": data_dir, "subset": True} # prepare mpp_data log.info("Preparing data") mpp_data.prepare(exp.data_params) if exp.config["cluster"]["cluster_rep"] == "mpp": # just save mpp mpp_data.write( os.path.join(exp.full_path, save_dir, data_dir), mpp_params=mpp_params, save_keys=["mpp"], ) else: # need to predict rep - prepare neighborhood if exp.data_params["neighborhood"]: mpp_data.add_neighborhood(exp.data_params["neighborhood_size"]) # predict rep log.info("Predicting latent") pred = Predictor(exp) pred.predict( mpp_data, reps=[exp.config["cluster"]["cluster_rep"]], save_dir=os.path.join(exp.full_path, save_dir, data_dir), mpp_params=mpp_params, )
[docs]def create_cluster_data( experiment_dir: str, subsample: bool = False, frac: float = 0.005, save_dir: str | None = None, cluster: bool = False, ) -> None: """ Create (subsampled) data for clustering. Uses dataset used to train experiment. Parameters ---------- experiment_dir Experiment directory relative to ``EXPERIMENT_PATH``. subsample Subsample the data. frac Fraction of pixels to use for clustering if ``subsample`` is True. save_dir Directory to save subsampled cluster data, relative to ``experiment_dir``. Default is ``aggregated/sub-FRAC``. cluster Use cluster parameters in Experiment config to cluster the subsetted data. """ from campa.tl import Experiment exp = Experiment.from_dir(experiment_dir) cluster_config = { "subsample": subsample, "subsample_kwargs": {"frac": frac}, } save_dir = save_dir if save_dir is not None else f"aggregated/sub-{frac}" cl = Cluster.from_exp(exp, cluster_config=cluster_config, data_dir=save_dir) # create cluster_mpp cl.create_cluster_mpp() # predict rep cl.predict_cluster_rep(exp) if cluster: # cluster (also gets umap) cl.create_clustering() else: # get umap cl.add_umap()
[docs]def project_cluster_data( experiment_dir: str, cluster_data_dir: str, cluster_name: str = "clustering", save_dir: str = "aggregated/full_data", data_dir: str | None = None, ) -> None: """ Project existing clustering to new data. Parameters ---------- experiment_dir Experiment directory relative to ``EXPERIMENT_PATH``. cluster_data_dir Directory in which clustering is stored relative to experiment dir. Usually in ``aggregated/sub-FRAC``. cluster_name Name of clustering to project. save_dir Directory in which the data to be projected is stored, relative to ``experiment_dir``. data_dir Data directory to project. If not specified, project all ``data_dir``s in ``save_dir``. Relative to ``save_dir``. """ from campa.tl import Experiment exp = Experiment.from_dir(experiment_dir) # set up cluster data cl = Cluster.from_cluster_data_dir(os.path.join(exp.dir, exp.name, cluster_data_dir)) cl.set_cluster_name(cluster_name) assert cl.cluster_mpp is not None assert cl.cluster_mpp.data(cluster_name) is not None, f"cluster data needs to contain clustering {cluster_name}" # iterate over all data dirs data_dirs: Iterable[str] = exp.data_params["data_dirs"] if data_dir is None else [data_dir] for cur_data_dir in data_dirs: # load mpp_data with cluster_rep mpp_data = load_full_data_dict( exp, keys=["x", "y", "obj_ids", cl.config["cluster_rep"]], data_dirs=[cur_data_dir], save_dir=save_dir )[cur_data_dir] cl.project_clustering(mpp_data, save_dir=os.path.join(exp.full_path, save_dir, cur_data_dir))
[docs]def load_full_data_dict( exp: Experiment, keys: Iterable[str] = ("x", "y", "obj_ids", "latent"), data_dirs: Iterable[str] | None = None, save_dir: str = "aggregated/full_data", ) -> Mapping[str, MPPData]: """ Load mpp_datas used in experiment in a dict. NOTE: this might take a few minutes, as all data needs to be loaded. Parameters ---------- exp Experiment from which to load the mpp_datas. keys Controls which numpy data matrices are being loaded. Passed to :meth:`MPPData.from_data_dir`, with `optional_keys` set to empty list. Excluding mpp here speeds up loading. data_dirs Directories that should be loaded, if None, all ``data_dirs`` are loaded. save_dir Directory in which the data to be loaded is stored, relative to ``{exp.dir}/{exp.name}``. Returns ------- Dictionary with `data_dirs` as keys and :class:`MPPData` as values. """ if data_dirs is None: data_dirs = exp.data_params["data_dirs"] mpp_datas = {} for data_dir in data_dirs: mpp_datas[data_dir] = MPPData.from_data_dir( data_dir, base_dir=os.path.join(exp.full_path, save_dir), keys=keys, optional_keys=[], data_config=exp.config["data"]["data_config"], ) return mpp_datas
[docs]def get_clustered_cells( mpp_datas: Mapping[str, MPPData], cl: Cluster, cluster_name: str, num_objs: int = 5 ) -> Mapping[str, Mapping[str, MPPData]]: """ Get `num_objs` example cells for each `mpp_data`. Parameters ---------- mpp_datas Data from which to sample and cluster cells. cl Clustering object to use. cluster_name Name of the clustering. num_objs Number of objects (cells) to sample from each mpp_data. Returns ------- dictionary containing clustered cells in `cluster_name` and coloured cells in `cluster_name_colored`. """ from campa.pl import annotate_img cl.set_cluster_name(cluster_name) res: dict[str, dict[str, Any]] = {cluster_name: {}, cluster_name + "_colored": {}} for data_dir, mpp_data in mpp_datas.items(): print(data_dir) # get random obj_ids for this mpp_data rng = np.random.default_rng(seed=42) obj_ids = rng.choice(mpp_data.unique_obj_ids, num_objs, replace=False) # subset mpp_data to obj_ids sub_mpp_data = mpp_data.subset(obj_ids=obj_ids, copy=True) # project_clustering sub_mpp_data = cl.project_clustering(sub_mpp_data) # if only need colored cells, can pass annotation_kwargs to get_object_imgs res[cluster_name][data_dir] = sub_mpp_data.get_object_imgs( data=cluster_name ) # annotation_kwargs={'color': True, 'annotation': cl.cluster_annotation}) res[cluster_name + "_colored"][data_dir] = [ annotate_img(img, annotation=cl.cluster_annotation, from_col=cluster_name, color=True) for img in res[cluster_name][data_dir] ] return res