22 """!Runs the hwrf.gribtask.GRIBTask on one thread."""
23 jlogger.info(hwrf_expt.conf.strinterp(
'config',
24 '{stormlabel}: starting regribbing job for {out_prefix}'))
25 with
NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger())
as t:
26 hwrf_expt.gribber.uncomplete()
28 hwrf_expt.gribber.run()
29 jlogger.info(hwrf_expt.conf.strinterp(
'config',
30 '{stormlabel}: completed regribbing job for {out_prefix}'))
33 """!Runs the hwrf.tracker.TrackerTask on one thread
34 @param n the domain of interest: 1 2 or 3"""
35 jlogger.info(hwrf_expt.conf.strinterp(
'config',
36 '{stormlabel}: starting domain {dom} tracker job for {out_prefix}',
38 with
NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger())
as t:
40 hwrf_expt.tracker.run()
42 hwrf_expt.trackerd02.run()
44 hwrf_expt.trackerd01.run()
45 jlogger.info(hwrf_expt.conf.strinterp(
46 'config',
'{stormlabel}: completed domain {dom} tracker job '
47 'for {out_prefix}',dom=n))
50 """!Runs the hwrf.copywrf.WRFCopyTask to copy native WRF input and
51 output files to COM, and then runs the gribber(). Does this on one thread."""
52 post_runs_copier=hwrf_expt.conf.getbool(
53 'config',
'post_runs_wrfcopier',
False)
54 if not post_runs_copier:
55 jlogger.info(hwrf_expt.conf.strinterp(
'config',
56 '{stormlabel}: starting wrfcopier job for {out_prefix}'))
57 with
NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger())
as t:
58 hwrf_expt.wrfcopier.run(check_all=
True)
59 jlogger.info(hwrf_expt.conf.strinterp(
'config',
60 '{stormlabel}: completed wrfcopier job for {out_prefix}'))
62 jlogger.info(
'Products job will not run wrfcopier, post will do it.')
66 """!Runs the hwrf.nhc_products.NHCProducts on multiple threads."""
67 jlogger.info(hwrf_expt.conf.strinterp(
'config',
68 '{stormlabel}: starting nhc_products job for {out_prefix}'))
69 with
NamedDir(hwrf_expt.WORKhwrf,logger=logging.getLogger())
as t:
71 jlogger.info(hwrf_expt.conf.strinterp(
'config',
72 '{stormlabel}: completed nhc_products job for {out_prefix}'))
75 wave_flag=hwrf_expt.conf.getbool(
'config',
'run_wave')
76 wave=hwrf_expt.conf.getstr(
'config',
'wave_model')
77 if not wave_flag
or wave!=
'WW3':
78 jlogger.info(hwrf_expt.conf.strinterp(
'config',
79 '{stormlabel}: no wave model; skip wave_post job for {out_prefix}'))
81 jlogger.info(hwrf_expt.conf.strinterp(
'config',
82 '{stormlabel}: starting wave_post job for {out_prefix}'))
83 hwrf_expt.ww3post.run()
84 jlogger.info(hwrf_expt.conf.strinterp(
'config',
85 '{stormlabel}: completed wave_post job for {out_prefix}'))
88 """!Main program for subprocesses. Calls tracker() for the last
89 one or three threads (depending on whether extra_trackers=yes in
90 the [config] section). Calls copier() for the last non-tracker
91 rank. Calls the gribber() on all other ranks."""
93 myrank=int(os.environ[
'SCR_COMM_RANK'])
94 count=int(os.environ[
'SCR_COMM_SIZE'])
95 logger=conf.log(
'exhwrf_products')
96 extra_trackers=conf.getbool(
'config',
'extra_trackers',
False)
100 for rank
in xrange(count):
102 logger.info(
'Rank %d runs d03 tracker'%rank)
106 elif rank==1
and extra_trackers:
107 logger.info(
'Rank %d runs d02 tracker'%rank)
111 elif rank==2
and extra_trackers:
112 logger.info(
'Rank %d runs d01 tracker'%rank)
117 logger.info(
'Rank %d runs wrfcopier'%rank)
121 whoami=
'copier%d'%ncopiers
123 logger.info(
'Rank %d runs gribber'%rank)
127 whoami=
'gribber%d'%ngribbers
128 if ncopiers<1
or ngribbers<2:
130 if extra_trackers: need+=2
131 msg=
'Cannot run products job with %d processors with these settings.'\
132 ' I require at least %d.'%(count,need)
141 """!This is run multiple times in parallel, once in each
142 subprocess. It sets up the environment and logging settings and
143 then runs the starter() function."""
144 rank=int(os.environ[
'SCR_COMM_RANK'])
145 count=int(os.environ[
'SCR_COMM_SIZE'])
146 print 'MPI communicator: rank=%d size=%d'%(rank,count)
150 subdict={
'RANK':rank,
'COUNT':count,
'WHO':
'regribber',
152 'WORKhwrf':hwrf_expt.conf.getdir(
'WORKhwrf') }
155 subdict[
'THREAD_WHOAMI']=whoami
157 if whoami.find(
'tracker')>=0:
159 if 'TRACKER_LOGS' in os.environ:
160 r=os.environ.get(
'TRACKER_LOGS')
162 r=hwrf_expt.conf.strinterp(
163 'config',
'%(WORKhwrf)s/%(jobid)s-%(THREAD_WHOAMI)s.log')
164 rstdout=r % dict(subdict, WHO=
'tracker', STREAM=
'out')
165 rstderr=r % dict(subdict, WHO=
'tracker', STREAM=
'err')
167 threadname=
'tracker')
170 if 'REGRIBBER_LOGS' in os.environ:
171 r=os.environ[
'REGRIBBER_LOGS']
173 r=hwrf_expt.conf.strinterp(
175 '%(WORKhwrf)s/%(jobid)s-%(THREAD_WHOAMI)s.log',
177 rstdout=r % dict(subdict, WHO=
'regribber', STREAM=
'out')
178 rstderr=r % dict(subdict, WHO=
'regribber', STREAM=
'err')
179 logging.getLogger(
'hwrf').warning(
180 'Redirecting regribber %d to: stderr=%s stdout=%s'%
181 ( rank, rstderr, rstdout ))
183 threadname=
'regrib%d'%(rank,))
187 """!Launches an MPI program that will call this script in multiple
188 threads using the mpiserial program."""
190 os.environ[
'LAUNCH_SELF']=
'no'
193 checkrun(mpirun(mpi(hwrf_expt.conf.getexe(
'mpiserial',
'mpiserial'))
194 [os.path.realpath(__file__)],allranks=
True))
199 """!Main entry point. Slave processes (launched by mpiserial)
200 just call slave_main to pass control on to tracker(), gribber() or
201 copier(). The main process (which calls mpiserial) will wait for
202 mpiserial to exit, and then run the products() function."""
204 if 'SCR_COMM_RANK' not in os.environ \
205 and os.environ.get(
'LAUNCH_SELF',
'yes')==
'yes':
211 logger=logging.getLogger(
'exhwrf_products')
213 logger.info(
'Ensure incomplete products are marked as such...')
214 hwrf_expt.gribber.uncomplete()
215 logger.info(
'Add alerts and delveries...')
219 logger.warning(
'''Rerunning dbn_alert for prior jobs' posted files.''')
220 hwrf_expt.gribber.call_completed_callbacks()
224 logger.warning(
'---------------------------------------------------')
225 logger.warning(
'LAUNCH PARALLEL PORTION OF SCRIPT------------------')
226 logger.warning(
'---------------------------------------------------')
228 logger.warning(
'---------------------------------------------------')
229 logger.warning(
'PARALLEL PORTION OF SCRIPT HAS ENDED---------------')
230 logger.warning(
'---------------------------------------------------')
233 if hwrf_expt.fcstlen == 126:
236 logger.info(
'Forecast length is: %d ; Not running the products job.'%hwrf_expt.fcstlen)
Change directory, handle temporary directories.
Contains setup(), which initializes the produtil package.
def gribber()
Runs the hwrf.gribtask.GRIBTask on one thread.
def starter(dryrun)
Main program for subprocesses.
Sets up signal handlers to ensure a clean exit.
def tracker(n)
Runs the hwrf.tracker.TrackerTask on one thread.
def doit()
Main entry point.
def init_module
Initializes the HWRF object structure.
def copier()
Runs the hwrf.copywrf.WRFCopyTask to copy native WRF input and output files to COM, and then runs the gribber().
A shell-like syntax for running serial, MPI and OpenMP programs.
def setup(ignore_hup=False, dbnalert_logger=None, jobname=None, cluster=None, send_dbn=None, thread_logger=False, thread_stack=2 **24, kwargs)
Initializes the produtil package.
def set_vars_for_products(logger)
Sets variables to speed up the products job on WCOSS.
def launchself()
Launches an MPI program that will call this script in multiple threads using the mpiserial program...
This subclass of TempDir takes a directory name, instead of generating one automatically.
def add_wave_alerts()
DBN ALERTS #########################################################.
def add_regrib_alerts()
Adds dbn alerts for GRIB products by adding DBNAlert objects to the hwrf_expt.gribber.
def jobid
Get the batch job ID.
def add_tracker_alerts()
Adds dbn alerts for the tracker and requests delivery of the tracker to NHC deck locations.
def mpi_redirect
Used to split to multiple logging streams.
def add_nhc_alerts()
Adds dbn alerts for the hwrf_nhc_products program's output by adding DBNAlert objects to the hwrf_exp...
def slave_main()
This is run multiple times in parallel, once in each subprocess.
def products()
Runs the hwrf.nhc_products.NHCProducts on multiple threads.