Source code for marsimage.download

"""Download images from the PDS archive."""

import concurrent.futures
import logging
import os
import re
import time
from pathlib import Path
from time import sleep

import pandas as pd
import requests
from tqdm.auto import tqdm

from .filename import Filename

logger = logging.getLogger(__name__)

# url
PDS3_URL = 'https://planetarydata.jpl.nasa.gov/w10n/msl/'  # The MSL PDS3 archive
PDS4_URL = (
    'https://planetarydata.jpl.nasa.gov/w10n/msl/msl_mmm/'  # New MMM PDS4 bundle since release 33
)

PDS3_END = 3644  # PDS3 was used until Sol 3644
PDS4_START = 3570  # PDS4 was used from Sol 3570

# check if directories exists, if not create them
CAM_LIST = ['mastcam', 'mahli', 'navcam', 'mardi', 'hazcam']
CAM_IDS = {
    'mastcam': 'MSLMST',
    'mahli': 'MSLMHL',
    'navcam': 'MSLNAV_1',
    'mardi': 'MSLMRD',
    'hazcam': 'MSLHAZ_1',
}

MSL_PROD_FILTER = ['DRXX', 'RAD_', 'MXY_']  # radiometrically calibrated, non linearized images

TIMEOUT = 600  # 10 minutes


def _urljoin(*args):
    """Join given arguments into an url.

    Trailing but not leading slashes are stripped for each argument.
    """
    # https://stackoverflow.com/questions/1793261/how-to-join-components-of-a-path-when-you-are-constructing-a-url-in-python
    return '/'.join(str(x).rstrip('/') for x in args)


def _get_json(url, retries=3):
    """Fetch JSON data from a PDS W10N url with retries.

    Parameters
    ----------
    url : str
        The URL to fetch JSON data from.
    retries : int, optional
        The number of retry attempts in case of a timeout, by default 3.

    Returns
    -------
    dict
        The JSON data fetched from the URL.
    """
    for attempt in range(retries):
        try:
            response = requests.get(url + '/?output=json', timeout=TIMEOUT)
            response.raise_for_status()
            return response.json()
        except (
            requests.exceptions.Timeout,
            requests.exceptions.HTTPError,
        ) as e:
            # if not a timeout, log the error and return an empty dictionary
            if response.status_code not in {408, 504, 502}:  # timeout status codes
                logger.debug(f'Error {response.status_code} occurred for URL: {url}. Skipping...')
                return {'nodes': {}, 'leaves': {}}
            logger.debug(
                f'Error occurred for URL: {url}. Retrying {attempt + 1}/{retries}... Error: {e}'
            )
            sleep(4**attempt)  # exponential backoff to avoid overloading the server
            if attempt == retries - 1:
                logger.error(f'Failed to fetch data from URL: {url}. {e}')
    return {'nodes': {}, 'leaves': {}}


def _msl_index_pds3_folders(camera, sol_list=None):
    """Build PDS3 folder index for the selected instrument ID.

    PDS3 was used until release 32, or Sol 3644. Since Sol 3570, PDS4 is used for MMM cameras.

    Parameters
    ----------
    camera : str
        Instrument name
    sol_list : dict, optional
        Existing dictionary of sols and their URLs, by default None
    """
    try:
        mission_node = _get_json(PDS3_URL)  # get mission root page
    except requests.exceptions.Timeout as e:
        logger.error(f'Timeout occurred for URL: {PDS3_URL}. {e}')
        mission_node = {}
    if sol_list is None:
        sol_list = {}
    cam_id = CAM_IDS[camera]
    bundle_nodes = [node for node in mission_node['nodes'] if cam_id in node['name']]
    for bundle_node in tqdm(
        bundle_nodes, desc='PDS3 Sol index', leave=False
    ):  # iterate over each folder and append instrument specific data urls  # noqa: PLR1702
        bundle_name = bundle_node['name']
        if 'MSLHAZ' in cam_id:
            data_urls = [_urljoin(PDS3_URL, bundle_name + '/DATA/')]
        elif 'MSLNAV' in cam_id:
            data_urls = [
                _urljoin(PDS3_URL, bundle_name + '/DATA/'),
                _urljoin(PDS3_URL, bundle_name + '/DATA_V1/'),
            ]
        else:
            data_urls = [_urljoin(PDS3_URL, bundle_name + '/DATA/RDR/SURFACE/')]

        for data_url in data_urls:
            logger.debug(f'Indexing {data_url}')
            data_node = _get_json(data_url)
            for sol_node in data_node['nodes']:
                sol_name = sol_node['name']
                # get sol number with regex expression \d+ (one or more digits)
                nsol = re.search(r'\d+', sol_name).group()
                nsol = nsol.zfill(5)  # pad sol number with zeros
                sol_url = _urljoin(data_url, sol_name)  # get full SOL folder url
                if nsol not in sol_list:  # check that sol is not already in list and add url
                    sol_list[nsol] = [sol_url]
                elif (
                    sol_url not in sol_list[nsol]
                ):  # otherwise add new url to existing sol (in case of data updates)
                    sol_list[nsol].append(sol_url)
    return sol_list


