2D Transient simulation – checkpoint your work

Learning Goals

  • Write numerical state out to an npz file for later use

  • Write function and class definitions out to a python module for later import

Context

The 2D transient simulation is approach a level of complexity that makes keeping it all in a single notebook problematic. Here is a demonstration of how we can modify the original notebook to save our work for later use. We will export all of our functions and classes to a python module called build_2D_matrix.py, and export all of our numeric data (boundary conditions, initial concentration, etc.) to a numpy.npz file called savestate.npz

We will read these back into the 2D_assignment_restart.ipynb notebook

[1]:
import matplotlib.cm as cmap
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import AxesGrid
from numpy.testing import assert_allclose
from scipy import special

Our functions

[2]:
# this function deals with harmonic averaging when diffusion is not the same everywhere.
# It doesn't change anything when diffusion is homogeneous but you can try to see how it affects the behavior.


def avg(Di, Dj):
    """
    Computes the harmonic average between two values Di and Dj
    Returns 0 if either of them is zero
    """
    if (Di * Dj) == 0:
        return 0
    else:
        return 2 / (1 / Di + 1 / Dj)
[3]:
def index_to_row_col(ind, nrows, ncol):
    """
    in a 2D array, returns the row and column value
    associated with a 1D index
    Bottom left is index is zero (0-th row, 0-th column)
    while index one is the 0-th row and 1st column
    """
    if ind > nrows * ncol - 1:
        return 0

    row = int(np.floor(ind / ncol))
    col = int(ind - row * ncol)
    return row, col
[4]:
def build_2D_matrix(bc_dict, problem, D_matrix, Qsource):
    """
    Constructs a coefficient matrix A and an array b corresponding to the system Ac = b
    This system corresponds either to a 1D or 2D problem

    Parameters
    ----------
    bc_dict: dict
       dictionary with Boundary_Def objects defining the boundary conditions
    D_matrix: (float vector if 1D or matrix if 2D)
        values of the diffusion coefficient at each grid point(dm^2/day)
        if 2D, dimension is [problem.ny, problem.nx]
    width_x: (float)
        x-extent of the domain (dm)
    width_y: (float)
        y-extent of the domain (dm)
    poro (float)
        porosity value
    Qsource: (float array)
      volumetric source term (mg/L/day)

    Returns
    -------

    the_matrix, rhs: tuple
       where the_matrix=A and rhs =b
       in the discretized diffusion problem
       Ax=b
    """
    number_of_rows = problem.ny
    number_of_cols = problem.nx
    n = problem.nx * problem.ny
    is1D = False
    if number_of_rows == 1 or number_of_cols == 1:
        is1D = True
        number_of_cols = n
    the_matrix = np.zeros((n, n))
    rhs = np.zeros(n)

    if is1D:
        dx = max(problem.wx, problem.wy) / (max(problem.ny, problem.nx) - 1)
        coef_x = problem.poro / dx / dx
    else:
        dx = problem.wx / (problem.ny - 1)
        dy = problem.wy / (problem.nx - 1)
        coef_x = problem.poro / dx / dx
        coef_y = problem.poro / dy / dy

    for ind in range(n):
        if is1D:
            j = ind
            i = -1
        else:
            i, j = index_to_row_col(ind, number_of_rows, number_of_cols)
        if j == 0:  # WEST BOUNDARY
            if bc_dict["west"].btype == "const":
                rhs[ind] = bc_dict["west"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind + 1] = -1
                rhs[ind] = bc_dict["west"].val / dx

        elif j == number_of_cols - 1:  # EAST BOUNDARY
            if bc_dict["east"].btype == "const":
                rhs[ind] = bc_dict["east"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind - 1] = -1
                rhs[ind] = bc_dict["east"].val / dx
        elif i == 0 and problem.ny > 1:  # SOUTH BOUNDARY
            if bc_dict["south"].btype == "const":
                rhs[ind] = bc_dict["south"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind + number_of_cols] = -1
                rhs[ind] = bc_dict["south"].val / dy

        elif i == number_of_rows - 1 and problem.ny > 1:  # NORTH BOUNDARY
            if bc_dict["north"].btype == "const":
                rhs[ind] = bc_dict["west"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind - number_of_cols] = -1
                rhs[ind] = bc_dict["north"].val / dy
        else:
            if is1D:
                north = 0
                south = 0
                rhs[ind] = Qsource[ind]
                east = coef_x * avg(D_matrix[ind + 1], D_matrix[ind])
                west = coef_x * avg(D_matrix[ind - 1], D_matrix[ind])
            else:
                north = coef_y * avg(D_matrix[i, j], D_matrix[i + 1, j])
                south = coef_y * avg(D_matrix[i, j], D_matrix[i - 1, j])
                east = coef_x * avg(D_matrix[i, j], D_matrix[i, j + 1])
                west = coef_x * avg(D_matrix[i, j], D_matrix[i, j - 1])
                the_matrix[ind, ind + number_of_cols] = -north
                the_matrix[ind, ind - number_of_cols] = -south
                rhs[ind] = Qsource[i, j]

            the_matrix[ind, ind] = east + west + north + south
            the_matrix[ind, ind + 1] = -east
            the_matrix[ind, ind - 1] = -west

    return the_matrix, rhs

