Source code for rain2flow._hbv._numba


__all__ = ['hbv_nb']

from typing import Dict

try:
    import numba as nb
except (ImportError, ModuleNotFoundError):
    nb = None

import numpy as np

from ..utils import to_oneD_array


[docs] def hbv_nb( prec:np.ndarray, temp:np.ndarray, pet:np.ndarray, parameters:Dict[str, float], initialize:bool = True, routing:bool = True, )->np.ndarray: """ Same as hbv function in :py:module:`rain2flow._hbv._main` but compatible with numba! parameters : dict a dictionary of 15 model parameters routing : bool whether to apply triangular routing to the output or not initialize : bool whether to initialize the model by running it once with the whole input data or not Returns ------- np.ndarray array of simulated discharge values (mm/day) """ if nb is None: raise ImportError("numba is not installed, cannot use hbv_nb function") prec = to_oneD_array(prec, dtype=np.float32) temp = to_oneD_array(temp, dtype=np.float32) pet = to_oneD_array(pet, dtype=np.float32) assert len(prec) == len(temp) == len(pet), "Input arrays must have the same length" assert isinstance(parameters, dict), "Parameters must be a dictionary" assert all([k in parameters for k in [ 'PCORR', 'TT', 'SFCF', 'CFMAX', 'CFR', 'CWH', 'FC', 'BETA', 'LP', 'K0', 'K1', 'K2', 'UZL', 'PERC', 'MAXBAS']]), "Missing required parameters" assert all([isinstance(v, (int, float)) for v in parameters.values()]), "All parameters must be floats" assert len(parameters) == 15, f"Expected 15 parameters for HBV, got {len(parameters)}" parameters = {k:float(v) for k,v in parameters.items()} return hbv_( prec, temp, pet, #**{k:np.array([v]) for k,v in parameters.items()}, **parameters, initialize=initialize, routing=routing, )
@nb.jit(nopython=True) def hbv_( p, #:np.ndarray, temp, pet, # ** parameters ** PCORR, TT, SFCF, CFMAX, CFR, CWH, FC, BETA, LP, K0, K1, K2, UZL, PERC, MAXBAS, initialize, routing, # ** states ** q0 = 0.001, snowpack0 = 0.001, MELTWATER0 = 0.001, SM0 = 0.001, SUZ0 = 0.001, SLZ0 = 0.001, ETact0 = 0.001, ): # Initialize time series of model variables SNOWPACK = snowpack0 MELTWATER = MELTWATER0 SM = SM0 SUZ = SUZ0 SLZ = SLZ0 ETact = ETact0 Qsim = np.full(len(p), np.nan, dtype=np.float32) Qsim[0] = q0 # Start loop if initialize: init_days = p.shape[0]-1 else: init_days = 0 time_step = 0 init_day_counter = 0 init_done = False total_steps = 0 while time_step < p.shape[0]: # Separate precipitation into liquid and solid components PRECIP = p[time_step] * PCORR RAIN = np.multiply(PRECIP, temp[time_step]>= TT).item() SNOW = np.multiply(PRECIP, temp[time_step]< TT).item() SNOW = SNOW * SFCF # Snow SNOWPACK = SNOWPACK+SNOW melt = CFMAX * (temp[time_step] - TT) # melt = np.clip(melt, 0., SNOWPACK) melt = max(0., min(melt, SNOWPACK)) MELTWATER = MELTWATER + melt SNOWPACK = SNOWPACK - melt refreezing = CFR * CFMAX * (TT- temp[time_step]) #refreezing = np.clip(refreezing, 0, MELTWATER) refreezing = max(0., min(refreezing, MELTWATER)) SNOWPACK = SNOWPACK + refreezing MELTWATER = MELTWATER - refreezing tosoil = MELTWATER-(CWH * SNOWPACK) # tosoil = np.clip(tosoil, 0, None) tosoil = max(0., tosoil) MELTWATER = MELTWATER - tosoil # Soil and evaporation soil_wetness = (SM / FC) ** BETA #soil_wetness = soil_wetness.clip(0,1.0) # soil_wetness = np.clip(soil_wetness, 0,1.0) soil_wetness = max(0., min(soil_wetness, 1.0)) recharge = (RAIN + tosoil) * soil_wetness SM = SM+RAIN + tosoil - recharge excess = SM - FC # excess = np.clip(excess, 0, None) excess = max(0., excess) SM = SM-excess evapfactor = SM/(LP * FC) # evapfactor = np.clip(evapfactor, 0, 1.0) evapfactor = max(0., min(evapfactor, 1.0)) ETact = pet[time_step] * evapfactor ETact = np.minimum(SM, ETact) SM = SM-ETact # Groundwater boxes SUZ = SUZ + recharge + excess #perc = np.minimum(SUZ, PERC) perc = min(SUZ, PERC) SUZ = SUZ - perc # Q0 = K0 * np.maximum(SUZ- UZL, 0.0) Q0 = K0 * max(SUZ - UZL, 0.) SUZ = SUZ-Q0 Q1 = K1 * SUZ SUZ = SUZ-Q1 SLZ = SLZ + perc Q2 = K2 * SLZ SLZ = SLZ-Q2 Qsim[time_step] = Q0+Q1+Q2 time_step = time_step+1 init_day_counter = init_day_counter+1 total_steps += 1 # Go back to date_start once we've reached the initialization period if (init_done==False) & (init_day_counter==init_days): time_step = 0 init_done = True ## Check water balance closure #storage_diff = SM[-1]-SM[0]+SUZ[-1]-SUZ[0]+SLZ[-1]-SLZ[0]+SNOWPACK[-1]-SNOWPACK[0]+MELTWATER[-1]-MELTWATER[0] #error = np.mean(P*365.25) - np.mean(Qsim*365.25) - np.mean(ETact*365.25) - (365.25*storage_diff/len(P)) #if error > 1: # print "Warning: Water balance error of "+str(round(error*1000) / 1000)+" mm/yr" if routing: # Add routing delay to simulated runoff, uses MAXBAS parameter MAXBAS = np.round(MAXBAS * 100) / 100 window = MAXBAS * 100 if int(window) < 0: raise ValueError(f"MAXBAS parameter must be greater than 0 {MAXBAS}") w = np.empty(int(window), dtype=np.float32) for x in range(0, int(window)): w[x] = window/2 - abs(window/2-x-0.5) # w = np.concatenate([w, [0.0]*200]) # w = np.concatenate([w, np.full(200, 0.0, dtype=np.float32)]) w = np.append(w, np.full(200, 0.0, dtype=np.float32)) # w_small = [0.0] * int(np.ceil(MAXBAS)) w_small = np.full(int(np.ceil(MAXBAS)) , fill_value=0.0, dtype=np.float32) #w_small = np.arange(2, 10, dtype=np.float) for x in range(0, int(np.ceil(MAXBAS))): w_small[x] = np.sum(w[x*100:x*100+100]) w_small = w_small/np.sum(w_small) Qsim_smoothed = np.full(Qsim.shape, fill_value=0.0, dtype=np.float32) for ii in range(len(w_small)): Qsim_smoothed[ii:len(Qsim_smoothed)] = Qsim_smoothed[ii:len(Qsim_smoothed)] \ + Qsim[0:len(Qsim_smoothed)-ii]*w_small[ii] Qsim = Qsim_smoothed return Qsim