# sort the PDS by sol number for a given image product
def _mmm_index_pds4_folders(camera, sol_list=None):
    """Build PDS4 folder index for the selected instrument ID.

    PDS3 was used until release 32, or Sol 3644. Since Sol 3570, PDS4 is used for MMM cameras.

    Parameters
    ----------
    camera : str
        Instrument name
    sol_list : dict, optional
        Existing dictionary of sols and their URLs, by default None
    """
    try:
        mmm_node = _get_json(PDS4_URL)  # get mission root page
    except requests.exceptions.Timeout as e:
        logger.error(f'Timeout occurred for URL: {PDS3_URL}. {e}')
        mmm_node = {}
    if sol_list is None:
        sol_list = {}
    cam_id = CAM_IDS[camera]
    collection_nodes = [node for node in mmm_node['nodes'] if f'data_{cam_id}' in node['name']]
    for collection in (
        collection_nodes
    ):  # iterate over each folder and append instrument specific data urls  # noqa: PLR1702
        collection_name = collection['name']
        collection_url = _urljoin(PDS4_URL, collection_name)
        logger.debug(f'Indexing {collection_url}')
        collection_node = _get_json(collection_url)
        volume_list = [
            volume for volume in collection_node['nodes'] if 'calibrated' in volume['name']
        ]
        for volume in tqdm(volume_list, desc='PDS4 Sol index', leave=False):
            data_url = _urljoin(collection_url, volume['name'] + '/SURFACE/')
            logger.debug(f'Indexing {data_url}')
            data_node = _get_json(data_url)
            for sol_node in data_node['nodes']:
                sol_name = sol_node['name']
                # get sol number with regex expression \d+ (one or more digits)
                nsol = re.search(r'\d+', sol_name).group()
                nsol = nsol.zfill(5)  # pad sol number with zeros
                sol_url = _urljoin(data_url, sol_name)  # get full SOL folder url
                if nsol not in sol_list:  # check that sol is not already in list and add url
                    sol_list[nsol] = [sol_url]
                elif (
                    sol_url not in sol_list[nsol]
                ):  # otherwise add new url to existing sol (in case of data updates)
                    sol_list[nsol].append(sol_url)
    return sol_list


def _msl_index_pds_folders(camera, sol_start=None, sol_end=None):
    """Build PDS folder index for the selected instrument ID.

    Will index PDS3, PDS4, or both, depending on the sol range and camera.
    If no sol range is given, it will index both PDS3 and PDS4.

    Parameters
    ----------
    camera : str
        Instrument name
    sol_start : int
        Starting sol number
    sol_end : int
        Ending sol number
    """
    # for MMM cameras possibly check both PDS3 and PDS4 bundles
    if camera in {'mastcam', 'mahli', 'mardi'}:
        if sol_end < PDS4_START and sol_end is not None:
            return _msl_index_pds3_folders(camera)
        if sol_start >= PDS3_END and sol_start is not None:
            return _mmm_index_pds4_folders(camera)
        return _mmm_index_pds4_folders(camera, _msl_index_pds3_folders(camera))
    # for HAZCAM and NAVCAM only check PDS3
    return _msl_index_pds3_folders(camera)


