Source code for campa.tl._experiment

from copy import deepcopy
from typing import Any, List, Mapping, Iterable, Optional, MutableMapping
import os
import re
import glob
import json
import logging

import pandas as pd
import tensorflow as tf

from campa.tl import LossEnum, ModelEnum
from campa.data import MPPData
from campa.utils import load_config, merged_config
from campa.constants import campa_config


[docs]class Experiment: """ Experiment stored on disk with neural network. Initialised with config dictionary with keys: - `experiment`: where to save experiment - `dir`: experiment folder - `name`: name of the experiment - `save_config`: (bool), whether to save this config in the folder - `data`: which dataset to use for training - `data_config`: name of the data config to use, should be registered in ``campa.ini`` - `dataset_name`: name of the dataset, relative to ``DATA_DIR`` - `output_channels`: Channels that should be predicted by the neural network. Defaults to all input channels. - `model`: model definition - `model_cls`: instance or value of :class:`ModelEnum` - `model_kwargs`: keyword arguments passed to the model class - `init_with_weights`: if true, looks for saved weights in experiment_dir. if a path, loads these weights - `training`: training hyper-parameters - `learning_rate`: learning rate to use - `epochs`: number of epochs to train - `batch_size`: number of samples per batch - `loss`: mapping of model output names to values of :class:`LossEnum`. Possible names are `decoder` and `latent`. - `metrics`: mapping of model output names to values of :class:`LossEnum`. - `save_model_weights`: (bool) whether or not to save the model. - `save_history`: (bool) save csv with losses and metrics at each epoch. - `overwrite_history`: overwrite existing history csv file. Otherwise concatenate to it. - `evaluation`: evaluation on val/test split - `split`: `train`, `val`, or `test` - `predict_reps`: (list) model output that should be predicted. Possible values: `decoder`, `latent`. - `img_ids`: number of images to predict, or list of image ids. - `predict_imgs`: (bool) whether to predict reconstructed images. - `predict_cluster_imgs`: (bool) whether to predict clustered images. - `cluster`: clustering on val/test split - `cluster_name`: name of the clustering, used to save `npy` file. - `cluster_rep`: model output name to use for clustering, or "mpp". - `cluster_method`: `leiden` or `kmeans`. - `leiden_resolution`: resolution parameter for leiden clustering. - `subsample`: None or "subsample", whether or not to subsample data before clustering. - `subsample_kwargs`: passed to :meth:`campa.data.MPPData.subsample` for creating the subsample data for clustering. - `umap`: (bool) predict UMAP of cluster_rep. Parameters ---------- config Experiment config. """ # base experiment config config: MutableMapping[str, Any] = { "experiment": { "dir": None, "name": "experiment", "save_config": True, }, "data": { "data_config": None, "dataset_name": None, "output_channels": None, }, "model": { "model_cls": ModelEnum.BaseAEModel, # instance or value of ModelEnum "model_kwargs": {}, # if true, looks for saved weights in experiment_dir # if a path, loads these weights "init_with_weights": False, }, "training": { "learning_rate": 0.001, "epochs": 10, "batch_size": 128, "loss": {"decoder": LossEnum.MSE}, # instance or value of LossEnum "loss_weights": {"decoder": 1}, "loss_warmup_to_epoch": {}, "metrics": {"decoder": LossEnum.MSE}, # instance or value of LossEnum # saving models "save_model_weights": True, "save_history": True, "overwrite_history": True, }, "evaluation": { # TODO change this to fit to aggregation params "split": "val", "predict_reps": ["latent", "decoder"], "img_ids": 25, "predict_imgs": True, }, "cluster": { # cluster config, also used in this format for whole data clustering "predict_cluster_imgs": True, "cluster_name": "clustering", "cluster_rep": "latent", "cluster_method": "leiden", # leiden or kmeans "leiden_resolution": 0.8, "subsample": None, # 'subsample' or 'som' "subsample_kwargs": {}, "som_kwargs": {}, "umap": True, }, } def __init__(self, config: Mapping[str, Any]): self.config = merged_config(self.config, config) """ Experiment config, see :class:`Experiment`. """ self.log = logging.getLogger(self.__class__.__name__) self.log.info(f"Setting up experiment {self.dir}/{self.name}") data_config = campa_config.get_data_config(self.config["data"]["data_config"]) # load data_params self.data_params = json.load( open( os.path.join( data_config.DATASET_DIR, self.config["data"]["dataset_name"], "params.json", ), ) ) # create exp_path if self.dir is not None: os.makedirs(self.full_path, exist_ok=True) if self.config["experiment"]["save_config"]: self.log.info(f"Saving config to {self.dir}/{self.name}/config.json") json.dump( self.config, open(os.path.join(self.full_path, "config.json"), "w"), indent=4, ) else: self.log.info("exp_dir is None, did not save config")
[docs] @classmethod def from_dir(cls, exp_path: str) -> "Experiment": """ Initialise experiment from trained experiment in exp_path. Changes ``init_with_weights`` to True and ``save_config`` to False Parameters ---------- exp_path path to experiment, relative to campa_config.EXPERIMENT_DIR """ # load config from json config_fname = os.path.join(campa_config.EXPERIMENT_DIR, exp_path, "config.json") assert os.path.exists(config_fname), f"no config.json in {campa_config.EXPERIMENT_DIR}/{exp_path}" config = json.load(open(config_fname)) # set save_config to False to avoid overwriting config["experiment"]["save_config"] = False self = cls(config) self.log.info(f"Initialised from existing experiment in {self.dir}/{self.name}") return self
[docs] def set_to_evaluate(self) -> "Experiment": """ Prepare Experiment for evaluation. Changes ``init_with_weights`` to True to load correct weights in :class:`Estimator`. """ self.config["model"]["init_with_weights"] = True return self
@property def is_trainable(self) -> bool: """ Return false, if this is not a trainable experiment. """ return self.config["model"] is not None and self.config["training"] is not None @property def name(self) -> str: """ Experiment name. """ return str(self.config["experiment"]["name"]) @property def dir(self) -> str: # noqa: A003 """ Experiment directory. """ return str(self.config["experiment"]["dir"]) @property def full_path(self) -> str: """ Full path to Experiment. """ return os.path.join(campa_config.EXPERIMENT_DIR, self.dir, self.name) @property def estimator_config(self) -> Mapping[str, Any]: """ Config dictionary to initialise :class:`campa.tl.Estimator`. """ estimator_config = { key: val for key, val in self.config.items() if key in ["experiment", "data", "model", "training"] } # return copy to avoid side effects on self.config return deepcopy(estimator_config) @property def evaluate_config(self) -> Mapping[str, Any]: """ Config dictionary to initialise :class:`campa.tl.Predictor`. """ evaluate_config = self.config["evaluation"] return deepcopy(evaluate_config)
[docs] def get_history(self) -> Optional[pd.DataFrame]: """ Training history. Returns ------- training history. """ history_path = os.path.join(self.full_path, "history.csv") if os.path.isfile(history_path): return pd.read_csv(history_path, index_col=0) else: return None
@property def epoch(self) -> int: """ Last epoch for which there is a trained model. """ weights_path = tf.train.latest_checkpoint(self.full_path) if weights_path is None: return 0 # find epoch in weights_path res = re.findall(r"epoch(\d\d\d)", os.path.basename(weights_path)) if len(res) == 0: return 0 else: return int(res[0])
[docs] def get_split_mpp_data(self) -> Optional[MPPData]: """ Val or test :class:`MPPData` read from ``results_epoch{self.epoch}``. Whether val or test is returned depends on evaluation split defined in config. """ split = self.config["evaluation"]["split"] data_dir = os.path.join(self.full_path, f"results_epoch{self.epoch:03d}", split) if os.path.isdir(data_dir): return MPPData.from_data_dir( data_dir, base_dir="", keys=["x", "y", "obj_ids", "mpp"], optional_keys=list( { self.config["cluster"]["cluster_rep"], "latent", "decoder", self.config["cluster"]["cluster_name"], "umap", } ), data_config=self.config["data"]["data_config"], ) return None
[docs] def get_split_imgs_mpp_data(self) -> Optional[MPPData]: """ `Val_imgs` / `test_imgs` :class:`MPPData` read from ``results_epoch{self.epoch}```. Whether val or test is returned depends on evaluation split defined in config. """ split = self.config["evaluation"]["split"] data_dir = os.path.join(self.full_path, f"results_epoch{self.epoch:03d}", split + "_imgs") if os.path.isdir(data_dir): return MPPData.from_data_dir( data_dir, base_dir="", keys=["x", "y", "obj_ids", "mpp"], optional_keys=list( { self.config["cluster"]["cluster_rep"], "latent", "decoder", self.config["cluster"]["cluster_name"], "umap", } ), data_config=self.config["data"]["data_config"], ) return None
[docs] def get_split_cluster_annotation(self, cluster_name: str = "clustering") -> pd.DataFrame: """ Read cluster_annotation file for evaluation split from disk. Looks for file ``{cluster_name}_annotation.csv`` in ``results_epoch{self.epoch}``. Parameters ---------- cluster_name Name of clustering. Returns ------- The cluster annotation file. """ fname = os.path.join( self.full_path, f"results_epoch{self.epoch:03d}", self.config["evaluation"]["split"], f"{cluster_name}_annotation.csv", ) # TODO this reading is duplicated in Cluster (where annotation is first created) return pd.read_csv(fname, index_col=0, dtype=str, keep_default_na=False)
[docs] def get_cluster_annotation( self, cluster_name: str = "clustering", cluster_dir: Optional[str] = None ) -> pd.DataFrame: """ Read cluster_annotation file for full data from disk. Looks for file ``{cluster_name}_annotation.csv`` in ``cluster_dir``. If ``cluster_dir`` is None, is is set to the first dir of ``aggregated/sub-*``. Parameters ---------- cluster_name Name of clustering. cluster_dir Directory in which to find the clustering. Returns ------- The cluster annotation file. """ # TODO need to somehow figure out sub dir! if cluster_dir is None: for f in glob.glob(os.path.join(self.full_path, "aggregated/sub-*")): cluster_dir = "aggregated/" + os.path.basename(f) self.log.info(f"Cluster annotation: using cluster data in {cluster_dir}") break fname = os.path.join(self.full_path, cluster_dir, f"{cluster_name}_annotation.csv") # type: ignore[arg-type] return pd.read_csv(fname, index_col=0, dtype=str, keep_default_na=False)
[docs] @staticmethod def get_experiments_from_config(config_fname: str, exp_names: Optional[Iterable[str]] = None) -> List["Experiment"]: """ Initialise and return experiments from configs in config file. Parameters ---------- config_fname full path to config file containing experiment definitions. exp_names List of experiment names to load. If None, all are loaded. Returns ------- `typing.Iterable[Experiment]` Initialised experiments. """ config = load_config(config_fname) exps = [] for exp_config in config.variable_config: cur_config = merged_config(config.base_config, exp_config) if exp_names is None or cur_config["experiment"]["name"] in exp_names: exps.append(Experiment(cur_config)) return exps
[docs] @staticmethod def get_experiments_from_dir( exp_dir: str, exp_names: Optional[Iterable[str]] = None, only_trainable: bool = False ) -> List["Experiment"]: """ Initialise and return experiments from experiment directory. Parameters ---------- exp_dir Experiment directory, relative to campa_config.EXPERIMENT_DIR. exp_names List of experiment names to load. If None, all are loaded. only_trainable Only return trainable experiments. Returns ------- Initialised experiments. """ exps = [] for exp_name in next(os.walk(os.path.join(campa_config.EXPERIMENT_DIR, exp_dir)))[1]: config_fname = os.path.join(campa_config.EXPERIMENT_DIR, exp_dir, exp_name, "config.json") if os.path.exists(config_fname) and ((exp_names is None) or (exp_name in exp_names)): exp = Experiment.from_dir(os.path.join(exp_dir, exp_name)) if not only_trainable or exp.is_trainable: exps.append(exp) return exps
[docs]def run_experiments(exps: Iterable[Experiment], mode: str = "all") -> None: """ Execute experiments. Runs all given experiments in the given mode. The following modes are available: - `train`: train experiments (if trainable) - `evaluate`: predict experiments on val set and cluster results (on val set) - `trainval`: both train and evaluate - `compare`: generate comparative plots of experiments - `all`: trainval and compare Parameters ---------- exps Experiments to run. mode mode, one of "train", "evaluate", "trainval", "compare", "all". """ from campa.tl import Cluster, Estimator, Predictor, ModelComparator assert mode in ["train", "evaluate", "trainval", "compare", "all"], f"unknown mode {mode}" exp_names = [exp.name for exp in exps] print(f"Running experiment for {exp_names} with mode {mode}") for exp_name, exp in zip(exp_names, exps): if mode in ("all", "train", "trainval"): if exp.is_trainable: print(f"Training model for {exp_name}") est = Estimator(exp) _ = est.train_model() if mode in ("all", "evaluate", "trainval"): if exp.is_trainable: # evaluate model print(f"Evaluating model for {exp_name}") pred = Predictor(exp) pred.evaluate_model() else: _prepare_exp_split(exp) # cluster model print(f"Clustering results for {exp_name}") cl = Cluster.from_exp_split(exp) cl.create_clustering() # predict cluster for images if exp.config["evaluation"]["predict_cluster_imgs"]: cl.predict_cluster_imgs(exp) # compare models if mode in ("all", "compare"): # assumes that all experiments have the same experiment_dir comp = ModelComparator(exps, save_dir=os.path.join(campa_config.EXPERIMENT_DIR, list(exps)[0].dir)) comp.plot_history(values=["val_loss", "val_decoder_loss"]) comp.plot_final_score( score="val_decoder_loss", fallback_score="val_loss", save_prefix="decoder_loss_", ) comp.plot_per_channel_mse() comp.plot_predicted_images(img_ids=[0, 1, 2, 3, 4], img_size=list(exps)[0].data_params["test_img_size"]) comp.plot_cluster_images(img_ids=[0, 1, 2, 3, 4], img_size=list(exps)[0].data_params["test_img_size"]) comp.plot_umap()
def _prepare_exp_split(exp: Experiment) -> None: """ Set up exp split data for non trainable model. Mimicks results folders created with predictor. """ import numpy as np from campa.data import MPPData # create results mpp_data for not trainable experiment to allow usage with Cluster for split in [ exp.config["evaluation"]["split"], exp.config["evaluation"]["split"] + "_imgs", ]: base_data_dir = os.path.join("datasets", exp.data_params["dataset_name"], split) mpp_params = {"base_data_dir": base_data_dir, "subset": True} mpp_data = MPPData.from_data_dir(base_data_dir, data_config=exp.config["data"]["data_config"]) if "_imgs" in split: # choose random img_ids from availalbe ones rng = np.random.default_rng(seed=42) img_ids = rng.choice( mpp_data.unique_obj_ids, exp.config["evaluation"]["img_ids"], replace=False, ) # subset mpp_data to these img_ids mpp_data.subset(obj_ids=img_ids) mpp_data.write( save_dir=os.path.join(exp.full_path, "results_epoch000", split), mpp_params=mpp_params, save_keys=[], )