The boundary/problem classes

[5]:
class Boundary_Def:
    """
    this class holds the boundary type btype ('flux' or 'const')
    and the value of the boundary condition (derivitive of the concentration if 'flux'
    value of the concentration if 'const')
    """

    btype: str
    val: float

    def __init__(self, btype, val):
        self.btype = btype
        self.val = val
[6]:
class Problem_Def:
    """
    this class holds the specifcation for the domain,
    including the value of the porosity
    """

    nx: int
    ny: int
    poro: float
    wx: float
    wy: float

    def __init__(self, nx, ny, poro, wx, wy):
        self.nx = nx
        self.ny = ny
        self.poro = poro
        self.wx = wx
        self.wy = wy

Create the boundary conditions

[7]:
# Here we create 4 boundaries, west has a constant concentration at c0, east has a constant boundary at 0;
c0 = 1  # mg/L
west = Boundary_Def("const", val=c0)
east = Boundary_Def("const", val=0)

# For 1D problem, the used boundaries are west and east.

# The other south and north boundaries have a zero flux (impermeable)

north = Boundary_Def("flux", val=0)
south = Boundary_Def("flux", val=0)
[8]:
bc_dict = {"west": west, "north": north, "east": east, "south": south}
# The latter array bc_dict will be sent to the different functions

Build the 2D matrix

[9]:
n_x = 51
n_y = 1 # 1D -- x only
Diff = 2e-9 * 100 * 24 * 3600  # dm²/day
width_x = 10  # dm
width_y = 0
n = n_x * n_y
x = np.linspace(0, width_x, n_x)
c_init = np.zeros(n_x)
c_init[0] = c0
D_matrix = Diff * np.ones(n)
poro = 0.4
prob = Problem_Def(n_x, n_y, poro, width_x, width_y)
Qsource = np.zeros(n)
A, b = build_2D_matrix(bc_dict, prob, D_matrix, Qsource)

Make a checkpoint

Save everything you need to reconstruct A and B outside of this notebook

use https://docs.scipy.org/doc/numpy/reference/generated/numpy.savez.html to dump the state to a file named “savestate.npz”

Save our work in an npz file

[10]:
out_name= 'savestate.npz'
np.savez(out_name,n_x=n_x,n_y=n_y,poro=poro,width_x=width_x,
        width_y=width_y,Qsource=Qsource, c0=c0, c_init=c_init,Diff=Diff,
        save_date="March 3, 2019",
        case_name="1D simulation",
        comment="simple x-only domain",
        history="written by 2D_Assignment_Transient_Error_checkpoint.ipynb",
        units=["Diff: dm²/day","length: dm","concentration: mg/L"])
print(f"writing the savez file {out_name}")
writing the savez file savestate.npz

Write functions and classes to a module file

The cell below saves the function and class definitions into a new module call build_2D_matrix.py. It uses the special IPython “magic” function %%writefile build_2D_matrix.py

[11]:
%%writefile build_2D_matrix.py
"""
functions and classes for 2D matrix construction
"""
def index_to_row_col(ind, nrows, ncol):
    """
    in a 2D array, returns the row and column value
    associated with a 1D index
    Bottom left is index is zero (0-th row, 0-th column)
    while index one is the 0-th row and 1st column
    """
    if ind > nrows * ncol - 1:
        return 0
    row = int(np.floor(ind / ncol))
    col = int(ind - row * ncol)
    return row, col

def avg(Di, Dj):
    """
    Computes the harmonic average between two values Di and Dj
    Returns 0 if either of them is zero
    """
    if (Di * Dj) == 0:
        return 0
    else:
        return 2 / (1 / Di + 1 / Dj)

