Skip to content

Reference

Main functions

time_march(p, cycles=None)

Run the actual simulation(s) as defined in the input json file p.

Source code in HGD/main.py
def time_march(p, cycles=None):
    """
    Run the actual simulation(s) as defined in the input json file `p`.
    """

    state = init(p, cycles)
    if p.t_f is not None:
        for tstep in tqdm(range(1, p.nt), leave=False, desc=p.this_sim, position=p.concurrent_index + 1):
            state = time_step(p, state)
    else:
        chi_progress_bar = tqdm(total=1.0, leave=False, desc=p.this_sim, position=p.concurrent_index + 1)

        p.stopped_times = 0
        while not p.stop_event:
            chi = state[6]
            # Update progress bar for chi using a log scale
            with np.errstate(divide="ignore"):
                progress = (np.log10(p.max_chi) - np.log10(chi.mean())) / (
                    np.log10(p.max_chi) - np.log10(p.min_chi)
                )
            chi_progress_bar.n = max(0, min(1.0, progress))  # Ensure bounds
            chi_progress_bar.refresh()

            state = time_step(p, state)

            if p.stopped_times > p.stop_after:
                p.stop_event = True

    plotter.update(p, state)

Operators

empty_nearby(nu, p)

Identifies empty spaces adjacent to each point in a grid based on a given solid fraction matrix.

Parameters: - nu (numpy.ndarray): A 2D array representing the solid fraction at each point in the grid. - p (object): An object containing simulation parameters, not used in this function but included for consistency with the interface.

Returns: - numpy.ndarray: A boolean array where True indicates an empty space adjacent to the corresponding point in the input grid.

This function applies a maximum filter with a cross-shaped kernel to the solid fraction matrix 'nu'. The kernel is defined to consider the four cardinal directions (up, down, left, right) adjacent to each point. The maximum filter operation identifies the maximum solid fraction value in the neighborhood defined by the kernel for each point. Points where the maximum solid fraction in their neighborhood is 0 are considered adjacent to an empty space, and the function returns a boolean array marking these points as True.

Source code in HGD/operators.py
def empty_nearby(nu, p):
    """
    Identifies empty spaces adjacent to each point in a grid based on a given solid fraction matrix.

    Parameters:
    - nu (numpy.ndarray): A 2D array representing the solid fraction at each point in the grid.
    - p (object): An object containing simulation parameters, not used in this function but included for consistency with the interface.

    Returns:
    - numpy.ndarray: A boolean array where True indicates an empty space adjacent to the corresponding point in the input grid.

    This function applies a maximum filter with a cross-shaped kernel to the solid fraction matrix 'nu'. The kernel is defined to consider the four cardinal directions (up, down, left, right) adjacent to each point. The maximum filter operation identifies the maximum solid fraction value in the neighborhood defined by the kernel for each point. Points where the maximum solid fraction in their neighborhood is 0 are considered adjacent to an empty space, and the function returns a boolean array marking these points as True.
    """
    kernel = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]])
    nu_max = maximum_filter(nu, footprint=kernel)  # , mode='constant', cval=0.0)

    return nu_max == 0

get_average(s, loc=None)

Calculate the mean size over the microstructural co-ordinate.

Source code in HGD/operators.py
def get_average(s, loc: list | None = None):
    """
    Calculate the mean size over the microstructural co-ordinate.
    """
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        if loc is None:
            s_bar = np.nanmean(s, axis=2)
        else:
            s_bar = np.nanmean(s[loc[0], loc[1], :])
    return s_bar

get_depth(nu, p, debug=False)

Calculate the depth array based on the top indices and y-coordinates.

nu : array-like An array or list of values used to determine the top indices. p : object An object containing the attributes 'nx', 'ny', and 'y'. 'nx' and 'ny' are the dimensions of the grid, and 'y' is an array of y-coordinates.

numpy.ndarray A 2D array of shape (p.nx, p.ny) representing the depth values.

Source code in HGD/operators.py
def get_depth(nu, p, debug=False):
    """
    Calculate the depth array based on the top indices and y-coordinates.

    Parameters:
    nu : array-like
        An array or list of values used to determine the top indices.
    p : object
        An object containing the attributes 'nx', 'ny', and 'y'. 'nx' and 'ny' are the dimensions
        of the grid, and 'y' is an array of y-coordinates.

    Returns:
    numpy.ndarray
        A 2D array of shape (p.nx, p.ny) representing the depth values.
    """
    top = get_top(nu, p)
    depth = np.zeros([p.nx, p.ny])

    for i in range(p.nx):
        depth[i, :] = p.y[top[i]] - p.y

    if debug:
        import matplotlib.pyplot as plt

        plt.figure(77)
        plt.ion()
        plt.clf()
        plt.pcolormesh(p.x, p.y, depth.T)
        plt.colorbar()
        plt.pause(0.01)

    return depth

get_hyperbolic_average(s, loc=None)

Calculate the hyperbolic mean size over the microstructural co-ordinate.

Source code in HGD/operators.py
def get_hyperbolic_average(s: np.ndarray, loc: list | None = None) -> float:
    """
    Calculate the hyperbolic mean size over the microstructural co-ordinate.
    """
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        if loc is None:
            return 1.0 / np.nanmean(1.0 / s, axis=2)
        else:
            return 1.0 / np.nanmean(1.0 / s[loc[0], loc[1], :])

get_solid_fraction(s, loc=None)

Calculate solid fraction of a single physical in a 3D array.

Parameters:

Name Type Description Default
s ndarray

a 3D numpy array

required
loc list | None

Either None or a list of two integers.

None

Returns:

Type Description
float

The fraction of the solid phase in s at (i, j) as a float.

