2D Transient simulation – checkpoint your work¶
Learning Goals¶
Write numerical state out to an npz file for later use
Write function and class definitions out to a python module for later import
Context¶
The 2D transient simulation is approach a level of complexity that makes keeping it all in a single notebook problematic. Here is a demonstration of how we can modify the original notebook to save our work for later use. We will export all of our functions and classes to a python module called build_2D_matrix.py, and export all of our numeric data (boundary conditions, initial concentration, etc.) to a numpy.npz file called savestate.npz
We will read these back into the 2D_assignment_restart.ipynb
notebook
[1]:
import matplotlib.cm as cmap
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import AxesGrid
from numpy.testing import assert_allclose
from scipy import special
Our functions¶
[2]:
# this function deals with harmonic averaging when diffusion is not the same everywhere.
# It doesn't change anything when diffusion is homogeneous but you can try to see how it affects the behavior.
def avg(Di, Dj):
"""
Computes the harmonic average between two values Di and Dj
Returns 0 if either of them is zero
"""
if (Di * Dj) == 0:
return 0
else:
return 2 / (1 / Di + 1 / Dj)
[3]:
def index_to_row_col(ind, nrows, ncol):
"""
in a 2D array, returns the row and column value
associated with a 1D index
Bottom left is index is zero (0-th row, 0-th column)
while index one is the 0-th row and 1st column
"""
if ind > nrows * ncol - 1:
return 0
row = int(np.floor(ind / ncol))
col = int(ind - row * ncol)
return row, col
[4]:
def build_2D_matrix(bc_dict, problem, D_matrix, Qsource):
"""
Constructs a coefficient matrix A and an array b corresponding to the system Ac = b
This system corresponds either to a 1D or 2D problem
Parameters
----------
bc_dict: dict
dictionary with Boundary_Def objects defining the boundary conditions
D_matrix: (float vector if 1D or matrix if 2D)
values of the diffusion coefficient at each grid point(dm^2/day)
if 2D, dimension is [problem.ny, problem.nx]
width_x: (float)
x-extent of the domain (dm)
width_y: (float)
y-extent of the domain (dm)
poro (float)
porosity value
Qsource: (float array)
volumetric source term (mg/L/day)
Returns
-------
the_matrix, rhs: tuple
where the_matrix=A and rhs =b
in the discretized diffusion problem
Ax=b
"""
number_of_rows = problem.ny
number_of_cols = problem.nx
n = problem.nx * problem.ny
is1D = False
if number_of_rows == 1 or number_of_cols == 1:
is1D = True
number_of_cols = n
the_matrix = np.zeros((n, n))
rhs = np.zeros(n)
if is1D:
dx = max(problem.wx, problem.wy) / (max(problem.ny, problem.nx) - 1)
coef_x = problem.poro / dx / dx
else:
dx = problem.wx / (problem.ny - 1)
dy = problem.wy / (problem.nx - 1)
coef_x = problem.poro / dx / dx
coef_y = problem.poro / dy / dy
for ind in range(n):
if is1D:
j = ind
i = -1
else:
i, j = index_to_row_col(ind, number_of_rows, number_of_cols)
if j == 0: # WEST BOUNDARY
if bc_dict["west"].btype == "const":
rhs[ind] = bc_dict["west"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind + 1] = -1
rhs[ind] = bc_dict["west"].val / dx
elif j == number_of_cols - 1: # EAST BOUNDARY
if bc_dict["east"].btype == "const":
rhs[ind] = bc_dict["east"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind - 1] = -1
rhs[ind] = bc_dict["east"].val / dx
elif i == 0 and problem.ny > 1: # SOUTH BOUNDARY
if bc_dict["south"].btype == "const":
rhs[ind] = bc_dict["south"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind + number_of_cols] = -1
rhs[ind] = bc_dict["south"].val / dy
elif i == number_of_rows - 1 and problem.ny > 1: # NORTH BOUNDARY
if bc_dict["north"].btype == "const":
rhs[ind] = bc_dict["west"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind - number_of_cols] = -1
rhs[ind] = bc_dict["north"].val / dy
else:
if is1D:
north = 0
south = 0
rhs[ind] = Qsource[ind]
east = coef_x * avg(D_matrix[ind + 1], D_matrix[ind])
west = coef_x * avg(D_matrix[ind - 1], D_matrix[ind])
else:
north = coef_y * avg(D_matrix[i, j], D_matrix[i + 1, j])
south = coef_y * avg(D_matrix[i, j], D_matrix[i - 1, j])
east = coef_x * avg(D_matrix[i, j], D_matrix[i, j + 1])
west = coef_x * avg(D_matrix[i, j], D_matrix[i, j - 1])
the_matrix[ind, ind + number_of_cols] = -north
the_matrix[ind, ind - number_of_cols] = -south
rhs[ind] = Qsource[i, j]
the_matrix[ind, ind] = east + west + north + south
the_matrix[ind, ind + 1] = -east
the_matrix[ind, ind - 1] = -west
return the_matrix, rhs
The boundary/problem classes¶
[5]:
class Boundary_Def:
"""
this class holds the boundary type btype ('flux' or 'const')
and the value of the boundary condition (derivitive of the concentration if 'flux'
value of the concentration if 'const')
"""
btype: str
val: float
def __init__(self, btype, val):
self.btype = btype
self.val = val
[6]:
class Problem_Def:
"""
this class holds the specifcation for the domain,
including the value of the porosity
"""
nx: int
ny: int
poro: float
wx: float
wy: float
def __init__(self, nx, ny, poro, wx, wy):
self.nx = nx
self.ny = ny
self.poro = poro
self.wx = wx
self.wy = wy
Create the boundary conditions¶
[7]:
# Here we create 4 boundaries, west has a constant concentration at c0, east has a constant boundary at 0;
c0 = 1 # mg/L
west = Boundary_Def("const", val=c0)
east = Boundary_Def("const", val=0)
# For 1D problem, the used boundaries are west and east.
# The other south and north boundaries have a zero flux (impermeable)
north = Boundary_Def("flux", val=0)
south = Boundary_Def("flux", val=0)
[8]:
bc_dict = {"west": west, "north": north, "east": east, "south": south}
# The latter array bc_dict will be sent to the different functions
Build the 2D matrix¶
[9]:
n_x = 51
n_y = 1 # 1D -- x only
Diff = 2e-9 * 100 * 24 * 3600 # dm²/day
width_x = 10 # dm
width_y = 0
n = n_x * n_y
x = np.linspace(0, width_x, n_x)
c_init = np.zeros(n_x)
c_init[0] = c0
D_matrix = Diff * np.ones(n)
poro = 0.4
prob = Problem_Def(n_x, n_y, poro, width_x, width_y)
Qsource = np.zeros(n)
A, b = build_2D_matrix(bc_dict, prob, D_matrix, Qsource)
Make a checkpoint¶
Save everything you need to reconstruct A and B outside of this notebook
use https://docs.scipy.org/doc/numpy/reference/generated/numpy.savez.html to dump the state to a file named “savestate.npz”
Save our work in an npz file¶
[10]:
out_name= 'savestate.npz'
np.savez(out_name,n_x=n_x,n_y=n_y,poro=poro,width_x=width_x,
width_y=width_y,Qsource=Qsource, c0=c0, c_init=c_init,Diff=Diff,
save_date="March 3, 2019",
case_name="1D simulation",
comment="simple x-only domain",
history="written by 2D_Assignment_Transient_Error_checkpoint.ipynb",
units=["Diff: dm²/day","length: dm","concentration: mg/L"])
print(f"writing the savez file {out_name}")
writing the savez file savestate.npz
Write functions and classes to a module file¶
The cell below saves the function and class definitions into a new module call build_2D_matrix.py
. It uses the special IPython “magic” function %%writefile build_2D_matrix.py
[11]:
%%writefile build_2D_matrix.py
"""
functions and classes for 2D matrix construction
"""
def index_to_row_col(ind, nrows, ncol):
"""
in a 2D array, returns the row and column value
associated with a 1D index
Bottom left is index is zero (0-th row, 0-th column)
while index one is the 0-th row and 1st column
"""
if ind > nrows * ncol - 1:
return 0
row = int(np.floor(ind / ncol))
col = int(ind - row * ncol)
return row, col
def avg(Di, Dj):
"""
Computes the harmonic average between two values Di and Dj
Returns 0 if either of them is zero
"""
if (Di * Dj) == 0:
return 0
else:
return 2 / (1 / Di + 1 / Dj)
class Boundary_Def:
"""
this class holds the boundary type btype ('flux' or 'const')
and the value of the boundary condition (derivitive of the concentration if 'flux'
value of the concentration if 'const')
"""
btype: str
val: float
def __init__(self, btype, val):
self.btype = btype
self.val = val
class Problem_Def:
"""
this class holds the specifcation for the domain,
including the value of the porosity
"""
nx: int
ny: int
poro: float
wx: float
wy: float
def __init__(self, nx, ny, poro, wx, wy):
self.nx = nx
self.ny = ny
self.poro = poro
self.wx = wx
self.wy = wy
def build_2D_matrix(bc_dict, problem, D_matrix, Qsource):
"""
Constructs a coefficient matrix A and an array b corresponding to the system Ac = b
This system corresponds either to a 1D or 2D problem
Parameters
----------
bc_dict: dict
dictionary with Boundary_Def objects defining the boundary conditions
D_matrix: (float vector if 1D or matrix if 2D)
values of the diffusion coefficient at each grid point(dm^2/day)
if 2D, dimension is [problem.ny, problem.nx]
width_x: (float)
x-extent of the domain (dm)
width_y: (float)
y-extent of the domain (dm)
poro (float)
porosity value
Qsource: (float array)
volumetric source term (mg/L/day)
Returns
-------
the_matrix, rhs: tuple
where the_matrix=A and rhs =b
in the discretized diffusion problem
Ax=b
"""
import numpy as np
number_of_rows = problem.ny
number_of_cols = problem.nx
n = problem.nx * problem.ny
is1D = False
if number_of_rows == 1 or number_of_cols == 1:
is1D = True
number_of_cols = n
the_matrix = np.zeros((n, n))
rhs = np.zeros(n)
if is1D:
dx = max(problem.wx, problem.wy) / (max(problem.ny, problem.nx) - 1)
coef_x = problem.poro / dx / dx
else:
dx = problem.wx / (problem.ny - 1)
dy = problem.wy / (problem.nx - 1)
coef_x = problem.poro / dx / dx
coef_y = problem.poro / dy / dy
for ind in range(n):
if is1D:
j = ind
i = -1
else:
i, j = index_to_row_col(ind, number_of_rows, number_of_cols)
if j == 0: # WEST BOUNDARY
if bc_dict["west"].btype == "const":
rhs[ind] = bc_dict["west"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind + 1] = -1
rhs[ind] = bc_dict["west"].val / dx
elif j == number_of_cols - 1: # EAST BOUNDARY
if bc_dict["east"].btype == "const":
rhs[ind] = bc_dict["east"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind - 1] = -1
rhs[ind] = bc_dict["east"].val / dx
elif i == 0 and problem.ny > 1: # SOUTH BOUNDARY
if bc_dict["south"].btype == "const":
rhs[ind] = bc_dict["south"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind + number_of_cols] = -1
rhs[ind] = bc_dict["south"].val / dy
elif i == number_of_rows - 1 and problem.ny > 1: # NORTH BOUNDARY
if bc_dict["north"].btype == "const":
rhs[ind] = bc_dict["west"].val
the_matrix[ind, ind] = 1
else: # flux boundary condition
the_matrix[ind, ind] = 1
the_matrix[ind, ind - number_of_cols] = -1
rhs[ind] = bc_dict["north"].val / dy
else:
if is1D:
north = 0
south = 0
rhs[ind] = Qsource[ind]
east = coef_x * avg(D_matrix[ind + 1], D_matrix[ind])
west = coef_x * avg(D_matrix[ind - 1], D_matrix[ind])
else:
north = coef_y * avg(D_matrix[i, j], D_matrix[i + 1, j])
south = coef_y * avg(D_matrix[i, j], D_matrix[i - 1, j])
east = coef_x * avg(D_matrix[i, j], D_matrix[i, j + 1])
west = coef_x * avg(D_matrix[i, j], D_matrix[i, j - 1])
the_matrix[ind, ind + number_of_cols] = -north
the_matrix[ind, ind - number_of_cols] = -south
rhs[ind] = Qsource[i, j]
the_matrix[ind, ind] = east + west + north + south
the_matrix[ind, ind + 1] = -east
the_matrix[ind, ind - 1] = -west
return the_matrix, rhs
Overwriting build_2D_matrix.py