class Boundary_Def:
    """
    this class holds the boundary type btype ('flux' or 'const')
    and the value of the boundary condition (derivitive of the concentration if 'flux'
    value of the concentration if 'const')
    """

    btype: str
    val: float

    def __init__(self, btype, val):
        self.btype = btype
        self.val = val

class Problem_Def:
    """
    this class holds the specifcation for the domain,
    including the value of the porosity
    """

    nx: int
    ny: int
    poro: float
    wx: float
    wy: float

    def __init__(self, nx, ny, poro, wx, wy):
        self.nx = nx
        self.ny = ny
        self.poro = poro
        self.wx = wx
        self.wy = wy

def build_2D_matrix(bc_dict, problem, D_matrix, Qsource):
    """
    Constructs a coefficient matrix A and an array b corresponding to the system Ac = b
    This system corresponds either to a 1D or 2D problem

    Parameters
    ----------
    bc_dict: dict
       dictionary with Boundary_Def objects defining the boundary conditions
    D_matrix: (float vector if 1D or matrix if 2D)
        values of the diffusion coefficient at each grid point(dm^2/day)
        if 2D, dimension is [problem.ny, problem.nx]
    width_x: (float)
        x-extent of the domain (dm)
    width_y: (float)
        y-extent of the domain (dm)
    poro (float)
        porosity value
    Qsource: (float array)
      volumetric source term (mg/L/day)

    Returns
    -------

    the_matrix, rhs: tuple
       where the_matrix=A and rhs =b
       in the discretized diffusion problem
       Ax=b
    """
    import numpy as np
    number_of_rows = problem.ny
    number_of_cols = problem.nx
    n = problem.nx * problem.ny
    is1D = False
    if number_of_rows == 1 or number_of_cols == 1:
        is1D = True
        number_of_cols = n
    the_matrix = np.zeros((n, n))
    rhs = np.zeros(n)

    if is1D:
        dx = max(problem.wx, problem.wy) / (max(problem.ny, problem.nx) - 1)
        coef_x = problem.poro / dx / dx
    else:
        dx = problem.wx / (problem.ny - 1)
        dy = problem.wy / (problem.nx - 1)
        coef_x = problem.poro / dx / dx
        coef_y = problem.poro / dy / dy

    for ind in range(n):
        if is1D:
            j = ind
            i = -1
        else:
            i, j = index_to_row_col(ind, number_of_rows, number_of_cols)
        if j == 0:  # WEST BOUNDARY
            if bc_dict["west"].btype == "const":
                rhs[ind] = bc_dict["west"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind + 1] = -1
                rhs[ind] = bc_dict["west"].val / dx

        elif j == number_of_cols - 1:  # EAST BOUNDARY
            if bc_dict["east"].btype == "const":
                rhs[ind] = bc_dict["east"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind - 1] = -1
                rhs[ind] = bc_dict["east"].val / dx
        elif i == 0 and problem.ny > 1:  # SOUTH BOUNDARY
            if bc_dict["south"].btype == "const":
                rhs[ind] = bc_dict["south"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind + number_of_cols] = -1
                rhs[ind] = bc_dict["south"].val / dy

        elif i == number_of_rows - 1 and problem.ny > 1:  # NORTH BOUNDARY
            if bc_dict["north"].btype == "const":
                rhs[ind] = bc_dict["west"].val
                the_matrix[ind, ind] = 1
            else:  # flux boundary condition
                the_matrix[ind, ind] = 1
                the_matrix[ind, ind - number_of_cols] = -1
                rhs[ind] = bc_dict["north"].val / dy
        else:
            if is1D:
                north = 0
                south = 0
                rhs[ind] = Qsource[ind]
                east = coef_x * avg(D_matrix[ind + 1], D_matrix[ind])
                west = coef_x * avg(D_matrix[ind - 1], D_matrix[ind])
            else:
                north = coef_y * avg(D_matrix[i, j], D_matrix[i + 1, j])
                south = coef_y * avg(D_matrix[i, j], D_matrix[i - 1, j])
                east = coef_x * avg(D_matrix[i, j], D_matrix[i, j + 1])
                west = coef_x * avg(D_matrix[i, j], D_matrix[i, j - 1])
                the_matrix[ind, ind + number_of_cols] = -north
                the_matrix[ind, ind - number_of_cols] = -south
                rhs[ind] = Qsource[i, j]

            the_matrix[ind, ind] = east + west + north + south
            the_matrix[ind, ind + 1] = -east
            the_matrix[ind, ind - 1] = -west

    return the_matrix, rhs
Overwriting build_2D_matrix.py