#!/usr/bin/python
import numpy as np
import pyopencl as cl
from .. import common as PPU
# ========== Globals
cl_program = None
oclu = None
# fmt: off
DEFAULT_dTip = np.array( [ 0.0 , 0.0 , -0.1 , 0.0 ], dtype=np.float32 )
DEFAULT_stiffness = np.array( [-0.03,-0.03, -0.03,-1.0 ], dtype=np.float32 )
DEFAULT_dpos0 = np.array( [ 0.0 , 0.0 , -4.0 , 4.0 ], dtype=np.float32 )
DEFAULT_relax_params = np.array( [ 0.5 , 0.1 , 0.02, 0.5 ], dtype=np.float32 )
# fmt: on
verbose = 0
# ========== Functions
[docs]
def init(env):
global cl_program
global oclu
cl_program = env.loadProgram(env.CL_PATH / "relax.cl")
oclu = env
[docs]
def mat3x3to4f(M):
a = np.zeros(4, dtype=np.float32)
a[0:3] = M[0]
b = np.zeros(4, dtype=np.float32)
b[0:3] = M[1]
c = np.zeros(4, dtype=np.float32)
c[0:3] = M[2]
return (a, b, c)
[docs]
def getInvCell(lvec):
cell = lvec[1:4, 0:3]
invCell = np.transpose(np.linalg.inv(cell))
if verbose > 0:
print(invCell)
return mat3x3to4f(invCell)
[docs]
def preparePoss(scan_dim, z0, start=(0.0, 0.0), end=(10.0, 10.0)):
ys = np.linspace(start[0], end[0], scan_dim[0])
xs = np.linspace(start[1], end[1], scan_dim[1])
Xs, Ys = np.meshgrid(xs, ys)
poss = np.zeros(Xs.shape + (4,), dtype=np.float32)
poss[:, :, 0] = Ys
poss[:, :, 1] = Xs
poss[:, :, 2] = z0
return poss
[docs]
def preparePossRot(scan_dim, pos0, avec, bvec, start=(-5.0, -5.0), end=(5.0, 5.0)):
xs = np.linspace(start[0], end[0], scan_dim[0])
ys = np.linspace(start[1], end[1], scan_dim[1])
As, Bs = np.meshgrid(xs, ys, indexing="ij")
poss = np.zeros(As.shape + (4,), dtype=np.float32)
poss[:, :, 0] = pos0[0] + As * avec[0] + Bs * bvec[0]
poss[:, :, 1] = pos0[1] + As * avec[1] + Bs * bvec[1]
poss[:, :, 2] = pos0[2] + As * avec[2] + Bs * bvec[2]
return poss
[docs]
def rotTip(rot, zstep, tipR0=[0.0, 0.0, 4.0]):
dTip = np.zeros(4, dtype=np.float32)
dTip[:3] = rot[2] * -zstep
tipRot = mat3x3to4f(rot)
tipRot[2][3] = -zstep
dpos0Tip = np.zeros(4, dtype=np.float32)
dpos0Tip[0] = tipR0[0]
dpos0Tip[1] = tipR0[1]
dpos0Tip[2] = -np.sqrt(tipR0[2] ** 2 - tipR0[0] ** 2 - tipR0[1] ** 2)
dpos0Tip[3] = tipR0[2]
dpos0 = np.zeros(4, dtype=np.float32)
dpos0[:3] = np.dot(rot.transpose(), dpos0Tip[:3])
dpos0[3] = tipR0[2]
return dTip, tipRot, dpos0Tip, dpos0
## ============= Relax Class:
[docs]
class RelaxedScanner:
verbose = 0
def __init__(self):
self.queue = oclu.queue
self.ctx = oclu.ctx
self.stiffness = DEFAULT_stiffness.copy()
self.relax_params = DEFAULT_relax_params.copy()
self.zstep = 0.1 # step of tip approach in scan [Angstroem]
self.start = (-5.0, -5.0) # scan region start
self.end = (5.0, 5.0) # scan region end
self.tipR0 = 4.0 # equlibirum distance of ProbeParticle form ancoring point
self.surfFF = np.zeros(4, dtype=np.float32)
self.cl_atoms = None
self.cl_zMap = None
self.cl_feMap = None
[docs]
def updateFEin(self, FEin_cl, bFinish=False):
if verbose > 0:
print(" updateFEin ", FEin_cl, self.cl_ImgIn, self.FEin_shape)
if bFinish:
self.queue.finish()
cl.enqueue_copy(queue=self.queue, src=FEin_cl, dest=self.cl_ImgIn, offset=0, origin=(0, 0, 0), region=self.FEin_shape[:3])
if bFinish:
self.queue.finish()
self.FEin_cl = FEin_cl
[docs]
def updateAtoms(self, atoms):
if self.cl_atoms:
self.cl_atoms.release()
self.nAtoms = np.int32(len(atoms))
self.cl_atoms = cl.Buffer(self.ctx, cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR, hostbuf=atoms)
[docs]
def prepareAuxMapBuffers(self, bZMap=False, bFEmap=False, atoms=None):
nbytes = 0
mf = cl.mem_flags
fsize = np.dtype(np.float32).itemsize
nxy = self.scan_dim[0] * self.scan_dim[1]
if bZMap:
if self.cl_zMap:
self.cl_zMap.release()
self.cl_zMap = cl.Buffer(self.ctx, mf.WRITE_ONLY, nxy * fsize)
nbytes += nxy * fsize
if bFEmap:
if self.cl_feMap:
self.cl_feMap.release()
self.cl_feMap = cl.Buffer(self.ctx, mf.WRITE_ONLY, nxy * fsize * 4)
nbytes += nxy * fsize * 4
if atoms is not None:
if self.cl_atoms:
self.cl_atoms.release()
self.nAtoms = np.int32(len(atoms))
self.cl_atoms = cl.Buffer(self.ctx, cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR, hostbuf=atoms)
nbytes += atoms.nbytes
if self.verbose > 0:
print("prepareAuxMapBuffers.nbytes: ", nbytes)
[docs]
def prepareBuffers(self, FEin_np=None, lvec=None, FEin_cl=None, FEin_shape=None, scan_dim=None, nDimConv=None, nDimConvOut=None, bZMap=False, bFEmap=False, atoms=None):
nbytes = 0
mf = cl.mem_flags
if lvec is not None:
self.lvec = lvec
self.invCell = getInvCell(lvec)
if FEin_np is not None:
self.cl_ImgIn = cl.image_from_array(self.ctx, FEin_np, num_channels=4, mode="r")
nbytes += FEin_np.nbytes # TODO make this re-uploadable
if self.verbose > 0:
print("prepareBuffers made self.cl_ImgIn ", self.cl_ImgIn)
else:
if FEin_shape is not None:
self.FEin_shape = FEin_shape
self.image_format = cl.ImageFormat(cl.channel_order.RGBA, cl.channel_type.FLOAT)
self.cl_ImgIn = cl.Image(self.ctx, mf.READ_ONLY, self.image_format, shape=FEin_shape[:3], pitches=None, hostbuf=None, is_array=False, buffer=None)
if self.verbose > 0:
print("prepareBuffers made self.cl_ImgIn ", self.cl_ImgIn)
if FEin_cl is not None:
self.updateFEin(FEin_cl)
self.FEin_cl = FEin_cl
# see: https://stackoverflow.com/questions/39533635/pyopencl-3d-rgba-image-from-numpy-array
if scan_dim is not None:
self.scan_dim = tuple(scan_dim)
fsize = np.dtype(np.float32).itemsize
f4size = fsize * 4
nxy = self.scan_dim[0] * self.scan_dim[1]
bsz = f4size * nxy
self.cl_poss = cl.Buffer(self.ctx, mf.READ_ONLY, bsz)
nbytes += bsz # float4
self.cl_FEout = cl.Buffer(self.ctx, mf.READ_WRITE, bsz * self.scan_dim[2])
nbytes += bsz * self.scan_dim[2]
self.cl_paths = cl.Buffer(self.ctx, mf.READ_WRITE, bsz * self.scan_dim[2])
nbytes += bsz * self.scan_dim[2]
if nDimConv is not None:
self.nDimConv = nDimConv
self.nDimConvOut = nDimConvOut
self.cl_FEconv = cl.Buffer(self.ctx, mf.WRITE_ONLY, bsz * self.nDimConvOut)
nbytes += bsz * self.nDimConvOut
self.cl_WZconv = cl.Buffer(self.ctx, mf.READ_ONLY, fsize * self.nDimConv)
nbytes += fsize * self.nDimConv
self.FEconv = self.prepareFEConv()
if bZMap:
self.cl_zMap = cl.Buffer(self.ctx, mf.WRITE_ONLY, nxy * fsize)
nbytes += nxy * fsize
if bFEmap:
self.cl_feMap = cl.Buffer(self.ctx, mf.WRITE_ONLY, nxy * fsize * 4)
nbytes += nxy * fsize * 4
if atoms is not None:
self.updateAtoms(atoms)
nbytes += atoms.nbytes
if self.verbose > 0:
print("prepareBuffers.nbytes: ", nbytes)
[docs]
def releaseBuffers(self):
if self.verbose > 0:
print("tryReleaseBuffers self.cl_ImgIn ", self.cl_ImgIn)
self.cl_ImgIn.release()
self.cl_poss.release()
self.cl_FEout.release()
if self.cl_zMap is not None:
self.cl_zMap.release()
if self.cl_feMap is not None:
self.cl_feMap.release()
if self.cl_atoms is not None:
self.cl_atoms.release()
[docs]
def tryReleaseBuffers(self):
if self.verbose > 0:
print("tryReleaseBuffers self.cl_ImgIn ", self.cl_ImgIn)
try:
self.cl_ImgIn.release()
except:
pass
try:
self.cl_poss.release()
except:
pass
try:
self.cl_FEout.release()
except:
pass
try:
self.cl_zMap.release()
except:
pass
try:
self.cl_feMap.release()
except:
pass
try:
self.cl_atoms.release()
except:
pass
[docs]
def prepareFEConv(self):
return np.empty(self.scan_dim[:2] + (self.nDimConvOut, 4), dtype=np.float32)
[docs]
def preparePosBasis(self, start=(-5.0, -5.0), end=(5.0, 5.0)):
self.start = start
self.end = end
self.xs = np.linspace(start[0], end[0], self.scan_dim[0])
self.ys = np.linspace(start[1], end[1], self.scan_dim[1])
self.As, self.Bs = np.meshgrid(self.xs, self.ys, indexing="ij")
self.poss = np.zeros(self.As.shape + (4,), dtype=np.float32)
[docs]
def preparePossRot(self, pos0, avec, bvec):
self.poss[:, :, 0] = pos0[0] + self.As * avec[0] + self.Bs * bvec[0]
self.poss[:, :, 1] = pos0[1] + self.As * avec[1] + self.Bs * bvec[1]
self.poss[:, :, 2] = pos0[2] + self.As * avec[2] + self.Bs * bvec[2]
return self.poss
[docs]
def setScanRot(self, pos0, rot=None, zstep=None, tipR0=[0.0, 0.0, 4.0]):
if rot is None:
rot = np.eye(3)
if zstep:
self.zstep = zstep
self.dTip, self.tipRot, self.dpos0Tip, self.dpos0 = rotTip(rot, self.zstep, tipR0)
poss = self.preparePossRot(pos0, rot[0], rot[1])
cl.enqueue_copy(self.queue, self.cl_poss, poss)
return poss
[docs]
def updateBuffers(self, FEin=None, lvec=None, WZconv=None):
if lvec is not None:
self.invCell = getInvCell(lvec)
if FEin is not None:
region = FEin.shape[:3]
region = region[::-1]
if self.verbose > 0:
print("region : ", region)
cl.enqueue_copy(self.queue, self.cl_ImgIn, FEin, origin=(0, 0, 0), region=region)
if WZconv is not None:
cl.enqueue_copy(self.queue, self.cl_WZconv, WZconv)
[docs]
def downloadPaths(self):
"""
Get probe particle path array from device.
Returns:
paths: np.ndarray of shape scan_dim + (3,). xyz positions of probe particle at all scan points.
"""
# Make numpy array. Last axis is bigger by one because OCL aligns to multiples of 4 floats.
paths = np.empty(self.scan_dim + (4,), dtype=np.float32, order="C")
if self.verbose:
print("paths.shape ", paths.shape)
# Copy from device to host
cl.enqueue_copy(self.queue, paths, self.cl_paths)
self.queue.finish()
# Get rid of extra column
paths = paths[:, :, :, :3]
# Add origin because the OCL kernel does not take it into account
paths += self.lvec[0]
return paths
[docs]
def run(self, FEout=None, FEin=None, lvec=None, nz=None):
"""
calculate force on relaxing probe particle approaching from top (z-direction)
"""
if nz is None:
nz = self.scan_dim[2]
self.updateBuffers(FEin=FEin, lvec=lvec)
if FEout is None:
FEout = np.empty(self.scan_dim + (4,), dtype=np.float32)
# fmt: off
cl_program.relaxStrokes(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_ImgIn,
self.cl_poss,
self.cl_FEout,
self.invCell[0],
self.invCell[1],
self.invCell[2],
self.dTip,
self.stiffness,
self.dpos0,
self.relax_params,
np.int32(nz)
)
# fmt: on
cl.enqueue_copy(self.queue, FEout, self.cl_FEout)
self.queue.finish()
return FEout
[docs]
def run_relaxStrokesTilted(self, FEout=None, FEin=None, lvec=None, nz=None, bCopy=True, bFinish=True):
"""
calculate force on relaxing probe particle approaching from particular direction
"""
if nz is None:
nz = self.scan_dim[2]
if bCopy and (FEout is None):
FEout = np.empty(self.scan_dim + (4,), dtype=np.float32)
self.updateBuffers(FEin=FEin, lvec=lvec)
# fmt: off
cl_program.relaxStrokesTilted(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_ImgIn,
self.cl_poss,
self.cl_FEout,
self.cl_paths,
self.invCell[0],
self.invCell[1],
self.invCell[2],
self.tipRot[0],
self.tipRot[1],
self.tipRot[2],
self.stiffness,
self.dpos0Tip,
self.relax_params,
self.surfFF,
np.int32(nz)
)
# fmt: on
if bCopy:
cl.enqueue_copy(self.queue, FEout, self.cl_FEout)
if bFinish:
self.queue.finish()
return FEout
[docs]
def run_relaxStrokesTilted_convZ(self, FEconv=None, FEin=None, lvec=None, nz=None):
"""
calculate force on relaxing probe particle approaching from particular direction
"""
if nz is None:
nz = self.scan_dim[2]
if FEconv is None:
if self.FEconv is not None:
FEconv = self.FEconv
else:
FEconv = self.prepareFEConv()
self.updateBuffers(FEin=FEin, lvec=lvec)
# fmt: off
cl_program.relaxStrokesTilted_convZ(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_ImgIn,
self.cl_poss,
self.cl_WZconv,
self.cl_FEconv,
self.invCell[0],
self.invCell[1],
self.invCell[2],
self.tipRot[0],
self.tipRot[1],
self.tipRot[2],
self.stiffness,
self.dpos0Tip,
self.relax_params,
self.surfFF,
np.int32(nz), np.int32(self.nDimConvOut),
)
# fmt: on
cl.enqueue_copy(self.queue, FEconv, self.cl_FEconv)
self.queue.finish()
return FEconv
[docs]
def run_getFEinStrokes(self, FEout=None, FEconv=None, FEin=None, lvec=None, nz=None, WZconv=None, bDoConv=False):
"""
un-relaxed sampling of FE values from input Force-field (cl_ImgIn) store to cl_FEout
"""
if nz is None:
nz = self.scan_dim[2]
self.updateBuffers(FEin=FEin, lvec=lvec, WZconv=WZconv)
# fmt: off
cl_program.getFEinStrokes(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_ImgIn,
self.cl_poss,
self.cl_FEout,
self.invCell[0],
self.invCell[1],
self.invCell[2],
self.dTip,
self.dpos0,
np.int32(nz)
)
# fmt: on
if bDoConv:
# This function is missing. Maybe this should be self.run_convolveZ?
FEout = runZConv(self, FEconv=FEconv, nz=nz)
else:
if FEout is None:
FEout = np.empty(self.scan_dim + (4,), dtype=np.float32)
cl.enqueue_copy(self.queue, FEout, self.cl_FEout)
self.queue.finish()
return FEout
[docs]
def run_getFEinStrokesTilted(self, FEout=None, FEin=None, lvec=None, nz=None):
"""
un-relaxed sampling of FE values from input Force-field (cl_ImgIn) store to cl_FEout
operates in coordinates rotated by tipRot
"""
if nz is None:
nz = self.scan_dim[2]
self.updateBuffers(FEin=FEin, lvec=lvec)
# fmt: off
cl_program.getFEinStrokesTilted(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_ImgIn,
self.cl_poss,
self.cl_FEout,
self.invCell[0],
self.invCell[1],
self.invCell[2],
self.tipRot[0],
self.tipRot[1],
self.tipRot[2],
self.dTip,
self.dpos0,
np.int32(nz)
)
# fmt: on
cl.enqueue_copy(self.queue, FEout, self.cl_FEout)
self.queue.finish()
return FEout
[docs]
def run_convolveZ(self, FEconv=None, nz=None):
"""
convolve 3D forcefield in FEout with 1D weight mask WZconv
"""
if nz is None:
nz = self.scan_dim[2]
if FEconv is None:
if self.FEconv is not None:
FEconv = self.FEconv
else:
FEconv = self.prepareFEConv()
# fmt: off
cl_program.convolveZ(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_FEout,
self.cl_FEconv,
self.cl_WZconv,
np.int32(nz), np.int32(self.nDimConvOut)
)
# fmt: on
cl.enqueue_copy(self.queue, FEconv, self.cl_FEconv)
self.queue.finish()
return FEconv
[docs]
def run_izoZ(self, zMap=None, iso=0.0, nz=None):
"""
get isosurface of input 3D field from top (z)
used to generate HeightMap
if cl_FEout is Forcefield it takes "z" where ( F(z) > iso )
"""
if nz is None:
nz = self.scan_dim[2]
if zMap is None:
zMap = np.empty(self.scan_dim[:2], dtype=np.float32)
# fmt: off
cl_program.izoZ( self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), None,
self.cl_FEout,
self.cl_zMap,
np.int32(nz), np.float32(iso)
)
# fmt: on
cl.enqueue_copy(self.queue, zMap, self.cl_zMap)
self.queue.finish()
return zMap
[docs]
def run_getZisoTilted(self, zMap=None, iso=0.0, nz=None):
"""
get isosurface of input 3D field from given direction
used to generate HeightMap
operates in coordinates rotated by tipRot
"""
if nz is None:
nz = self.scan_dim[2]
if zMap is None:
zMap = np.empty(self.scan_dim[:2], dtype=np.float32)
local_size = (1,)
# fmt: off
cl_program.getZisoTilted(self.queue, ( int(self.scan_dim[0]*self.scan_dim[1]),), local_size,
self.cl_ImgIn,
self.cl_poss,
self.cl_zMap,
self.invCell[0],
self.invCell[1],
self.invCell[2],
self.tipRot[0],
self.tipRot[1],
self.tipRot[2],
self.dTip,
self.dpos0,
np.int32(nz), np.float32( iso )
)
# fmt: on
cl.enqueue_copy(self.queue, zMap, self.cl_zMap)
self.queue.finish()
return zMap
[docs]
def run_getZisoFETilted(self, zMap=None, feMap=None, iso=0.0, nz=None):
"""
get isosurface of input 3D field from given direction
get map of 3D volume FE (e.g. electrostatic field) maped on 2D isosurface
used to generate ElectrostaticMap
returns zMap, feMap
operates in coordinates rotated by tipRot
"""
if self.cl_atoms is None:
raise ValueError("Atoms must be set before calculating the Electrostatic Map")
if nz is None:
nz = self.scan_dim[2]
if zMap is None:
zMap = np.empty(self.scan_dim[:2], dtype=np.float32)
if feMap is None:
feMap = np.empty(self.scan_dim[:2] + (4,), dtype=np.float32)
local_size = (1,)
# fmt: off
cl_program.getZisoFETilted(self.queue, ( np.int32(self.scan_dim[0]*self.scan_dim[1]),), local_size,
self.cl_ImgIn,
self.cl_poss,
self.cl_zMap,
self.cl_feMap,
self.nAtoms,
self.cl_atoms,
self.invCell[0], self.invCell[1], self.invCell[2],
self.tipRot[0], self.tipRot[1], self.tipRot[2],
self.dTip,
self.dpos0,
np.int32(nz), np.float32( iso )
)
# fmt: on
cl.enqueue_copy(self.queue, zMap, self.cl_zMap)
cl.enqueue_copy(self.queue, feMap, self.cl_feMap)
self.queue.finish()
return zMap, feMap