|
|
import os
|
|
|
import concurrent
|
|
|
import multiprocessing
|
|
|
|
|
|
import netCDF4
|
|
|
import numpy as np
|
|
|
|
|
|
from ..features.io import load_batch
|
|
|
|
|
|
|
|
|
def ranks_ensemble(
|
|
|
observation, forecasts,
|
|
|
noise_scale=1e-6, rng=None
|
|
|
):
|
|
|
shape = observation.shape
|
|
|
N = np.prod(shape)
|
|
|
shape_flat = (np.prod(shape),)
|
|
|
observation = observation.reshape((N,))
|
|
|
forecasts = forecasts.reshape((N, forecasts.shape[-1]))
|
|
|
N_threads = multiprocessing.cpu_count()
|
|
|
|
|
|
max_rank = forecasts.shape[-1]
|
|
|
bins = np.arange(-0.5, max_rank+0.6)
|
|
|
ranks_all = np.zeros_like(observation, dtype=np.uint32)
|
|
|
|
|
|
if rng is None:
|
|
|
rng = np.random
|
|
|
|
|
|
def rank_dist_chunk(k):
|
|
|
i0 = int(round((k/N_threads) * N))
|
|
|
i1 = int(round(((k+1) / N_threads) * N))
|
|
|
obs = observation[i0:i1].astype(np.float64, copy=True)
|
|
|
fc = forecasts[i0:i1,:].astype(np.float64, copy=True)
|
|
|
|
|
|
|
|
|
|
|
|
obs += (rng.rand(*obs.shape) - 0.5) * noise_scale
|
|
|
fc += (rng.rand(*fc.shape) - 0.5) * noise_scale
|
|
|
|
|
|
ranks = np.count_nonzero(obs[...,None] >= fc, axis=-1)
|
|
|
ranks_all[i0:i1] = ranks
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(N_threads) as executor:
|
|
|
futures = {}
|
|
|
for k in range(N_threads):
|
|
|
args = (rank_dist_chunk, k)
|
|
|
futures[executor.submit(*args)] = k
|
|
|
concurrent.futures.wait(futures)
|
|
|
|
|
|
return ranks_all.reshape(shape)
|
|
|
|
|
|
|
|
|
def ranks_multiscale(observation, forecasts):
|
|
|
obs = observation
|
|
|
fc = forecasts
|
|
|
|
|
|
rank_scales = {}
|
|
|
scale = 1
|
|
|
while True:
|
|
|
r = ranks_ensemble(obs, fc)
|
|
|
rank_scales[scale] = r
|
|
|
scale *= 2
|
|
|
if obs.shape[-1] == 1:
|
|
|
break
|
|
|
|
|
|
obs = 0.25 * (
|
|
|
obs[...,::2,::2] +
|
|
|
obs[...,1::2,::2] +
|
|
|
obs[...,::2,1::2] +
|
|
|
obs[...,1::2,1::2]
|
|
|
)
|
|
|
fc = 0.25 * (
|
|
|
fc[...,::2,::2,:] +
|
|
|
fc[...,1::2,::2,:] +
|
|
|
fc[...,::2,1::2,:] +
|
|
|
fc[...,1::2,1::2,:]
|
|
|
)
|
|
|
|
|
|
return rank_scales
|
|
|
|
|
|
|
|
|
def rank_distribution(ranks, num_forecasts=32):
|
|
|
N = np.prod(ranks.shape)
|
|
|
bins = np.arange(-0.5, num_forecasts+0.6)
|
|
|
N_threads = multiprocessing.cpu_count()
|
|
|
ranks = ranks.ravel()
|
|
|
|
|
|
hist = [None] * N_threads
|
|
|
def hist_chunk(k):
|
|
|
i0 = int(round((k/N_threads) * N))
|
|
|
i1 = int(round(((k+1) / N_threads) * N))
|
|
|
(h, _) = np.histogram(ranks[i0:i1], bins=bins)
|
|
|
hist[k] = h
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(N_threads) as executor:
|
|
|
futures = {}
|
|
|
for k in range(N_threads):
|
|
|
args = (hist_chunk, k)
|
|
|
futures[executor.submit(*args)] = k
|
|
|
concurrent.futures.wait(futures)
|
|
|
|
|
|
hist = sum(hist)
|
|
|
return hist / hist.sum()
|
|
|
|
|
|
|
|
|
def rank_KS(rank_dist, num_forecasts=32):
|
|
|
h = rank_dist
|
|
|
h = h / h.sum()
|
|
|
ch = np.cumsum(h)
|
|
|
cb = np.linspace(0, 1, len(ch))
|
|
|
return abs(ch-cb).max()
|
|
|
|
|
|
|
|
|
def rank_DKL(rank_dist, num_forecasts=32):
|
|
|
h = rank_dist
|
|
|
q = h / h.sum()
|
|
|
p = 1/len(h)
|
|
|
return p*np.log(p/q).sum()
|
|
|
|
|
|
|
|
|
def rank_metric_by_leadtime(ranks, metric=None, num_forecasts=32):
|
|
|
if metric is None:
|
|
|
metric = rank_DKL
|
|
|
|
|
|
metric_by_leadtime = []
|
|
|
for t in range(ranks.shape[2]):
|
|
|
ranks_time = ranks[:,:,t,...]
|
|
|
h = rank_distribution(ranks_time)
|
|
|
m = metric(h, num_forecasts=num_forecasts)
|
|
|
metric_by_leadtime.append(m)
|
|
|
return np.array(metric_by_leadtime)
|
|
|
|
|
|
|
|
|
def rank_metric_by_bin(ranks, values, bins, metric=None, num_forecasts=32):
|
|
|
if metric is None:
|
|
|
metric = rank_DKL
|
|
|
|
|
|
metric_by_bin = []
|
|
|
for (b0,b1) in zip(bins[:-1],bins[1:]):
|
|
|
ranks_bin = ranks[(b0 <= values) & (values < b1)]
|
|
|
h = rank_distribution(ranks_bin)
|
|
|
m = metric(h, num_forecasts=num_forecasts)
|
|
|
metric_by_bin.append(m)
|
|
|
return np.array(metric_by_bin)
|
|
|
|
|
|
|
|
|
def process_batch(fn, preproc_fc=None):
|
|
|
print(fn)
|
|
|
(_, y, y_pred) = load_batch(fn, preproc_fc=preproc_fc)
|
|
|
return ranks_multiscale(y, y_pred)
|
|
|
|
|
|
|
|
|
def save_ranks_for_dataset(data_dir, result_fn, preproc_fc=None):
|
|
|
files = sorted(os.listdir(data_dir))
|
|
|
files = [os.path.join(data_dir,fn) for fn in files]
|
|
|
|
|
|
N_threads = multiprocessing.cpu_count()
|
|
|
futures = []
|
|
|
with concurrent.futures.ProcessPoolExecutor(N_threads) as executor:
|
|
|
for fn in files:
|
|
|
args = (process_batch, fn)
|
|
|
kwargs = {"preproc_fc": preproc_fc}
|
|
|
futures.append(executor.submit(*args, **kwargs))
|
|
|
|
|
|
ranks = [f.result() for f in futures]
|
|
|
scales = sorted(ranks[0].keys())
|
|
|
ranks = {
|
|
|
s: np.concatenate([r[s] for r in ranks], axis=0)
|
|
|
for s in scales
|
|
|
}
|
|
|
|
|
|
with netCDF4.Dataset(result_fn, 'w') as ds:
|
|
|
ds.createDimension("dim_sample", ranks[1].shape[0])
|
|
|
ds.createDimension("dim_channel", ranks[1].shape[1])
|
|
|
ds.createDimension("dim_time_future", ranks[1].shape[2])
|
|
|
var_params = {"zlib": True, "complevel": 1}
|
|
|
|
|
|
for s in scales:
|
|
|
ds.createDimension(f"dim_h_pool{s}x{s}", ranks[s].shape[3])
|
|
|
ds.createDimension(f"dim_w_pool{s}x{s}", ranks[s].shape[4])
|
|
|
var = ds.createVariable(
|
|
|
f"ranks_pool{s}x{s}", np.float32,
|
|
|
(
|
|
|
"dim_sample", "dim_channel", "dim_time_future",
|
|
|
f"dim_h_pool{s}x{s}", f"dim_w_pool{s}x{s}",
|
|
|
),
|
|
|
chunksizes=(1,)+ranks[s].shape[1:],
|
|
|
**var_params
|
|
|
)
|
|
|
var[:] = ranks[s]
|
|
|
|