Source code for campa.data._download_data

from typing import Union
from pathlib import Path
import os
import shutil
import tempfile

from tqdm import tqdm
import requests

from campa.constants import SCRIPTS_DIR

Path_t = Union[str, Path]


[docs]def load_example_data(data_dir: Path_t = None) -> Path_t: """ Download example data to ``data_dir``. Parameters ---------- data_dir Defaults to ``notebooks/example_data``. Returns ------- Path to folder where dataset is stored. """ from pathlib import Path fname = "example_data" if data_dir is None: data_dir = Path(__file__).parent.parent.parent / "notebooks" folder_dir = load_dataset( dataset_path=data_dir, fname=fname, backup_url="https://figshare.com/ndownloader/files/38021094", ) return folder_dir
[docs]def load_example_experiment(experiment_dir: Path_t = None) -> Path_t: """ Download example experiment to ``experiment_dir``. Parameters ---------- experiment_dir Defaults to ``notebooks/example_experiments``. Returns ------- Path to folder where experiment is stored """ if experiment_dir is None: experiment_dir = Path(__file__).parent.parent.parent / "notebooks" / "example_experiments" url = "https://figshare.com/ndownloader/files/38021925" uncpacked_dir = Path(os.path.join(experiment_dir, "test_pre_trained")) archive_path = Path(os.path.join(experiment_dir, "test_pre_trained.zip")) os.makedirs(uncpacked_dir, exist_ok=True) foldercontent = os.listdir(str(uncpacked_dir)) if "weights_epoch010.index" in foldercontent: return uncpacked_dir elif archive_path.exists(): shutil.unpack_archive(archive_path, uncpacked_dir) return uncpacked_dir elif not archive_path.exists(): print("Path or dataset does not yet exist. Attempting to download...") download( url, output_path=archive_path, ) shutil.unpack_archive(archive_path, uncpacked_dir) return uncpacked_dir
def load_test_data(): """ Download test data to ``SCRIPTS_DIR/tests``. """ url = "https://figshare.com/ndownloader/files/34988353" base_dir = os.path.join(SCRIPTS_DIR, "tests") archive_path = os.path.join(base_dir, "_test_data.zip") # check if is downloaded already if os.path.exists(os.path.join(base_dir, "_data")) and os.path.exists(os.path.join(base_dir, "_experiments")): datacontent = os.listdir(os.path.join(base_dir, "_data")) experimentcontent = os.listdir(os.path.join(base_dir, "_experiments")) if "channels_metadata.csv" in datacontent and "reference_experiment" in experimentcontent: return # have to unpack/redownload if os.path.exists(archive_path): shutil.unpack_archive(archive_path, base_dir) else: print("Path or dataset does not yet exist. Appemting to download...") download(url, output_path=archive_path) shutil.unpack_archive(archive_path, base_dir) return def load_dataset(dataset_path: Path_t, fname: str, backup_url: str) -> Path_t: """ Load dataset (from URL). If unpacked files are already stored in `dataset_path/fname` don't do anything. Otherwise check for archive file in `dataset_path/fname.zip` and unpack it into `dataset_path/fname`. If no files are present there, attempt to load the dataset from URL into `dataset_path/fname.zip` and unpack it from there. Parameters ---------- dataset_path Path where folder for the dataset will be created. fname Desired name of the dataset backup_url Link from which dataset will be loaded Returns ------- path to a folder where unpacked dataset is stored """ uncpacked_dir = Path(os.path.join(dataset_path, fname)) archive_path = Path(os.path.join(dataset_path, fname, f"{fname}.zip")) os.makedirs(uncpacked_dir, exist_ok=True) foldercontent = os.listdir(str(uncpacked_dir)) if "channels_metadata.csv" in foldercontent: return uncpacked_dir elif archive_path.exists(): shutil.unpack_archive(archive_path, uncpacked_dir) return uncpacked_dir elif not archive_path.exists(): if backup_url is None: raise Exception( f"File or directory {archive_path} does not exist and no backup_url was provided.\n" f"Please provide a backup_url or check whether path is spelled correctly." ) print("Path or dataset does not yet exist. Attempting to download...") download( backup_url, output_path=archive_path, ) shutil.unpack_archive(archive_path, uncpacked_dir) return uncpacked_dir def getFilename_fromCd(cd): """ Get filename from content-disposition or url request. """ import re print(cd) if not cd: return None fname = re.findall("filename=(.+)", cd) if len(fname) == 0: return None fname = fname[0] if '"' in fname: fname = fname.replace('"', "") return fname def download( url: str, output_path: Path_t = None, block_size: int = 1024, overwrite: bool = False, ) -> None: """ Download a dataset irrespective of the format. Parameters ---------- url URL to download output_path Path to download/extract the files to block_size Block size for downloads in bytes (default: 1024) overwrite Whether to overwrite existing files (default: False) """ if output_path is None: output_path = tempfile.gettempdir() response = requests.get(url, stream=True) print(response.headers) filename = getFilename_fromCd(response.headers.get("content-disposition")) if filename is None: # use content-disposition is empty, try to guess filename filename = os.path.basename(output_path) print(f"Guessed filename: {filename}") # currently supports zip, tar, gztar, bztar, xztar download_to_folder = Path(output_path).parent os.makedirs(download_to_folder, exist_ok=True) archive_formats, _ = zip(*shutil.get_archive_formats()) is_archived = str(Path(filename).suffix)[1:] in archive_formats assert is_archived download_to_path = os.path.join(download_to_folder, filename) if Path(download_to_path).exists(): warning = f"File {download_to_path} already exists!" if not overwrite: print(warning) return else: print(f"{warning} Overwriting...") total = int(response.headers.get("content-length", 0)) print(f"Downloading... {total}") with open(download_to_path, "wb") as file: for data in tqdm(response.iter_content(block_size)): file.write(data) os.replace(download_to_path, str(output_path))