'''Get test data for tests and/or examples'''
# based on https://github.com/GlacioHack/xdem/blob/d91bf1cc9b3f36d77f3729649bc8e9edc6b42f9f/xdem/examples.py#L33
import os
import tarfile
import urllib.request
import shutil
import subprocess
import sys
import goes_ortho as go
from goespy.Downloader import ABI_Downloader
import json
from glob import glob
from dateutil import rrule, parser
import datetime as dt
import xarray as xr
import zarr
import gtsa
from pathlib import Path
def build_zarr(downloadRequest_filepath):
# download requested imagery
print('download requested imagery')
image_path_list = download_abi(downloadRequest_filepath)
# parse json request file
print('parse json request file')
_, _, bounds, _, _, _, _, variables, apiKey, _, outputFilepath = parse_json(downloadRequest_filepath)
# orthorectify all images
print('orthorectify all images')
new_image_path_list = []
print(image_path_list)
for goes_image_path in image_path_list:
print('filename: ', goes_image_path)
new_goes_filename = goes_image_path.with_name(goes_image_path.stem + '_o').with_suffix('.nc')
#new_goes_filename = goes_image_path.split('.')[:-1][0] + '_o.nc'
print('renamed to: ', new_goes_filename)
new_image_path_list.append(new_goes_filename)
go.orthorectify.ortho(goes_image_path, variables, bounds, apiKey, new_goes_filename, keep_dem=True)
print(new_image_path_list)
# add time dimension, fix CRS, build zarr file
print('add time dimension, fix CRS, build zarr file')
for variable in variables:
print('add_datetime_crs')
new_image_path_list, datetimes_list = add_datetime_crs(new_image_path_list, variable)
# start Dask cluster
print('start Dask cluster')
client = gtsa.io.dask_start_cluster(
workers=6,
threads=2,
open_browser=False,
verbose=True,
)
print(new_image_path_list)
#nc_files = sorted(
# new_image_path_list,
# key=datetimes_list
#)
nc_files = [img_path for img_dt, img_path in sorted(zip(datetimes_list, new_image_path_list))]
print(nc_files)
# Open all the raster files as a single dataset (combining them together)
# Why did we choose chunks = 500? 100MB?
# https://docs.xarray.dev/en/stable/user-guide/dask.html#optimization-tips
print('open all rasters')
ds = xr.open_mfdataset(nc_files, chunks={'time': 500})
#
## if 'Rad' is our variable, check if we should add reflectance 'ref', or brightness temperature 'tb' to the list too
#if variable == 'Rad':
# if ds.band_id.values[0] <= 6:
# print('adding ref to variables list')
# variables.append('ref')
# else:
# print('adding tb to variables list')
# variables.append('tb')
#
# rechunk along time dimension
# Dask's rechunk documentation: https://docs.dask.org/en/stable/generated/dask.array.rechunk.html
# 0:-1 specifies that we want the dataset to be chunked along the 0th dimension -- the time dimension, which means that each chunk will have all 40 thousand values in time dimension
# 1:'auto', 2:'auto' and balance=True specifies that dask can freely rechunk along the latitude and longitude dimensions to attain blocks that have a uniform size
print('rechunk')
ds[variable].data.rechunk(
{0:-1, 1:'auto', 2:'auto'},
block_size_limit=1e8,
balance=True
)
# Assign the dimensions of a chunk to variables to use for encoding afterwards
t,y,x = ds[variable].data.chunks[0][0], ds[variable].data.chunks[1][0], ds[variable].data.chunks[2][0]
# Create an output zarr file and write these chunks to disk
# remove if file already exists
shutil.rmtree(outputFilepath, ignore_errors=True)
# chunk
ds[variable].encoding = {'chunks': (t, y, x)}
# output zarr file
print('saving zarr file')
ds.to_zarr(outputFilepath)
# Display
source_group = zarr.open(outputFilepath)
source_array = source_group[variable]
print(source_group.tree())
print(source_array.info)
del source_group
del source_array
print('Done.')
return None
[docs]def make_request_json(workflowName, startDatetime, endDatetime, bounds, satellite, product, band, variable, apiKey):
'''For running through github actions, make a request json file from github user input to be read by the build_zarr function'''
request_dict = {
"dateRange" : {
"startDatetime" : startDatetime,
"endDatetime" : endDatetime
},
"bounds" : {
"min_lon" : bounds[0],
"min_lat" : bounds[1],
"max_lon" : bounds[2],
"max_lat" : bounds[3]
},
"satellite" : satellite,
"product" : product,
"bands" : [band],
"variables" : [variable],
"downloadDirectory" : "./",
"outputFilepath" : "./{}.zarr".format(workflowName),
"apiKey" : apiKey
}
filename = workflowName + '.json'
with open(filename, "w") as f:
json.dump(request_dict , f)
def get_start_date_from_abi_filename(s):
return s.split('_s')[1].split('_')[0]
def add_datetime_crs(files, variable, crs='EPSG:4326'):
print(files)
print(variable)
print(crs)
new_files = []
datetimes = [
dt.datetime.strptime(
f.stem.split('_')[3][1:-1], # parse the start time (the part "s2022__________" in the file name)
"%Y%j%H%M%S"
) for f in files
]
print(datetimes)
for i,file in enumerate(files):
print(f"Processing {i} of {len(files)}...")
print(datetimes[i])
try:
ds = xr.open_dataset(file)
ds = ds.assign_coords({"time": datetimes[i]})
ds = ds.expand_dims("time")
ds = ds.reset_coords(drop=True)
da = ds[variable]
new_file_name = file.with_name(file.stem + '_{}'.format(variable)).with_suffix('.nc')
#new_file_name = file.replace(
# ".nc",
# "_{}.nc".format(variable),
#)
da = da.rio.write_crs(crs)
da.to_netcdf(new_file_name)
new_files.append(new_file_name)
except Exception as err:
print(f"Failed on {file}")
print(f"Error: {err}")
return new_files, datetimes
def parse_json(downloadRequest_filepath):
# load json file that specifies what we'd like to download and parse its contents
with open(downloadRequest_filepath, "r") as f:
downloadRequest = json.load(f)
startDatetime = parser.parse(downloadRequest['dateRange']['startDatetime'])
endDatetime = parser.parse(downloadRequest['dateRange']['endDatetime'])
bounds = [ downloadRequest['bounds']["min_lon"],
downloadRequest['bounds']["min_lat"],
downloadRequest['bounds']["max_lon"],
downloadRequest['bounds']["max_lat"]
] # bounds = [min_lon, min_lat, max_lon, max_lat]
satellite = downloadRequest['satellite']
bucket = 'noaa-' + satellite
product = downloadRequest['product']
channels = downloadRequest['bands']
variables = downloadRequest['variables']
apiKey = downloadRequest['apiKey']
outDir = downloadRequest['downloadDirectory']
outputFilepath = downloadRequest['outputFilepath']
return startDatetime, endDatetime, bounds, satellite, bucket, product, channels, variables, apiKey, outDir, outputFilepath
[docs]def download_abi(downloadRequest_filepath):
'''Download GOES ABI imagery as specified by an input JSON file. (this function wraps around goespy.ABIDownloader())'''
startDatetime, endDatetime, bounds, satellite, bucket, product, channels, _, _, outDir, _ = parse_json(downloadRequest_filepath)
output_filepaths = []
# parse channels/bands and start a download for each
for channel in channels:
#print(channel)
if type(channel) == int:
channel = 'C{:02}'.format(channel) # correct channel to a string formatted like "C02" if we were provided with an integer channel/band number
#print(channel)
# Show us the bounds we'll crop images to
print('\nFiles will be downloaded and then cropped to these bounds:')
print('\t({w},{n}).\t.({e},{n})\n\n\n\n\t({w},{s}).\t.({e},{s})\n'.format(n=bounds[3],w=bounds[0],e=bounds[2],s=bounds[1]))
# For each S3 bucket, download the corresponding observations if we don't have them already
download_filepaths = [] # store filepaths where our downloaded files are
print('For each S3 bucket, download the corresponding observations')
i = 0
for dt in rrule.rrule(rrule.HOURLY, dtstart=startDatetime, until=endDatetime):
if ("L1b-Rad" in product) or ("L2-CMIP" in product):
this_filepath = Path(outDir) / satellite / str(dt.year) / str(dt.month) / str(dt.day) / product / '{:02}'.format(dt.hour) / channel
else:
this_filepath = Path(outDir) / satellite / str(dt.year) / str(dt.month) / str(dt.day) / product / '{:02}'.format(dt.hour)
print(this_filepath)
download_filepaths.append(this_filepath) #'{}/{}/{}/{}/{}/{}/{}/{}/'.format(outDir,satellite,dt.year,dt.month,dt.day,product,'{:02}'.format(dt.hour),channel)
if not Path.is_dir(download_filepaths[i]): #os.path.exists(download_filepaths[i]):
ABI = ABI_Downloader(outDir,bucket,dt.year,dt.month,dt.day,f'{dt.hour:02}',product,channel)
# now try and crop these so they don't take up so much space - this is very inefficient but oh well it's what I have right now
if Path.is_dir(download_filepaths[i]): # we have to make sure the path exists (meaning we downloaded something) before running the subsetNetCDF function
print('\nSubsetting files in...{}'.format(download_filepaths[i]))
for file in download_filepaths[i].glob('*.nc'):
print(file)
output_filepaths.append(file)
go.clip.subsetNetCDF(file,bounds)
i+=1
print("Done")
return output_filepaths
[docs]def get_dem(demtype, bounds, api_key, out_fn=None, proj='EPSG:4326'):
"""
download a DEM of choice from OpenTopography World DEM (modified by Shashank Bhushan, first written by David Shean)
Parameters
------------
demtype : str
type of DEM to fetch (e.g., SRTMGL1, SRTMGL1_E, SRTMGL3 etc)
bounds : list
geographic aoi extent in format (minx,miny,maxx,maxy)
out_fn : str
path to output filename
t_srs : str
output DEM projection
Returns
-----------
out_DEM : str
path to output DEM (useful if the downloaded DEM is reprojected to custom proj)
Examples
------------
"""
import requests
from distutils.spawn import find_executable
### From David Shean
base_url="https://portal.opentopography.org/API/globaldem?demtype={}&west={}&south={}&east={}&north={}&outputFormat=GTiff&API_Key={}"
if out_fn is None:
out_fn = '{}.tif'.format(demtype)
if not os.path.exists(out_fn):
#Prepare API request url
#Bounds should be [minlon, minlat, maxlon, maxlat]
url = base_url.format(demtype, *bounds, api_key)
print(url)
#Get
response = requests.get(url)
#Check for 200
#Write to disk
open(out_fn, 'wb').write(response.content)
if proj != 'EPSG:4326':
#Could avoid writing to disk and direclty reproject with rasterio, using gdalwarp for simplicity
proj_fn = os.path.splitext(out_fn)[0]+'_proj.tif'
if not os.path.exists(proj_fn):
output_res = 30
gdalwarp = find_executable('gdalwarp')
gdalwarp_call = f"{gdalwarp} -r cubic -co COMPRESS=LZW -co TILED=YES -co BIGTIFF=IF_SAFER -tr {output_res} {output_res} -t_srs '{proj}' {out_fn} {proj_fn}"
print(gdalwarp_call)
run_bash_command(gdalwarp_call)
out_DEM = proj_fn
else:
out_DEM = out_fn
return out_DEM
[docs]def run_bash_command(cmd):
#written by Scott Henderson
# move to asp_binder_utils
"""Call a system command through the subprocess python module."""
print(cmd)
try:
retcode = subprocess.call(cmd, shell=True)
if retcode < 0:
print("Child was terminated by signal", -retcode, file=sys.stderr)
else:
print("Child returned", retcode, file=sys.stderr)
except OSError as e:
print("Execution failed:", e, file=sys.stderr)
[docs]def download_example_data() -> None:
"""
Fetch the GOES ABI example files.
"""
# Static commit hash (to be updated as needed)
#commit = "16756d3aff6ca41ebb0be999a82d2f66930e7851"
# The URL from which to download the tarball
url = f"https://github.com/spestana/goes-ortho-data/tarball/main" ##commit={commit}"
# Make resources directory
tmp_dir = "./tests/resources/"
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
# Path and filename for tarball
tar_path = tmp_dir + "data.tar.gz"
if not os.path.exists(tar_path):
response = urllib.request.urlopen(url)
# If the response was right, download the tarball
if response.getcode() == 200:
with open(tar_path, "wb") as outfile:
outfile.write(response.read())
else:
raise ValueError(f"Example GOES ABI data fetch gave non-200 response: {response.status_code}")
# Extract the tarball
with tarfile.open(tar_path) as tar:
tar.extractall(tmp_dir)
def remove_example_data() -> None:
tmp_dir = "./tests/resources/"
shutil.rmtree(tmp_dir)