1 """!A shell-like syntax for running serial, MPI and OpenMP programs.
3 This module implements a shell-like syntax for launching MPI and
4 non-MPI programs from Python. It recognizes three types of
5 executables: mpi, "small serial" (safe for running on a batch node)
6 and "big serial" (which should be run via aprun if applicable). There
7 is no difference between "small serial" and "big serial" programs
8 except on certain architectures (like Cray) where the job script runs
9 on a heavily-loaded batch node and has compute nodes assigned for
10 running other programs.
12 @section progtype Program Types
14 There are three types of programs: mpi, serial and "big non-MPI." A
15 "big" executable is one that is either OpenMP, or is a serial program
16 that cannot safely be run on heavily loaded batch nodes. On Cray
17 architecture machines, the job script runs on a heavily-populated
18 "batch" node, with some compute nodes assigned for "large" programs.
19 In such environments, the "big" executables are run on compute nodes
20 and the small ones on the batch node.
22 * mpi('exename') = an executable "exename" that calls MPI_Init and
23 MPI_Finalize exactly once each, in that order.
24 * exe('exename') = a small non-MPI program safe to run on a batch node
25 * bigexe('exename') = a big non-MPI program that must be run on a
26 compute node it may or may not use other forms of parallelism
28 You can also make reusable aliases to avoid having to call those
29 functions over and over (more on that later). Examples:
31 * Python: wrf=mpi('./wrf.exe')
32 * Python: lsl=alias(exe('/bin/ls')['-l'].env(LANG='C',LS_COLORS='never'))
34 Those can then be reused later on as if the code is pasted in, similar
37 @section serexs Serial Execution Syntax
39 Select your serial programs by exe('name') for small serial programs
40 and bigexe('name') for big serial programs. The return value of those
41 functions can then be used with a shell-like syntax to specify
42 redirection and piping. Example:
44 * shell version: ls -l / | wc -l
45 * Python version: run(exe('ls')['-l','/'] | exe('wc')['-l'])
47 Redirection syntax similar to the shell (< > and << operators):
49 run( ( exe('myprogram')['arg1','arg2','...'] < 'infile' ) > 'outfile')
52 Note the extra set of parentheses: you cannot do "exe('prog') < infile
53 > outfile" because of the order of precedence of Python operators
57 run(exe('myprogram')['arg1','arg2','...'] >> 'appendfile')
60 You can also send strings as input with <<
62 run(exe('myprogram')['arg1','arg2','...'] << 'some input string')
65 One difference from shells is that < and << always modify the
66 beginning of the pipeline:
68 * shell: cat < infile | wc -l
69 * Python #1: ( exe('cat') < 'infile' ) | exe('wc')['-l']
70 * Python #2: exe('cat') | ( exe('wc')['-l'] < 'infile' )
72 Note that the last second one, equivalent to `cat|wc -l<infile`, would
73 NOT work in a shell since you would be giving wc -l two inputs.
75 @section parexs Parallel Execution Syntax
77 Use mpi('exename') to select your executable, use [] to set arguments,
78 use multiplication to set the number of ranks and use addition to
79 combine different executables together into a multiple program
80 multiple data (MPMD) MPI program.
82 Run ten copies of ls -l:
84 run(mpirun(mpiserial(('ls')['-l'])*10))
87 Run HyCOM coupled HWRF: one wm3c.exe, 30 hycom.exe and 204 wrf.exe:
89 run(mpirun(mpi('wm3c.exe') + mpi('hycom.exe')*30 + mpi('wrf.exe')*204))
92 You can set environment variables, pipe MPI output and handle
93 redirection using the mpirun() function, which converts MPI programs
94 into an bigexe()-style object (Runner):
98 result=$( mpirun -n 30 hostname | sort -u | wc -l )
103 result=runstr( mpirun(mpi('hostname')*30) | exe['sort']['-u'] | exe['wc']['-l'] )
106 @section aliases Aliases
108 If you find yourself frequently needing the same command, or you need
109 to store a command for multiple uses, then then you should define an
110 alias. Let's say you want "long output" format Japanese language "ls"
114 exe('ls')['-l','/path/to/dir'].env(LANG='JP')
117 but you find yourself running that on many different directories.
118 Then you may want to make an alias:
121 jplsl=alias(exe('ls')['-l'].env(LANG='JP'))
124 The return value jplsl can be treated as an exe()-like return value
125 since it was from exe() originally, but any new arguments will be
126 appended to the original set:
129 run(jplsl['/path/to/dir'])
132 Note that if we did this:
134 badlsl=exe('ls')['-l'].env(LANG='JP') # Bad! No alias!
135 run(badlsl['/']) # will list /
136 run(badlsl['/home']) # will list / and /home
137 run(badlsl['/usr/bin']) # will list / /home and /usr/bin
139 goodlsl=alias(exe('ls')['-l'].env(LANG='JP')
140 run(goodlsl['/']) # will list /
141 run(goodlsl['/home']) # will list /home
142 run(goodlsl['/usr/bin']) # will list /usr/bin
145 Then the run(badlsl['/home']) would list /home AND / which is NOT what
146 we want. Why does it do that? It is because badlsl is not an alias
147 --- it is a regular output from exe(), so every time we call its []
148 operator, we add an argument to the original command. When we call
149 alias() it returns a copy-on-write version (goodlsl), where every call
150 to [] creates a new object.
152 Note that alias() also works with pipelines, but most operations will
153 only modify the last the command in the pipeline (or the first, for
154 operations that change stdin).
166 __all__=[
'alias',
'exe',
'run',
'runstr',
'mpi',
'mpiserial',
'mpirun',
167 'runbg',
'prog',
'mpiprog',
'mpiimpl',
'waitprocs',
'runsync',
168 'InvalidRunArgument',
'ExitStatusException',
'checkrun',
173 module_logger=logging.getLogger(
'produtil.run')
176 """!Raised to indicate that an invalid argument was sent into one
177 of the run module functions."""
180 """!Raised to indicate that a program generated an invalid return
183 Examine the "returncode" member variable for the returncode value.
184 Negative values indicate the program was terminated by a signal
185 while zero and positive values indicate the program exited. The
186 highest exit status of the pipeline is returned when a pipeline is
189 For MPI programs, the exit status is generally unreliable due to
190 implementation-dependent issues, but this package attempts to
191 return the highest exit status seen. Generally, you can count on
192 MPI implementations to return zero if you call MPI_Finalize() and
193 exit normally, and non-zero if you call MPI_Abort with a non-zero
194 argument. Any other situation will produce unpredictable results."""
202 """!ExitStatusException constructor
203 @param message a description of what went wrong
204 @param status the exit status"""
210 """!An alias for self.returncode: the exit status."""
214 """!A string description of the error."""
217 """!A pythonic description of the error for debugging."""
221 """!Attempts to generate an unmodifiable "copy on write" version
222 of the argument. The returned copy will generate a modifiable
223 duplicate of itself if you attempt to change it.
224 @returns a produtil.prog.ImmutableRunner
225 @param arg a produtil.prog.Runner or produtil.prog.ImmutableRunner"""
229 arg.make_runners_immutable()
232 raise InvalidRunArgument(
'Arguments to alias() must be Runner objects (such as from exe()) or MPIRanksBase objects (such as from mpi() or mpiserial()). Got: %s'%(repr(arg),))
235 """!Returns a prog.ImmutableRunner object that represents a small
236 serial program that can be safely run on a busy batch node.
237 @param name the executable name or path
238 @param kwargs passed to produtil.prog.Runner.__init__
239 @returns a new produtil.prog.ImmutableRunner"""
243 """!Returns a prog.ImmutableRunner object that represents a large
244 serial program that must be run on a compute node.
245 @note This function does NOT search $PATH on Cray. That ensures
246 the $PATH will be expanded on the compute node instead. Use
247 produtil.fileop.find_exe() if you want to explicitly search the
248 PATH before execution.
249 @param name the executable name or path
250 @param kwargs passed to produtil.prog.Runner.__init__
251 @returns a new produtil.prog.ImmutableRunner"""
252 return mpiimpl.make_bigexe(str(name),**kwargs)
255 """!Alias for exe() for backward compatibility. Use exe() instead."""
256 return exe(name,**kwargs)
259 """!Converts an MPI program specification into a runnable shell
260 program suitable for run(), runstr() or checkrun().
263 * allranks=True --- to run on all available MPI ranks. This cannot be
264 used if a specific number of ranks (other than 1) was requested in
266 * logger=L --- a logging.Logger for log messages
267 * Other platform-specific arguments. See produtil.mpi_impl for details.
269 @param arg the mpiprog.MPIRanksBase describing the MPI program to
270 run. This is the output of the mpi() or mpiserial() function.
271 @param kwargs additional arguments to control output.
272 @returns a prog.Runner object for the specified
273 mpiprog.MPIRanksBase object."""
274 return mpiimpl.mpirunner(arg,**kwargs)
277 """!This internal implementation function generates a
278 prog.PopenCommand object for the specified input, which may be a
279 prog.Runner or mpiprog.MPIRanksBase.
280 @param arg the produtil.prog.Runner to convert. This is the
281 output of exe(), bigexe() or mpirun()
282 @param capture if True, capture the stdout into a string
283 @param kwargs additional keyword arguments, same as for mpirun()"""
287 runner=mpiimpl.mpirunner(arg,**kwargs)
290 'Can only run a Runner object (such as from exe()) or an '
291 'MPIRanksBase object (such as from mpi() or mpiserial()). '
292 'Got: %s'%(repr(arg),))
294 if 'logger' in kwargs: logger=kwargs[
'logger']
295 if logger
is not None:
296 logger.info(
'Starting: %s'%(repr(arg),))
297 if capture: logger.info(
' - and will capture output.')
299 if logger
is not None:
300 logger.debug(
'Pipeline is %s'%(repr(pl),))
303 def runbg(arg,capture=False,**kwargs):
304 """!Not implemented: background execution
306 Runs the specified process in the background. Specify
307 capture=True to capture the command's output. Returns a
308 produtil.prog.PopenCommand. Call poll() to determine process
309 completion, and use the stdout_data property to get the output
310 after completion, if capture=True was specified.
312 @bug produtil.run.runbg() is not implemented
314 @warning this is not implemented
316 @param arg the produtil.prog.Runner to execute (output of
317 exe(), bigexe() or mpirun()
318 @param capture if True, capture output
319 @param kwargs same as for mpirun()"""
324 def waitprocs(procs,logger=None,timeout=None,usleep=1000):
325 """!Not implemented: background process monitoring
327 Waits for one or more backgrounded processes to complete. Logs to
328 the specified logger while doing so. If a timeout is specified,
329 returns False after the given time if some processes have not
330 returned. The usleep argument is the number of microseconds to
331 sleep between checks (can be a fraction). The first argument,
332 procs specifies the processes to check. It must be a
333 produtil.prog.Pipeline (return value from runbg) or an iterable
334 (list or tuple) of such.
336 @bug produtil.run.waitprocs() is untested
338 @warning This is not tested and probably does not work.
340 @param procs the processes to watch
341 @param logger the logging.Logger for log messages
342 @param timeout how long to wait before giving up
343 @param usleep sleep time between checks"""
345 if isinstance(procs,produtil.prog.PopenCommand):
350 if logger
is not None: logger.info(
"Wait for: %s",repr(p))
356 if logger
is not None:
357 logger.info(
"%s returned %s"%(repr(proc),repr(ret)))
358 elif logger
is not None and usleep>4.99999e6:
360 logger.info(
"%s is still running"%(repr(proc),))
366 if usleep>4.99999e6
and logger
is not None:
368 logger.info(
"... sleep %f ..."%(float(usleep/1.e6),))
369 time.sleep(usleep/1.e6)
370 return False if(p)
else True
373 """!Runs the "sync" command as an exe()."""
374 return mpiimpl.runsync(logger=logger)
376 def run(arg,logger=None,sleeptime=None,**kwargs):
377 """!Executes the specified program and attempts to return its exit
378 status. In the case of a pipeline, the highest exit status seen
379 is returned. For MPI programs, exit statuses are unreliable and
380 generally implementation-dependent, but it is usually safe to
381 assume that a program that runs MPI_Finalize() and exits normally
382 will return 0, and anything that runs MPI_Abort(MPI_COMM_WORLD)
383 will return non-zero. Programs that exit due to a signal will
384 return statuses >255 and can be interpreted with WTERMSIG,
386 @param arg the produtil.prog.Runner to execute (output of
387 exe(), bigexe() or mpirun()
388 @param logger a logging.Logger to log messages
389 @param sleeptime time to sleep between checks of child process
390 @param kwargs ignored"""
392 p.communicate(sleeptime=sleeptime)
394 if logger
is not None:
395 logger.info(
' - exit status %d'%(int(result),))
399 """!This is a simple wrapper round run that raises
400 ExitStatusException if the program exit status is non-zero.
402 @param arg the produtil.prog.Runner to execute (output of
403 exe(), bigexe() or mpirun()
404 @param logger a logging.Logger to log messages
405 @param kwargs The optional run=[] argument can provide a different
406 list of acceptable exit statuses."""
407 r=
run(arg,logger=logger)
408 if kwargs
is not None and 'ret' in kwargs:
409 if not r
in kwargs[
'ret']:
416 """!Sets the number of OpenMP threads for the specified program.
418 @warning Generally, when using MPI with OpenMP, the batch system
419 must be configured correctly to handle this or unexpected errors
422 @param arg The "arg" argument must be from mpiserial, mpi, exe or
425 @param threads The optional "threads" argument is an integer number of
426 threads. If it is not specified, the maximum possible number of
427 threads will be used. Note that using threads=None with
428 mpirun(...,allranks=True) will generally not work unless the batch
429 system has already configured the environment correctly for an
430 MPI+OpenMP task with default maximum threads and ranks.
431 @returns see run()"""
432 return mpiimpl.openmp(arg,threads)
435 """!Executes the specified program or pipeline, capturing its
436 stdout and returning that as a string.
438 If the exit status is non-zero, then NonZeroExit is thrown.
442 runstr(exe('false'),ret=(1))
445 succeeds if "false" returns 1, and raises ExitStatusError otherwise.
447 @param arg The "arg" argument must be from mpiserial, mpi, exe or
449 @param logger a logging.Logger for logging messages
450 @param kwargs You can specify an optional list or tuple "ret" that
451 contains an alternative list of valid return codes. All return
452 codes are zero or positive: negative values represent
453 signal-terminated programs (ie.: SIGTERM produces -15, SIGKILL
454 produces -9, etc.) """
458 if kwargs
is not None and 'ret' in kwargs:
459 if not r
in kwargs[
'ret']:
466 """!Returns an MPIRank object that represents the specified MPI
468 @param arg the MPI program to run
469 @param kwargs logger=L for a logging.Logger to log messages"""
473 """!Generates an mpiprog.MPISerial object that represents an MPI
474 rank that executes a serial (non-MPI) program. The given value
475 MUST be from bigexe() or exe(), NOT from mpi().
476 @param arg the MPI program to run
477 @param kwargs logger=L for a logging.Logger to log messages"""
def waitprocs
Not implemented: background process monitoring.
def mpirun(arg, kwargs)
Converts an MPI program specification into a runnable shell program suitable for run(), runstr() or checkrun().
Sets up signal handlers to ensure a clean exit.
This class is a wrapper around launch and manage.
def checkrun(arg, logger=None, kwargs)
This is a simple wrapper round run that raises ExitStatusException if the program exit status is non-...
def run(arg, logger=None, sleeptime=None, kwargs)
Executes the specified program and attempts to return its exit status.
def __init__(self, message, status)
ExitStatusException constructor.
def openmp
Sets the number of OpenMP threads for the specified program.
Implements the produtil.run: provides the object tree for representing shell commands.
def __str__(self)
A string description of the error.
Represents a single MPI rank.
def __repr__(self)
A pythonic description of the error for debugging.
Converts a group of MPI ranks to a runnable command.
Base class of exceptions raised when a Runner is given arguments that make no sense.
def alias(arg)
Attempts to generate an unmodifiable "copy on write" version of the argument.
returncode
The return code, including signal information.
Raised to indicate that an invalid argument was sent into one of the run module functions.
def mpiserial(arg, kwargs)
Generates an mpiprog.MPISerial object that represents an MPI rank that executes a serial (non-MPI) pr...
Raised to indicate that a program generated an invalid return code.
Represents a single rank of an MPI program that is actually running a serial program.
Object structure for describing MPI programs.
def runbg(arg, capture=False, kwargs)
Not implemented: background execution.
def runstr(arg, logger=None, kwargs)
Executes the specified program or pipeline, capturing its stdout and returning that as a string...
def make_pipeline(arg, capture, kwargs)
This internal implementation function generates a prog.PopenCommand object for the specified input...
This is the abstract superclass of all classes that represent one or more MPI ranks, including MPI ranks that are actually serial programs.
def status(self)
An alias for self.returncode: the exit status.
def exe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a large serial program that must be run on a co...
Represents a single stage of a pipeline to execute.
def runsync
Runs the "sync" command as an exe().
message
A string description for what went wrong.
Internal module that launches and monitors processes.
def mpi(arg, kwargs)
Returns an MPIRank object that represents the specified MPI executable.
An copy-on-write version of Runner.
def bigexe(name, kwargs)
Alias for exe() for backward compatibility.
def batchexe(name, kwargs)
Returns a prog.ImmutableRunner object that represents a small serial program that can be safely run o...