Source code in HGD/operators.py
def get_solid_fraction(s: np.ndarray, loc: list | None = None) -> float:
    """Calculate solid fraction of a single physical in a 3D array.

    Args:
        s: a 3D numpy array
        loc: Either None or a list of two integers.

    Returns:
        The fraction of the solid phase in s at (i, j) as a float.
    """
    # return np.mean(~np.isnan(s[i, j, :]))
    if loc is None:
        return 1.0 - np.mean(np.isnan(s), axis=2)
    else:
        return 1.0 - np.mean(np.isnan(s[loc[0], loc[1], :]))

get_top(nu, p, nu_lim=0)

Determine the top void in each column of a 2D array.

Parameters: nu (ndarray): A 2D numpy array where each element indicates the presence (0) or absence (non-zero) of a void. p (object): An object with attributes nx (number of columns) and ny (number of rows).

ndarray: A 1D numpy array of integers where each element represents the row index of the top void in the corresponding column. If a column has no voids, the value will be p.ny - 1.

Source code in HGD/operators.py
def get_top(nu, p, nu_lim=0):
    """
    Determine the top void in each column of a 2D array.

    Parameters:
    nu (ndarray): A 2D numpy array where each element indicates the presence (0) or absence (non-zero) of a void.
    p (object): An object with attributes `nx` (number of columns) and `ny` (number of rows).

    Returns:
    ndarray: A 1D numpy array of integers where each element represents the row index of the top void in the corresponding column.
             If a column has no voids, the value will be `p.ny - 1`.
    """
    solid = nu > nu_lim
    # now get top void in each column
    top = np.zeros(p.nx)
    for i in range(p.nx):
        for j in range(p.ny - 1, 0, -1):
            if solid[i, j]:
                top[i] = j
                break
    # print(f"Top voids: {top}")
    return top.astype(int)

locally_solid(s, i, j, p)

Determines if a given point in the simulation grid is locally solid based on the solid fraction threshold.

Parameters: - s (object): The simulation state, containing the grid and relevant data. - i (int): The row index of the point in the grid. - j (int): The column index of the point in the grid. - p (object): An object containing simulation parameters, including the critical solid fraction threshold (nu_cs).

Returns: - bool: True if the solid fraction at the given point is greater than or equal to the critical solid fraction threshold (nu_cs), indicating the point is locally solid. False otherwise.

This function calculates the solid fraction at the specified point (i, j) in the simulation grid. It then compares this value to the critical solid fraction threshold (nu_cs) defined in the parameter object p. If the solid fraction at the point is greater than or equal to nu_cs, the function returns True, indicating the point is considered locally solid. Otherwise, it returns False.

Source code in HGD/operators.py
def locally_solid(s, i, j, p):
    """
    Determines if a given point in the simulation grid is locally solid based on the solid fraction threshold.

    Parameters:
    - s (object): The simulation state, containing the grid and relevant data.
    - i (int): The row index of the point in the grid.
    - j (int): The column index of the point in the grid.
    - p (object): An object containing simulation parameters, including the critical solid fraction threshold (nu_cs).

    Returns:
    - bool: True if the solid fraction at the given point is greater than or equal to the critical solid fraction threshold (nu_cs), indicating the point is locally solid. False otherwise.

    This function calculates the solid fraction at the specified point (i, j) in the simulation grid. It then compares this value to the critical solid fraction threshold (nu_cs) defined in the parameter object p. If the solid fraction at the point is greater than or equal to nu_cs, the function returns True, indicating the point is considered locally solid. Otherwise, it returns False.
    """
    nu = get_solid_fraction(s, [i, j])
    if isinstance(p.nu_cs, np.ndarray):
        return nu >= p.nu_cs[i, j]
    else:
        return nu >= p.nu_cs

stable_slope(s, i, j, dest, p)

Determines if the slope between two points is stable based on the solid fraction difference.

Parameters: - s (object): The simulation state, containing the grid and relevant data. - i (int): The current row index in the grid. - j (int): The current column index in the grid. - dest (int): The destination row index for comparison. - p (object): An object containing simulation parameters, including the delta_limit.

  • bool: True if the difference in solid fraction between the current point and the destination point is less than or equal to the delta_limit, indicating a stable slope. False otherwise.

This function calculates the solid fraction at the current point (i, j) and at a destination point (dest, j), then compares the difference in solid fraction to a threshold (delta_limit) defined in the parameter object p. If the difference is less than or equal to the threshold, the function returns True, indicating the slope is stable. Otherwise, it returns False.

Source code in HGD/operators.py
def stable_slope(s, i, j, dest, p):
    """
    Determines if the slope between two points is stable based on the solid fraction difference.

    Parameters:
    - s (object): The simulation state, containing the grid and relevant data.
    - i (int): The current row index in the grid.
    - j (int): The current column index in the grid.
    - dest (int): The destination row index for comparison.
    - p (object): An object containing simulation parameters, including the delta_limit.

    Returns:
    - bool: True if the difference in solid fraction between the current point and the destination
        point is less than or equal to the delta_limit, indicating a stable slope. False otherwise.

    This function calculates the solid fraction at the current point (i, j) and at a destination point
    (dest, j), then compares the difference in solid fraction to a threshold (delta_limit) defined in
    the parameter object p. If the difference is less than or equal to the threshold, the function
    returns True, indicating the slope is stable. Otherwise, it returns False.
    """
    nu_here = get_solid_fraction(s, [i, j])
    nu_dest = get_solid_fraction(s, [dest, j])
    delta_nu = nu_dest - nu_here

    return delta_nu <= p.delta_limit

stable_slope_fast(s, d, p, chi=None, potential_free_surface=None)

Determines the stability of slopes based on the solid fraction.

Parameters: s (numpy.ndarray): A 3D array representing the solid fraction in the system. d (int): The direction in which to roll the array (shift axis). p (object): An object containing the parameter delta_limit which is used to determine stability.

