weatherforecast1024's picture
Upload folder using huggingface_hub
d2f661a verified
import os
import concurrent
import multiprocessing
import netCDF4
import numpy as np
from ..features.io import load_batch
def ranks_ensemble(
observation, forecasts,
noise_scale=1e-6, rng=None
):
shape = observation.shape
N = np.prod(shape)
shape_flat = (np.prod(shape),)
observation = observation.reshape((N,))
forecasts = forecasts.reshape((N, forecasts.shape[-1]))
N_threads = multiprocessing.cpu_count()
max_rank = forecasts.shape[-1]
bins = np.arange(-0.5, max_rank+0.6)
ranks_all = np.zeros_like(observation, dtype=np.uint32)
if rng is None:
rng = np.random
def rank_dist_chunk(k):
i0 = int(round((k/N_threads) * N))
i1 = int(round(((k+1) / N_threads) * N))
obs = observation[i0:i1].astype(np.float64, copy=True)
fc = forecasts[i0:i1,:].astype(np.float64, copy=True)
# add a tiny amount of noise to forecast to randomize ties
# (important to add to both obs and fc!)
obs += (rng.rand(*obs.shape) - 0.5) * noise_scale
fc += (rng.rand(*fc.shape) - 0.5) * noise_scale
ranks = np.count_nonzero(obs[...,None] >= fc, axis=-1)
ranks_all[i0:i1] = ranks
with concurrent.futures.ThreadPoolExecutor(N_threads) as executor:
futures = {}
for k in range(N_threads):
args = (rank_dist_chunk, k)
futures[executor.submit(*args)] = k
concurrent.futures.wait(futures)
return ranks_all.reshape(shape)
def ranks_multiscale(observation, forecasts):
obs = observation
fc = forecasts
rank_scales = {}
scale = 1
while True:
r = ranks_ensemble(obs, fc)
rank_scales[scale] = r
scale *= 2
if obs.shape[-1] == 1:
break
# avg pooling
obs = 0.25 * (
obs[...,::2,::2] +
obs[...,1::2,::2] +
obs[...,::2,1::2] +
obs[...,1::2,1::2]
)
fc = 0.25 * (
fc[...,::2,::2,:] +
fc[...,1::2,::2,:] +
fc[...,::2,1::2,:] +
fc[...,1::2,1::2,:]
)
return rank_scales
def rank_distribution(ranks, num_forecasts=32):
N = np.prod(ranks.shape)
bins = np.arange(-0.5, num_forecasts+0.6)
N_threads = multiprocessing.cpu_count()
ranks = ranks.ravel()
hist = [None] * N_threads
def hist_chunk(k):
i0 = int(round((k/N_threads) * N))
i1 = int(round(((k+1) / N_threads) * N))
(h, _) = np.histogram(ranks[i0:i1], bins=bins)
hist[k] = h
with concurrent.futures.ThreadPoolExecutor(N_threads) as executor:
futures = {}
for k in range(N_threads):
args = (hist_chunk, k)
futures[executor.submit(*args)] = k
concurrent.futures.wait(futures)
hist = sum(hist)
return hist / hist.sum()
def rank_KS(rank_dist, num_forecasts=32):
h = rank_dist
h = h / h.sum()
ch = np.cumsum(h)
cb = np.linspace(0, 1, len(ch))
return abs(ch-cb).max()
def rank_DKL(rank_dist, num_forecasts=32):
h = rank_dist
q = h / h.sum()
p = 1/len(h)
return p*np.log(p/q).sum()
def rank_metric_by_leadtime(ranks, metric=None, num_forecasts=32):
if metric is None:
metric = rank_DKL
metric_by_leadtime = []
for t in range(ranks.shape[2]):
ranks_time = ranks[:,:,t,...]
h = rank_distribution(ranks_time)
m = metric(h, num_forecasts=num_forecasts)
metric_by_leadtime.append(m)
return np.array(metric_by_leadtime)
def rank_metric_by_bin(ranks, values, bins, metric=None, num_forecasts=32):
if metric is None:
metric = rank_DKL
metric_by_bin = []
for (b0,b1) in zip(bins[:-1],bins[1:]):
ranks_bin = ranks[(b0 <= values) & (values < b1)]
h = rank_distribution(ranks_bin)
m = metric(h, num_forecasts=num_forecasts)
metric_by_bin.append(m)
return np.array(metric_by_bin)
def process_batch(fn, preproc_fc=None):
print(fn)
(_, y, y_pred) = load_batch(fn, preproc_fc=preproc_fc)
return ranks_multiscale(y, y_pred)
def save_ranks_for_dataset(data_dir, result_fn, preproc_fc=None):
files = sorted(os.listdir(data_dir))
files = [os.path.join(data_dir,fn) for fn in files]
N_threads = multiprocessing.cpu_count()
futures = []
with concurrent.futures.ProcessPoolExecutor(N_threads) as executor:
for fn in files:
args = (process_batch, fn)
kwargs = {"preproc_fc": preproc_fc}
futures.append(executor.submit(*args, **kwargs))
ranks = [f.result() for f in futures]
scales = sorted(ranks[0].keys())
ranks = {
s: np.concatenate([r[s] for r in ranks], axis=0)
for s in scales
}
with netCDF4.Dataset(result_fn, 'w') as ds:
ds.createDimension("dim_sample", ranks[1].shape[0])
ds.createDimension("dim_channel", ranks[1].shape[1])
ds.createDimension("dim_time_future", ranks[1].shape[2])
var_params = {"zlib": True, "complevel": 1}
for s in scales:
ds.createDimension(f"dim_h_pool{s}x{s}", ranks[s].shape[3])
ds.createDimension(f"dim_w_pool{s}x{s}", ranks[s].shape[4])
var = ds.createVariable(
f"ranks_pool{s}x{s}", np.float32,
(
"dim_sample", "dim_channel", "dim_time_future",
f"dim_h_pool{s}x{s}", f"dim_w_pool{s}x{s}",
),
chunksizes=(1,)+ranks[s].shape[1:],
**var_params
)
var[:] = ranks[s]