Source code for campa.data._nn_dataset

from typing import Any, Dict, List, Tuple, Union, Mapping, Iterable, Optional
import os
import json
import logging

import numpy as np
import tensorflow as tf

from campa.constants import campa_config
from campa.data._data import MPPData


[docs]def create_dataset(params: Mapping[str, Any]) -> None: """ Create a :class:`NNDataset`. Parameters determine how the data should be selected and processed. The following keys in parameters are expected: - ``dataset_name``: name of the resulting dataset that is defined by these parameters (relative to ``DATA_DIR/datasets``) - ``data_config``: name of data configuration (registered in ``campa.ini``) - ``data_dirs``: where to read data from (relative to ``DATA_DIR`` defined in data config) - ``channels``: list of channel names to include in this dataset - ``condition``: list of conditions. Should be defined in data config. The suffix `_one_hot` will convert the condition in a one-hot encoded vector. Conditions are concatenated, except when they are defined as a list of lists. In this case the condition is defined as a pairwise combination of the conditions. - ``condition_kwargs``: kwargs to :meth:`MPPData.add_conditions` - ``split_kwargs``: kwargs to :meth:`MPPData.train_val_test_split` - ``test_img_size``: standard size of images in test set. Imaged are padded/truncated to this size - ``subset``: (bool) subset to objects with certain metadata. - ``subset_kwargs``: kwargs to :meth:`MPPData.subset` defining which object to subset to - ``subsample``: (bool) subsampling of pixels (only for train/val) - ``subsample_kwargs``: kwargs for :meth:`MPPData.subsample` defining the fraction of pixels to be sampled - ``neighborhood``: (bool) add local neighbourhood to samples in NNDataset - ``neighborhood_size``: size of neighbourhood - ``normalise``: (bool) Intensity normalisation - ``normalise_kwargs``: kwargs to :meth:`MPPData.normalise` - ``seed``: random seed to make subsampling reproducible Parameters ---------- params parameter dict """ log = logging.getLogger() log.info("Creating train/val/test datasets with params:") log.info(json.dumps(params, indent=4)) p = params # prepare outdir data_config = campa_config.get_data_config(p["data_config"]) outdir = os.path.join(data_config.DATASET_DIR, p["dataset_name"]) os.makedirs(outdir, exist_ok=True) # prepare datasets mpp_datas: Mapping[str, List[MPPData]] = {"train": [], "val": [], "test": []} for data_dir in p["data_dirs"]: mpp_data = MPPData.from_data_dir(data_dir, seed=p["seed"], data_config=p["data_config"]) train, val, test = mpp_data.train_val_test_split(**p["split_kwargs"]) # subsample train data now if p["subsample"]: train = train.subsample( add_neighborhood=p["neighborhood"], neighborhood_size=p["neighborhood_size"], **p["subsample_kwargs"], ) elif p["neighborhood"]: train.add_neighborhood(p["neighborhood_size"]) mpp_datas["train"].append(train) mpp_datas["test"].append(test) mpp_datas["val"].append(val) # merge all datasets train = MPPData.concat(mpp_datas["train"]) val = MPPData.concat(mpp_datas["val"]) test = MPPData.concat(mpp_datas["test"]) # prepare (channels, normalise, condition, subset) train.prepare(params) # this has side-effects on params, st val + test use correct params val.prepare(params) test.prepare(params) # save test and val imgs val.write(os.path.join(outdir, "val_imgs")) test.write(os.path.join(outdir, "test_imgs")) # subsample and add neighbors to val and test (for prediction during training) if p["subsample"]: val = val.subsample( add_neighborhood=p["neighborhood"], neighborhood_size=p["neighborhood_size"], **p["subsample_kwargs"], ) test = test.subsample( add_neighborhood=p["neighborhood"], neighborhood_size=p["neighborhood_size"], **p["subsample_kwargs"], ) elif p["neighborhood"]: val.add_neighborhood(p["neighborhood_size"]) test.add_neighborhood(p["neighborhood_size"]) log.info("-------------------") log.info("created datasets:") log.info(f"train: {str(train)}") log.info(f"val: {str(val)}") log.info(f"test: {str(test)}") log.info("-------------------") # save datasets train.write(os.path.join(outdir, "train")) val.write(os.path.join(outdir, "val")) test.write(os.path.join(outdir, "test")) # save params json.dump(params, open(os.path.join(outdir, "params.json"), "w"), indent=4)
[docs]class NNDataset: """ Dataset for training and evaluation of neural networks. A ``NNDataset`` is stored within ``DATA_DIR/dataset_name``. This folder contains `train`/`val`/`test`/`val_img`/`test_img` folders with :class:`MPPData` objects. Parameters ---------- dataset_name: name of the dataset, relative to ``DATA_DIR`` data_config: name of the data config to use, should be registered in ``campa.ini`` """ def __init__(self, dataset_name: str, data_config: Optional[str] = None): self.log = logging.getLogger(self.__class__.__name__) if data_config is None: self.data_config_name = "NascentRNA" self.log.warning(f"Using default data_config {self.data_config_name}") else: self.data_config_name = data_config self.data_config = campa_config.get_data_config(self.data_config_name) self.dataset_folder = os.path.join(self.data_config.DATASET_DIR, dataset_name) # data self.data: Dict[str, MPPData] = { "train": MPPData.from_data_dir( os.path.join(self.dataset_folder, "train"), base_dir="", data_config=self.data_config_name ), "val": MPPData.from_data_dir( os.path.join(self.dataset_folder, "val"), base_dir="", data_config=self.data_config_name ), "test": MPPData.from_data_dir( os.path.join(self.dataset_folder, "test"), base_dir="", data_config=self.data_config_name ), } """ Train, val, and test MPPDatas. """ self.imgs: Dict[str, MPPData] = { "val": MPPData.from_data_dir( os.path.join(self.dataset_folder, "val_imgs"), base_dir="", data_config=self.data_config_name ), "test": MPPData.from_data_dir( os.path.join(self.dataset_folder, "test_imgs"), base_dir="", data_config=self.data_config_name ), } """ Val and test MPPDatas containing entire images for visualisation. """ self.channels = self.data["train"].channels.reset_index().set_index("name") self.params = json.load(open(os.path.join(self.dataset_folder, "params.json"))) def __str__(self): s = f"NNDataset for {self.data_config_name} (shape {self.data['train'].mpp.shape[1:]})." s += f" train: {len(self.data['train'].mpp)}, val: {len(self.data['val'].mpp)}," s += f" test: {len(self.data['test'].mpp)}" return s
[docs] def x(self, split: str, is_conditional: bool = False) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Neural network inputs. Parameters ---------- split One of `train`, `val`, `test`. is_conditional Whether to add condition information to x """ x = self.data[split].mpp.astype(np.float32) if is_conditional: c = self.data[split].conditions.astype(np.float32) # type: ignore[union-attr] x = (x, c) # type: ignore[assignment] return x
[docs] def y(self, split: str, output_channels: Optional[Iterable[str]] = None) -> np.ndarray: """ Groundtruth outputs. Parameters ---------- split One of `train`, `val`, `test`. output_channels Channels that should be predicted by the neural network. Defaults to all input channels. """ y = self.data[split].center_mpp if output_channels is not None: channel_ids = self.data[split].get_channel_ids(list(output_channels)) y = y[:, channel_ids] return y
[docs] def get_tf_dataset( self, split: str = "train", output_channels: Optional[Iterable[str]] = None, is_conditional: bool = False, repeat_y: Union[bool, int] = False, add_c_to_y: bool = False, shuffled: bool = False, ) -> tf.data.Dataset: """ :class:`tf.data.Dataset` of the desired split. Parameters ---------- split One of `train`, `val`, `test`. output_channels Channels that should be predicted by the neural network. Defaults to all input channels. is_conditional Whether to add condition information to x repeat_y: Match output length to number of losses (otherwise keras will not work, even if its losses that do not need y). add_c_to_y Append condition to y. Needed for adversarial loss. shuffled Shuffle indices before generating data. Will produce same order every time. Returns ------- :class:`tf.data.Dataset` The dataset. """ output_types = [] output_shapes = [] # x x = self.x(split, is_conditional) if is_conditional: num = x[0].shape[0] output_types.append((tf.float32, tf.float32)) output_shapes.append((tf.TensorShape(x[0].shape[1:]), tf.TensorShape(x[1].shape[1:]))) else: num = x.shape[0] # type: ignore[union-attr] output_types.append(tf.float32) output_shapes.append(tf.TensorShape(x.shape[1:])) # type: ignore[union-attr] # y y = self.y(split, output_channels) output_types.append(tf.float32) output_shapes.append(tf.TensorShape(y.shape[1:])) if repeat_y is not False: # TODO concat c here instead of y! (for adv loss) output_types[1] = tuple(tf.float32 for _ in range(repeat_y)) # type: ignore[assignment] y = tuple(y for _ in range(repeat_y)) # type: ignore[assignment] output_shapes[1] = tuple(output_shapes[1] for _ in range(repeat_y)) # type: ignore[assignment] if add_c_to_y is not False: assert is_conditional # get output_type and shape for c from first output c_output_type = output_types[0][1] c_output_shape = output_shapes[0][1] # add c to y and output data to types and shapes if isinstance(output_types[1], tuple): output_types[1] = tuple(list(output_types[1]) + [c_output_type]) # type: ignore[assignment] y = tuple(list(y) + [x[1]]) # type: ignore[assignment] output_shapes[1] = tuple(list(output_shapes[1]) + [c_output_shape]) # type: ignore[assignment] else: output_types[1] = (output_types[1], c_output_type) y = (y, x[1]) output_shapes[1] = (output_shapes[1], c_output_shape) # create a generator dataset: indices = np.arange(num) if shuffled: rng = np.random.default_rng(seed=0) rng.shuffle(indices) def gen(): for i in indices: if is_conditional: el_x = (x[0][i], x[1][i]) else: el_x = x[i] # type: ignore[assignment] if repeat_y is not False: el_y = tuple(y[j][i] for j in range(len(y))) else: el_y = y[i] yield (el_x, el_y) dataset = tf.data.Dataset.from_generator(gen, tuple(output_types), tuple(output_shapes)) return dataset