Returns: numpy.ndarray: A 3D boolean array where True indicates stable slopes and False indicates unstable slopes.

Source code in HGD/operators.py
def stable_slope_fast(s, d, p, chi=None, potential_free_surface=None):
    """
    Determines the stability of slopes based on the solid fraction.

    Parameters:
    s (numpy.ndarray): A 3D array representing the solid fraction in the system.
    d (int): The direction in which to roll the array (shift axis).
    p (object): An object containing the parameter `delta_limit` which is used to determine stability.

    Returns:
    numpy.ndarray: A 3D boolean array where `True` indicates stable slopes and `False` indicates unstable slopes.
    """

    nu_here = get_solid_fraction(s)
    nu_dest = np.roll(nu_here, d, axis=0)
    delta_nu = nu_dest - nu_here

    # delta_nu = -dir*np.gradient(nu_here,axis=0)
    if p.inertia:
        get_delta_limit(p, chi)
    else:
        delta_limit = p.delta_limit

    if potential_free_surface is None:
        stable = delta_nu <= delta_limit
    else:
        stable = (delta_nu <= p.delta_limit) & potential_free_surface

    Stable = np.repeat(stable[:, :, np.newaxis], s.shape[2], axis=2)
    return Stable

stable_slope_gradient(s, dir, p, debug=False)

Determines the stability of slopes based on the solid fraction.

Parameters: s (numpy.ndarray): A 3D array representing the solid fraction in the system. dir (int): The direction in which to roll the array (shift axis). p (object): An object containing the parameter delta_limit which is used to determine stability.

Returns: numpy.ndarray: A 3D boolean array where True indicates stable slopes and False indicates unstable slopes.

Source code in HGD/operators.py
def stable_slope_gradient(s, dir, p, debug=False):
    """
    Determines the stability of slopes based on the solid fraction.

    Parameters:
    s (numpy.ndarray): A 3D array representing the solid fraction in the system.
    dir (int): The direction in which to roll the array (shift axis).
    p (object): An object containing the parameter `delta_limit` which is used to determine stability.

    Returns:
    numpy.ndarray: A 3D boolean array where `True` indicates stable slopes and `False` indicates unstable slopes.
    """

    nu = get_solid_fraction(s)
    dnu_dx, dnu_dy = np.gradient(nu)
    nu_mag = np.sqrt(dnu_dx**2 + dnu_dy**2)
    with np.errstate(divide="ignore", invalid="ignore"):
        slope = np.nan_to_num(dnu_dy / dnu_dx, posinf=0, neginf=0)
    stable = (np.abs(slope) < p.mu) & (nu_mag > 0.2)

    if debug:
        import matplotlib.pyplot as plt

        plt.ion()
        plt.figure(98)
        plt.subplot(2, 2, 1)
        plt.imshow(nu_mag)
        plt.colorbar()
        plt.subplot(2, 2, 2)
        plt.imshow(slope)
        plt.colorbar()
        plt.subplot(2, 2, 3)
        plt.imshow(stable)
        plt.colorbar()
        plt.pause(1e-1)

    Stable = np.repeat(stable[:, :, np.newaxis], s.shape[2], axis=2)
    return Stable

stable_slope_stress(s, p, last_swap, debug=False)

Determines the stability of slopes based on the stress.

Parameters: s (numpy.ndarray): A 3D array representing the solid fraction in the system. p (object): An object containing the parameter delta_limit which is used to determine stability. last_swap (numpy.ndarray): A 3D array representing the last swap in the system.

Returns: numpy.ndarray: A 3D boolean array where True indicates stable slopes and False indicates unstable slopes.

Source code in HGD/operators.py
def stable_slope_stress(s, p, last_swap, debug=False):
    """
    Determines the stability of slopes based on the stress.

    Parameters:
    s (numpy.ndarray): A 3D array representing the solid fraction in the system.
    p (object): An object containing the parameter `delta_limit` which is used to determine stability.
    last_swap (numpy.ndarray): A 3D array representing the last swap in the system.

    Returns:
    numpy.ndarray: A 3D boolean array where `True` indicates stable slopes and `False` indicates unstable slopes.
    """

    sigma = stress.calculate_stress(s, last_swap, p)
    mobilised_friction_angle = stress.get_friction_angle(sigma, p, last_swap)

    stable = mobilised_friction_angle <= p.repose_angle

    if debug and np.random.rand() < 0.01:
        import matplotlib.pyplot as plt

        plt.close("all")
        plt.figure(98)
        plt.ion()
        plt.subplot(211)
        plt.pcolormesh(p.x, p.y, mobilised_friction_angle.T)
        plt.colorbar()
        plt.subplot(212)
        plt.pcolormesh(p.x, p.y, stable.T)
        plt.colorbar()
        plt.pause(1e-5)

    Stable = np.repeat(stable[:, :, np.newaxis], s.shape[2], axis=2)
    return Stable

Boundary conditions

close_voids(p, s, u, v, c, T, last_swap, chi, sigma, outlet)

Not implemented. Do not use.

Source code in HGD/boundary.py
def close_voids(p, s, u, v, c, T, last_swap, chi, sigma, outlet):
    """
    Not implemented. Do not use.
    """
    for i in range(p.nx):
        for j in np.arange(p.ny - 1, -1, -1):  # go from top to bottom
            for k in range(p.nm):
                if np.isnan(s[i, j, k]):
                    pass
                    # if np.random.rand() < 5e-2 * dt / dy:  # FIXME
                    #     v[i, j:] -= 1
                    #     s[i, j:, k] = np.roll(s[i, j:, k], -1)
    return s, u, v, c, T, last_swap, chi, sigma, outlet

update(p, state)

Add voids to the system. This function is called at each time step. Loop through all functions defined here and call them if they are in the list of boundary methods.

