"""
Utility functions that don't properly fit anywhere else. These are operations that aren't specific to any particular
interface or script.
"""
import pytraj
import mdtraj
import shutil
import fileinput
import re
import os
import sys
import math
import numpy
import pickle
import copy
import subprocess
import warnings
import itertools
from multiprocess import Pool
from statsmodels.tsa import stattools
[docs]def check_commit(filename, settings):
"""
Check commitment of coordinate file to basins defined by settings.commit_fwd and settings.commit_bwd.
Raises a RuntimeError if the supplied file is not suitable for checking for commitment.
Parameters
----------
filename : str
Name of coordinate or trajectory file to be checked. If the file has more than one frame, only the last frame
will be loaded.
settings : argparse.Namespace
Settings namespace object
Returns
-------
commit_flag : str
Either 'fwd' or 'bwd' if the coordinates are in the corresponding basin, or '' if in neither
"""
# todo: consider adding support for angles and dihedrals
try:
traj = mdtraj.load(filename, top=settings.topology)[-1]
# traj = pytraj.load(filename, settings.topology, frame_indices=[-1])
if traj.n_frames == 0:
raise RuntimeError('Attempted to check commitment for file: ' + filename + ' but it has no frames.')
except ValueError as e:
raise RuntimeError('Unable to load file: ' + filename + ' for checking commitment with the specified topology '
'file.\npytraj returned error: ' + str(e))
commit_flag = '' # initialize
for i in range(len(settings.commit_fwd[2])):
if settings.commit_fwd[3][i] == 'lt':
if mdtraj.compute_distances(traj, numpy.array([[settings.commit_fwd[0][i] - 1, settings.commit_fwd[1][i] - 1]]))[0][0] * 10 <= settings.commit_fwd[2][i]:
# if pytraj.distance(traj, '@' + str(settings.commit_fwd[0][i]) + ' @' + str(settings.commit_fwd[1][i]),
# n_frames=1)[0] <= settings.commit_fwd[2][i]:
commit_flag = 'fwd' # if a committor test is passed, testing moves on to the next one.
else:
commit_flag = ''
break # if a committor test is not passed, all testing in this direction fails
elif settings.commit_fwd[3][i] == 'gt':
if mdtraj.compute_distances(traj, numpy.array([[settings.commit_fwd[0][i] - 1, settings.commit_fwd[1][i] - 1]]))[0][0] * 10 >= settings.commit_fwd[2][i]:
# if pytraj.distance(traj, '@' + str(settings.commit_fwd[0][i]) + ' @' + str(settings.commit_fwd[1][i]),
# n_frames=1)[0] >= settings.commit_fwd[2][i]:
commit_flag = 'fwd'
else:
commit_flag = ''
break
else:
raise ValueError('An incorrect committor definition \"' + settings.commit_fwd[3][i] + '\" was given for '
'index ' + str(i) + ' in the \'fwd\' direction.')
if commit_flag == '': # only bother checking for bwd commitment if not fwd committed
for i in range(len(settings.commit_bwd[2])):
if settings.commit_bwd[3][i] == 'lt':
if mdtraj.compute_distances(traj, numpy.array([[settings.commit_bwd[0][i] - 1, settings.commit_bwd[1][i] - 1]]))[0][0] * 10 <= settings.commit_bwd[2][i]:
# if pytraj.distance(traj, '@' + str(settings.commit_bwd[0][i]) + ' @' + str(settings.commit_bwd[1][i]),
# n_frames=1)[0] <= settings.commit_bwd[2][i]:
commit_flag = 'bwd' # if a committor test is passed, testing moves on to the next one.
else:
commit_flag = ''
break # if a committor test is not passed, all testing in this direction fails
elif settings.commit_bwd[3][i] == 'gt':
if mdtraj.compute_distances(traj, numpy.array([[settings.commit_bwd[0][i] - 1, settings.commit_bwd[1][i] - 1]]))[0][0] * 10 >= settings.commit_bwd[2][i]:
# if pytraj.distance(traj, '@' + str(settings.commit_bwd[0][i]) + ' @' + str(settings.commit_bwd[1][i]),
# n_frames=1)[0] >= settings.commit_bwd[2][i]:
commit_flag = 'bwd'
else:
commit_flag = ''
break
else:
raise ValueError('An incorrect committor definition \"' + settings.commit_bwd[3][i] + '\" was given for'
' index ' + str(i) + ' in the \'bwd\' direction.')
return commit_flag
[docs]def get_cvs(filename, settings, reduce=False, frame=0, only_in_rc=False):
"""
Get CV values for a coordinate file given by filename, as well as rates of change if settings.include_qdot = True.
If reduce = True, the returned CVs will be reduced to between 0 and 1 based on the minimum and maximum values of
that CV in as.out, which is assumed to exist at settings.as_out_file.
If frame is > 0 or 'all', filename will be interpreted as a trajectory file instead of a .rst7 coordinate file.
This option is incompatible with settings.include_qdot = True.
Parameters
----------
filename : str
Name of .rst7-formatted coordinate file to be checked (or trajectory file if frame > 0 or frame == 'all')
settings : argparse.Namespace
Settings namespace object
reduce : bool
Boolean determining whether to reduce the CV values for use in evaluating an RC value that uses reduced values
frame : int or str
Frame of trajectory filename to use (0-indexed, negatives read from end); or, 'all' returns the CVs for each
frame of the input trajectory, separated by newlines. 'all' is incompatible with settings.include_qdot = True
only_in_rc : bool
If this is True, then do not evaluate and return 0 for all CVs not present in the RC defined by
settings.rc_definition. This is useful to speed up evaluation when all CVs are not necessary. Applies also to
corresponding qdot values as appropriate.
Returns
-------
output : str
Space-separated list of CV values for the given coordinate file
"""
def increment_coords():
# Produces a new coordinate file from the existing one by incrementing coordinates by their rates of change
# Returns the name of the newly-created coordinate file
byline = open(filename).readlines()
pattern = re.compile('-*[0-9.]+') # regex to match numbers including decimals and negatives
n_atoms = pattern.findall(byline[1])[0] # number of atoms indicated on second line of .rst file
shutil.copyfile(filename, filename + '_temp.rst7')
for i, line in enumerate(fileinput.input(filename + '_temp.rst7', inplace=1)):
if int(n_atoms)/2 + 2 > i >= 2:
newline = line
coords = pattern.findall(newline) # line of coordinates
try:
vels = pattern.findall(byline[i + int(math.ceil(int(n_atoms)/2))]) # corresponding velocities
except IndexError:
os.remove(filename + '_temp.rst7') # to clean up
os.remove(filename + '_temp.rst7.bak') # to clean up
fileinput.close()
raise IndexError('get_cvs.increment_coords() encountered an IndexError. This is caused '
'by attempting to read qdot values from a coordinate file lacking velocity information, or'
' else by that file being truncated. Ensure that the relevant simulation input file is set'
' to write velocities to output trajectories, as this may not be default behavior. In '
'Amber, this is accomplished by setting ntwv=-1 in the MD input file. Alternatively, if '
'you don\'t want to include velocity terms in the output file, you can set include_qdot = '
'False in the ATESA config file.\n'
'The offending file is: ' + filename)
# Sometimes items in coords or vels 'stick together' at a negative sign (e.g., '-1.8091748-112.6420521')
# This next loop is just to split them up
for index in range(len(coords)):
length = len(coords[index]) # length of string representing this coordinate
replace_string = str(float(coords[index]) + float(vels[index]))[0:length-1]
while len(replace_string) < length:
replace_string += '0'
newline = newline.replace(coords[index], replace_string)
sys.stdout.write(newline)
else:
sys.stdout.write(line)
return filename + '_temp.rst7'
def reduce_cv(unreduced_value, local_index, rc_minmax):
# Returns a reduced value for a CV given an unreduced value and the index within as.out corresponding to that CV
this_min = rc_minmax[0][local_index]
this_max = rc_minmax[1][local_index]
if this_min == this_max:
return unreduced_value # can't reduce when a CV only has a single value across rc_minmax
return (float(unreduced_value) - this_min) / (this_max - this_min)
def cv_is_in_rc(cv_index):
# Helper function checks if the zero-indexed cv_index corresponds to a CV in settings.rc_definition
if settings.rc_definition == '':
raise RuntimeError('utilities.get_cvs was called with only_in_rc=True, but rc_definition was empty. You '
'must provide the RC definition in the configuration file to use this option.')
# Have to be careful here not to interpret, e.g., "CV10" as matching "CV1", so we need to be a bit clever
pattern = re.compile('CV[0-9]+')
cvs_in_rc = pattern.findall(settings.rc_definition)
cv_to_look_for = 'CV' + str(int(cv_index) + 1) # cv_index is zero-indexed, but CVs in RCs are 1-indexed
return cv_to_look_for in cvs_in_rc
if frame == 'all' and settings.include_qdot:
raise RuntimeError('utilities.get_cvs cannot be called with frame=all if settings.include_qdot == True')
if not os.getcwd() == settings.working_directory:
os.chdir(settings.working_directory) # make sure we're in the working directory
# boolean determining whether we need to bother loading pytraj
need_pytraj = any([('traj' in cv and not 'mtraj' in cv) for cv in settings.cvs])
# define variables as they are expected to be interpreted in CV definitions (these are potentially invoked by eval)
delete_traj_name = False # so we don't delete the traj_name file later unless we created it below
if frame in [0, 'all']:
full_mtraj = mdtraj.load(filename, top=settings.topology)
if need_pytraj: # only load pytraj if it's needed
full_traj = pytraj.iterload(filename, settings.topology)
traj_name = filename
else:
if need_pytraj:
if frame < 0:
full_traj = pytraj.iterload(filename, settings.topology)
pframe = full_traj.n_frames + frame
full_traj = pytraj.iterload(filename, settings.topology, frame_slice=[(pframe, pframe + 1)])
else:
full_traj = pytraj.iterload(filename, settings.topology, frame_slice=[(frame, frame + 1)])
full_mtraj = mdtraj.load(filename, top=settings.topology)[frame]
if True in ['traj_name' in cv for cv in settings.cvs]: # if True, need .rst7 formatted file to operate on
mdengine = factory.mdengine_factory(settings.md_engine)
traj_name = mdengine.get_frame(filename, frame, settings)
delete_traj_name = True # to be sure we clean up traj_name later
rc_minmax = [[],[]]
if reduce:
# Try to load from file if available
size = os.path.getsize(settings.as_out_file)
if os.path.exists(str(size) + '_minmax.pkl'):
rc_minmax = pickle.load(open(str(size) + '_minmax.pkl', 'rb'))
else:
# Prepare new cv_minmax list
asout_lines = [[float(item) for item in line.replace('A <- ', '').replace('B <- ', '').replace(' \n', '').replace('\n', '').split(' ')] for line in open(settings.as_out_file, 'r').readlines()]
open(settings.as_out_file, 'r').close()
mapped = list(map(list, zip(*asout_lines)))
rc_minmax = [[numpy.min(item) for item in mapped], [numpy.max(item) for item in mapped]]
pickle.dump(rc_minmax, open(str(size) + '_minmax.pkl', 'wb'), protocol=2)
output = ''
values = []
sigfigs = '%.' + str(settings.sigfigs) + 'f'
if frame == 'all':
n_iter = full_mtraj.n_frames
else:
n_iter = 1
for iter in range(n_iter): # iterate over all frames
local_index = -1
# set traj and mtraj to only the desired frame; has no (important) effect if there's only one frame anyway
mtraj = full_mtraj[iter]
if need_pytraj: # only load traj if it's needed
traj = full_traj[iter:iter + 1:1]
for cv in settings.cvs:
local_index += 1
if (not only_in_rc) or cv_is_in_rc(local_index):
evaluation = eval(cv) # evaluate the CV definition code (potentially using traj, mtraj, and/or traj_name)
if settings.include_qdot: # want to save values for later
values.append(float(evaluation))
if reduce: # reduce values if necessary
evaluation = reduce_cv(evaluation, local_index, rc_minmax)
else:
evaluation = 0
output += sigfigs % float(evaluation) + ' '
if n_iter > 1:
output += '\n'
elif settings.include_qdot and not settings.job_type == 'equilibrium_path_sampling': # if True, then we want to include rate of change for every CV, too
# Strategy here is to write a new temporary .rst7 file by incrementing all the coordinate values by their
# corresponding velocity values, load it as a new iterload object, and then rerun our analysis on that.
incremented_filename = increment_coords()
mtraj = mdtraj.load(incremented_filename, top=settings.topology)
if need_pytraj: # only load traj if it's needed
traj = pytraj.iterload(incremented_filename, settings.topology)
local_index = -1
for cv in settings.cvs:
local_index += 1
if (not only_in_rc) or cv_is_in_rc(local_index):
evaluation = eval(cv) - values[local_index] # Subtract value 1/20.455 ps earlier from value of cv
if reduce:
try:
evaluation = reduce_cv(evaluation, local_index + len(settings.cvs), rc_minmax)
except IndexError:
raise RuntimeError('attempted to obtain a reduced value for the rate-of-change ("qdot") of CV' +
str(local_index) + ', but the corresponding column appears to be missing from '
'the aimless shooting output file: ' + settings.as_out_file + '. If this is the '
'wrong file, please specify the right one with the "as_out_file" option in the '
'configuration file. If this is the right file but it does not contain qdot '
'terms, please add "include_qdot = False" to the configuration file.')
else:
evaluation = 0
output += sigfigs % float(evaluation) + ' '
os.remove(incremented_filename) # clean up temporary file
while output[-1] in [' ', '\n']:
output = output[:-1] # remove trailing space and/or newline on terminating line
output = output.replace(' \n', '\n') # remove trailing spaces on non-terminating lines
if delete_traj_name:
os.remove(traj_name)
return output
[docs]def rev_vels(restart_file):
"""
Reverse all the velocity terms in a restart file and return the name of the new, 'reversed' file.
Parameters
----------
restart_file : str
Filename of the 'fwd' restart file, in .rst7 format
Returns
-------
reversed_file : str
Filename of the newly written 'bwd' restart file, in .rst7 format
"""
byline = open(restart_file).readlines()
open(restart_file).close()
pattern = re.compile(r'[-0-9.]+') # regex to match numbers including decimals and negatives
pattern2 = re.compile(r'\s[-0-9.]+') # regex to match numbers including decimals and negatives, with one space in front
try:
n_atoms = pattern.findall(byline[1])[0] # number of atoms indicated on second line of .rst7 file
except IndexError:
raise RuntimeError('restart file: ' + restart_file + ' exists but is improperly formatted. It must contain the '
'number of atoms as the first integer on its second line.')
offset = 2 # appropriate for n_atoms is odd; offset helps avoid modifying the box line
if int(n_atoms) % 2 == 0: # if n_atoms is even...
offset = 1 # appropriate for n_atoms is even
try:
name = restart_file[:restart_file.rindex('.')] # everything before last '.', to remove file extension
except ValueError:
name = restart_file # if no '.' in the filename
shutil.copyfile(restart_file, name + '_bwd.rst7')
for i, line in enumerate(fileinput.input(name + '_bwd.rst7', inplace=1)):
if int(n_atoms) / 2 + 2 <= i <= int(n_atoms) + offset: # if this line is a velocity line
newline = line
for vel in pattern2.findall(newline):
if '-' in vel:
newline = newline.replace(vel, ' ' + vel[2:], 1) # replace ' -magnitude' with ' magnitude'
else:
newline = newline.replace(vel, '-' + vel[1:], 1) # replace ' magnitude' with '-magnitude'
sys.stdout.write(newline)
else: # if not a velocity line
sys.stdout.write(line)
return name + '_bwd.rst7'
[docs]def evaluate_rc(rc_definition, cv_list):
"""
Evaluate the RC value given by RC definition for the given list of CV values given by cv_list.
Parameters
----------
rc_definition : str
A reaction coordinate definition formatted as a string of python-readable code with "CV[X]" standing in for the
Xth CV value (zero-indexed); e.g., "CV2" has value "4" in the cv_list [1, -2, 4, 6]
cv_list : list
A list of CV values whose indices correspond to the desired values in rc_definition
Returns
-------
rc_value : float
The value of the reaction coordinate given the values in cv_list
"""
# Fill in CV[X] slots with corresponding values from cv_list
for i in reversed(range(len(cv_list))): # reversed so that e.g. CV10 doesn't get interpreted as '[CV1]0'
rc_definition = rc_definition.replace('CV' + str(i + 1), str(cv_list[i]))
# Evaluate the filled-in rc_definition and return the result
return eval(rc_definition)
[docs]def resample(settings, partial=False, full_cvs=False, only_full_cvs=False):
"""
Resample each shooting point in each thread with different CV definitions to produce new output files with extant
aimless shooting data.
This function also assesses decorrelation times and produces one or more decorrelated output files. If and only if
settings.information_error_checking == True, decorrelated files are produced at each settings.information_error_freq
increment. In this case, if partial == True, decorrelation will only be assessed for data lengths absent from the
info_err.out file in the working directory.
Parameters
----------
settings : argparse.Namespace
Settings namespace object
partial : bool
If True, reads the info_err.out file and only builds new decorrelated output files where the corresponding lines
are missing from that file. If partial == False, decorrelation is assessed for every valid data length. Has no
effect if not settings.information_error_checking.
full_cvs : bool
If True, also resamples as_full_cvs.out using every prod trajectory in the working directory.
Returns
-------
None
"""
# todo: test this more thoroughly using a dummy thread and a manual decorrelation time calculation using different software
# This function is sometimes called from outside the working directory, so make sure we're there
os.chdir(settings.working_directory)
# Load in allthreads from restart.pkl
try:
allthreads = pickle.load(open('restart.pkl', 'rb'))
except FileNotFoundError:
raise FileNotFoundError('resample = True requires restart.pkl, but could not find one in working directory: '
+ settings.working_directory)
if not only_full_cvs:
# Remove pre-existing output files if any, initialize new one
open(settings.working_directory + '/as_raw_resample.out', 'w').close()
if settings.information_error_checking:
open(settings.working_directory + '/as_raw_timestamped.out', 'w').close()
# Open files for writing outside loop (much faster than opening/closing for each write)
f1 = open(settings.working_directory + '/as_raw_resample.out', 'a')
if settings.information_error_checking:
f2 = open(settings.working_directory + '/as_raw_timestamped.out', 'a')
# Iterate through each thread's history.init_coords list and obtain CV values as needed
for thread in allthreads:
thread.this_cvs_list = [] # initialize full nested list of CV values for this thread
thread.cvs_for_later = [] # need this one with empty lists for failed moves, for indexing reasons
for step_index in range(len(thread.history.prod_results)):
if thread.history.prod_results[step_index][0] in ['fwd', 'bwd']:
if thread.history.prod_results[step_index][0] == 'fwd':
this_basin = 'B'
else: # 'bwd'
this_basin = 'A'
# Get CVs for this shooting point # todo: a bit sloppy... can I clean this up?
try:
if not os.path.exists(thread.history.init_coords[step_index][0]):
warnings.warn('attempted to resample ' + thread.history.init_coords[step_index][0] + ' but no such '
'file exists in the working directory\nSkipping and continuing', RuntimeWarning)
thread.cvs_for_later.append([])
continue # skip to next step_index
except IndexError: # getting cv's failed (maybe corrupt coordinate file) so consider this step failed
thread.cvs_for_later.append([])
continue # skip to next step_index
try:
this_cvs = get_cvs(thread.history.init_coords[step_index][0], settings)
except IndexError: # getting cv's failed (maybe corrupt coordinate file) so consider this step failed
thread.cvs_for_later.append([])
continue # skip to next step_index
# Write CVs to as_raw_resample.out
f1.write(this_basin + ' <- ' + this_cvs + '\n')
f1.flush()
if settings.information_error_checking:
f2.write(str(thread.history.timestamps[step_index]) + ' ' + this_basin + ' <- ' + this_cvs + '\n')
f2.flush()
# Append this_cvs to running list for evaluating decorrelation time
thread.this_cvs_list.append([[float(item) for item in this_cvs.split(' ')], thread.history.timestamps[step_index]])
thread.cvs_for_later.append([float(item) for item in this_cvs.split(' ')])
else:
thread.cvs_for_later.append([])
# Close files just to be sure
f1.close()
if settings.information_error_checking:
f2.close()
if settings.information_error_checking: # sort timestamped output file
shutil.copy(settings.working_directory + '/as_raw_timestamped.out', settings.working_directory + '/as_raw_timestamped_copy.out')
open(settings.working_directory + '/as_raw_timestamped.out', 'w').close()
with open(settings.working_directory + '/as_raw_timestamped_copy.out', 'r') as f:
for line in sorted(f):
open(settings.working_directory + '/as_raw_timestamped.out', 'a').write(line)
open(settings.working_directory + '/as_raw_timestamped.out', 'a').close()
os.remove(settings.working_directory + '/as_raw_timestamped_copy.out')
# Construct list of data lengths to perform decorrelation for
if settings.information_error_checking:
if not partial:
lengths = [leng for leng in range(settings.information_error_freq, len(open(settings.working_directory + '/as_raw_timestamped.out', 'r').readlines()) + 1, settings.information_error_freq)]
else: # if partial
lengths = [leng for leng in range(settings.information_error_freq, len(open(settings.working_directory + '/as_raw_timestamped.out', 'r').readlines()) + 1, settings.information_error_freq) if not leng in [int(line.split(' ')[0]) for line in open(settings.working_directory + '/info_err.out', 'r').readlines()]]
pattern = re.compile('[0-9]+') # pattern for reading out timestamp from string
else:
lengths = [len(open(settings.working_directory + '/as_raw_resample.out', 'r').readlines())]
pattern = None
# Assess decorrelation and write as_decorr.out
for length in lengths:
if settings.information_error_checking:
suffix = '_' + str(length) # only use-case with multiple lengths, so this keeps them from stepping on one another's toes
cutoff_timestamp = int(pattern.findall(open(settings.working_directory + '/as_raw_timestamped.out', 'r').readlines()[length - 1])[0])
else:
cutoff_timestamp = math.inf
suffix = ''
open(settings.working_directory + '/as_decorr' + suffix + '.out', 'w').close()
f3 = open(settings.working_directory + '/as_decorr' + suffix + '.out', 'a')
for thread in allthreads:
if thread.this_cvs_list: # if there were any 'fwd' or 'bwd' results in this thread
mapped = list(map(list, zip(*[item[0] for item in thread.this_cvs_list if item[1] <= cutoff_timestamp]))) # list of lists of values of each CV
slowest_lag = -1 # initialize running tally of slowest autocorrelation time among CVs in this thread
if settings.include_qdot:
ndims = len(thread.this_cvs_list[0]) / 2 # number of non-rate-of-change CVs
if not ndims % 1 == 0:
raise ValueError('include_qdot = True, but an odd number of dimensions were found in the '
'threads in restart.pkl, so they can\'t contain inertial terms.')
ndims = int(ndims)
else:
ndims = len(thread.this_cvs_list[0])
for dim_index in range(ndims):
slowest_lag = -1
if mapped:
this_cv = mapped[dim_index]
if len(this_cv) > 1:
this_autocorr = stattools.acf(this_cv, nlags=len(this_cv) - 1, fft=True)
for lag in range(len(this_cv) - 1):
corr = this_autocorr[lag]
if abs(corr) <= 1.96 / numpy.sqrt(len(this_cv)):
slowest_lag = lag + 1
break
if slowest_lag > 0: # only proceed to writing decorrelated output file if a slowest_lag was found
# Write the same way as to as_raw_resample.out above, but starting the range at slowest_lag
for step_index in range(slowest_lag, len(thread.history.prod_results)):
if thread.history.prod_results[step_index][0] in ['fwd', 'bwd'] and thread.history.timestamps[step_index] <= cutoff_timestamp:
if thread.history.prod_results[step_index][0] == 'fwd':
this_basin = 'B'
else: # 'bwd'
this_basin = 'A'
# Get CVs for this shooting point and write them to the decorrelated output file
if thread.cvs_for_later[step_index]:
this_cvs = thread.cvs_for_later[step_index] # retrieve CVs from last evaluation
f3.write(this_basin + ' <- ' + ' '.join([str(item) for item in this_cvs]) + '\n')
f3.close()
# Move resample raw output file to take its place as the only raw output file
shutil.move(settings.working_directory + '/as_raw_resample.out', settings.working_directory + '/as_raw.out')
else:
if not full_cvs:
raise RuntimeError('resample called with only_full_cvs = True but full_cvs = False, so this does nothing. '
'Change one of them!')
# Implement full_cvs
if full_cvs:
open(settings.working_directory + '/as_full_cvs.out', 'w').close()
temp_settings = copy.deepcopy(settings)
temp_settings.include_qdot = False # never want to include_qdot in this upcoming call to get_cvs
try:
affinity = len(os.sched_getaffinity(0))
except AttributeError: # os.sched_getaffinity raises AttributeError on non-UNIX systems.
affinity = 1
if affinity == 1:
with open(settings.working_directory + '/as_full_cvs.out', 'a') as f:
for thread in allthreads:
for step_index in range(min([len(thread.history.prod_results), len(thread.history.prod_trajs)])): # just in case one got an extra write in over the other
if thread.history.prod_results[step_index] in [['fwd', 'bwd'], ['bwd', 'fwd']]: # if step accepted
for job_index in range(2):
if os.path.exists(thread.history.prod_trajs[step_index][job_index]):
f.write(get_cvs(thread.history.prod_trajs[step_index][job_index], temp_settings, False, 'all') + '\n')
else: # affinity > 1
# Map partial_full_cvs calls to available processes
with Pool(affinity) as p:
p.starmap(partial_full_cvs, zip(allthreads, ['partial_full_cvs_' + str(thread_index) + '.out' for thread_index in range(len(allthreads))], itertools.repeat(temp_settings)))
# Finally, combine the partial files into the full file
with open(settings.working_directory + '/as_full_cvs.out', 'w') as outfile:
for fname in ['partial_full_cvs_' + str(thread_index) + '.out' for thread_index in range(len(allthreads))]:
with open(fname) as infile:
for line in infile:
if line: # skip blank lines
outfile.write(line)
os.remove(fname)
[docs]def partial_full_cvs(thread, filename, settings):
"""
Write the full CVs list to the file given by filename for each accepted move in each thread in threads.
A helper function for parallelizing get_cvs (has to be defined at top-level for multiprocessing)
Parameters
----------
thread : Thread
Thread object to evaluate
filename : str
Name of the file to write CVs to
settings : argparse.Namespace
Settings namespace object
Returns
-------
None
"""
open(settings.working_directory + '/' + filename, 'w').close()
with open(settings.working_directory + '/' + filename, 'a') as f:
for step_index in range(min([len(thread.history.prod_results), len(thread.history.prod_trajs)])): # just in case one got an extra write in over the other
if thread.history.prod_results[step_index] in [['fwd', 'bwd'], ['bwd', 'fwd']]: # if step accepted
for job_index in range(2):
if os.path.exists(thread.history.prod_trajs[step_index][job_index]):
f.write(get_cvs(thread.history.prod_trajs[step_index][job_index], settings, False, 'all') + '\n')
[docs]def interpret_cv(cv_index, settings):
"""
Read the cv_index'th CV from settings.cvs, identify its type (distance, angle, dihedral, or differance-of-distances)
and the atom indices the define it (one-indexed) and return these.
This function is designed for use in the umbrella_sampling jobtype only. For this reason, it only supports the
aforementioned CV types. If none of these types appears to fit, this function raises a RuntimeError.
Parameters
----------
cv_index : int
The index for the CV to use; e.g., 6 corresponds to CV6. Must be in the range [1, len(settings.cvs)].
settings : argparse.Namespace
Settings namespace object
Returns
-------
atoms : list
A list of 1-indexed atom indices as strings that define the given CV
optype : str
A string (either 'distance', 'angle', 'dihedral', or 'diffdistance') corresponding to the type for this CV.
nat : int
The number of atoms constituting the given CV
"""
if not 1 <= cv_index <= len(settings.cvs):
raise RuntimeError('called interpret_cv with an index outside the range [1, len(settings.cvs)].\n'
'len(settings.cvs) = ' + str(len(settings.cvs)) + ' and cv_index = ' + str(cv_index))
this_cv = settings.cvs[cv_index - 1] # -1 because CVs are 1-indexed
if 'pytraj.dihedral' in this_cv or 'mdtraj.compute_dihedrals' in this_cv:
optype = 'dihedral'
nat = 4
elif 'pytraj.angle' in this_cv or 'mdtraj.compute_angles' in this_cv:
optype = 'angle'
nat = 3
elif 'pytraj.distance' in this_cv or 'mdtraj.compute_distances' in this_cv:
if '-' in this_cv and (this_cv.count('pytraj.distance') == 2 or this_cv.count('mdtraj.compute_distances') == 2 or
('mdtraj.distance' in this_cv and 'pytraj.distance' in this_cv)):
optype = 'diffdistance'
nat = 4
else:
optype = 'distance'
nat = 2
else:
raise RuntimeError('unable to discern CV type for CV' + str(int(cv_index)) + '\nOnly '
'distances, angles, dihedrals, and differences of distances (all defined '
'using either pytraj or mdtraj distance, angle, and/or dihedral functions) '
'are supported in umbrella sampling. The offending CV is defined as: ' +
this_cv)
# Get atom indices as a string, then convert to list
atoms = ''
if not optype == 'diffdistance':
count = 0
for match in re.finditer('[\[\\\']([@0-9]+[,\ ]){' + str(nat - 1) + '}[@0-9]+[\]\\\']',
this_cv.replace(', ', ',')):
atoms += this_cv.replace(', ', ',')[match.start():match.end()] # should be only one match
count += 1
if not count == 1:
raise RuntimeError('failed to identify atoms constituting CV definition: ' + this_cv +
'\nInterpreted as a ' + optype + ' but found ' + str(count) +
' blocks of atom indices with length ' + str(nat) + '(should be one). Is'
' this CV formatted in an unusual way?')
if not atoms:
raise RuntimeError('unable to identify atoms constituting CV definition: ' + this_cv +
'\nIs it formatted in an unusual way?')
atoms = atoms.replace('[', '').replace(']', ',').replace('\'',
'') # included delimeters for safety, but don't want them
if '@' in atoms:
atoms = [item.replace('@', '') for item in atoms.split(' @')] # pytraj style atom indices
else:
atoms = atoms.split(',') # mdtraj style atom indices
atoms = [str(int(item) + 1) for item in atoms if not item == ''] # fix zero-indexing in mdtraj
else:
count = 0
for match in re.finditer('[\[\\\']([@0-9]+[,\ ]){1}[@0-9]+[\]\\\']', this_cv.replace(', ', ',')):
atoms += this_cv.replace(', ', ',')[match.start():match.end()] # should be two matches
count += 1
if not count == 2:
raise RuntimeError('failed to identify atoms constituting CV definition: ' + this_cv +
'\nInterpreted as a difference of distances but found ' + str(count) +
' blocks of atom indices with length 2 (should be two). Is this CV '
'formatted in an unusual way?')
if not atoms:
raise RuntimeError('unable to identify atoms constituting CV definition: ' + this_cv +
'\nIs it formatted in an unusual way?')
atoms = atoms.replace('[', '').replace(']', ',').replace('\'', '') # included delimeters for safety, but don't want them
if '@' in atoms:
atoms = [item.replace('@', '') for item in atoms.replace('\'', ' ').replace('@', ' @').split()]
else:
atoms = atoms.split(',')
atoms = [str(int(item) + 1) for item in atoms if not item == ''] # fix zero-indexing in mdtraj
while '' in atoms:
atoms.remove('') # remove empty list elements if present
return atoms, optype, nat
[docs]def plot_cvs_vs_rc(full_cvs_file, rc_definition, as_output_file):
"""
Produce a plot of the values of each CV in the RC as a function of RC value from either aimless shooting or umbrella
sampling data.
The plot is made in matplotlib but also printed as text to an output file.
Parameters
----------
full_cvs_file : str
Path to full_cvs output file (produced by utilities.resample or resampling.resample_umbrella_sampling)
rc_definition : str
String representing the definition of the reduced reaction coordinate
as_output_file : str
Path to the as_output_file file passed to lmax.py when generating the reaction coordinate
Returns
-------
None
"""
# Identify relevant CVs
pattern = re.compile('CV[0-9]+')
cv_names = pattern.findall(rc_definition) # get list of CV names, e.g., ['CV6', 'CV35', ...]
cvs = [int(cv[2:]) - 1 for cv in cv_names] # get zero-indexed CV indices
# Get values of relevant CVs from each line in full_cvs_file
values = [[] for _ in range(len(cvs))]
for line in open(full_cvs_file, 'r').readlines():
ii = -1
for cv in cvs:
ii += 1
values[ii].append(float(line.split()[cv])) # [[obs1cv1, obs2cv1], [obs1cv2, obs2cv2]]
# Get data from input file, and determine minimum and maximum values for each CV, get reduced CV values
input_data = [[float(item) for item in
line.replace('A <- ', '').replace('B <- ', '').replace(' \n', '').replace('\n', '').split(' ')]
for line in open(as_output_file, 'r').readlines()] # [[obs1cv1, obs1cv2], [obs2cv1, obs2cv2]]
mapped = list(map(list, zip(*input_data))) # [[obs1cv1, obs2cv1], [obs1cv2, obs2cv2]]
minmax = [[numpy.min(item) for item in mapped], [numpy.max(item) for item in mapped]] # [[mincv1, mincv2], [maxcv1, maxcv2]]
n_cvs = len(minmax[0]) # number of CVs total in as_output_file, for use later
minmax = [[minmax[0][ii] for ii in cvs], [minmax[1][ii] for ii in cvs]] # cut down to just the relevant values
reduced_values = [[(values[ii][jj] - minmax[0][ii]) / (minmax[1][ii] - minmax[0][ii]) for ii in range(len(values))] for jj in range(len(values[0]))]
reduced_values = list(map(list, zip(*reduced_values))) # transpose to match shape of values
rcs = [] # list of RC values for each sample
for jj in range(len(reduced_values[0])): # iterate over samples
cvs_list = [0 for ii in range(n_cvs)] # evaluate_rc expects an entry for every CV, not just those in the RC
for kk in range(len(cvs)):
cvs_list[cvs[kk]] = reduced_values[kk][jj]
rcs.append(evaluate_rc(rc_definition, cvs_list))
# Bin each RC value and evaluate average and stdev of each CV value in each bin
bins = numpy.linspace(min(rcs), max(rcs), 20) # todo: should the number of bins be a setting?
assert len(values[0]) == len(rcs) # sanity check
means = [[numpy.mean([values[cv][jj] for jj in range(len(rcs)) if bins[bin_ii] <= rcs[jj] <= bins[bin_ii + 1]]) for
bin_ii in range(len(bins) - 1)] for cv in range(len(values))]
stds = [[numpy.std([values[cv][jj] for jj in range(len(rcs)) if bins[bin_ii] <= rcs[jj] <= bins[bin_ii + 1]]) for
bin_ii in range(len(bins) - 1)] for cv in range(len(values))]
reduced_means = [[numpy.mean([reduced_values[cv][jj] for jj in range(len(rcs)) if bins[bin_ii] <= rcs[jj] <= bins[bin_ii + 1]]) for
bin_ii in range(len(bins) - 1)] for cv in range(len(values))]
reduced_stds = [[numpy.std([reduced_values[cv][jj] for jj in range(len(rcs)) if bins[bin_ii] <= rcs[jj] <= bins[bin_ii + 1]]) for
bin_ii in range(len(bins) - 1)] for cv in range(len(values))]
# Write results to output file and plot in both reduced and non-reduced forms
with open('cvs_vs_rc.out', 'w') as f:
f.write('RC value')
for name in cv_names:
f.write('; ' + name + ' mean')
f.write('; ' + name + ' st.dev.')
for name in cv_names:
f.write('; reduced ' + name + ' mean')
f.write('; reduced ' + name + ' st.dev.')
f.write('\n')
for ii in range(len(bins) - 1):
f.write('%.3f' % numpy.mean([bins[ii], bins[ii + 1]]))
for jj in range(len(means)):
f.write('\t' + '%.3f' % float(means[jj][ii]))
f.write('\t' + '%.3f' % float(stds[jj][ii]))
for jj in range(len(means)):
f.write('\t' + '%.3f' % float(reduced_means[jj][ii]))
f.write('\t' + '%.3f' % float(reduced_stds[jj][ii]))
f.write('\n')
# todo: optionally, finish implementing with matplotlib
# rc_means = [numpy.mean([bins[ii], bins[ii + 1]]) for ii in range(len(bins) - 1)]
# for ii in range(len(bins) - 1):
# plt.plot(rc_means, means)