def _msl_index_singlesol_products(
    camera,
    folder_urls,
    sol,
    product_filter=MSL_PROD_FILTER,
    remove_thumbnails=True,
    find_best=True,
):
    """Index folders from a single sol for a given camera.

    Parameters
    ----------
    camera : str
        The camera to download images from. Options are 'mastcam
    folder_urls : list of str
        The URLs to the folders to index.
    sol : int
        The sol number.
    product_filter : list, optional
    remove_thumbnails : bool, optional
        If True, remove thumbnails from the index, by default True.
    find_best : bool, optional
        If True, only download the highest quality version of each product, by default True.
        This will also non unique thumbnails.
    """
    if isinstance(folder_urls, str):
        folder_urls = [folder_urls]

    file_index = pd.Index([], name='productid')
    df = pd.DataFrame(
        index=file_index,
        columns=[
            'observation',
            'image',
            'pds3_label',
            'pds4_label',
            'sol',
            'camera',
            'desirability',
            'thumbnail',
        ],
    )

    for folder_url in folder_urls:
        logger.debug(f'Indexing {folder_url}')
        folder_node = _get_json(folder_url)
        for file_leaf in folder_node['leaves']:
            file_name = file_leaf['name']
            fname = Filename(file_name)
            if '.IMG' in file_name:
                df.loc[
                    fname.product_id,
                    ['observation', 'image', 'sol', 'camera', 'desirability', 'thumbnail'],
                ] = (
                    fname._product,
                    _urljoin(folder_url, file_name),
                    sol,
                    camera,
                    fname._desirability,
                    fname.is_thumbnail,
                )
            if '.LBL' in file_name:
                df.loc[fname.product_id, 'pds3_label'] = _urljoin(folder_url, file_name)
            if '.xml' in file_name:
                df.loc[fname.product_id, 'pds4_label'] = _urljoin(folder_url, file_name)

    # only keep rows with highest desirability for each observation
    if find_best:
        df = df.loc[df.groupby('observation')['desirability'].idxmax()]

    # Remove thumbnails
    if remove_thumbnails:
        df = df.loc[df['thumbnail'] == False]  # noqa: E712 Pandas doesn't work with 'is False'

    # filter out products who's productid index does not contain any of the product_filter strings
    if product_filter:
        if isinstance(product_filter, str):
            product_filter = [product_filter]
        df = df.loc[
            df.index.get_level_values('productid').str.contains(
                '|'.join(product_filter), regex=True
            )
        ]

    return df


