"""
Interface for BatchSystem objects. New BatchSystems can be implemented by constructing a new class that inherits from
BatchSystem and implements its abstract methods.
"""
import abc
import subprocess
import time
[docs]class BatchSystem(abc.ABC):
"""
Abstract base class for HPC cluster batch systems.
Implements methods for all of the batch system-specific tasks that ATESA might need.
"""
[docs] @abc.abstractmethod
def get_status(self, jobid, settings): # todo: this should MAYBE be moved to a method of TaskManager
"""
Query batch system for a status string for the given job.
Parameters
----------
jobid : str
The jobid to query
settings : argparse.Namespace
Settings namespace object
Returns
-------
status : str
A one-character status string. Options are: 'R'unning, 'Q'ueued, and 'C'omplete/'C'anceled.
"""
pass
[docs] @abc.abstractmethod
def cancel_job(self, jobid, settings): # todo: this should DEFINITELY be moved to a method of TaskManager
"""
Cancel the job given by jobid
Parameters
----------
jobid : str
The jobid to cancel
settings : argparse.Namespace
Settings namespace object
Returns
-------
output : str
Raw output string from batch system, if any
"""
pass
[docs] @abc.abstractmethod
def get_submit_command(self):
"""
Return the appropriate terminal command for submitting a batch job, with '{file}' where the file to submit
should be indicated.
Parameters
----------
None
Returns
-------
output : str
Appropriate command including '{file}' substring
"""
pass
[docs]class AdaptSlurm(BatchSystem):
"""
Adapter class for Slurm BatchSystem.
"""
[docs] def get_status(self, jobid, settings):
if settings.DEBUG:
return 'C'
count = 1
max_tries = 5
output = 'first_attempt'
errors = ['first_attempt', 'slurm_load_jobs', 'slurm_receive_msg', 'send/recv'] # error messages to retry on
command = 'squeue -o %t --job ' + str(jobid)
while True in [error in output for error in errors] and count <= max_tries:
if not output == 'first_attempt':
time.sleep(30) # wait 30 seconds before trying again
count += 1
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read().decode() # decode converts from bytes-like to string
# Raise error if anything other than a status or invalid job id
if not output.split('\n')[0] == 'ST' and not 'Invalid job id' in output:
raise ValueError('Queried Slurm system status for jobid ' + str(jobid) + ' but got unexpected status: ' + output)
try:
output = output.split('\n')[1] # get status code; if invalid job id, this is an empty string
except IndexError:
output = output # should never happen but if it does somehow then this makes the error message useful
if output in ['PD', 'CF']:
output = 'Q'
elif output in ['CG', 'ST'] or 'Invalid job id' in output or not output: # 'Invalid job id' or empty output returned when job is finished in Slurm
output = 'C'
elif output == 'R':
output = 'R' # just to be really explicit I guess
else:
raise ValueError('Queried Slurm system status for jobid ' + str(jobid) + ' but got unexpected status: ' + output)
return output
[docs] def cancel_job(self, jobid, settings):
command = 'scancel ' + str(jobid)
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read().decode() # decode converts from bytes-like to string
return output
[docs] def get_submit_command(self):
return 'sbatch {file}'
[docs]class AdaptPBS(BatchSystem):
"""
Adapter class for PBS/Torque BatchSystem.
"""
[docs] def get_status(self, jobid, settings):
if settings.DEBUG:
return 'C'
# Iterate through each status three times for robustness in case the status changes while being checked
for status in ['CE', 'QHW', 'R', 'CE', 'QHW', 'R', 'CE', 'QHW', 'R']:
command = 'qselect -u $USER -s ' + status
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read().decode() # decode converts from bytes-like to string
# Some PBS-specific error handling to help handle common issues by simply resubmitting as necessary.
while 'Pbs Server is currently too busy to service this request. Please retry this request.' in str(output):
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read()
if 'Bad UID for job execution MSG=user does not exist in server password file' in str(output) or \
'This stream has already been closed. End of File.' in str(output):
time.sleep(60)
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read()
if str(jobid) in output:
if status == 'CE': # 'Complete' or 'Exiting'
return 'C'
elif status == 'QHW': # 'Queued', 'Waiting', or 'Hold'
return 'Q'
elif status == 'R': # 'Running'
return 'R'
# If this code is reached, it means the jobid did not appear in any of the above searches
process = subprocess.Popen('qstat -u $USER', stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read().decode() # decode converts from bytes-like to string
raise RuntimeError('unexpected PBS status for jobid: ' + str(jobid) + '\nFull status string: ' + output)
[docs] def cancel_job(self, jobid, settings):
command = 'qdel ' + str(jobid)
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read().decode() # decode converts from bytes-like to string
while 'Pbs Server is currently too busy to service this request. Please retry this request.' in str(output):
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True)
output = process.stdout.read()
return output
[docs] def get_submit_command(self):
return 'qsub {file}'