| import concurrent.futures | |
| import multiprocessing | |
| import os | |
| import netCDF4 | |
| import numpy as np | |
| from ..features.io import load_batch, decode_saved_var_to_rainrate | |
| def fractions_ensemble(observation, forecasts, threshold, max_scale=256): | |
| obs = (observation >= threshold).astype(np.float32) | |
| fc = (forecasts >= threshold).astype(np.float32).mean(axis=-1) | |
| obs_frac = {} | |
| fc_frac = {} | |
| scale = 1 | |
| while True: | |
| obs_frac[scale] = obs.copy() | |
| fc_frac[scale] = fc.copy() | |
| scale *= 2 | |
| if scale > max_scale: | |
| break | |
| obs = 0.25 * ( | |
| obs[...,::2,::2] + | |
| obs[...,1::2,::2] + | |
| obs[...,::2,1::2] + | |
| obs[...,1::2,1::2] | |
| ) | |
| fc = 0.25 * ( | |
| fc[...,::2,::2] + | |
| fc[...,1::2,::2] + | |
| fc[...,::2,1::2] + | |
| fc[...,1::2,1::2] | |
| ) | |
| return (obs_frac, fc_frac) | |
| def frac_from_file(fn, threshold, preproc_fc): | |
| print(fn) | |
| (_, y, y_pred) = load_batch(fn, preproc_fc=preproc_fc) | |
| return fractions_ensemble(y, y_pred, threshold) | |
| def save_fractions_for_dataset(data_dir, result_fn, threshold, preproc_fc=None): | |
| files = sorted(os.listdir(data_dir)) | |
| files = [os.path.join(data_dir,fn) for fn in files] | |
| N_threads = multiprocessing.cpu_count() | |
| with concurrent.futures.ProcessPoolExecutor(N_threads) as executor: | |
| futures = [] | |
| for fn in files: | |
| args = (frac_from_file, fn, threshold, preproc_fc) | |
| futures.append(executor.submit(*args)) | |
| (obs_frac, fc_frac) = zip(*(f.result() for f in futures)) | |
| scales = list(obs_frac[0].keys()) | |
| obs_frac_dict = {} | |
| fc_frac_dict = {} | |
| for s in scales: | |
| obs_frac_dict[s] = np.concatenate([f[s] for f in obs_frac], axis=0) | |
| fc_frac_dict[s] = np.concatenate([f[s] for f in fc_frac], axis=0) | |
| obs_frac = obs_frac_dict | |
| fc_frac = fc_frac_dict | |
| frac_vars = {} | |
| k = 0 | |
| with netCDF4.Dataset(result_fn, 'w') as ds: | |
| ds.createDimension("dim_sample", obs_frac[1].shape[0]) | |
| ds.createDimension("dim_channel", obs_frac[1].shape[1]) | |
| ds.createDimension("dim_time_future", obs_frac[1].shape[2]) | |
| var_params = {"zlib": True, "complevel": 1} | |
| for s in scales: | |
| ds.createDimension(f"dim_h_pool{s}x{s}", obs_frac[s].shape[3]) | |
| ds.createDimension(f"dim_w_pool{s}x{s}", obs_frac[s].shape[4]) | |
| obs_var = ds.createVariable( | |
| f"obs_frac_scale{s}x{s}", np.float32, | |
| ( | |
| "dim_sample", "dim_channel", "dim_time_future", | |
| f"dim_h_pool{s}x{s}", f"dim_w_pool{s}x{s}", | |
| ), | |
| chunksizes=(1,)+obs_frac[s].shape[1:], | |
| **var_params | |
| ) | |
| obs_var[:] = obs_frac[s] | |
| fc_var = ds.createVariable( | |
| f"fc_frac_scale{s}x{s}", np.float32, | |
| ( | |
| "dim_sample", "dim_channel", "dim_time_future", | |
| f"dim_h_pool{s}x{s}", f"dim_w_pool{s}x{s}", | |
| ), | |
| chunksizes=(1,)+fc_frac[s].shape[1:], | |
| **var_params | |
| ) | |
| fc_var[:] = fc_frac[s] | |
| def load_fractions(fn): | |
| obs_frac = {} | |
| fc_frac = {} | |
| with netCDF4.Dataset(fn, 'r') as ds: | |
| var_list = ds.variables.keys() | |
| scales = {int(v.split("x")[-1]) for v in var_list} | |
| for s in scales: | |
| obs_frac[s] = np.array(ds[f"obs_frac_scale{s}x{s}"][:], copy=False) | |
| fc_frac[s] = np.array(ds[f"fc_frac_scale{s}x{s}"][:], copy=False) | |
| return (obs_frac, fc_frac) | |
| def fractions_skill_score( | |
| obs_frac, fc_frac, | |
| frac_axes=None, fss_axes=None, use_timesteps=None | |
| ): | |
| if isinstance(obs_frac, dict): | |
| return { | |
| s: fractions_skill_score( | |
| obs_frac[s], fc_frac[s], | |
| frac_axes=frac_axes, fss_axes=fss_axes, | |
| use_timesteps=use_timesteps | |
| ) | |
| for s in sorted(obs_frac) | |
| } | |
| if use_timesteps != None: | |
| obs_frac = obs_frac[:,:,:use_timesteps,...] | |
| fc_frac = fc_frac[:,:,:use_timesteps,...] | |
| fbs = ((obs_frac - fc_frac)**2).mean(axis=frac_axes) | |
| fbs_ref = (obs_frac**2).mean(axis=frac_axes) + \ | |
| (fc_frac**2).mean(axis=frac_axes) | |
| fss = 1 - fbs/fbs_ref | |
| if isinstance(fss, np.ndarray): | |
| fss[~np.isfinite(fss)] = 1 | |
| fss = fss.mean(axis=fss_axes) | |
| return fss | |