Source code in HGD/boundary.py
def update(p, state):
    """
    Add voids to the system. This function is called at each time step.
    Loop through all functions defined here and call them if they are in the list of boundary methods.
    """

    boundary_methods = [name for name, obj in globals().items() if callable(obj)]

    for method in p.boundaries:
        if method in boundary_methods:
            state = globals()[method](p, *state)

    if p.close_voids:
        state = close_voids(*state)

    if p.wall_motion:
        if p.t % p.save_wall == 0:
            s = state[0]
            s_mean = np.nanmean(s, axis=2)

            start_sim = np.min(np.argwhere(s_mean > 0), axis=0)[
                0
            ]  # gives the start position of column in x-direction
            end_sim = np.max(np.argwhere(s_mean > 0), axis=0)[
                0
            ]  # gives the end position of column in x-direction

            if start_sim > 1 and end_sim + 1 < p.nx - 1:
                # s[start_sim-2:start_sim-1,:,:] = np.nan
                s[end_sim + 2 : end_sim + 3, :, :] = np.nan

            state[0] = s

    return state

Initial conditions

IC(p)

Sets up the initial value of the grain size and/or void distribution everywhere.

Parameters:

Name Type Description Default
p

Parameters class. In particular, the gsd_mode and IC_mode should be set to determine the grain size distribution (gsd) and the initial condition (IC).

required

Returns:

Type Description

The array of grain sizes. Values of NaN are voids.