# download products
[docs] def msl_index_products( camera, sol_start, sol_end, product_filter=MSL_PROD_FILTER, remove_thumbnails=True, find_best=True, ): """Index products from the PDS archive. Warning: This function may fail for very large sol folders (like MSL Navcam), because the PDS takes very long to reply to many of these reqests. Parameters ---------- camera : str The camera to download images from. Options are 'mastcam', 'mahli', 'mardi', 'navcam', 'hazcam'. sol_start : int The starting sol number. sol_end : int The ending sol number. product_filter : list, optional A whitelist of strings to filter the product ids, by default `['DRXX', 'RAD_', 'MXY_']`. Only products containing these strings will be downloaded. If None, all products will be downloaded. TODO implemet regex filtering find_best : bool, optional If True, only download the highest quality version of each product, by default True. This will also remove non unique thumbnails. """ camera = camera.lower() if camera not in CAM_LIST: raise ValueError(f'Camera incorrect. Available cameras are: {", ".join(CAM_LIST)}') # Index data for each folder from sol sol_start to sol sol_end sol_list = _msl_index_pds_folders( camera, sol_start=sol_start, sol_end=sol_end ) # build PDS index for instrument and sol range sol_subset = { k: v for k, v in sol_list.items() if sol_end >= int(k) >= sol_start } # generate subset of sols to download file_index = pd.Index([], name='productid') df = pd.DataFrame( index=file_index, columns=[ 'observation', 'image', 'pds3_label', 'pds4_label', 'sol', 'camera', 'desirability', 'thumbnail', ], ) # iterate over each sol and add image and label files to the dataframe for sol in tqdm(sorted(sol_subset), desc='Indexing Products', total=len(sol_subset)): sol_df = _msl_index_singlesol_products( camera, sol_subset[sol], sol, product_filter=product_filter, remove_thumbnails=remove_thumbnails, find_best=find_best, ) df = pd.concat([df, sol_df]) sleep(4) # sleep between requests to reduce server load return df
[docs] def download(url, path, skip_existing=True, session=None): """Download a file from a URL. Parameters ---------- url : str The URL to download the file from. path : str The path to save the file to. If a directory is provided, the file will be saved with the same name as the URL. skip_existing : bool, optional If True, skip downloading if the file already exists, by default True. session : requests.Session, optional A requests session to use for the download. This allows for reusing the same connection for multiple downloads, by default None. Returns ------- pathlib.Path The path to the downloaded file. """ if session is None: session = requests.Session() path = Path(path) if path.is_dir(): path = path / Path(url).name if skip_existing and path.exists(): logger.debug(f'Skipping existing file: {path}') return path.absolute() r = session.get(url, stream=True) if r.status_code == requests.codes.ok: with open(path, 'wb') as f: for chunk in r: f.write(chunk) else: logger.error(f'Error downloading file: {url}. Status code: {r.status_code}') return path.absolute()
[docs] def download_pds3(product_url, path, skip_existing=True, session=None): """Download a PDS3 product and label from a URL. Parameters ---------- product_url : str The URL to download the product from. path : str The path to save the product to. skip_existing : bool, optional If True, skip downloading if the file already exists, by default True. session : requests.Session, optional A requests session to use for the download. This allows for reusing the same connection for multiple downloads, by default None. Returns ------- pathlib.Path The path to the downloaded product. """ if session is None: session = requests.Session() _internal_session = True else: _internal_session = False product = Path(product_url).stem + '.IMG' product_url = _urljoin(*product_url.split('/')[:-1], product) label = Path(product_url).stem + '.LBL' label_url = _urljoin(*product_url.split('/')[:-1], label) path = Path(path) path.mkdir(parents=True, exist_ok=True) if path.suffix == '': product_path = path / product label_path = path / label else: raise ValueError('Path must be a directory.') try: download(label_url, label_path, skip_existing=skip_existing, session=session) except requests.exceptions.HTTPError: logger.debug(f'Label not found: {label_url}') download(product_url, product_path, skip_existing=skip_existing, session=session) if _internal_session: session.close() return product_path
[docs] def download_products( products_df, output_dir='.', groupby='sol/camera', skip_existing=True, pbar=None, num_threads=None, ): """Download products from the DataFrame. Parameters ---------- products_df : pd.DataFrame The DataFrame containing the product index created by msl_index_products or msl_index_pds_folders. output_dir : str, optional The output directory to save the products to, by default '.'. groupby : str, optional The column to group by, by default 'sol'. num_threads : int, optional The number of threads to use for downloading, by default number of CPUs. Returns ------- pd.DataFrame A DataFrame containing the paths to the downloaded files. """ download_index = pd.Index([], name='productid') files_downloaded = pd.DataFrame( index=download_index, columns=[ 'observation', 'image', 'pds3_label', 'pds4_label', 'sol', 'camera', 'desirability', 'thumbnail', ], ) session = requests.Session() def download2(url, path): sleep(0.1) # sleep to increase interval between requests return download(url, path, skip_existing=skip_existing, session=session) def download_row(row): match groupby: case 'sol': group = row['sol'].zfill(5) case 'sol/camera': group = f'{row["sol"].zfill(5)}/{row["camera"].upper()}' case 'camera': group = row['camera'].upper() case 'site': raise NotImplementedError('Grouping by site is not yet implemented.') case None: group = '' case _: raise ValueError(f'Invalid groupby value: {groupby}') folder = Path(output_dir) / group folder.mkdir(parents=True, exist_ok=True) if pd.notna(row['image']): file = download2(row['image'], folder / Path(row['image']).name) files_downloaded.loc[ row.name, ['observation', 'image', 'sol', 'camera', 'desirability', 'thumbnail'] ] = ( row['observation'], file, row['sol'], row['camera'], row['desirability'], row['thumbnail'], ) # if the image exists, also try to download the label if pd.notna(row['pds3_label']): file = download2(row['pds3_label'], folder / Path(row['pds3_label']).name) files_downloaded.loc[row.name, 'pds3_label'] = file if pd.notna(row['pds4_label']): file = download2(row['pds4_label'], folder / Path(row['pds4_label']).name) files_downloaded.loc[row.name, 'pds4_label'] = file if num_threads is None: num_threads = min(os.cpu_count(), 8) if pbar is None: pbar = tqdm(total=len(products_df), desc='Downloading Sol') with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: future_to_url = { executor.submit(download_row, row): row for _, row in products_df.iterrows() } for future in concurrent.futures.as_completed(future_to_url): future_to_url[future] pbar.update(1) session.close() return files_downloaded
[docs] def download_msl( cameras, sol_start, sol_end, output_dir='.', groupby='sol/camera', product_filter=MSL_PROD_FILTER, skip_existing=True, **kwargs, ): """Download images from the PDS archive. Parameters ---------- cameras : str | list of str The cameras to download images from. Options are 'mastcam', 'mahli', 'mardi', 'navcam', 'hazcam'. If a list is provided, images from all cameras in the list will be downloaded. If 'all' is provided, images from all cameras will be downloaded. sol_start : int The starting sol number. sol_end : int The ending sol number. output_dir : str, optional The output directory to save the products to, by default '.'. product_filter : list, optional A whitelist of strings to filter the product ids, by default `['DRXX', 'RAD_', 'MXY_']`. Only products containing these strings will be downloaded. If None, all products will be downloaded. TODO implemet regex filtering find_best : bool, optional If True, only download the highest quality version of each product, by default True. This will also remove most thumbnails. num_threads : int, optional The number of threads to use for downloading, by default number of CPUs. Returns ------- pd.DataFrame A DataFrame containing the paths to the downloaded files. """ start_time = time.time() if sol_end < sol_start: logger.warning('sol_end is less than sol_start. Swapping values.') sol_start, sol_end = sol_end, sol_start # pre parse cameras list if isinstance(cameras, str) and cameras.lower() == 'all': cameras = CAM_LIST if isinstance(cameras, str): cameras = [cameras] for i, camera in enumerate(cameras): cameras[i] = camera.lower() print( # noqa: T201 f'Downloading images from Sol {sol_start} to Sol {sol_end} for {", ".join(cameras).upper()}.' ) for camera in cameras: if camera not in CAM_LIST: raise ValueError(f'Camera incorrect. Available cameras are: {", ".join(CAM_LIST)}') total_downloaded = dict.fromkeys(cameras, 0) download_df = pd.DataFrame() try: for camera in cameras: with tqdm( total=1, desc=f'Downloading {camera.upper()} Products (Sol {sol_start}-{sol_end})', smoothing=0.05, ) as pbar_camera: # Index data for each folder from sol sol_start to sol sol_end sol_list = _msl_index_pds_folders( camera, sol_start=sol_start, sol_end=sol_end ) # build PDS index for instrument and sol range sol_subset = { k: v for k, v in sol_list.items() if sol_end >= int(k) >= sol_start } # generate subset of sols to download pbar_camera.reset(total=len(sol_subset)) # iterate over each sol and add image and label files to the dataframe for sol in sorted(sol_subset): pbar_camera.set_description( f'Downloading {camera.upper()} (Sol {int(sol)} of {sol_start}-{sol_end})' ) with tqdm(total=1, desc=f'Indexing Sol {int(sol)}', leave=False) as pbar_sol: sol_df = _msl_index_singlesol_products( camera, sol_subset[sol], sol, product_filter=product_filter, **kwargs ) pbar_sol.reset(total=len(sol_df)) pbar_sol.set_description(f'Downloading Sol {int(sol)} products') files = download_products( sol_df, output_dir=output_dir, groupby=groupby, skip_existing=skip_existing, pbar=pbar_sol, ) total_downloaded[camera] += len(files) download_df = pd.concat([download_df, files]) pbar_camera.update(1) elapsed_time = time.time() - start_time hours, rem = divmod(elapsed_time, 3600) minutes, seconds = divmod(rem, 60) for camera in cameras: pass return download_df except KeyboardInterrupt: elapsed_time = time.time() - start_time hours, rem = divmod(elapsed_time, 3600) minutes, seconds = divmod(rem, 60) return download_df