Package invocations :: Package cu5 :: Module cu5
[hide private]

Source Code for Module invocations.cu5.cu5

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: cu5.py 9352 2012-08-06 12:53:14Z RossCollins $ 
  4  """ 
  5     Invoke CU5. Create library H2-K difference image frame products. 
  6   
  7     @author: E. Sutorius 
  8     @org:    WFAU, IfA, University of Edinburgh 
  9   
 10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
 11     @contributors: R.S. Collins 
 12  """ 
 13  #------------------------------------------------------------------------------ 
 14  from   collections import defaultdict 
 15  import dircache 
 16  import mx.DateTime     as mxdt 
 17  import os 
 18  import shutil 
 19   
 20  import wsatools.Astrometry                as astro 
 21  from   wsatools.CLI                   import CLI 
 22  from   wsatools.DbConnect.CuSession   import CuSession 
 23  import wsatools.DbConnect.DbConstants     as dbc 
 24  from   wsatools.DbConnect.DbSession   import Join 
 25  import wsatools.ExternalProcess           as extp 
 26  from   wsatools.File                  import File 
 27  import wsatools.FitsUtils                 as fits 
 28  from   wsatools.Logger                import Logger, ForLoopMonitor 
 29  import wsatools.Utilities                 as utils 
 30  #------------------------------------------------------------------------------ 
 31   