Source code in HGD/initial.py
def IC(p):
    """
    Sets up the initial value of the grain size and/or void distribution everywhere.

    Args:
        p: Parameters class. In particular, the `gsd_mode` and `IC_mode` should be set to determine the grain size distribution (gsd) and the initial condition (IC).

    Returns:
        The array of grain sizes. Values of `NaN` are voids.
    """
    rng = np.random.default_rng()

    # First step: Generate BOTH the grain size distribution and the void distribution

    # pick a grain size distribution
    if p.gsd_mode == "mono":
        s = np.nan * np.ones([p.nx, p.ny, p.nm])  # monodisperse
        for i in range(p.nx):
            for j in range(p.ny):
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, j, fill] = p.s_m
        p.s_M = p.s_m

    elif p.gsd_mode == "half_half":
        in_ny = int(np.ceil(p.ny * 0.4))  # the height can be controlled here if required
        s = np.nan * np.ones([p.nx, p.ny, p.nm])  # monodisperse
        for i in range(p.nx):
            for j in range(in_ny // 2):
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, j, fill] = p.s_M
            for k in range(in_ny // 2, in_ny):
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, k, fill] = p.s_m

    elif p.gsd_mode == "four_layers":
        s = np.nan * np.ones([p.nx, p.ny, p.nm])  # monodisperse
        layers = 4
        d_pts = p.ny * 0.6
        lay_breaks = p.ny / layers
        lay_breaks = d_pts / layers
        y_ordinates = []
        for i in range(layers):
            y_ordinates.append(np.arange(0 + lay_breaks * i, lay_breaks * (i + 1), 1))
        y_ordinates = np.array(y_ordinates).astype(int)

        for i in range(p.nx):
            for j in y_ordinates[0]:
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, j, fill] = p.s_M
            for k in y_ordinates[1]:
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, k, fill] = p.s_m
            for l in y_ordinates[2]:
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, l, fill] = p.s_M
            for m in y_ordinates[3]:
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                s[i, m, fill] = p.s_m

    elif p.gsd_mode == "bi" or p.gsd_mode == "fbi":  # bidisperse
        if (p.nm * p.large_concentration * p.nu_fill) < 2:
            s = np.random.choice([p.s_m, p.s_M], size=[p.nx, p.ny, p.nm])
        else:
            s = np.nan * np.ones([p.nx, p.ny, p.nm])
            for i in range(p.nx):
                for j in range(p.ny):
                    large = rng.choice(
                        p.nm, size=int(p.nm * p.large_concentration * p.nu_fill), replace=False
                    )
                    s[i, j, large] = p.s_M
                    remaining = np.where(np.isnan(s[i, j, :]))[0]
                    small = rng.choice(
                        remaining, size=int(p.nm * (1 - p.large_concentration) * p.nu_fill), replace=False
                    )
                    s[i, j, small] = p.s_m

    elif p.gsd_mode == "power_law":
        # CGSD(d) = ( d**(3-alpha) - d_m**(3-alpha) ) / ( d_M**(3-alpha) - d_m**(3-alpha) )
        # d = ( CGSD(d) * ( d_M**(3-alpha) - d_m**(3-alpha) ) + d_m**(3-alpha) )**(1/(3-alpha))
        s = np.nan * np.ones([p.nx, p.ny, p.nm])
        for i in range(p.nx):
            for j in range(p.ny):
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                F = rng.uniform(low=0, high=1, size=len(fill))
                s[i, j, fill] = (
                    F * (p.s_M ** (3.0 - p.power_law_alpha) - p.s_m ** (3.0 - p.power_law_alpha))
                    + p.s_m ** (3.0 - p.power_law_alpha)
                ) ** (1.0 / (3.0 - p.power_law_alpha))

    elif p.gsd_mode == "uniform":  # polydisperse
        s = rng.uniform(p.s_m, p.s_M, size=[p.nx, p.ny, p.nm])

        mask = rng.uniform(size=[p.nx, p.ny, p.nm]) > p.nu_fill
        s[mask] = np.nan

    elif p.gsd_mode == "weibull":
        # Using a weird parameterisation of the Weibull distribution to have d_50 as a parameter
        # F(d) = 1−exp(−(d/d_50)^k)*ln(2))
        # Rearranging for d: d = d_50 * (-ln(1-F)/ln(2))**(1/k)
        s = np.nan * np.ones([p.nx, p.ny, p.nm])
        for i in range(p.nx):
            for j in range(p.ny):
                fill = rng.choice(p.nm, size=int(p.nm * p.nu_fill), replace=False)
                F = rng.uniform(low=0, high=1, size=len(fill))
                s[i, j, fill] = p.d_50 * (-np.log(1 - F) / np.log(2)) ** (1 / p.k)
    else:
        raise ValueError(f"Unrecognised gsd_mode: {p.gsd_mode}")

    # Second step: Mask out regions in space

    if p.IC_mode == "random":  # voids everywhere randomly, this has been done above in the first step
        # mask = np.random.rand(p.nx, p.ny, p.nm) > p.nu_fill
        return s
    elif p.IC_mode == "top":  # material at the top
        mask = np.zeros([p.nx, p.ny, p.nm], dtype=bool)
        mask[:, : int((1 - p.fill_ratio) * p.ny), :] = True
    elif p.IC_mode == "bottom":  # material at the bottom
        mask = np.zeros([p.nx, p.ny, p.nm], dtype=bool)
        mask[:, int(p.fill_ratio * p.ny) :, :] = True
    elif p.IC_mode == "full":  # completely full
        return s
    elif p.IC_mode == "column":  # just middle full to top
        mask = np.ones([p.nx, p.ny, p.nm], dtype=bool)
        mask[
            p.nx // 2 - int(p.fill_ratio / 4 * p.nx) : p.nx // 2 + int(p.fill_ratio / 4 * p.nx), :, :
        ] = False

        mask[
            :, -1, :
        ] = True  # top row can't be filled for algorithmic reasons - could solve this if we need to

    elif p.IC_mode == "empty":  # completely empty
        mask = np.ones([p.nx, p.ny, p.nm], dtype=bool)

    elif p.IC_mode == "left_column":  # just middle full to top
        mask = np.ones([p.nx, p.ny, p.nm], dtype=bool)
        mask[: int(p.fill_ratio * p.nx), :, :] = False

        mask[
            :, -1, :
        ] = True  # top row can't be filled for algorithmic reasons - could solve this if we need to
    # create alternating layers of two densities
    elif p.IC_mode == "layers":
        layer_thickness = 20
        mask = np.zeros([p.nx, p.ny, p.nm], dtype=bool)
        current_layer = 0
        extra_num_masked_1 = (1 - p.nu_fill_1) * p.nm
        extra_num_masked_2 = (1 - p.nu_fill_2) * p.nm
        for j in range(p.ny):
            if j % layer_thickness == 0:
                current_layer = 1 - current_layer
            for i in range(p.nx):
                if current_layer == 0:
                    indices = rng.choice(p.nm, size=int(extra_num_masked_1), replace=False)
                    mask[i, j, indices] = True
                else:
                    indices = rng.choice(p.nm, size=int(extra_num_masked_2), replace=False)
                    mask[i, j, indices] = True

    # create a wedge at the angle of repose
    elif p.IC_mode == "wedge":
        mask = np.ones([p.nx, p.ny, p.nm], dtype=bool)
        H = p.W / 2 * np.tan(np.radians(p.repose_angle))  # height of the wedge

        for i in range(p.nx):
            for j in range(p.ny):
                if p.y[j] - H < -np.abs(p.x[i]) * np.tan(np.radians(p.repose_angle)):
                    mask[i, j, :] = False

    elif p.IC_mode == "slope":
        x = np.arange(0, p.nx)
        y = np.arange(0, p.ny)
        X, Y = np.meshgrid(x, y, indexing="ij")
        top = p.cyclic_BC_y_offset + p.fill_ratio * p.ny
        mask = Y > (-p.cyclic_BC_y_offset / p.nx * X + top)

    else:
        raise ValueError(f"Unrecognised IC_mode: {p.IC_mode}")
    s[mask] = np.nan

    if p.wall_motion:
        nu = 1.0 - np.mean(np.isnan(s[:, :, :]), axis=2)
        start_sim = np.min(np.argwhere(nu > 0), axis=0)[
            0
        ]  # gives the start position of column in x-direction
        end_sim = np.max(np.argwhere(nu > 0), axis=0)[0]  # gives the end position of column in x-direction

        s[0 : start_sim - 1, :, :] = 0  # Depending upon these values, make changes in void_migration.py
        s[end_sim + 2 :, :, :] = 0

    return s

inclination(p, s)

This function is used when the bottom of silo is to be constructed with certain angle with horizontal. The variable used is form_angle which is by default set to 0 in defaults.json5

