Module themisasi.web
Source code
from dateutil.parser import parse
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import asyncio
import logging
import sys
import os
from aiohttp_requests import requests
from typing import Sequence, Union, Iterator, Dict
TIMEOUT = 600 # arbitrary, seconds
if sys.version_info < (3, 7):
raise RuntimeError('Python >= 3.7 required')
async def urlretrieve(url: str, fn: Path, overwrite: bool = False):
if not overwrite and fn.is_file() and fn.stat().st_size > 10000:
print(f'SKIPPED {fn}')
return
# %% download
R = await requests.get(url, allow_redirects=True, timeout=TIMEOUT)
if R.status != 200:
logging.error(f'could not download {url} {R.status}')
return
print(url)
data = await R.read()
with fn.open('wb') as f:
f.write(data)
def download(treq: Sequence[Union[str, datetime]],
site: Sequence[str], odir: Path,
urls: Dict[str, str], overwrite: bool = False):
"""
concurrent download video and calibration files
Parameters
----------
treq : datetime or list of datatime
time request e.g. '2012-02-01T03' or a time range ['2012-02-01T03', '2012-02-01T05']
site : str or list of str
camera site e.g. gako, fykn
odir : pathlib.Path
output directory e.g. ~/data
overwrite : bool
overwrite existing files--normally wasteful of download space, unless you had a corrupted download.
urls : dict of str
stem of URLs to [web]site(s) hosting files
"""
# %% sanity checks
odir = Path(odir).expanduser().resolve()
odir.mkdir(parents=True, exist_ok=True)
if isinstance(site, str):
site = [site]
if isinstance(treq, (str, datetime)):
treq = [treq]
start = parse(treq[0]) if isinstance(treq[0], str) else treq[0]
if len(treq) == 2:
end = parse(treq[1]) if isinstance(treq[1], str) else treq[1]
else:
end = start
if end < start:
raise ValueError('start time must be before end time!')
# %% start download
if os.name == 'nt' and sys.version_info < (3, 8):
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # type: ignore
asyncio.run(arbiter(site, start, end, odir, overwrite, urls))
async def arbiter(sites: Sequence[str],
start: datetime, end: datetime,
odir: Path, overwrite: bool,
urls: Dict[str, str]):
"""
This starts len(sites) tasks concurrently.
Thus if you only have one site, it only downloads one file at a time.
A more elegant way is to setup a task pool.
However normally we are downloading N sites across the same timespan,
where N is typically in the 3..10 range or so.
Parameters
----------
sites : str or list of str
sites to download video from
start : datetime
starting time
end : datetime
ending time
odir : Path
where to download data to
overwrite : bool, optional
overwrite existing data
urls : dict of str
sites hosting data
Example of task pool:
https://github.com/ec500-software-engineering/asyncio-subprocess-ffmpeg/blob/c82d3a243078d8e865740cd9c3328fe7fd6ea52e/asyncioffmpeg/ffplay.py#L45
"""
futures = [_download_cal(site, odir, urls['cal_stem'], overwrite) for site in sites]
await asyncio.gather(*futures)
futures = [_download_video(site, odir, start, end, urls['video_stem'], overwrite) for site in sites]
await asyncio.gather(*futures)
async def _download_video(site: str, odir: Path,
start: datetime, end: datetime,
url_stem: str, overwrite: bool):
for url in _urlgen(site, start, end, url_stem):
print('GEN: ', url)
await urlretrieve(url, odir / url.split('/')[-1], overwrite)
def _urlgen(site: str, start: datetime, end: datetime, url_stem: str) -> Iterator[str]:
for T in np.arange(start, end+timedelta(hours=1), timedelta(hours=1)):
t = T.astype(datetime)
fpath = (f'{url_stem}{site}/{t.year:4d}/{t.month:02d}/'
f'thg_l1_asf_{site}_{t.year:4d}{t.month:02d}{t.day:02d}{t.hour:02d}_v01.cdf')
yield fpath
async def _download_cal(site: str, odir: Path, url_stem: str,
overwrite: bool = False):
fpath = f"{url_stem}thg_l2_asc_{site}_19700101_v01.cdf"
await urlretrieve(fpath, odir / fpath.split('/')[-1], overwrite)
Functions
async def arbiter(sites, start, end, odir, overwrite, urls)
-
This starts len(sites) tasks concurrently. Thus if you only have one site, it only downloads one file at a time.
A more elegant way is to setup a task pool. However normally we are downloading N sites across the same timespan, where N is typically in the 3..10 range or so.
Parameters
sites
:str
orlist
ofstr
- sites to download video from
start
:datetime
- starting time
end
:datetime
- ending time
odir
:Path
- where to download data to
overwrite
:bool
, optional- overwrite existing data
urls
:dict
ofstr
- sites hosting data
Example of task pool: https://github.com/ec500-software-engineering/asyncio-subprocess-ffmpeg/blob/c82d3a243078d8e865740cd9c3328fe7fd6ea52e/asyncioffmpeg/ffplay.py#L45
Source code
async def arbiter(sites: Sequence[str], start: datetime, end: datetime, odir: Path, overwrite: bool, urls: Dict[str, str]): """ This starts len(sites) tasks concurrently. Thus if you only have one site, it only downloads one file at a time. A more elegant way is to setup a task pool. However normally we are downloading N sites across the same timespan, where N is typically in the 3..10 range or so. Parameters ---------- sites : str or list of str sites to download video from start : datetime starting time end : datetime ending time odir : Path where to download data to overwrite : bool, optional overwrite existing data urls : dict of str sites hosting data Example of task pool: https://github.com/ec500-software-engineering/asyncio-subprocess-ffmpeg/blob/c82d3a243078d8e865740cd9c3328fe7fd6ea52e/asyncioffmpeg/ffplay.py#L45 """ futures = [_download_cal(site, odir, urls['cal_stem'], overwrite) for site in sites] await asyncio.gather(*futures) futures = [_download_video(site, odir, start, end, urls['video_stem'], overwrite) for site in sites] await asyncio.gather(*futures)
def download(treq, site, odir, urls, overwrite=False)
-
concurrent download video and calibration files
Parameters
treq
:datetime
orlist
ofdatatime
- time request e.g. '2012-02-01T03' or a time range ['2012-02-01T03', '2012-02-01T05']
site
:str
orlist
ofstr
- camera site e.g. gako, fykn
odir
:pathlib.Path
- output directory e.g. ~/data
overwrite
:bool
- overwrite existing files–normally wasteful of download space, unless you had a corrupted download.
urls
:dict
ofstr
- stem of URLs to [web]site(s) hosting files
Source code
def download(treq: Sequence[Union[str, datetime]], site: Sequence[str], odir: Path, urls: Dict[str, str], overwrite: bool = False): """ concurrent download video and calibration files Parameters ---------- treq : datetime or list of datatime time request e.g. '2012-02-01T03' or a time range ['2012-02-01T03', '2012-02-01T05'] site : str or list of str camera site e.g. gako, fykn odir : pathlib.Path output directory e.g. ~/data overwrite : bool overwrite existing files--normally wasteful of download space, unless you had a corrupted download. urls : dict of str stem of URLs to [web]site(s) hosting files """ # %% sanity checks odir = Path(odir).expanduser().resolve() odir.mkdir(parents=True, exist_ok=True) if isinstance(site, str): site = [site] if isinstance(treq, (str, datetime)): treq = [treq] start = parse(treq[0]) if isinstance(treq[0], str) else treq[0] if len(treq) == 2: end = parse(treq[1]) if isinstance(treq[1], str) else treq[1] else: end = start if end < start: raise ValueError('start time must be before end time!') # %% start download if os.name == 'nt' and sys.version_info < (3, 8): asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # type: ignore asyncio.run(arbiter(site, start, end, odir, overwrite, urls))
async def urlretrieve(url, fn, overwrite=False)
-
Source code
async def urlretrieve(url: str, fn: Path, overwrite: bool = False): if not overwrite and fn.is_file() and fn.stat().st_size > 10000: print(f'SKIPPED {fn}') return # %% download R = await requests.get(url, allow_redirects=True, timeout=TIMEOUT) if R.status != 200: logging.error(f'could not download {url} {R.status}') return print(url) data = await R.read() with fn.open('wb') as f: f.write(data)