32 -class Cu5(CuSession):
33 """ Create library H2-K difference image frame products for given database 34 release number. 35 """ 36 #-------------------------------------------------------------------------- 37 # Class parameters - should not be altered 38 cuNum = 5 39 _autoCommit = True 40 _reqCuEventID = True 41 _useWorkDir = True 42 43 #-------------------------------------------------------------------------- 44 # Define public member variable default values (access as obj.varName) 45 # these need to be set from command-line options 46 47 #: Observation date range to include. 48 dateRange = CuSession.sysc.obsCal.dateRange() 49 #: Database release version number. 50 releaseNum = None 51 #: Ignore the last run CUEventID. 52 ignoreLastRun = False 53 #: cuEventID of a previous run that has to be resumed. 54 resumeID = 0 55 56 #-------------------------------------------------------------------------- 57
58 - def _onRun(self):
59 """ Do CU5. """ 60 # Resume a broken run? 61 if self.resumeID > 0: 62 Logger.addMessage("Determining provenance info for CU5 run %s..." 63 % self.resumeID) 64 65 provInfoDict, timeStamp, outputList, counterStart = \ 66 self._getPrevCU5Info() 67 else: 68 timeStamp = mxdt.utc().strftime("%Y%m%d") 69 outputList = [] 70 counterStart = 0 71 72 # Determine product directories 73 diffDir = (self.sysc.testOutputPath() if not self.archive.isRealRun else 74 utils.getNextDisk(self.sysc, preAllocMem=500)) 75 76 diffDir = os.path.join(diffDir, self.sysc.diffDir) 77 outFilePath = \ 78 os.path.join(diffDir, '%s_v%s' % (timeStamp, self.releaseNum)) 79 80 nanFilePath = os.path.join(os.path.dirname(outFilePath), "nan") 81 82 Logger.addMessage( 83 "Difference images will be created in: " + outFilePath) 84 85 Logger.addMessage( 86 "Broken files with NaN errors will go here: " + nanFilePath) 87 88 lastRun = (self._queryLastRuns() if not self.ignoreLastRun else dict()) 89 90 # This CU is driven by the RequiredDiffImage database table 91 for progID, h2ID, kID, wcs, method in self.archive.query( 92 "programmeID, filter1, filter2, wcs, method", "RequiredDiffImage"): 93 94 self.programmeID.add(progID) 95 self.programme.setCurRow(programmeID=progID) 96 97 Logger.addMessage("Determining list of new images for programme %s" 98 "..." % self.programme.getAcronym(progID).upper()) 99 100 # query DB to determine list of new images available since last run 101 h2Images = self._getImageList(h2ID, progID, lastRun.get(progID)) 102 kImages = self._getImageList(kID, progID, lastRun.get(progID)) 103 Logger.addMessage("found %s H2 and %s K images." % 104 (len(h2Images), len(kImages))) 105 106 if not h2Images or not kImages: 107 Logger.addMessage("<Info> No difference images require " 108 "curation for programme " 109 + self.programme.getAcronym(progID).upper()) 110 continue 111 112 Logger.addMessage("Pairing images...") 113 pairDict, numPairs = self._pairImages(h2Images, kImages) 114 Logger.addMessage("found %s pairs in total, using %s " 115 "time-wise nearest pairs." % (numPairs, len(pairDict))) 116 117 Logger.addMessage("Creating difference images...") 118 119 if self.resumeID > 0: 120 newDict = {} 121 for h2Entry in pairDict.itervalues(): 122 h2Img, h2Cat, h2Conf, kImg, kCat, kConf = h2Entry[0] 123 h2ImgFile = File(h2Img) 124 kImgFile = File(kImg) 125 if not [os.path.join(h2ImgFile.subdir, h2ImgFile.base), 126 os.path.join(kImgFile.subdir, kImgFile.base)] \ 127 in provInfoDict.values(): 128 129 newDict[h2Img] = h2Entry 130 131 pairDict = newDict 132 Logger.addMessage("Rerun on %s pairs." % len(pairDict)) 133 134 utils.ensureDirExist(diffDir) 135 utils.ensureDirExist(outFilePath) 136 utils.ensureDirExist(nanFilePath) 137 138 # 1. create difference image 139 brokenFiles = [] 140 progress = ForLoopMonitor(pairDict) 141 for counter, h2Entry in enumerate(pairDict.itervalues()): 142 h2Img, h2Cat, h2Conf, kImg, kCat, kConf = h2Entry[0] 143 h2ImgID = os.path.basename(h2Img).split('_')[1] 144 kImgID = os.path.basename(kImg).split('_')[1] 145 outputPathName = os.path.join(outFilePath, 146 'e%s_%05u_%s_%s_diff.fit' % 147 (timeStamp, counterStart + counter + 1, h2ImgID, kImgID)) 148 149 imgFiles = '%s,%s' % (h2Img, kImg) 150 catFiles = '%s,%s' % (h2Cat, kCat) 151 if self.sysc.emptyFitsCatalogueFileName in catFiles.lower(): 152 catFiles = "nocats" 153 154 # Are there also associated confidence images? 155 confMapFiles, confPathName = ( 156 ('%s,%s' % (h2Conf, kConf), fits.getConfMap(outputPathName)) 157 if h2Conf and kConf else ('', '')) 158 159 newPairs = [] 160 cmd = "fitsio_subtract %s %s wcs=%s %s %s %s %s" % (imgFiles, 161 catFiles, wcs, outputPathName, method, confMapFiles, 162 confPathName) 163 164 # @@NOTE: fitsio_subtract sends non-error output to stderr 165 progOutput = extp.run(cmd, raiseOnError=False, 166 cwd=self.sysc.tempWorkPath()) 167 168 Logger.addMessage('\n'.join(progOutput), alwaysLog=False) 169 if any(("Simple method statistic" in line and "nan" in line) 170 for line in progOutput): 171 brokenFiles.append(outputPathName) 172 shutil.move(outputPathName, nanFilePath) 173 if confPathName: 174 shutil.move(confPathName, nanFilePath) 175 else: 176 components = [fits.FileList(h2Img, h2Conf, cat=None), 177 fits.FileList(kImg, kConf, cat=None)] 178 179 fits.prepHeaders(outputPathName, confPathName, components) 180 newPairs.append(outputPathName) 181 if confPathName: 182 newPairs.append(confPathName) 183 fits.compressFits(newPairs) 184 outputList += newPairs 185 186 progress.testForOutput() 187 188 if not outputList and not brokenFiles: 189 Logger.addMessage("<WARNING> No difference images created for " 190 "programme " + self.programme.getAcronym(progID).upper()) 191 continue 192 193 # 2. insert difference image into filesystem 194 ingestListPath = (self.archive.sharePath('ingestLists') 195 if self.archive.isRealRun else 196 os.path.join(self.sysc.testOutputPath(), self.sysc.productLogDir)) 197 198 utils.ensureDirExist(ingestListPath) 199 prefix = "cu%02did%d_%s_%s" % (self.cuNum, self.cuEventID, 200 os.getenv('HOST'), self.archive.database) 201 202 listPath = os.path.join(ingestListPath, prefix + "_xfer.list") 203 file(listPath, 'w').write('\n'.join(outputList) + '\n') 204 Logger.addMessage("Wrote ingest list to " + listPath) 205 206 if len(outputList) < 2 * len(numPairs): 207 Logger.addMessage("<WARNING> The number of output files (%d)" 208 " differs from twice the number of" 209 " pairs (%d)." % (len(outputList), 210 2 * len(numPairs))) 211 if brokenFiles: 212 brokenPath = os.path.join(os.curdir, prefix + "_broken.list") 213 Logger.addMessage("<Info> %s Files are broken. Listed in %s" % 214 (len(brokenFiles), brokenPath)) 215 file(brokenPath, 'w').write('\n'.join(brokenFiles) + '\n')
216 217 #-------------------------------------------------------------------------- 218
219 - def _getImageList(self, filterID, progID, lastRun=None):
220 """ 221 Obtains the list of attributes for new images of a certain filter for 222 a certain programme since the last run. 223 224 @param filterID: Select images of this filter ID. 225 @type filterID: int 226 @param progID: Select images from this programme ID. 227 @type progID: int 228 @param lastRun: Select images created since this cuEventID. 229 @type lastRun: int 230 231 @return: List of image attributes defined in query. 232 @rtype: list(tuple) 233 234 """ 235 attrs = "raBase, decBase, fileName, catName, confID, mjdObs, object" 236 dateRange = "utDate BETWEEN '%s' AND '%s'" % self.dateRange 237 return self.archive.query(attrs, 238 fromStr=Join(["Multiframe", "ProgrammeFrame", "MultiframeDetector"], 239 ["multiframeID"]), 240 whereStr=(("Multiframe.cuEventID > %s AND " % lastRun) 241 if lastRun else "") + "frameType='leavstack' AND " 242 "(Multiframe.deprecated=0 OR Multiframe.deprecated=81) " 243 "AND (MultiframeDetector.deprecated=0" 244 " OR MultiframeDetector.deprecated=81) " 245 "AND Multiframe.filterID=%s AND programmeID=%s AND %s " 246 "GROUP BY %s HAVING COUNT(*)=4 ORDER BY decBase ASC" 247 % (filterID, progID, dateRange, attrs))
248 249 #-------------------------------------------------------------------------- 250
251 - def _pairImages(self, h2Images, kImages):
252 """ 253 Produces a list of paired images. 254 255 @param h2Images: List of H2 filter images. 256 @type h2Images: list(tuple) 257 @param kImages: List of K filter images. 258 @type kImages: list(tuple) 259 260 @return: List of paired image file names. 261 @rtype: defaultdict, int 262 263 """ 264 raIndex = 0 265 tol = self.programme.getAttr('frameSetTolerance') 266 pairedDict = defaultdict(list) 267 numPairs = 0 268 # loop over all h2 frames 269 for h2Ra, h2Dec, h2FileName, h2CatName, h2ConfID, h2MjdObs, h2Object \ 270 in h2Images: 271 272 h2ConfPathName = self.archive.query("fileName", "Multiframe", 273 "multiframeID=%s AND fileName!='%s'" % 274 (h2ConfID, dbc.charDefault()), firstOnly=True, default='') 275 276 h2ConfPathName = h2ConfPathName.split(':')[-1] 277 h2FilePathName = h2FileName.split(':')[-1] 278 h2CatPathName = h2CatName.split(':')[-1] 279 280 # find matching k frames 281 kDict = defaultdict(list) 282 h2kRatings = utils.Ratings() 283 for _kRa, _kDec, kFileName, kCatName, kConfID, kMjdObs, kObject \ 284 in astro.matchFrames(kImages, raIndex, (h2Ra, h2Dec), tol): 285 286 kConfPathName = self.archive.query("fileName", "Multiframe", 287 "multiframeID=%s AND fileName!='%s'" % 288 (kConfID, dbc.charDefault()), firstOnly=True, default='') 289 290 kConfPathName = kConfPathName.split(':')[-1] 291 kFilePathName = kFileName.split(':')[-1] 292 kCatPathName = kCatName.split(':')[-1] 293 294 if kObject.strip() == h2Object.strip(): 295 kDict[kFilePathName] += [kFilePathName, kCatPathName, 296 kConfPathName] 297 numPairs += 1 298 h2kRatings[kFilePathName] = abs(h2MjdObs - kMjdObs) 299 300 # update pairedDict with the timewise nearest k frame 301 if kDict: 302 h2Entry = [h2FilePathName, h2CatPathName, h2ConfPathName] 303 h2Entry += kDict[h2kRatings.getKeyByRating(0)] 304 pairedDict[h2FilePathName].append(h2Entry) 305 306 return pairedDict, numPairs
307 308 #-------------------------------------------------------------------------- 309
310 - def _getPrevCU5Info(self):
311 timeStamp = self.archive.query( 312 selectStr="timeStamp", 313 fromStr="ArchiveCurationHistory", 314 whereStr="cuID=%s and cuEventID=%s" % (self.cuNum, self.resumeID) 315 )[0] 316 317 prevCu5TimeStamp = str(timeStamp).partition(' ')[0].replace('-', '') 318 prevCu5DateVersStr = "%s_v%s" % (prevCu5TimeStamp, self.releaseNum) 319 disks = self.sysc.availableRaidFileSystem() 320 fileList = [] 321 for dirName, dateDirs in fits.getAllPaths(disks, [self.sysc.diffDir]): 322 for diffDir in dateDirs: 323 if diffDir == prevCu5DateVersStr: 324 dirPath = os.path.join(dirName, diffDir) 325 fileList.extend(os.path.join(dirPath, fileName) 326 for fileName in dircache.listdir(dirPath) 327 if fileName.endswith(self.sysc.mefType)) 328 329 diffFileList = [x for x in fileList 330 if x.endswith(self.sysc.diffSuffix + self.sysc.mefType)] 331 332 # get Provenance info 333 provInfoDict = {} 334 maxCounter = 0 335 for fileName in diffFileList: 336 hdulist = fits.open(fileName) 337 try: 338 shdr = hdulist[1].header 339 h2ImgProv, kImgProv = shdr["PROV0001"], shdr["PROV0002"] 340 provInfoDict.setdefault(fileName, []).extend( 341 [h2ImgProv, kImgProv]) 342 except IndexError: 343 Logger.addMessage("Extension missing in %s" % (fileName)) 344 hdulist.close() 345 maxCounter = max(maxCounter, 346 int(os.path.basename(fileName).split('_', 2)[1])) 347 348 return provInfoDict, prevCu5TimeStamp, fileList, maxCounter
349 350 #-------------------------------------------------------------------------- 351
352 - def _queryLastRuns(self):
353 """ 354 @return: A dictionary containing the cuEventIDs of the last run of this 355 CU for every programme it has been run on, referenced by 356 programmeID. 357 @rtype: dict(int:int) 358 359 """ 360 Logger.addMessage("Determining when CU5 was last run for each " 361 "programme...") 362 363 return dict(self.archive.query( 364 selectStr="programmeID, MAX(ArchiveCurationHistory.cuEventID)", 365 fromStr=Join(["ArchiveCurationHistory", 366 "ProgrammeCurationHistory"], ["cuEventID"]), 367 whereStr="cuID=%s AND status=1 GROUP BY programmeID" % self.cuNum))
368 369 #------------------------------------------------------------------------------ 370 # Entry point for script. 371 372 # Allow module to be imported as well as executed from the command line 373 if __name__ == '__main__': 374 # @@TODO: Can remove this line once IngCuSession is united with CuSession 375 CLI.progArgs.remove("comment") 376 CLI.progArgs += [ 377 CLI.Argument("begin_date", "05A", isValOK=CLI.isDateOK), 378 CLI.Argument("end_date", "05A", isValOK=CLI.isDateOK), 379 CLI.Argument("releaseNum", '1', isValOK=lambda x: x.isdigit())] 380 381 CLI.progOpts += [ 382 CLI.Option('r', 'resume', 383 'resume CU5 run with given cuEventID', "INT", '0', 384 isValOK=lambda x: x.isdigit()), 385 CLI.Option('i', 'ignorelast', 'ignore check when CU5 was last run.'), 386 CLI.Option('v', 'verbose', 387 'display details of every fitsio_subtract run')] 388 389 cli = CLI(Cu5, "$Revision: 9352 $") 390 Logger.isVerbose = cli.getOpt('verbose') 391 Logger.addMessage(cli.getProgDetails()) 392 393 cu = Cu5(cli=cli) 394 try: 395 cu.dateRange = cu.sysc.obsCal.dateRange(cli.getArg("begin_date"), 396 cli.getArg("end_date")) 397 except Exception as error: 398 eType = "Invalid Option" 399 Logger.addExceptionMessage(error, eType) 400 raise SystemExit(eType + ": see log " + cu._log.pathName) 401 402 cu.ignoreLastRun = int(cli.getOpt("ignorelast")) 403 cu.resumeID = int(cli.getOpt("resume")) 404 cu.releaseNum = cli.getArg("releaseNum") 405 cu.run() 406 407 #------------------------------------------------------------------------------ 408 # Change log: 409 # 410 # 18-Nov-2004, ETWS: First Version 411 # 12-May-2005, ETWS: Major refactoring 412 # 20-Jan-2006, JB: Added code so that the name of the database being used 413 # is prepended to the logfile filename and added the 414 # variable "mainDb" for the above and the DB Connection. 415 # 7-Jun-2006, RSC: Replaced utils.sortList() with fast, new Python 2.4 method 416 # 5-Jul-2006, RSC: Upgraded to use new CSV.py module. 417 # 2-Aug-2006, RSC: Updated to new ExternalProcess.py design. 418 # 9-Nov-2006, RSC: Upgraded to use OO curation task framework. 419 # 16-Mar-2007, ETWS: Refactored for DR2. 420 # 2-Apr-2007, ETWS: Included OBJECT keyword, which defines H2-K-sets. 421 # 5-Nov-2007, ETWS: Included repeated frames in file selection. 422 # 16-Nov-2007, ETWS: Included possibility to resume a broken run. 423 # 30-Sep-2008, ETWS: Included possibility to ignore last run check. 424