2 """!Obtains input data needed by various subclasses of
3 hwrf.hwrftask.HWRFTask.
5 This module implements the functionality described in
6 hwrf.hwrftask.HWRFTask.inputiter(). It takes many HWRF tasks, asks
7 them what input is needed and collates that information. It has a
8 list of many possible input sources, and knows which ones are
9 available from which cluster. It goes through available input sources
10 in priority order, obtaining the input data."""
14 __all__=[
"DataCatalog",
"InputSource",
'in_date_range']
16 import collections, os, ftplib, tempfile, ConfigParser, urlparse, stat, \
17 re, threading, time, datetime, StringIO
22 from produtil.run import alias, batchexe, checkrun, ExitStatusException, run
23 from produtil.fileop import deliver_file, isnonempty, make_symlink, makedirs
24 from hwrf.numerics import to_datetime, to_datetime_rel, to_timedelta
30 """!Is this time in the given time range?
32 @param t A time as a ten digit number. For example, 1830123118 is
33 December 31, 1830 at 18:00 UTC.
34 @param trange A comma-separated list of time ranges such as
36 * 2015081412 --- 12:00 UTC on August 14, 2015
37 * 2015081412-2015082318 --- From 12:00 UTC on August 14, 2015
38 through 18:00 UTC on August 23, 2015
39 * 2015081412-2015082318,2011010100-2011123123 --- From 12:00 UTC
40 on August 14, 2015 through 18:00 UTC on August 23, 2015 and all
42 @returns True if t falls in the range trange, or False otherwise."""
43 epsilon=to_timedelta(
'1800')
45 for tr
in trange.split(
','):
50 if t>=to_datetime_rel(-epsilon,start) \
51 and t<=to_datetime_rel(epsilon,start):
55 start=to_datetime(tr[0:10])
56 end=to_datetime(tr[idash+1:idash+11])
57 if t>=to_datetime_rel(-epsilon,start) \
58 and t<=to_datetime_rel(epsilon,end):
65 """!Convenience function that opens a temporary file using
66 tempfile.NamedTemporaryFile."""
68 return tempfile.NamedTemporaryFile(prefix=os.path.basename(f),
69 dir=os.path.dirname(f),
70 mode=m,suffix=
'.tmp',delete=
False)
75 """!Makes a string version of a dataset+item dict as produced by
76 hwrf_expt.inputiter() or hwrf.hwrftask.HWRFTask.inputiter()"""
78 s.write(
"%s(%s"%(d.get(
"dataset",
"(**no*dataset**)"),
79 d.get(
"item",
"(**no*item**)")))
80 for k
in sorted(list(d.iterkeys())):
81 if k==
'dataset' or k==
'item':
continue
83 if isinstance(v,datetime.datetime):
84 s.write(
', %s=%s'%(str(k),v.strftime(
'%Y-%m-%d_%H:%M:%S')))
86 s.write(
', %s=%s'%(str(k),repr(v)))
93 """!Makes a string version of a dataset+item dict as produced by
94 hwrf_expt.inputiter() or hwrf.hwrftask.HWRFTask.inputiter()"""
96 s.write(
"%s(%s"%(d.get(
"dataset",
"(**no*dataset**)"),
97 d.get(
"item",
"(**no*item**)")))
98 for k
in sorted(list(d.iterkeys())):
99 if k==
'dataset' or k==
'item':
continue
101 if isinstance(v,datetime.datetime):
102 s.write(
', %s=%s'%(str(k),v.strftime(
'%Y-%m-%d_%H:%M:%S')))
104 s.write(
', %s=%s'%(str(k),repr(v)))
110 """!Provides the location of a file in an archive, on disk or on a
111 remote server via sftp or ftp.
113 This class is a collection of functions that know how to provide
114 the location of a file in either an archive or a filesystem. It
115 does not know how to actually obtain the file. This serves as the
116 underlying "where is that file" implementation of InputSource.
117 All of this is driven by a section in an hwrf.config.HWRFConfig
120 For example, suppose one set up this configuration file:
123 # WCOSS: Input locations for the production HWRF
124 gfs = /com/gfs/prod/gfs.{aYMD}/
125 gdas1 = /com/gfs/prod/gdas.{aYMD}/
126 gfs_sf = gfs.t{aHH}z.sf{fahr:02d}
127 gfs_sfcanl = gfs.t{aHH}z.sfcanl
128 gdas1_bufr = gdas1.t{aHH}z.{obstype}.tm00.bufr_d
131 In this example, "gfs" is a dataset, while "gfs_sfcanl" is an item
132 in the dataset. The DataCatalog.locate() function can find the
133 location of a gfs_sf file given the inputs required for string
134 expansion by hwrf.config.HWRFConfig.timestrinterp(). In this
135 case, only the analysis time is required for the "{aYMD}" in the
136 dataset location and "{aHH}" in the gfs_sfcanl filename.
138 dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
139 sfcanl=dc.locate("gfs","gfs_sfcanl")
142 That code would print "/com/gfs/prod/gfs.20150818/gfs.t00z.sfcanl"
143 which is the operational output path of the GFS surface analysis
144 file for the analysis time in question.
146 Suppose we wanted the spectral forecast file, "gfs_sf" instead,
147 for forecast hour 54. That also requires the forecast time
148 ("ftime") in order to fill in the "{fahr:02d}" in the filename
151 dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
152 sf48a=dc.locate("gfs","gfs_sf",ftime="2015092006")
153 sf48b=dc.locate("gfs","gfs_sf",ftime=48*3600)
157 That code would print "/com/gfs/prod/gfs.20150818/gfs.t00z.sf54"
158 twice. Note that you can specify the forecast time as an absolute
159 time, or as a number of seconds relative to the analysis time and
160 achieve the same effect either way.
162 If we want the bufr file, we have to provide one more piece of
163 information: the observation type, to fill in "{obstype}".
165 dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
166 gpm=dc.locate("gdas1","gdas1_bufr",obstype="gpm")
169 which prints "/com/gfs/prod/gdas.20150918/gdas1.t00z.gpm.tm00.bufr_d"
172 """!DataCatalog constructor
173 @param conf the configuration object, an hwrf.config.HWRFConfig
174 @param section the section that provides location information
175 @param anltime the default analysis time """
177 if not isinstance(section,basestring):
178 raise TypeError(
'In DataCatalog.__init__, section must be a '
193 """!A string representation of this DataCatalog"""
194 if isinstance(self.
anltime,datetime.datetime):
195 stime=self.anltime.strftime(
'%Y%m%d%H')
198 return "DataCatalog(conf,%s,%s)"%(repr(self.
section), stime)
200 """!Is this dataset updated in real-time?
202 @returns True if this dataset is updated in real-time, False
203 otherwise. By default, this will return True if
204 conf[section,"rt_updated"] is set to "yes" or False otherwise."""
206 return conf.getbool(section,
'rt_updated',
False)
207 except ( ConfigParser.Error,KeyError,TypeError,ValueError )
as e:
209 def parse(self,string,atime=None,ftime=None,logger=None,dates=None,
211 """!Internal function that performs string interpolation.
213 This is an internal implementation function that you should
214 not call directly. It performs string interpolation using the
215 underlying conf object. This acts exactly like the expansions
216 done in the hwrf.conf file: {stuff} is expanded to the
217 contents of the "stuff" variable. Expansions are done in the
218 section specified in the constructor. In addition, various a*
219 and f* variables are expanded based on the analysis time
220 ("atime") and forecast time ("ftime"). See
221 hwrf.config.HWRFConfig.timestrinterp() for details.
222 @param string the string being expanded
223 @param atime Optional: the analysis time. The default is self.anltime
224 @param ftime Optional: the forecast time.
225 @param logger Optional: a logging.Logger for log messages
226 @param dates Optional: dates for which this datasource is valid.
227 This is passed to in_date_range() for validation. This is
228 used to implement the InputSource date ranges.
229 @param kwargs Additional keyword arguments are passed to the
230 hwrf.config.HWRFConfig.timestrinterp() for string replacement.
231 @returns The return value from string interpolation or None if
232 nothing was found."""
234 if logger
is not None:
236 '{%s}: has no atime. Will use atime=self.anltime=%s.'%(
237 str(string),repr(atime)))
240 if logger
is not None:
241 logger.info(
'{%s}: has no ftime. Will use ftime=atime=%s.'%(
242 str(string),repr(atime)))
244 atime=to_datetime(atime)
245 ftime=to_datetime_rel(ftime,atime)
246 if dates
is not None and atime
is not None:
248 if logger
is not None:
249 logger.info(
'{%s}: atime %s not in %s'%(
250 str(string),str(atime),str(dates)))
252 if logger
is not None:
254 'parsing {%s} with ftime=%s atime=%s in section %s'
255 %(str(string),repr(ftime),repr(atime),repr(self.
section)))
256 return self.conf.timestrinterp(
257 self.
section,
"{"+string+
"}",ftime,atime,**kwargs)
258 def locate(self,dataset,item,atime=None,ftime=None,logger=None,
259 dates=
None,**kwargs):
260 """!Find the location of a requested piece of data.
262 Locates the specified item for the specified dataset, at the
263 given analysis time ("atime") and forecast time ("ftime"). If
264 the requested data is known to not exist, returns None. This
265 should be overridden by subclasses. The present
266 implementation just does this: {dataset}/{item} expanding
267 dataset and item with self.parse. Any kwargs are passed
268 along: this allows such things as ensemble ID, or switching
269 between GRIB1 or GRIB2 via a keyword argument.
270 @param dataset The name of the dataset.
271 @param item The name of the item in the dataset.
272 @param atime Optional: the analysis time. The default is self.anltime.
273 @param ftime Optional: the forecast time which can be anything
274 accepted by hwrf.numerics.to_datetime_rel() relative to the
276 @param logger Optional: a logging.Logger for log messages. If this
277 is provided, several steps along the way of finding the data
279 @param dates Optional: dates for which this datasource is valid.
280 This is passed to in_date_range() for validation. This is
281 used to implement the InputSource date ranges.
282 @param kwargs Additional keyword arguments are passed by
283 parse() to the hwrf.config.HWRFConfig.timestrinterp() for
285 @return The path to the requested data or None if it is not found."""
286 if logger
is not None:
288 'locate item=%s atime=%s ftime=%s in dataset=%s'
289 %(repr(item),repr(atime),repr(ftime),repr(dataset)))
290 ds=self.
parse(dataset,atime=atime,ftime=ftime,logger=logger,
291 dates=dates,**kwargs)
292 if ds
is None:
return None
293 it=self.
parse(item,atime=atime,ftime=ftime,logger=logger,**kwargs)
295 if logger
is not None:
296 logger.info(
'result %s %s => %s'%(
297 repr(ds),repr(it),repr(result),))
302 """!Fetch data from multiple sources.
304 This class knows how to fetch data from remote clusters, or the
305 local machine. The data locations are specified by a several
306 DataCatalog sections, each of which is given a priority, a valid
307 set of dates and a file transfer mechanism. Data catalogs are
308 tried in priority order. Files are obtained in multiple threads
309 at once, and several file transfer mechanisms are understood:
311 * file:// --- obtain files on disk
312 * ftp:// --- contact an FTP server
313 * sftp:// --- contact a server over SSH. SSH-based rsync is used.
314 * htar:// --- use the proprietary htar program to get a tape archive
316 However, only one DataCatalog is examined at a time. All threads
317 work on that one DataCatalog until all data that can be obtained
318 from it is done. Then the threads exit, and new ones are spawned
319 to examine the next DataCatalog.
321 For example, suppose you are on the Jet supercomputer running a
322 HISTORY (retrospective) simulation. You set up this configuration
323 section in your hwrf.conf config file:
325 [jet_sources_prod2014]
326 jet_hist_PROD2014%location = file:///
327 jet_hist_PROD2014%histprio=90
328 jet_hist_PROD2014%fcstprio=90
330 prod15_data_sp%location=htar://
331 prod15_data_sp%histprio=59
332 prod15_data_sp%dates=2015011218-2015123118
336 inputroot2014=/lfs3/projects/hwrf-data/hwrf-input
337 gfs={inputroot2014}/HISTORY/GFS.{aYYYY}/{aYMDH}/
338 gfs_sfcanl = gfs.t{aHH}z.sfcanl
341 inputroot=/NCEPPROD/2year/hpssprod/runhistory/rh{aYYYY}/{aYYYY}{aMM}/{aYMD}
343 gfs_sfcanl = {gfs_tar}#./gfs.t{aHH}z.sfcanl
346 inputroot=/pan2/projects/hwrfv3/John.Doe/hwrfdata
347 gfs={inputroot}/hwrf.{aYMDH}/
348 gfs_sfcanl = gfs.t{aHH}z.sfcanl
350 and this is the code:
352 is=InputSource(conf,"jet_sources_prod2014","2015071806")
353 hwrfdata=DataCatalog(conf,"hwrfdata")
355 {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071800"},
356 {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071806"},
357 {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071812"} ],
358 hwrfdata,realtime=False)
361 In this example, the InputSource will look for three GFS surface
362 analysis files. It will search two possible locations for them:
363 the on-disk Jet "PROD2014" history location and the NCO production
364 tape files. The disk location will be searched first because its
365 history priority is 90, while the tape area has a priority of 59.
367 Three files will show up eventually:
369 * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071800/gfs.t00z.sfcanl
370 * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071806/gfs.t06z.sfcanl
371 * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071812/gfs.t12z.sfcanl
373 Each file will come from either here:
375 * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071800/gfs.t00z.sfcanl
376 * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071806/gfs.t06z.sfcanl
377 * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071812/gfs.t12z.sfcanl
381 * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071800gfs.tar ./gfs.t00z.sfcanl
382 * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071806gfs.tar ./gfs.t06z.sfcanl
383 * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071812gfs.tar ./gfs.t12z.sfcanl """
384 def __init__(self,conf,section,anltime,htar=None,logger=None,hsi=None):
385 """!InputSource constructor.
386 @param conf the hwrf.config.HWRFConfig to use for
388 @param section the section that specifies the list of data catalogs
389 @param anltime the default analysis time
390 @param htar the produtil.prog.Runner that runs htar
391 @param logger a logging.Logger for log messages
392 @param hsi the produtil.prog.Runner that runs hsi"""
399 return collections.defaultdict(none)
406 self.
locks=collections.defaultdict(threading.Lock)
407 assert(htar
is not None)
408 assert(hsi
is not None)
411 self.
valid=collections.defaultdict(
None)
414 if conf.has_option(section,
'@inc'):
415 sections.extend(conf[section,
'@inc'].split(
','))
417 sources=collections.defaultdict(dict)
419 for key
in conf.keys(sec):
422 (src,attr)=(key[0:c],key[c+1:])
424 sources[src][attr]=conf.get(sec,key)
425 except KeyError
as ke:
426 if logger
is not None:
427 logger.warning(
"[%s] %s: key error: %s"%(
431 for (src,attr)
in sources.iteritems():
432 if 'location' in attr
and (
'histprio' in attr
or \
434 dctype=attr.get(
'type',
'DataCatalog')
435 if dctype==
'DataCatalog':
439 'Do not know how to make a DataCatalog of type "%s"'
444 dates=
'1970010100-2038011818'
445 self.
add(dc,location=attr[
'location'],
446 fcstprio=attr.get(
'fcstprio',
None),
447 histprio=attr.get(
'histprio',
None),
450 logger.warning(
'Bad source %s: must have location and either histprio or fcstprio.'%(src,))
454 'Input sources must ahve location and either histprio or '
455 'fcstprio. Check options in [%s]: %s and rerun launcher '
456 'job.'%(self.
section,
', '.join(bad)))
485 def _rsync_ssh_exe(self,netpart,path=None,checkdir='/',dest=None):
486 """!Creates a produtil.prog.Runner for running rsync over ssh.
488 Returns a Runner object (as in produtil.run) for executing
489 rsync -e ssh. This subroutine is used to implement
490 workarounds for known bugs.
491 @param netpart The netpart portion of the sftp URL.
492 @param path The path portion of the sftp URL.
493 @param dest The destination on the local disk."""
494 rsync=self.conf.getexe(
'rsync',
'rsync')
497 cmd=alias(batchexe(rsync)[
'-e',
'ssh',
'--protocol',
'29'])
499 cmd=alias(batchexe(rsync)[
'-e',
'ssh'])
501 cmd=cmd[
'-LvptgoD',
"%s:%s"%(netpart,path),dest]
504 cmd=cmd[
'-d',
'%s:%s'%(netpart,checkdir)]
507 """!Sorts the list of history and forecast DataCatalogs by
508 decreasing priority."""
511 def add(self,dc,location,fcstprio=None,histprio=None,dates=None):
512 """!Adds a DataCatalog to this InputSource.
514 Called automatically from the constructor to add a DataCatalog
515 to this InputSource. The list of add() calls is generated
516 from the config section specified in the constructor. You
517 should never need to call this function unless you want to
518 explicitly add more DataCatalog objects that are not listed in
521 The location parameter is a URL from file, sftp, ftp or htar.
524 * local files: file:///lfs3/projects/hwrf-data/hwrf-input/
525 * scp: sftp://Some.Username@dtn-zeus.rdhpcs.noaa.gov/
526 * ftp: ftp://anonymous@ftpprd.ncep.noaa.gov/
527 * htar: htar:///NCEPPROD/1year/hpssprod/runhistory/rh2012/201204/20120418/
529 @warning Bad things will happen if you add the same source
531 @note If fcstprio and histprio are both None, this call has no
534 @param dc the DataCatelog object
535 @param location the URL of the data source, including the
537 @param fcstprio the priority for using this source in FORECAST
538 (real-time) mode. If missing or None, the source will not
539 be used in FORECAST mode.
540 @param histprio the priority for using this source in HISTORY
541 (retrospective) mode. If missing or None,the source will
542 not be used in HISTORY mode.
544 @param dates Dates for which this source is valid. This is
545 passed to the trange argument of in_date_range(t,trange) """
546 if fcstprio
is None and histprio
is None:
return
548 dates=
'1970010100-2038011818'
549 parsed=urlparse.urlparse(location)
550 if fcstprio
is not None:
551 self.forecast.append( ( float(fcstprio), location, parsed, dc, dates ) )
553 if histprio
is not None:
554 self.history.append( ( float(histprio), location, parsed, dc, dates ) )
557 """!Opens an FTP connection
559 Opens the specified ftp://user@host/... request subject to the
560 specified timeout, logging to the specified logger (if present
562 @param netpart The netpart portion of the URL
563 @param logger the logging.Logger for log messages
564 @param timeout the connection timeout in seconds"""
565 if logger
is None: logger=self.
_logger
566 if logger
is not None:
567 logger.info(
'open_ftp %s'%(netpart,))
568 r=re.search(
'([a-zA-Z0-9_.-]+)+@(.+)',netpart)
570 (user,host)=r.groups()
571 if not user
or not host:
573 'FTP logins must be of the form user@host but you '
574 'gave "%s"'%(netpart))
576 (user,host)=(
'anonymous',netpart)
579 if logger
is not None: logger.info(
'%s@%s: log in'%(user,host))
580 f=ftplib.FTP(host,user,timeout=timeout)
582 assert(f
is not None)
585 valid[
'ftp://'+netpart]=
True
587 except Exception
as e:
588 valid[
'ftp://'+netpart]=
False
591 if logger
is not None:
592 logger.warning(
'In finally block, closing FTP stream.')
595 """!Checks to see if rsync can even access a remote server.
596 @param netpart the netpart portion of the URL
597 @param logger the logging.Logger for log messages
598 @param timeout the connection timeout in seconds
599 @returns True if the server is accessible and False otherwise"""
602 checkrun(cmd,logger=logger)
604 except Exception
as e:
605 if logger
is not None:
606 logger.warning(
'%s: rsync cannot access: %s'
607 %(str(netpart),str(e)))
610 def fetch_file(self,streams,dc,dsurl,urlmore,dest,logger=None,
611 timeout=20,realtime=
True):
612 """!Internal implementation function that fetches one file.
614 You should not call this directly; it is meant to be called
615 by "get" and re-implemented in subclasses. This grabs one
616 file, potentially from a remote location. The URL for the
617 base directory of some dataset is in dsurl, while the specific
618 file is in urlmore. The urlmore will be appended to the file
619 part of dsurl via urljoin, and the resulting file will be
621 @param streams a list used to store opened streams
622 @param dc the DataCatalog being obtained
623 @param dsurl the URL of the DataCatalog
624 @param urlmore additional parts of the URL such as the
625 reference or HTTP Get
626 @param dest The local disk destination
627 @param logger the logging.Logger for log messages
628 @param timeout the connection timeout in seconds
629 @param realtime True for FORECAST mode, False for HISTORY mode.
630 @returns True if successful, False if not"""
631 if logger
is None: logger=self.
_logger
632 parsed=urlparse.urlparse(dsurl)
633 joined=urlparse.urljoin(dsurl,urlmore,allow_fragments=
True)
634 parsed=urlparse.urlparse(joined)
635 if logger
is not None:
636 logger.info(
'%s + %s = %s',repr(dsurl),repr(urlmore),repr(joined))
639 netpart=parsed.netloc
640 n=
"%s://%s"%(scheme,netpart)
643 parsed,joined,scheme,path,netpart,streams,dc,dsurl,urlmore,dest,
644 logger,timeout,realtime)
648 parsed,joined,scheme,path,netpart,streams,dc,dsurl,urlmore,dest,
649 logger,timeout,realtime)
652 parsed,joined,scheme,path,netpart,streams,dc,dsurl,urlmore,dest,
653 logger,timeout,realtime)
655 raise UnsupportedTransfer(
656 'Cannot transfer this url: unsupported method (not htar, '
657 'ftp, file or sftp): '+joined)
659 def _impl_fetch_file(self,parsed,joined,scheme,path,netpart,streams,dc,dsurl,
660 urlmore,dest,logger,timeout,realtime):
661 """!Fetches a file from local disk by making a symbolic link.
662 @param parsed The parsed URL from urlparse.urlparse
663 @param joined The joined URL from urlparse.urljoin
664 @param scheme The data transfer scheme (ftp, sftp, etc.)
665 @param path The URL path
666 @param netpart the netpart portion of the URL.
667 @param streams the array of transfer streams
668 @param dc the DataCatalog for the remote data
669 @param dsurl the dataset URL
670 @param urlmore section and other parts of the URL
671 @param dest the local disk destination
672 @param logger the logging.Logger for messages, or None
673 @param timeout connection timeout in seconds, ignored
674 @param realtime True for FORECAST mode, False if not. In
675 FORECAST mode, the symbolic link is made even if the file
676 does not exist, so long as the DataCatalog is marked as
677 realtime (DataCatalog.rt_updated() returns True)
678 @returns True on success, False if the file was not linked"""
679 if logger
is not None:
680 logger.info(
'%s: from local file %s'%(dest,joined))
681 if ( realtime
and dc.rt_updated() )
or os.path.exists(path):
682 makedirs(os.path.dirname(dest),logger=logger)
683 make_symlink(path,dest,force=
True,logger=logger)
688 def _impl_fetch_sftp(self,parsed,joined,scheme,path,netpart,streams,dc,dsurl,
689 urlmore,dest,logger,timeout,realtime):
690 """!Fetches a file via rsync over ssh.
691 @param parsed The parsed URL from urlparse.urlparse
692 @param joined The joined URL from urlparse.urljoin
693 @param scheme The data transfer scheme (ftp, sftp, etc.)
694 @param path The URL path
695 @param netpart the netpart portion of the URL.
696 @param streams the array of transfer streams
697 @param dc the DataCatalog for the remote data
698 @param dsurl the dataset URL
699 @param urlmore section and other parts of the URL
700 @param dest the local disk destination
701 @param logger the logging.Logger for messages, or None
702 @param timeout connection timeout in seconds
703 @param realtime True for FORECAST mode, False if not. Ignored.
704 @returns True on success, False if the file was not copied"""
707 dirpath=os.path.dirname(path)
710 logger.info(
'%s:%s: check access.'%(netpart,dirpath))
712 netpart,logger=logger,dirpath=dirpath)
715 logger.info(
'%s:%s: skip: directory inaccessibble.'%(
718 makedirs(os.path.dirname(dest),logger=logger)
722 checkrun(cmd,logger=logger)
723 os.rename(tempname,dest)
726 if logger
is not None:
727 logger.warning(
"%s: non-zero exit status %s"%(
728 joined,repr(e.returncode)))
731 if tempname
is not None:
732 if logger
is not None:
733 logger.warning(
'In finally block, deleting temp file %s.'%(tempname,))
736 def _impl_fetch_ftp(self,parsed,joined,scheme,path,netpart,streams,dc,dsurl,
737 urlmore,dest,logger,timeout,realtime):
738 """!Fetches a file over FTP.
739 @param parsed The parsed URL from urlparse.urlparse
740 @param joined The joined URL from urlparse.urljoin
741 @param scheme The data transfer scheme (ftp, sftp, etc.)
742 @param path The URL path
743 @param netpart the netpart portion of the URL.
744 @param streams the array of transfer streams
745 @param dc the DataCatalog for the remote data
746 @param dsurl the dataset URL
747 @param urlmore section and other parts of the URL
748 @param dest the local disk destination
749 @param logger the logging.Logger for messages, or None
750 @param timeout connection timeout in seconds
751 @param realtime True for FORECAST mode, False if not. Ignored.
752 @returns True on success, False if the file was not copied"""
753 n=
"%s://%s"%(scheme,netpart)
755 streams[n]=self.
open_ftp(n,logger=logger,timeout=timeout)
759 makedirs(os.path.dirname(dest),logger=logger)
762 if logger
is not None:
763 logger.info(
'%s: pull %s => %s'
764 %(n,parsed.path,tempname))
765 stream.retrbinary(
"RETR "+parsed.path,f.write)
766 remote_size=stream.size(parsed.path)
767 if remote_size
is not None:
768 local_size=os.path.getsize(tempname)
769 if local_size!=remote_size:
770 if logger
is not None:
772 '%s: wrong size: %d local vs %d remote'
773 %(tempname,local_size,remote_size))
775 'Could not transfer full file: only %d of %d '
776 'bytes transferred.'%(local_size,remote_size))
777 if logger
is not None:
778 logger.info(
'%s: move from %s'%(dest,tempname))
779 os.rename(tempname,dest)
782 if tempname
is not None:
783 logger.warning(
'In finally block, removing temp file %s'%(
788 """!Returns the list of DataCatalog objects for FORECAST or
790 @param realtime True for FORECAST mode, False for HISTORY
791 @returns self.forecast or self.history
792 @post _sort() has been called, sorting self.forecast and
793 self.history in order of priority"""
801 def _impl_get_archive(self,archpath,parts,done,prio, loc, parsed, dc,
802 data,target_dc,realtime,logger,skip_existing):
803 """!Fetches an archive from HPSS
804 @param archpath path to the archive on HPSS
805 @param parts list of required archive elements as integer index
806 within the done argument
807 @param[out] done list of bool, set to True if the part was obtained
808 @param prio the priority of this input source
809 @param loc,parsed,dc,data,target_dt,realtime,skip_existing Ignored.
810 @param logger the logging.Logger for log messages"""
812 keep_on_error=
False)
as td:
814 assert(self.
hsi is not None)
815 if self.
hsi is not None:
816 i=self.
hsi[
'get',
'-',
':',archpath+
'.idx']>
"/dev/null"
817 err=run(i,logger=logger)
819 logger.warning(
"%s.idx: exit status %d dumping index "
820 "file. Htar will probably fail."
821 %(archpath,int(err)))
822 r=self.
htar[
'-xpf',archpath]\
823 [ [p
for p
in parts.iterkeys()] ]\
825 logger.info(
'%s: list contents'%(td.dirname,))
828 stat=run(r,logger=logger)
830 logger.info(
'non-zero exit status %d from htar; will retry '
831 'in five seconds.'%stat)
834 stat=run(r,logger=logger)
836 logger.info(
'non-zero exit status %d from htar; will keep '
838 if logger
is not None:
839 logger.info(
"%s: pull %d files"
840 %(archpath,len(parts)))
843 for (filepart,tgti)
in parts.iteritems():
845 src=os.path.join(td.dirname,filepart)
846 logger.debug(
'%s: check for this at %s'%(tgt,src))
847 if os.path.exists(src):
848 makedirs(os.path.dirname(tgt),logger=logger)
849 deliver_file(src,tgt,keep=
False,logger=logger)
851 logger.debug(
'%s: add %d'%(tgt,i))
854 relfile=os.path.relpath(src,td.dirname)
855 relfile=re.sub(
'^(../)+',
'',relfile)
858 relfile=os.path.relpath(src,td.dirname)
859 relfile=re.sub(
'^(../)+',
'',relfile)
861 logger.debug(
'%s: does not exist'%(src,))
863 missing=sorted(list(nope))
864 logger.warning(
'%s: does not have: %s'%(
865 archpath,
', '.join(missing)))
867 found=sorted(list(yup))
868 logger.warning(
'%s: has files: %s'%(
869 archpath,
', '.join(found)))
871 logger.info(
'%s: gleefully reporting all desired '
872 'files found.'%(archpath,))
875 def _impl_get_file(self,i,done,src,tgt,prio, loc, parsed, dc,streams,
876 archives,data,target_dc,realtime,logger,skip_existing):
877 """!Obtain one or more files.
878 @param i The index in done of the file being fetched
879 @param done an array of logical flags telling which files are transferred
880 @param src the source location
881 @param tgt the target location
882 @param prio the numerical priority
883 @param loc the on-disk destination
884 @param parsed the parsed URL as output by urlparse.urlparse
885 @param dc the DataCatalog
886 @param streams the array of transfer streams
887 @param archives a double-nested dict of lists, mapping from
888 archive name to file part to index within done of the file
890 @param target_dc the DataCatalog of the target locations
891 @param realtime True for FORECAST mode, False for HISTORY mode
892 @param logger the logging.Logger for log messages
893 @param skip_existing if True, do not re-download files that
894 already exist on disk (in the target_dc)"""
895 archsep=src.find(
'#')
901 filepart=src[archsep+1:]
902 if arch
in archives
and filepart
in archives[arch]:
903 archives[arch][filepart].append(i)
905 archives[arch][filepart]=[tgt,i]
907 if src[0:5]==
'htar:':
908 logger.warning(
"%s: no # in path - skipping this"
913 streams,dc,loc,src,tgt,
914 logger=logger,realtime=realtime):
916 except (EnvironmentError,ExitStatusException)
as e:
917 if logger
is not None:
919 'fetching %s=>%s: %s'%(str(src),str(tgt),
920 str(e)),exc_info=
True)
923 """!Generates a string containing a human-readable, prioritized
924 list of data sources.
925 @param dclist The data source list from list_for()
926 @returns A multi-line string containing the table.
929 Prioritized list of data sources:
930 PRIO- LOCATION = SOURCE @ DATES
931 100 - file:/// = DataCatalog(conf,'wcoss_fcst_PROD2014',2015080518) @ '1970010100-2038011818'
932 098 - file:/// = DataCatalog(conf,'wcoss_prepbufrnr_PROD2014',2015080518) @ '1970010100-2038011818'
933 097 - file:// = DataCatalog(conf,'zhan_gyre',2015080518) @ '2011060718-2011111200,2013051800-2013091018'"""
934 s=StringIO.StringIO()
935 s.write(
'Prioritized list of data sources:\nPRIO- LOCATION = SOURCE @ DATES\n')
936 for ( prio, loc, parsed, dc, dates )
in dclist:
937 s.write(
'%03d - %10s = %s @ %s\n'%(
938 int(prio),str(loc),repr(dc),repr(dates)))
943 def get(self,data,target_dc,realtime=False,logger=None,
945 """!Transfers the specified set of data to the specified
946 target. The "target_dc" is a DataCatalog that specifies the
947 destination filenames. The "realtime" argument is True for
948 FORECAST (real-time) mode runs, and False for HISTORY
949 (retrospective) mode runs. The "data" argument should be an
950 iterable (list, tuple, etc.) where each element is a dict-like
951 object that describes one file to obtain. Each dict contains:
953 dataset - string name of the dataset (gfs, gdas1, gefs,
955 item - string name of the object (ie.: sf, sfcanl, bufr)
956 atime - Optional: a datetime.datetime specifying the
957 analysis time. Default is the atime from the
958 InputSource's constructor.
959 ftime - Optional: a datetime.datetime specifying the
961 ...others... - any other keyword arguments will be sent to
962 the .location functions in any of this InputSource's
963 DataCatalog objects."""
964 if logger
is None: logger=self.
_logger
968 for ( prio, loc, parsed, dc, dates )
in dclist:
969 assert(loc
is not None)
970 assert(prio
is not None)
971 assert(parsed
is not None)
972 assert(dc
is not None)
973 assert(dates
is not None)
975 netpart=parsed.netloc
978 logger.error(
'%s: cannot access; will skip'%(netpart,))
980 elif scheme
not in [
'ftp',
'htar',
'file']:
981 logger.error(
'%s: invalid transfer mode %s; will skip'
985 archives=collections.defaultdict(dict)
993 if i
in done:
continue
995 assert(
'dates' not in d)
996 tgt=target_dc.locate(**d)
1000 if logger
is not None:
1001 logger.info(
'%s: already processing this'%(tgt,))
1003 if os.path.exists(tgt)
and skip_existing:
1004 if logger
is not None:
1005 logger.info(
'%s: already exists'%(tgt,))
1008 if logger
is not None:
1009 logger.debug(
"%s => %s"%(repr(d),repr(tgt)))
1011 if logger
is not None:
1012 logger.debug(
'search for %s in %s'%(repr(d),repr(dc)))
1014 src=dc.locate(dates=dates,**d)
1015 except KeyError
as k:
1016 logger.debug(
"%s: key error %s"%(src,str(k)))
1018 if src
is None:
continue
1019 if logger
is not None:
1020 logger.info(
"SRC %s => %s"%(
strsrc(d),repr(src)))
1023 i,done,src,tgt,prio, loc, parsed, dc,streams,
1024 archives,data,target_dc,realtime,logger,
1027 for (archpath,parts)
in archives.iteritems():
1029 if logger
is not None:
1030 logger.info(
"%s: nothing to pull; skip"
1034 archpath,parts,done,prio, loc, parsed, dc,
1035 data,target_dc,realtime,logger,skip_existing])
1038 if logger
is not None:
1039 logger.warning(
'In finally block, closing streams.')
1040 for (key,stream)
in streams.iteritems():
1043 except Exception
as e:
1044 if logger
is not None:
1046 'Exception while closing stream %s: %s'
1047 %(key,str(e)),exc_info=
True)
1055 tgt=target_dc.locate(**d)
1056 if os.path.exists(tgt):
1058 if d.get(
'optional',
False):
1059 if logger
is not None:
1060 logger.info(
'missing optional data: %s'%(repr(d),))
1062 if logger
is not None:
1063 logger.warning(
'MISSING INPUT: %s'%(repr(d),))
1067 def get_one(self,dataset,item,dest,logger=None,timeout=20,realtime=True,
1069 """!This is a simple wrapper around fetch_file that gets only
1070 one file. It will fail if the file requires pulling an
1072 @param dataset the dataset to transfer
1073 @param item the desired item in the dataset
1074 @param dest the on-disk destination filename
1075 @param logger a logging.Logger for log messages
1076 @param timeout the connection timeout in seconds
1077 @param realtime True for FORECAST mode, False for HISTORY mode
1078 @param kwargs extra keyword arguments are passed to DataCatalog.locate()"""
1079 if logger
is None: logger=self.
_logger
1083 for ( prio, loc, parsed, dc )
in dclist:
1084 src=dc.locate(dataset=dataset,item=item,**kwargs)
1085 if src
is None:
continue
1086 archsep=src.find(
'#')
1088 raise NotImplementedError(
1089 'Source is in an archive. De-archiving is not '
1090 'supported by "get_one." Use "get" instead.')
1091 elif self.
fetch_file(streams,dc,loc,src,dest,logger=logger):
1094 if logger
is not None:
1095 logger.warning(
'In finally block, closing streams.')
1096 for (key,stream)
in streams.iteritems():
1099 except Exception
as e:
1100 if logger
is not None:
1102 'Exception while closing stream %s: %s'
1103 %(key,str(e)),exc_info=
True)
Change directory, handle temporary directories.
This module provides a set of utility functions to do filesystem operations.
Imitates the shell "ls -l" program.
Raised when a file transfer, done by an InputSource, was incomplete.
A shell-like syntax for running serial, MPI and OpenMP programs.
Contains the WorkPool class, which maintains pools of threads that perform small tasks.
def makedirs
Make a directory tree, working around filesystem bugs.
Time manipulation and other numerical routines.
This class is intended to be used with the Python "with TempDir() as t" syntax.
Provides information about the cluster on which this job is running.
A pool of threads that perform some list of tasks.
Raised to indicate that a program generated an invalid return code.
Contains the Listing class, which emulates "ls -l".
Exceptions raised by the hwrf package.
def name()
Synonym for here.name.