Source code in HGD/initial.py
def inclination(p, s):
    """
    This function is used when the bottom of silo is to be constructed with
    certain angle with horizontal. The variable used is form_angle which
    is by default set to 0 in defaults.json5
    """
    if p.form_angle == 0:
        pass
    elif p.form_angle > 0:
        req_r = np.arange(p.nx // 2 + p.half_width + 1, p.nx, 1) - (p.nx // 2 + p.half_width + 1)
        req_r_y = []
        for i in req_r:
            if (
                round(np.tan(np.radians(p.form_angle)) * i) < 2
            ):  # to avoid larger outlet when angle becomes less
                req_r_y.append(2)
            else:
                req_r_y.append(round(np.tan(np.radians(p.form_angle)) * i))

        req_l = np.arange(0, p.nx // 2 - p.half_width, 1)
        req_l_y = []
        for i in req_l:
            if round(np.tan(np.radians(p.form_angle)) * i) < 2:
                req_l_y.append(2)
            else:
                req_l_y.append(round(np.tan(np.radians(p.form_angle)) * i))

        x_vals = np.concatenate((req_l, (p.nx // 2 + p.half_width + 1) + req_r))
        y_vals = np.concatenate((req_l_y[::-1], req_r_y))
        print(x_vals)

        for i in range(len(x_vals)):
            for j in range(y_vals[i]):
                s[x_vals[i], j, :] = 0

        s = np.ma.masked_where(s == 0, s)

    return s

Output

array_to_png_buffer(array, colormap, vmin=None, vmax=None)

Convert a NumPy array to a PNG image buffer using a colormap.

Parameters: array (numpy.ndarray): The input array with shape (height, width). colormap (function): A function that maps array values to RGB tuples.

Returns: io.BytesIO: A buffer containing the PNG image.

Source code in HGD/plotter.py
def array_to_png_buffer(array, colormap, vmin=None, vmax=None):
    """
    Convert a NumPy array to a PNG image buffer using a colormap.

    Parameters:
    array (numpy.ndarray): The input array with shape (height, width).
    colormap (function): A function that maps array values to RGB tuples.

    Returns:
    io.BytesIO: A buffer containing the PNG image.
    """

    # Normalize the array to the range [0, 1]
    if vmin is None:
        vmin = np.nanmin(array)
    if vmax is None:
        vmax = np.nanmax(array)
    norm_array = (array - vmin) / (vmax - vmin)

    # Apply the colormap to the normalized array to get RGBA values
    rgba_image = colormap(norm_array)

    # Convert the RGBA image to RGB (ignoring the alpha channel)
    rgb_image = np.flipud((rgba_image * 255).astype(np.uint8).transpose(1, 0, 2))

    buffer = io.BytesIO()

    # Create a PIL image from the RGB array
    img = Image.fromarray(rgb_image)
    img.save(buffer, format="PNG")
    buffer.seek(0)  # Reset buffer position to the beginning

    return buffer

plot_h(s, p)

Show the relative 'height' of the grains in each cell. Used for diagnostic purposes only, otherwise not that useful.

Source code in HGD/plotter.py
def plot_h(s, p):
    """
    Show the relative 'height' of the grains in each cell. Used for diagnostic purposes only, otherwise not that useful.
    """
    plt.figure(fig)
    nu = 1 - np.mean(np.isnan(s), axis=2)
    h = nu / p.nu_cs

    plt.clf()
    ax = plt.gca()

    # Loop through each value in the 2D array and create a rectangle
    for i in range(h.shape[0]):
        for j in range(h.shape[1]):
            # Create a rectangle
            rect = plt.Rectangle((i, j), 1, h[i, j], color="k", alpha=1)

            # Add the rectangle to the plot
            ax.add_patch(rect)

    # Set the limits of the plot to fit all rectangles
    ax.set_xlim(0, h.shape[0])
    ax.set_ylim(0, h.shape[1])

    # Display the plot
    # plt.gca().invert_yaxis() # Optional: to invert the y-axis to match array indexing
    # plt.show()

    plt.axis("off")
    plt.xlim(0, h.shape[0])
    plt.ylim(0, h.shape[1])
    plt.subplots_adjust(left=0, right=1, bottom=0, top=1)
    plt.savefig(p.folderName + "h_" + str(p.tstep).zfill(6) + ".png", dpi=100)

plot_permeability(s, p)

Calculate and save the permeability of the domain at time t.

Source code in HGD/plotter.py
def plot_permeability(s, p):
    """
    Calculate and save the permeability of the domain at time t.
    """
    permeability = kozeny_carman(s)

    plt.figure(fig)
    plt.clf()
    plt.pcolormesh(p.x, p.y, permeability.T, cmap=inferno)
    plt.axis("off")
    plt.xlim(p.x[0], p.x[-1])
    plt.ylim(p.y[0], p.y[-1])
    plt.subplots_adjust(left=0, right=1, bottom=0, top=1)
    plt.savefig(p.folderName + "permeability_" + str(p.tstep).zfill(6) + ".png")

replace_strings(text, replacements)

Replaces substrings in a given text based on a dictionary of replacements.

Parameters: - text (str): The original string in which substrings will be replaced. - replacements (dict): A dictionary where keys are substrings to be replaced and values are the new substrings.

Returns: - str: The modified string with all specified replacements applied.

This function iterates over the key-value pairs in the replacements dictionary. For each pair, it replaces all occurrences of the key (old substring) in the text with the corresponding value (new substring). The function returns the modified text after all replacements have been made.

Source code in HGD/plotter.py
def replace_strings(text, replacements):
    """
    Replaces substrings in a given text based on a dictionary of replacements.

    Parameters:
    - text (str): The original string in which substrings will be replaced.
    - replacements (dict): A dictionary where keys are substrings to be replaced and values are the new substrings.

    Returns:
    - str: The modified string with all specified replacements applied.

    This function iterates over the key-value pairs in the replacements dictionary. For each pair, it replaces all occurrences of the key (old substring) in the text with the corresponding value (new substring). The function returns the modified text after all replacements have been made.
    """
    for old, new in replacements.items():
        text = text.replace(old, new)
    return text

Parameters

dict_to_class

A convenience class to store the information from the parameters dictionary. Used because I prefer using p.variable to p['variable'].

Source code in HGD/params.py
class dict_to_class:
    """
    A convenience class to store the information from the parameters dictionary. Used because I prefer using p.variable to p['variable'].
    """

    def __init__(self, dict: dict):
        list_keys: List[str] = []  # noqa
        lists: List[List] = []  # noqa
        for key in dict:
            setattr(self, key, dict[key])
            if isinstance(dict[key], list) and key not in [
                "save",
                "plot",
                "videos",
                "T_cycles",
                "boundaries",
                "masks",
                "cycles",
            ]:
                list_keys.append(key)
                lists.append(dict[key])
        setattr(self, "lists", lists)
        setattr(self, "list_keys", list_keys)

    def print_optimal_resolution(p):
        # NOTE: Well defined for monodisperse conditions only
        # Optimal resolution is when P_adv = 1 and P_diff=0.5.
        # Under these conditions we have that
        # dx = 2*alpha*s_bar

        # So for any given s_bar and alpha there is an optimal spatial resolution
        s_bar = (p.s_m + p.s_M) / 2.0  # mean diameter (m)
        dx = 2 * p.alpha * s_bar
        # dt = p.dx / np.sqrt(p.g * s_bar)

        ny = int(p.H / dx)
        print(f"Optimal condition: ny = {ny}.")

    def set_defaults(self):
        # get current directory
        current_dir = os.path.dirname(os.path.realpath(__file__))
        ## Now go up one directory and then into the json directory
        json_dir = os.path.join(current_dir, "..", "json")
        defaults_path = resource_path(f"{json_dir}/defaults.json5")
        with open(defaults_path, "r") as f:
            defaults_dict = json5.loads(f.read())

        for key in defaults_dict:
            if not hasattr(self, key):
                setattr(self, key, defaults_dict[key])

        if hasattr(self, "aspect_ratio_y"):
            if isinstance(self.aspect_ratio_y, list):
                self.ny = [int(self.nx * ar) for ar in self.aspect_ratio_y]
            elif isinstance(self.nx, list):
                self.ny = [int(nx * self.aspect_ratio_y) for nx in self.nx]
            else:
                self.ny = int(self.nx * self.aspect_ratio_y)
        elif hasattr(self, "aspect_ratio_x"):
            if isinstance(self.aspect_ratio_x, list):
                self.nx = [int(self.ny * ar) for ar in self.aspect_ratio_x]
            elif isinstance(self.ny, list):
                self.nx = [int(ny * self.aspect_ratio_x) for ny in self.ny]
            else:
                self.nx = int(self.ny * self.aspect_ratio_x)

        if hasattr(self, "aspect_ratio_m"):
            if isinstance(self.aspect_ratio_m, list):
                self.nm = [int(self.nx * ar) for ar in self.aspect_ratio_m]
            elif isinstance(self.nx, list):
                self.nm = [int(nx * self.aspect_ratio_m) for nx in self.nx]
            else:
                self.nm = int(self.nx * self.aspect_ratio_m)

        if self.gsd_mode == "optimal":
            self.y = np.linspace(0, self.H, self.ny)
            self.dy = self.y[1] - self.y[0]
            self.s_m = self.dy / self.alpha
            self.s_M = self.dy / self.alpha
            self.gsd_mode = "mono"

        if self.gsd_mode == "mono":
            if hasattr(self, "s_m") and hasattr(self, "s_M"):
                if not self.s_m == self.s_M:
                    print("WARNING: s_m and s_M must be equal for monodisperse grains. Setting both to s_m.")
                    self.s_M = self.s_m
            if hasattr(self, "s_m"):
                self.s_M = self.s_m
            if hasattr(self, "s_M"):
                self.s_m = self.s_M
            else:
                self.s_m = 1
                self.s_M = 1

        if hasattr(self, "cyclic_BC_y_angle"):
            self.cyclic_BC_y_offset = int(np.tan(np.radians(self.cyclic_BC_y_angle)) * self.nx)

        # user can define mu or repose_angle. If both defined, mu takes precedence.
        if hasattr(self, "mu"):
            self.repose_angle = np.degrees(np.arctan(self.mu))
        if hasattr(self, "repose_angle"):
            self.mu = np.tan(np.radians(self.repose_angle))

        with np.errstate(divide="ignore", invalid="ignore"):
            inv_mu = np.nan_to_num(1.0 / self.mu, nan=0.0, posinf=1e30, neginf=0.0)
        self.delta_limit = self.nu_cs / (inv_mu + 1)

        if len(self.cycles) > 0:
            for cycle in self.cycles:
                cycle["completed"] = False
            self.n_cycles = len(self.cycles)

    def update_before_time_march(self, cycles):
        self.y = np.linspace(0, self.H, self.ny)
        self.dy = self.y[1] - self.y[0]
        self.x = np.arange(
            -(self.nx - 0.5) / 2 * self.dy, (self.nx - 0.5) / 2 * self.dy, self.dy
        )  # force equal grid spacing
        self.dx = self.x[1] - self.x[0]
        if not np.isclose(self.dx, self.dy):
            sys.exit(f"Fatal error: dx != dy. dx = {self.dx}, dy = {self.dy}")
        self.W = self.nx * self.dx
        self.X, self.Y = np.meshgrid(self.x, self.y, indexing="ij")

        if hasattr(self, "freefall_time"):
            self.t_f = 1.2 * self.H / np.sqrt(self.g * self.dy)

        self.y += self.dy / 2.0
        self.t = 0  # time (s)
        self.tstep = 0  # time step counter

        self.max_chi = 1  # Max chi is when all voids are swapping
        self.min_chi = 1.0 / (self.nx * self.ny * self.nm)  # Minimum chi (less than one cell moving)

        if hasattr(self, "outlet_width"):
            self.half_width = int(self.outlet_width / self.dx / 2)

        # self.t_p = self.s_m / np.sqrt(self.g * self.H)  # smallest confinement timescale (at bottom) (s)
        s_bar = (self.s_m + self.s_M) / 2.0  # mean diameter (m)
        if self.advection_model == "average_size":
            self.free_fall_velocity = np.sqrt(self.g * s_bar)  # typical speed to fall one mean diameter (m/s)
        elif self.advection_model == "freefall":
            self.free_fall_velocity = np.sqrt(
                self.g * self.dy
            )  # typical speed to fall one grid spacing (m/s)
        elif self.advection_model == "stress":
            max_pressure = self.solid_density * self.g * self.H
            self.free_fall_velocity = np.sqrt(
                2 * max_pressure / self.solid_density
            )  # confinement velocity (m/s)

        # if self.advection_model == "average_size":
        #     self.diffusivity = self.alpha * self.free_fall_velocity * s_bar  # diffusivity (m^2/s)
        # elif self.advection_model == "freefall":
        #     self.diffusivity = self.alpha * np.sqrt(2 * self.g * self.dy**3)

        safe = False
        stability = 0.5
        self.P_diff_weighting = 6 / (
            self.max_diff_swap_length * (self.max_diff_swap_length + 1) * (2 * self.max_diff_swap_length + 1)
        )

        if self.max_diff_swap_length > 1:
            print(f"Options for max swap length up to {self.max_diff_swap_length}")
            for n in range(1, self.max_diff_swap_length + 1):
                P_n = 6 / (n * (n + 1) * (2 * n + 1))
                P_ratio = self.alpha * self.s_M * P_n / self.dy
                print(f"n = {n}, n*P_diff/P_adv = {n*P_ratio}")

        while not safe:
            self.P_adv_ref = stability
            self.dt = self.P_adv_ref * self.dy / self.free_fall_velocity

            # self.P_diff_ref = self.alpha * self.P_adv_ref
            self.P_diff_max = (
                self.alpha
                * self.s_M
                * self.free_fall_velocity
                * self.dt
                / self.dy
                / self.dy
                * self.P_diff_weighting
            )

            self.P_adv_max = self.P_adv_ref * (self.s_M / self.s_m)
            # self.P_diff_max = self.P_diff_ref * (self.s_M / self.s_m)

            if self.P_adv_max + (2 * self.max_diff_swap_length) * self.P_diff_max <= self.P_stab:
                safe = True
            else:
                stability *= 0.95

        if self.t_f is None:
            self.nt = 0
        else:
            self.nt = int(np.ceil(self.t_f / self.dt))

        if hasattr(self, "saves"):
            if self.nt == 0:
                sys.exit("Fatal error: nt = 0. Cannot use `saves` parameter.")
            else:
                self.save_inc = int(self.nt / self.saves)

    def update_every_time_step(self, state):
        (
            s,
            u,
            v,
            c,
            T,
            last_swap,
            chi,
            sigma,
            outlet,
        ) = state

        if self.nu_cs_mode == "dynamic":
            # nu = operators.get_solid_fraction(s)
            sigma = stress.calculate_stress(s, None, self)
            pressure = stress.get_pressure(sigma, self)
            pressure_kPa = pressure / 1000
            nu_static = self.nu_1 * pressure_kPa ** (1 / self.lambda_nu)  # in kPa!
            # self.nu_cs = nu_static - chi * nu_static  # FIXME --- this is totally made up
            self.nu_cs = nu_static
            self.nu_cs[np.isnan(self.nu_cs)] = self.nu_1
            self.nu_cs[self.nu_cs < self.nu_1] = self.nu_1
        else:
            # self.nu_cs = np.full([self.nx, self.ny], self.nu_1)
            self.nu_cs = self.nu_1

    def process_charge_discharge_csv(self, array):
        self.cycles = []
        for row in array:
            cycle = {}
            cycle["mode"] = row[0]
            cycle["-45"] = float(row[1])
            cycle["-53"] = float(row[2])
            cycle["-76"] = float(row[3])
            cycle["-106"] = float(row[4])
            cycle["+150"] = float(row[5])
            cycle["mass"] = float(row[6])
            cycle["completed"] = False
            self.cycles.append(cycle)

resource_path(relative_path)

Get the absolute path to the resource, works for both development and PyInstaller bundle.

Source code in HGD/params.py
def resource_path(relative_path):
    """Get the absolute path to the resource, works for both development and PyInstaller bundle."""
    if hasattr(sys, "_MEIPASS"):
        return os.path.join(sys._MEIPASS, relative_path)
    return os.path.join(os.path.abspath("."), relative_path)

Stresses

solve_tridiagonal(a, b, c, d)

Solves a tridiagonal system using the Thomas algorithm. a: subdiagonal (length n-1) b: main diagonal (length n) c: superdiagonal (length n-1) d: right-hand side (length n)

Source code in HGD/stress.py
def solve_tridiagonal(a, b, c, d):
    """
    Solves a tridiagonal system using the Thomas algorithm.
    a: subdiagonal (length n-1)
    b: main diagonal (length n)
    c: superdiagonal (length n-1)
    d: right-hand side (length n)
    """
    n = len(d)
    c_prime = np.zeros(n - 1)
    d_prime = np.zeros(n)

    # Forward elimination
    c_prime[0] = c[0] / b[0]
    d_prime[0] = d[0] / b[0]
    for i in range(1, n):
        denom = b[i] - a[i - 1] * c_prime[i - 1]
        c_prime[i - 1] = c[i - 1] / denom
        d_prime[i] = (d[i] - a[i - 1] * d_prime[i - 1]) / denom

    # Back substitution
    x = np.zeros(n)
    x[-1] = d_prime[-1]
    for i in range(n - 2, -1, -1):
        x[i] = d_prime[i] - c_prime[i] * x[i + 1]

    return x

Thermal

update_temperature(s, T, p)

Used for modelling the diffusion of heat into the body. Still not functional. Do not use.

Source code in HGD/thermal.py
def update_temperature(s, T, p):
    """
    Used for modelling the diffusion of heat into the body. Still not functional. Do not use.
    """
    T[np.isnan(s)] = p.temperature["inlet_temperature"]  # HACK
    T[p.boundary] = p.temperature["boundary_temperature"]
    T_inc = np.zeros_like(T)
    T_inc[1:-1, 1:-1] = 1e-3 * (T[2:, 1:-1] + T[:-2, 1:-1] + T[1:-1, 2:] + T[1:-1, :-2] - 4 * T[1:-1, 1:-1])
    return T + T_inc