from Energy import rhfenergy, penalty_inverse
from scipy.optimize import optimize as opt
from Dipole import dipolemoment
from Minimize import minimize
from Molecule import Getbasis,Getgeom,System_mol
import numpy as np
import time
import algopy
from algopy import UTPM, zeros
'''
This module contain manages all tasks:
-Single point calculations.
-Optimizations.
-Gradients.
'''
[docs]class Tasks(object):
'''This class manage the implemented tasks over a system included
in DiffiQult.
Attributes:
sys : System_mol object
Contains the basis functions, geometry and information of the a molecular system.
name : string
An id of the task, it is used as prefix for outputs.
verbose: bool
It defines the output and screen options.
True it prints in screen all details and a file "name.out".
status: bool
Keep track of the success of the different task.
True the SCF and/or the optimization converged.
'''
[docs] def __init__(self,mol,name,verbose=False):
'''
Initialize task object that contains the inital parameters.
Parameters:
mol : System_mol object
Contains the basis functions, geometry and information of the a molecular system.
name : string
An id of the task, it is used as prefix for outputs.
Options:
verbose : bool
It defines the output and screen options
True it prints in screen all details and a file "name.out"
status: bool
Keep track of the success of the different task.
True the SCF and/or the optimization converged.
'''
self.name = name
self.sys = mol
self.verbose = verbose
self.status = True
if verbose:
self.tape = open(name+'.out',"w")
self._printheader()
self._select_task ={
"Energy": self.energy,
"Opt": self.optimization,
"Grad": self._energy_gradss,
}
self.select_method ={
'BFGS': self._BFGS,
}
self.ntask = 0
return
def _printheader(self):
"""This function prints the header of the outputfile"""
self.tape.write(' *************************************************\n')
self.tape.write(' DiffiQult Ago 2017\n')
self.tape.write(' Author: Teresa Tamayo Mendoza \n')
self.tape.write(' *************************************************\n\n')
localtime = time.asctime( time.localtime(time.time()) )
self.tape.write(" Starting at %s\n"%localtime)
def _printtail(self):
localtime = time.asctime( time.localtime(time.time()) )
self.tape.write(" Finishing at %s \n"%localtime)
self.tape.write(' *************************************************\n\n')
return
def _energy_gradss(self,argnum,max_scf=301,max_d=300,printguess=None,name='Output.molden',output=False,order='first'):
"""This function returns the gradient of args"""
## For the moment it retuns a value at a time
## This is used only by testing functions.
eigen = True
rguess = False
args=[np.log(self.sys.alpha),self.sys.coef,self.sys.xyz,self.sys.l,self.sys.charges,self.sys.atom,self.sys.natoms,self.sys.nbasis,
self.sys.list_contr,self.sys.ne,
max_scf,max_d,log,eigen,None,None,
name,output,self.sys.alpha] # Last term is only used for Algopy
if self.verbose:
self.tape.write(' \n Grad point ...\n')
self.tape.write(' ---Start--- \n')
self.tape.write(' Initial parameters \n')
self.tape.write(' Maximum number of SCF: %d\n'%max_scf)
self.tape.write(' Default SCF tolerance: %f\n'%1e-8)
self.tape.write(' Initial density matrix: %s\n'%str(rguess))
self.sys.printcurrentgeombasis(self.tape)
grad_fun =[]
for i in argnum:
var = UTPM.init_jacobian(args[i])
diff_args = list(args) # We are making a copy of args
diff_args[i] = var
diff_args[-1]= var
t0 = time.clock()
grad = UTPM.extract_jacobian(rhfenergy(*(diff_args)))
timer = time.clock() - t0
self.sys.grad = grad
self.tape.write(' ---End--- \n')
self.tape.write(' Time %3.7f :\n'%timer)
return grad
def _singlepoint(self,max_scf=300,max_d=300,printcoef=False,name='Output.molden',output=False):
"""
This function calculates a single point energy
max_scf -> Maximum number of SCF cycles
max_d -> Maximum cycles of iterations if cannonical purification
"""
log = True # We are not using logarithms of alphas
eigen = True # We are using diagonalizations
readguess = False #By now, we are stating the densisty matrix from scratch
rguess = None
if printcoef:
pguess = name+'.npy'
else:
pguess = None
args=[np.log(self.sys.alpha),self.sys.coef,self.sys.xyz,self.sys.l,self.sys.charges,self.sys.atom,self.sys.natoms,self.sys.nbasis,
self.sys.list_contr,self.sys.ne,
max_scf,max_d,log,eigen,pguess,rguess,
name,output,self.sys.alpha] # Last term is only used for Algopy
if self.verbose:
self.tape.write(' \n Single point ...\n')
self.tape.write(' ---Start--- \n')
self.tape.write(' Initial parameters \n')
self.tape.write(' Maximum number of SCF: %d\n'%max_scf)
self.tape.write(' Default SCF tolerance: %f\n'%1e-8)
self.tape.write(' Initial density matrix: %s\n'%str(rguess))
self.sys.printcurrentgeombasis(self.tape)
# Function
t0 = time.clock()
ene = rhfenergy(*(args))
timer = time.clock() - t0
self.energy = ene
if self.verbose:
self.tape.write(' ---End--- \n')
self.tape.write(' Time %3.7f :\n'%timer)
if (ene == 99999):
if self.verbose:
self.tape.write(' SCF did not converged :( !!\n')
print(' SCF did not converged :( !! %s\n')
self.status = False
else:
if self.verbose:
self.tape.write(' SCF converged!!\n')
self.tape.write(' Energy: %3.7f \n'%ene)
if pguess != None:
self.tape.write(' Coefficients in file: %s\n'%pguess)
print(' SCF converged!!')
print(' Energy: %3.7f'%ene)
if output:
if self.verbose:
self.tape.write(' Result in file: %s\n'%name)
else:
print(' Result in file: %s\n'%name)
return ene
def _BFGS(self,ene_function,grad_fun,args,argnums,log,name,**kwargs):
""" This function use the BFGS method implemented intialially in scipy to perform the optimization """
print('Minimizing BFGS ...')
G = False
var = [args[i] for i in argnums] ## Arguments to optimize
for i in reversed(argnums):
del(args[i]) ## Getting the rest of them
if log and 0 in argnums:
tol = 1e-05 #It basically depends if we are using log or not for alpha
else:
tol = 1e-07
terms = minimize(ene_function,var,
args=tuple(args),
argnum=argnums,
method='BFGS',jac=grad_fun,gtol=tol,name=name,options={'disp': True},**kwargs)
return terms
def _algo_gradfun(self,function,args,argnum):
"""This function returns a list with functions that extracts the gradient of the values
defined by args, args has the position of the inputs of energy
"""
grad_fun =[]
def function_builder(narg):
def algo_jaco(*args, **kwargs):
var = UTPM.init_jacobian(args[narg])
diff_args = list(args) # We are making a copy of args
diff_args[narg] = var
diff_args[-1]= var
diff_args = tuple(diff_args)
return UTPM.extract_jacobian(rhfenergy(*(diff_args)))
return algo_jaco
for i in argnum:
grad_fun.append(function_builder(i))
return grad_fun
def _algo_hessfun(self,function,args,argnum):
'''This function returns a list with functions that extracts the hessian of the values
defined by args, args has the position of the inputs of energy '''
grad_fun =[]
def function_builder(narg):
def algo_jaco(*args, **kwargs):
var = UTPM.init_hessian(args[narg])
diff_args = list(args) # We are making a copy of args
diff_args[narg] = var
diff_args[-1]= var
diff_args = tuple(diff_args)
return UTPM.extract_hessian(rhfenergy(*(diff_args)))
return algo_jaco
for i in argnum:
grad_fun.append(function_builder(i))
return grad_fun
def _optupdateparam(self,argnum,x):
### HARD CODED (ONLY WORKS WITH ALPHA AND XYZ)
cont = 0
for i in argnum:
if i == 0:
self.sys.alpha = np.exp(x[cont:cont+len(self.sys.alpha)])
cont += len(self.sys.alpha)
elif i == 2:
self.sys.xyz = x[cont:cont+self.sys.nbasis*3].reshape(self.sys.nbasis,3)
cont += self.sys.nbasis*3
elif i == 1:
self.sys.coef = x[cont:cont+len(self.sys.alpha)]
cont += self.sys.alpha
else:
raise NotImplementedError("Optimization is just recticted to contraction coefficients, exponents and Gaussian centers ")
return
def _optprintres(self,res,timer):
self.tape.write(' ---End--- \n')
self.tape.write(' Time %3.7f :\n'%timer)
self.tape.write(' Message: %s\n'%res.message)
self.tape.write(' Current energy: %f\n'%res.fun)
self.tape.write(' Current gradient % f \n'%np.linalg.norm(res.jac,ord=np.inf))
self.tape.write(' Number of iterations %d \n'%res.nit)
self.sys.printcurrentgeombasis(self.tape)
def _optprintparam(self,max_scf,rguess):
self.tape.write(' Initial parameters \n')
self.tape.write(' Maximum number of SCF: %d\n'%max_scf)
self.tape.write(' Default SCF tolerance: %f\n'%1e-8)
self.tape.write(' Initial density matrix: %s\n'%str(rguess))
self.tape.write(' Maximum number of optimization steps: (FIX ME)%d\n'%30)
self.tape.write(' Tolerance in jac infinity norm (Hardcoded): %f \n'%1e-1)
self.tape.write(' Tolerance in energy (Hardcoded): %f \n'%1e-5)
self.sys.printcurrentgeombasis(self.tape)
def _optimization(self,max_scf=100,log=True,scf=True,readguess=None,argnum=[0],taskname='Output', method='BFGS',penalize=None,**otherargs):
name=taskname
record = False
maxsteps = 100
rhf_old = 1000
grad_fun = []
lbda = 1.0
rguess = None
if readguess:
pguess = name +'.npy'
out = False
ene = self._singlepoint(maxsteps,maxsteps,pguess,name,False)
if ene == 99999:
raise NameError('SCF did not converved')
rguess= pguess
if self.verbose:
self.tape.write(' \n Optimization ...\n')
self.tape.write(' ---Start--- \n')
self._optprintparam(max_scf,rguess)
ene_function = rhfenergy
pguess = None
record = False
args=[np.log(self.sys.alpha),self.sys.coef,self.sys.xyz,self.sys.l,self.sys.charges,self.sys.atom,self.sys.natoms,self.sys.nbasis,
self.sys.list_contr,self.sys.ne,
max_scf,max_scf,log,True,pguess,rguess,
name,record,self.sys.alpha] # Last term is only used for Algopy
grad_fun = self._algo_gradfun(ene_function,args,argnum) ## This defines de gradient function of autograd
opt = self.select_method.get(method,lambda: self._BFGS) ## This selects the opt method
t0 = time.clock()
res = opt(ene_function,grad_fun,args,argnum,log,name,**otherargs)
timer = time.clock()-t0
self._optupdateparam(argnum,res.x)
self.status = bool(res.status == 0 or res.status==1)
return res,timer
def dipole(self,coef_file=None,max_scf=100,name='Output',**kwargs):
# Pending unit tests
if coef_file == None:
coef_file = name
ene = self._singlepoint(max_scf,max_scf,coef_file,name,False)
dipolemoment(self.sys,coef_file+'.npy')
return
[docs] def optimization(self,max_scf=100,log=True,scf=True,name='Output',readguess=None,output=False,argnum=[0],**kwargs):
'''
This function handdles the optimization procedure
Options:
argnum : list of integers
Parameter to optimize
0 widths
1 contraction coefficients
2 Gaussian centers
e.g. [0,1] to optimized widhts and contraction coefficients
max_scf : integer
Maximum number of scf steps, default 30
log : bool
If we are not optimizing the log of exponent, we highly recoment leave it as True, the default.
name : str
Output file name default Output
readguess : str
File path to a npy file in case on predefined initial guess of the density matrix
output : bool
True if it will print a molden file in case of success
'''
name = self.name+'-task-'+str(self.ntask)
if self.verbose:
self.tape.write(' Outputfiles prefix: %s'%name)
res,timer = self._optimization(max_scf,log,scf,readguess,argnum,taskname=name,**kwargs)
if self.verbose:
self._optprintres(res,timer)
return
[docs] def energy(self,max_scf=30,max_d=300,printguess=None,output=False,**kwargs):
'''
This function handdles the single point calculations
Options:
max_scf : integer
Maximum number of scf steps, default 30.
printguess : str
File path if it is requiered to prepare an inital guess for the molecular orbital coefficients.
output : bool
True if it will print a molden file in case of success.
'''
name = self.name+'-'+str(self.ntask)
if self.verbose:
self.tape.write(' Output molden file: %s'%name)
self._singlepoint(max_scf,max_d,printguess,name,output)
return
[docs] def runtask(self,task,**kwargs):
'''
This method run a given task and if it has success, it uptates system with the most recent energy value and basis function
Parameters:
task : str
If defines the task:
'Energy' is a single point calculation.
'Opt' an optimization of a given parameter.
Options:
Check documentation for each task
Returns:
success : bool
True if task ended successfully.
'''
print(' Task: %s'%task)
self.ntask += 1
if self.verbose:
self.tape.write(' -------------------------------------------------------- \n')
self.tape.write(' Task: %s \n'%task)
function = self._select_task.get(task,lambda: self.tape.write(' This task is not implemented\n'))
if self.verbose:
self.tape.write('\n')
function(**kwargs)
return self.status
def end(self):
if self.verbose:
self._printtail()
self.tape.close()
return
def main():
from Basis import basis_set_3G_STO as basis
d = -1.64601435
mol = [(1,(0.0,0.0,0.20165898)),(1,(0.0,0.0,d))]
ne = 2
system = System_mol(mol, ## Geometry
basis, ## Basis set (if shifted it should have the coordinates too)
ne, ## Number of electrons
shifted=False, ## If the basis is going to be on the atoms coordinates
angs=False, ## Units -> Bohr
mol_name='agua') ## Units -> Bohr
manager = Tasks(system,
name='../testfiles/h2_sto_3g', ## Prefix for all optput files
verbose=True) ## If there is going to be an output
manager.runtask('Energy',
max_scf=50,
printcoef=True,
name='../testfiles/Output.molden',
output=True)
manager.runtask('Opt',
max_scf=50,
printcoef=False,
argnum=[2],
output=True)
manager.runtask('Opt',
max_scf=50,
printcoef=False,
argnum=[0],
output=True)
manager.runtask('Opt',
max_scf=50,
printcoef=False,
argnum=[0],
output=True)
manager.end()
return
if __name__ == "__main__":
main()