Source code for dustapprox.tools.downloader

""" A simple tool to download files from URLs """
import requests
import sys
import os
import time
from typing import Sequence
from io import TextIOWrapper


def _pretty_size_print(num_bytes: int) -> str:
    """
    Output number of bytes in a human readable format

    parameters
    ----------
    num_bytes: int
        number of bytes to convert

    returns
    -------
    output: str
        string representation of the size with appropriate unit scale
    """
    if num_bytes is None:
        return

    KiB = 1024
    MiB = KiB * KiB
    GiB = KiB * MiB
    TiB = KiB * GiB
    PiB = KiB * TiB
    EiB = KiB * PiB
    ZiB = KiB * EiB
    YiB = KiB * ZiB

    if num_bytes > YiB:
        output = '%.3g YB' % (num_bytes / YiB)
    elif num_bytes > ZiB:
        output = '%.3g ZB' % (num_bytes / ZiB)
    elif num_bytes > EiB:
        output = '%.3g EB' % (num_bytes / EiB)
    elif num_bytes > PiB:
        output = '%.3g PB' % (num_bytes / PiB)
    elif num_bytes > TiB:
        output = '%.3g TB' % (num_bytes / TiB)
    elif num_bytes > GiB:
        output = '%.3g GB' % (num_bytes / GiB)
    elif num_bytes > MiB:
        output = '%.3g MB' % (num_bytes / MiB)
    elif num_bytes > KiB:
        output = '%.3g KB' % (num_bytes / KiB)
    else:
        output = '%.3g Bytes' % (num_bytes)

    return output


def _dl_ascii_progress(iterseq: Sequence,
                       total: int = 100,
                       progress_length: int = 50,
                       mininterval: float = 2,
                       buffer: TextIOWrapper = sys.stdout):
    """ A simplistic progress indicator in ascii format applicable to a sequence

    writes to sys.stdout

    Parameters
    ----------
    iterseq: Sequence
        sequence to iter over
    total: int
        length of the sequence (default is 100 or len(iterseq) when possible)
    progress_length: int
        number of characters used by the indicator
    mininterval: float
        how long to wait before updating the indicator (default 0.5 seconds)
    buffer: TextIOWrapper
        where to write the indicator to (default sys.stdout)
    """
    dl = 0
    message_length = 0
    try:
        total = len(iterseq)
    except:
        pass

    start_t = last_print_t = time.time()

    for chunk in iterseq:
        try:
            dl += len(chunk)
        except:
            dl += 1
        cur_t = time.time()
        if cur_t - last_print_t >= mininterval:
            done = int(progress_length * dl / total)
            message = "\r[%s%s] (%s)" % ('=' * done, ' ' * (progress_length - done), _pretty_size_print(dl))
            clear = ' ' * (max(1, message_length - len(message)))
            sys.stdout.write(message + clear)
            message_length = len(message)
            sys.stdout.flush()
        yield (chunk)
    buffer.write("\n")
    buffer.flush()


[docs]def download_file(link: str, file_name: str, overwrite: bool = False) -> str: """ Download a file on disk from url Parameters ---------- link: str url of the file file_name: str path and filename of the download location overwrite: bool set to re-download (default False) Returns ------- Returns the filename of the data """ response = requests.get(link, stream=True) total_length = int(response.headers.get('content-length')) if os.path.exists(file_name) and not overwrite: if (total_length is None) or (os.stat(file_name).st_size == total_length): print(f"file '{file_name}' already downloaded.") return file_name progress_length = 50 with open(file_name, "wb") as f: print(f"Downloading '{file_name}'", end="") print(' ({0:s})'.format(_pretty_size_print(total_length))) if total_length is None: # no content length header f.write(response.content) else: for data in _dl_ascii_progress(response.iter_content(chunk_size=4096), total=total_length): f.write(data) return file_name