Package invocations :: Package cu0 :: Module FinaliseFlatFileIngest
[hide private]

Source Code for Module invocations.cu0.FinaliseFlatFileIngest

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  # $Id: FinaliseFlatFileIngest.py 10129 2013-11-06 14:15:22Z NicholasCross $ 
  4  """ 
  5     Prepare database for CU4 ingests following a completed batch of CU3 ingests 
  6     (any arbitrary set, say, a month or semester of data). It combines the 
  7     following tasks: 
  8        - Copy the illumination tables from CASU to WFAU. 
  9        - helpers/UpdateIllumEntries.py (optionally, normally done by CU4) 
 10        - helpers/ProvenanceFiller.py 
 11        - helpers/SetupProgramme.py 
 12   
 13     @author: Eckhard Sutorius 
 14     @org:    WFAU, IfA, University of Edinburgh 
 15  """ 
 16  #------------------------------------------------------------------------------ 
 17  from   __future__  import division, print_function 
 18  from   collections import defaultdict, namedtuple 
 19  import multiprocessing 
 20  from   operator    import attrgetter 
 21  from   itertools            import groupby 
 22  import os 
 23  import time 
 24   
 25  from   invocations.cu1.cu1Transfer    import CASUQueries 
 26  from   wsatools.CLI                   import CLI 
 27  import wsatools.DbConnect.CommonQueries   as queries 
 28  from   wsatools.DbConnect.CuSession   import CuSession 
 29  from   wsatools.DbConnect.DbSession   import DbSession, Join, SelectSQL 
 30  import wsatools.FitsUtils                 as fits 
 31  from   wsatools.Logger                import Logger, ForLoopMonitor 
 32  from   wsatools.ProgrammeBuilder      import ProgrammeBuilder 
 33  from   wsatools.FitsToDb              import ProvenanceFiller 
 34  from   wsatools.SystemConstants       import DepCodes, SystemConstants 
 35  import wsatools.Utilities                 as utils 
 36  #------------------------------------------------------------------------------ 
 37   
38 -class FileHistoryDataFiller(CuSession):
39 """ Utility class for calculating properties from the Provenance. 40 """ 41 #-------------------------------------------------------------------------- 42 # Define class constants (access as FileHistoryDataFiller.varName) 43 44 FrameInfo = namedtuple("FrameInfo", "mjdObs detNDit detDit frameType") 45 46 #-------------------------------------------------------------------------- 47 # Define private member variable default values (access prohibited) 48 49 isVerbose = False 50 #: DbSession connection to database. 51 archive = None 52 #: Redo existing data 53 reDo = False 54 #: Update cats for existing updated DB values 55 updateCats = False 56 57 #-------------------------------------------------------------------------- 58
59 - def run(self, archive, semesterList=None, version=None, updateCats=False, 60 numThreads=1):
61 """ Updates OB pawprints and MetaData tables with data calculated 62 from tracing back the history of the file through all normal obs. 63 """ 64 self.archive = archive 65 66 self.updateCats = updateCats 67 if not self.archive.sysc.isVSA(): 68 raise FileHistoryDataFiller.CuError( 69 "This should only be run on the VSA") 70 Logger.isVerbose = self.isVerbose 71 startDate, endDate = self.archive.sysc.obsCal.getDatesFromInput( 72 semesterList[0], semesterList[-1]) 73 self.updateTotalExpTime(startDate, endDate) 74 75 frameTypeSel = queries.getFrameSelection('stack', noDeeps=True) 76 77 if self.updateCats: 78 Logger.addMessage("Updating FITS files where the DB " 79 "has already been updated.") 80 records = self.selectExistingDbData(version, startDate, 81 endDate) 82 if records: 83 Logger.addMessage("Updating catalogue files...") 84 self.updateFitsP(records, numThreads) 85 86 # Update pawprints 87 Logger.addMessage("Selecting OB pawprints that haven't already been " 88 "processed...") 89 redoSel = ('' if self.reDo else "mjdMean<0 AND ") 90 extInfo = self.archive.query( 91 selectStr="D.multiframeID, D.extNum, M.fileName, M.frameType, " 92 "M.confID, " 93 "M.mjdObs, E.detNDit, E.detDit, D.deprecated", 94 fromStr="Multiframe AS M, MultiframeDetector AS D, " 95 "FlatFileLookUp AS F, MultiframeEsoKeys AS E", 96 whereStr="D.multiframeID=M.multiframeID AND " 97 "F.multiframeID=M.multiframeID AND " 98 "E.multiframeID=M.multiframeID AND " 99 "dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND " 100 "%s D.%s AND (%s OR " 101 "frameType LIKE 'normal' OR frameType LIKE '%%conf') " % ( 102 startDate, version, endDate, version, 103 redoSel, DepCodes.selectNonDeprecated, frameTypeSel)) 104 recordDict = defaultdict(list) 105 frameInfoDict = defaultdict(list) 106 depInfoDict = defaultdict(list) 107 for entry in extInfo: 108 # remove 'conf' part later 109 # dict of all stacks 110 if "stack" in entry.frameType and not 'conf' in entry.frameType: 111 recordDict[(entry.multiframeID, entry.confID, entry.fileName) 112 ].append((entry.extNum, entry.deprecated)) 113 114 # frame info for all mfIDs 115 if entry.multiframeID not in frameInfoDict: 116 frameInfoDict[entry.multiframeID] = \ 117 FileHistoryDataFiller.FrameInfo( 118 entry.mjdObs, entry.detNDit, entry.detDit, entry.frameType) 119 120 # deprecation info for all mfiID/extension pairs 121 depInfoDict[(entry.multiframeID, entry.extNum)] = \ 122 entry.deprecated 123 self._mjdDict = {} 124 noNewFound = len(recordDict) 125 if recordDict: 126 Logger.addMessage("Started calculating new MJD values " 127 "for stacks...") 128 else: 129 Logger.addMessage("No new stacks found...") 130 progress = ForLoopMonitor(recordDict) 131 132 133 frameTypeSel = "m.frameType='stack'" 134 fromStr = "Multiframe as m" 135 136 137 whereStr = ("%s and m.multiframeID=vn.combiframeID" 138 % (frameTypeSel)) 139 # mjdObs, mjdEnd 140 mjdObsDict = dict((mfID, (mjdObs, mjdEnd)) for mfID, mjdObs, mjdEnd in self.archive.query( 141 selectStr="m.multiframeID,MIN(mn.mjdObs) as mnMjdObs," 142 "MAX(mn.mjdObs+(%s/(24.*3600.))) as mxMjdEnd" 143 % (("e.detNdit*e.detDit") 144 if self.sysc.isVSA() else ("mn.expTime")), 145 fromStr=fromStr + ",Provenance as vn,Multiframe as mn%s,FlatFileLookup as f" % 146 (",MultiframeEsoKeys as e" if self.sysc.isVSA() else ""), 147 whereStr=whereStr + " and vn.multiframeID=mn.multiframeID%s " 148 " AND f.dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND m.%s AND " 149 "f.multiframeID=m.multiframeID" % 150 (" and mn.multiframeID=e.multiframeID" if self.sysc.isVSA() else "", 151 startDate, version, endDate, version, DepCodes.selectNonDeprecated), 152 groupBy="m.multiframeID")) 153 # mjdMean 154 mjdMeanDict = {} 155 # check mjdMeanDict, _meanMjdDict, updateDB 156 for mfID, group in groupby(self.archive.query( 157 selectStr="d.multiframeID,d.extNum,AVG(mn.mjdObs+(0.5*%s/(24.*3600.))) as meanMjd" 158 % (("e.detNdit*e.detDit") if self.sysc.isVSA() else ("mn.expTime")), 159 fromStr=fromStr + ",Provenance as vn,Multiframe as mn%s," 160 "MultiframeDetector as d,FlatFileLookup as f,MultiframeDetector as dn" % 161 (",MultiframeEsoKeys as e" if self.sysc.isVSA() else ""), 162 whereStr=whereStr + " and vn.multiframeID=mn.multiframeID%s" 163 " AND d.multiframeID=m.multiframeID and dn.multiframeID=mn.multiframeID " 164 " AND dn.extNum=d.extNum AND dn.%s" 165 " AND f.dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND d.%s m.%s AND " 166 "f.multiframeID=m.multiframeID" % 167 (" and mn.multiframeID=e.multiframeID" if self.sysc.isVSA() else "", 168 DepCodes.selectNonDeprecated, startDate, version, endDate, version, redoSel, DepCodes.selectNonDeprecated), 169 groupBy="d.multiframeID,d.extNum", orderBy="d.multiframeID"), key=attrgetter("multiframeID")): 170 mjdMeanDict[mfID] = dict((extNum, mjdMean) for _mID, extNum, mjdMean in group) 171 172 for (mfID, cfID, filePathName) in recordDict: 173 # Use ProvenanceUtils to calculate new values 174 Logger.addMessage("Calculating new values for %d from Provenance " 175 "information" % mfID, alwaysLog=False) 176 177 178 mjdObs = mjdObsDict[mfID][0] if mfID in mjdObsDict else None 179 mjdEnd = mjdObsDict[mfID][1] if mfID in mjdObsDict else None 180 extDict = mjdMeanDict[mfID] if mfID in mjdMeanDict else None 181 if mjdObs: 182 self._mjdDict[mfID] = (mjdObs, mjdEnd, None, extDict) 183 # update database 184 Logger.addMessage("Updating DB and FITS files for mfID %d, " 185 "fileName %s " % (mfID, filePathName), 186 alwaysLog=False) 187 self.updateDb(mfID, cfID) 188 progress.testForOutput() 189 190 # Update FITS catalogues 191 if recordDict: 192 Logger.addMessage("Updating catalogue files...") 193 self.updateFitsP(recordDict.keys(), numThreads) 194 # Update tiles 195 Logger.addMessage("Selecting OB tiles that haven't already been " 196 "processed...") 197 frameTypeSel = queries.getFrameSelection('tile', noDeeps=True) 198 redoSel = ('' if self.reDo else "totalExpTimeSum<0 AND ") 199 recordList = self.archive.query( 200 selectStr="M.multiframeID, M.confID, M.fileName", 201 fromStr="Multiframe AS M, FlatFileLookUp AS F", 202 whereStr="F.multiframeID=M.multiframeID AND " 203 "dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND " 204 "%s AND %s %s " % ( 205 startDate, version, endDate, version, 206 frameTypeSel, redoSel, DepCodes.selectNonDeprecated)) 207 208 self._mjdDict = {} 209 noNewFound += len(recordList) 210 211 if recordList: 212 Logger.addMessage("Started calculating new MJD/totalExpTime values" 213 " for tiles...") 214 else: 215 Logger.addMessage("No new tiles found...") 216 frameTypeSel = "m.frameType='tilestack'" 217 fromStr = "Multiframe as m, Provenance as v" 218 219 whereStr = ("%s and m.multiframeID=v.combiframeID and v.multiframeID=" 220 % (frameTypeSel)) 221 # mjdObs, mjdEnd 222 mjdObsDict = dict((mfID, (mjdObs, mjdEnd)) for mfID, mjdObs, mjdEnd in self.archive.query( 223 selectStr="m.multiframeID,MIN(mn.mjdObs) as mnMjdObs," 224 "MAX(mn.mjdObs+(%s/(24.*3600.))) as mxMjdEnd" 225 % (("e.detNdit*e.detDit") 226 if self.sysc.isVSA() else ("mn.expTime")), 227 fromStr=fromStr + ",Provenance as vn,Multiframe as mn%s,FlatFileLookup as f" % 228 (",MultiframeEsoKeys as e" if self.sysc.isVSA() else ""), 229 whereStr=whereStr + "vn.combiframeID and vn.multiframeID=mn.multiframeID%s " 230 " AND f.dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND m.%s AND " 231 "f.multiframeID=m.multiframeID" % 232 (" and mn.multiframeID=e.multiframeID" if self.sysc.isVSA() else "", 233 startDate, version, endDate, version, DepCodes.selectNonDeprecated), 234 groupBy="m.multiframeID")) 235 236 # totalExpTimeSum 237 totExpTimeSumDict = dict(self.archive.query( 238 "md.multiframeID,SUM(mdo.totalExpTime) as tExpSum", 239 "Multiframe as m,MultiframeDetector as md,Provenance as v, " 240 "Multiframe as mo,MultiframeDetector as mdo, CurrentAstrometry as c", 241 "%s and m.multiframeID=v.combiframeID and md.multiframeID=" 242 "c.multiframeID and md.extNum=c.extNum and " 243 "v.multiframeID=mo.multiframeID and mo.multiframeID=" 244 "mdo.multiframeID and m.multiframeID=md.multiframeID and " 245 "mdo.%s and m.%s" 246 "group by md.multiframeID" 247 % (frameTypeSel, DepCodes.selectNonDeprecated, DepCodes.selectNonDeprecated))) 248 progress = ForLoopMonitor(recordList) 249 for mfID, cfID, filePathName in recordList: 250 # Use ProvenanceUtils to calculate new values 251 mjdObs = mjdObsDict[mfID][0] if mfID in mjdObsDict else None 252 mjdEnd = mjdObsDict[mfID][1] if mfID in mjdObsDict else None 253 totalExpTimeSum = totExpTimeSumDict[mfID] if mfID in totExpTimeSumDict else None 254 extDict = {} 255 if mjdObs: 256 self._mjdDict[mfID] = (mjdObs, mjdEnd, totalExpTimeSum, extDict) 257 258 259 Logger.addMessage("Updating DB and FITS files for mfID %d, " 260 "fileName %s " % (mfID, filePathName), 261 alwaysLog=False) 262 self.updateDb(mfID, cfID) 263 progress.testForOutput() 264 if recordList: 265 Logger.addMessage("Updating catalogue files...") 266 self.updateFitsP(recordList, numThreads) 267 if (noNewFound == 0): 268 self._success = True
269 #-------------------------------------------------------------------------- 270
271 - def updateTotalExpTime(self, startDate, endDate):
272 """ Update total exposure time of OB images 273 """ 274 275 eqnDict = defaultdict(dict) 276 277 eqnDictVSA = { 278 'stack':"SUM(e.detDit * e.detNdit)", 279 'tile':"SUM(e.detDit*e.detNdit)/3."} 280 eqnDictWSA = { 281 'stack':"SUM(m.expTime)" 282 } 283 eqnDict['WSA'] = eqnDictWSA 284 eqnDict['OSA'] = eqnDictWSA 285 eqnDict['VSA'] = eqnDictVSA 286 287 frameTypeDict = defaultdict(list) 288 frameTypeDict['VSA'] = ['stack', 'tile'] 289 frameTypeDict['WSA'] = ['stack'] 290 frameTypeDict['OSA'] = ['stack'] 291 EPS = 0.0001 292 293 for frameType in frameTypeDict[self.sysc.loadDatabase]: 294 frameTypeSel = queries.getFrameSelection(frameType, noDeeps=True, alias='m', selType='%stack') 295 selectStr = "d.multiframeID, d.extNum, %s as tExp" % eqnDict[self.sysc.loadDatabase][frameType] 296 fromStr = "MultiframeDetector as d, Multiframe as m, Provenance as v," 297 fromStr += "Multiframe as mn" 298 groupBy = ("d.multiframeID, d.extNum, totalExpTime having (d.totalExpTime-%s)>%s" 299 % (eqnDict[self.sysc.loadDatabase][frameType], EPS)) 300 wbcls = "v" 301 if frameType == 'tile': 302 fromStr += ", Provenance as v2" 303 wbcls = "v.multiframeID=v2.combiframeID and v2" 304 ekcls = "" 305 if self.sysc.loadDatabase == 'VSA': 306 fromStr += ", MultiframeEsokeys as e" 307 ekcls = " and mn.multiframeID=e.multiframeID" 308 whereStr = ("%s and m.multiframeID=d.multiframeID and " 309 "m.multiframeID=v.combiframeID and " 310 "%s.multiframeID=mn.multiframeID and " 311 "m.utDate BETWEEN '%s' AND '%s'%s" 312 % (frameTypeSel, wbcls, startDate, endDate, ekcls)) 313 314 corrTexpSel = SelectSQL(selectStr, fromStr, whereStr, groupBy=groupBy) 315 316 total = self.archive.update("MultiframeDetector", 317 entryList=[("totalExpTime", "T.tExp")], 318 fromTables="(%s) AS T" % corrTexpSel, 319 where="T.multiframeID=MultiframeDetector.multiframeID " 320 "and T.extNum=MultiframeDetector.extNum") 321 self.archive.commitTransaction() 322 Logger.addMessage("Updated %s %s totalExpTimes in MultiframeDetector" 323 % (total, frameType))
324 325 #-------------------------------------------------------------------------- 326 327
328 - def selectExistingDbData(self, version, startDate, endDate):
329 """ 330 Select data from database where non-default values already exist. 331 332 @param version: 333 @type version: 334 @param startDate: 335 @type startDate: 336 @param endDate: 337 @type endDate: 338 """ 339 340 # @Only stacks - not tiles , not conf 341 frameTypeSel = queries.getFrameSelection('stack', noDeeps=True) 342 extInfo = self.archive.query( 343 selectStr="D.multiframeID, D.extNum, M.fileName," 344 "M.confID, M.mjdObs, M.mjdEnd,D.mjdMean", 345 fromStr="Multiframe AS M, MultiframeDetector AS D, " 346 "FlatFileLookUp AS F, MultiframeEsoKeys AS E", 347 whereStr="D.multiframeID=M.multiframeID AND " 348 "F.multiframeID=M.multiframeID AND " 349 "E.multiframeID=M.multiframeID AND " 350 "dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND " 351 "mjdMean>0 AND D.%s AND %s ORDER BY " 352 "D.multiframeID, D.extNum" % ( 353 startDate, version, endDate, version, 354 DepCodes.selectNonDeprecated, frameTypeSel)) 355 356 # @TODO: is this the best way to do it.... 357 recordDict = defaultdict(list) 358 mjdTimeDict = defaultdict(tuple) 359 for entry in extInfo: 360 recordDict[(entry.multiframeID, entry.confID, entry.fileName) 361 ].append((entry.extNum, entry.mjdMean)) 362 mjdTimeDict[entry.multiframeID] = (entry.mjdObs, entry.mjdEnd) 363 self._mjdDict = {} 364 for (mfID, cnfID, fileName) in recordDict: 365 extDict = defaultdict() 366 for extNum, mjdMean in recordDict[(mfID, cnfID, fileName)]: 367 extDict[extNum] = mjdMean 368 mjdObs, mjdEnd = mjdTimeDict[mfID] 369 self._mjdDict[mfID] = (mjdObs, mjdEnd, None, extDict) 370 return recordDict
371 372 #-------------------------------------------------------------------------- 373
374 - def updateDb(self, mfID, cfID):
375 """ 376 Updates DB with MEANMJD values 377 378 @param mfID: MultiframeID of combiframe for which provenan. 379 @type mfID: int 380 @param cfID: MultiframeID of confidence frame. 381 @type cfID: int 382 383 """ 384 if mfID in self._mjdDict: 385 mjdObs, mjdEnd, tExpSum, extDict = self._mjdDict[mfID] 386 # Update Multiframe 387 entryList = [("mjdObs", mjdObs), ("mjdEnd", mjdEnd)] 388 if tExpSum: 389 entryList += [("totalExpTimeSum", tExpSum)] 390 self.archive.update("Multiframe", 391 entryList, 392 where="multiframeID IN (%d,%d)" % (mfID, cfID)) 393 394 for extNum in extDict: 395 meanMjd = extDict[extNum] 396 # Update MultiframeDetector 397 self.archive.update("MultiframeDetector", 398 [("mjdMean", meanMjd)], 399 where="multiframeID=%d AND extNum=%d" % 400 (mfID, extNum))
401 402 #-------------------------------------------------------------------------- 403
404 - def updateFits(self, mfID, filePathName):
405 """ 406 Updates DB and catalogue headers with MEANMJD values 407 @todo: when is this used 408 409 @param mfID: MultiframeID of combiframe for which provenan. 410 @type mfID: int 411 @param filePathName: Full path to the FITS file. 412 @type filePathName: str 413 414 """ 415 if mfID in self._mjdDict: 416 _mjdObs, _mjdEnd, _tExpSum, extDict = self._mjdDict[mfID] 417 418 catFileName = fits.getCatalogue(filePathName) 419 # get file creation date 420 modtime = time.strptime(time.ctime(os.stat(catFileName).st_mtime)) 421 fileTimestamp = time.strftime("%y%m%d%H%M%S", modtime) 422 423 # update catalogue 424 catFile = fits.open(catFileName, 'update') 425 for extNum in extDict: 426 meanMjd = extDict[extNum] 427 # Update catalogue header 428 # Catalogue needs updating for CU4 to get info. 429 catFile[extNum - 1].header.update("MEANMJD", meanMjd, 430 "Mean MJDobs of extension") 431 catFile.close() 432 433 # reset file timestamp 434 timetpl = time.strptime(fileTimestamp, '%y%m%d%H%M%S') 435 modtime = time.mktime(timetpl) 436 os.utime(catFileName, (time.time(), modtime))
437 438 #-------------------------------------------------------------------------- 439
440 - def updateFitsP(self, records, numThreads):
441 """ 442 Updates DB and catalogue headers with MEANMJD values 443 444 @param records: List of mfID, cfID, filePathName 445 @type records: list 446 @param numThreads: Number of threads used in multiprocessing 447 @type numThreads: int 448 449 """ 450 inputs = [] 451 for mfID, _cfID, filePathName in records: 452 if mfID in self._mjdDict: 453 _mjdObs, _mjdEnd, _tExpSum, extDict = self._mjdDict[mfID] 454 catFileName = fits.getCatalogue(filePathName) 455 if os.path.exists(catFileName): 456 inputs.append((catFileName, extDict)) 457 # pool the input list 458 UpdateFits.numprocesses = numThreads 459 c = UpdateFits() 460 c.pooling(inputs)
461 462 #------------------------------------------------------------------------------ 463
464 -class KeyboardInterruptError(Exception):
465 pass
466 467 #------------------------------------------------------------------------------ 468
469 -class PoolIndexError(Exception):
470 pass
471 472 #------------------------------------------------------------------------------ 473
474 -class UpdateFits(object):
475 476 #-------------------------------------------------------------------------- 477 # Define class constants (access as UpdateFits.varName) 478 #numprocesses = multiprocessing.cpu_count() - 2 479 numprocesses = int(0.6 * multiprocessing.cpu_count()) 480 481 #-------------------------------------------------------------------------- 482
483 - def pooling(self, inputs):
484 """ 485 Create a pool using all CPUs and farm out the calculations. 486 487 @param inputs: List containing argument tuples for file update. 488 @type inputs: list(tuple(str)) 489 490 """ 491 status = 0 492 if UpdateFits.numprocesses < 1 or len(inputs) <= UpdateFits.numprocesses: 493 UpdateFits.numprocesses = 1 494 chunksize = 1 + max(0, min(100, 495 int(len(inputs) / UpdateFits.numprocesses))) 496 Logger.addMessage("Updating %d files using %d threads..." % ( 497 len(inputs), UpdateFits.numprocesses)) 498 pool = multiprocessing.Pool(processes=UpdateFits.numprocesses) 499 try: 500 pool_outputs = pool.map(calculatestar, inputs, chunksize) 501 for entry in pool_outputs: 502 if "False" in entry: 503 Logger.addMessage(entry) 504 except IndexError: 505 status = 1 506 except KeyboardInterrupt: 507 Logger.addMessage( 508 "Got ^C while pool mapping, terminating the pool.") 509 pool.terminate() 510 Logger.addMessage("pool is terminated.") 511 if status == 1: 512 pool.terminate() 513 raise PoolIndexError()
514 515 #-------------------------------------------------------------------------- 516
517 - def calculate(self, args):
518 """ 519 Update catalogues. 520 @param args: Arguments for updateCatalogueFile. 521 @type args: args 522 523 """ 524 result = self.updateCatalogueFile(*args) 525 return '%s(%s) = %s' % (self.updateCatalogueFile.__name__, args, 526 result)
527 528 #-------------------------------------------------------------------------- 529
530 - def updateCatalogueFile(self, catFileName, extDict):
531 """ 532 Update the catalogue file. 533 534 @param catFileName: Full path to the FITS file. 535 @type catFileName: str 536 @param extDict: Dictionary of extension numbers and mean MJDs. 537 @type extDict: dict 538 539 """ 540 541 # get file creation date 542 modtime = time.strptime(time.ctime(os.stat(catFileName).st_mtime)) 543 fileTimestamp = time.strftime("%y%m%d%H%M%S", modtime) 544 545 # update catalogue 546 catFile = fits.open(catFileName, 'update') 547 for extNum in extDict: 548 meanMjd = extDict[extNum] 549 # Update catalogue header 550 # Catalogue needs updating for CU4 to get info. 551 catFile[extNum - 1].header.update("MEANMJD", meanMjd, 552 "Mean MJDobs of extension") 553 catFile.close() 554 555 # reset file timestamp 556 timetpl = time.strptime(fileTimestamp, '%y%m%d%H%M%S') 557 modtime = time.mktime(timetpl) 558 os.utime(catFileName, (time.time(), modtime))
559 560 561 #------------------------------------------------------------------------------ 562
563 -def calculatestar(args):
564 try: 565 return UpdateFits().calculate(args) 566 except KeyboardInterrupt: 567 raise KeyboardInterruptError()
568 569 #------------------------------------------------------------------------------ 570
571 -def removeIncorrectProvenance(archive):
572 """ 573 Removes incorrect rows from Provenance due to reprocessing deprecations. 574 575 """ 576 Logger.addMessage("Deleted %s deprecated rows in Provenance" 577 % archive.delete("Provenance", "multiframeID IN (%s)" 578 % SelectSQL("Old.multiframeID", 579 table="Provenance AS P, Multiframe AS Old, Multiframe AS New", 580 where="P.multiframeID=Old.multiframeID AND " 581 "P.combiframeID=New.multiframeID AND Old.deprecated >= %s" 582 " AND New.%s" % (DepCodes.reprocCASU, 583 DepCodes.selectNonDeprecated))))
584 585 #------------------------------------------------------------------------------ 586
587 -def transferIllumTables(sysc, isUKLight=True, isTrialRun=False):
588 """ 589 Transfer new illumination tables. 590 591 @param sysc: Initialised SystemConstants object. 592 @type sysc: SystemConstants 593 @param isUKLight: Use UKLight instead of JANET. 594 @type isUKLight: bool 595 @param isTrialRun: If True, don't transfer files, just list what is 596 available. 597 @type isTrialRun: bool 598 599 """ 600 CASUQueries.sysc = sysc 601 Logger.addMessage("Checking CASU for new illumination tables...") 602 # list CASU illum files 603 lsArgs = ("--indicator-style=none" if CASUQueries.sysc.isWSA() else "-E") 604 605 tableRegExp = "illum_*.table" if sysc.isWSA() else "???20??/illum_*.table" 606 cmd = CASUQueries.sshCmd(isUKLight) + " 'ls %s %s'" % ( 607 lsArgs, os.path.join(sysc.casuIllumDir, tableRegExp)) 608 casuIllums = set(os.path.basename(path.rstrip()) for path in os.popen(cmd)) 609 610 # list available illum files 611 hereIllums = set(x for x in os.listdir(sysc.illuminationDir()) 612 if x.endswith('.table')) 613 614 transferList = sorted(casuIllums - hereIllums) 615 616 if not transferList: 617 Logger.addMessage("No new tables available.") 618 elif isTrialRun: 619 Logger.addMessage("Files to transfer:" + ", ".join(transferList)) 620 else: 621 Logger.addMessage("Transferring...") 622 for fileName in transferList: 623 fnSplit = fileName.split('_') 624 subDir = '' if sysc.isWSA() else fnSplit[2] + fnSplit[1] 625 path = os.path.join(sysc.casuIllumDir, subDir, fileName) 626 CASUQueries.copyFile(path, sysc.illuminationDir(), isUKLight) 627 Logger.addMessage("%s transferred." % fileName)
628 629 #------------------------------------------------------------------------------ 630
631 -def updateIllumEntries(archive, sysc, cal, dateStr, versStr):
632 """ 633 Update the database entries for illumFile. 634 @param archive: The DB connection. 635 @type archive: DBSession object 636 @param sysc: Initialised SystemConstants object. 637 @type sysc: SystemConstants 638 @param cal: The observation calendar. 639 @type cal: ObsCalendar object 640 @param dateStr: The month or day for which data is updated. 641 @type dateStr: str 642 @param versStr: The version for which data is updated. 643 @type versStr: str 644 645 """ 646 filterDict = {"WSA": ['J', 'H', 'K', 'Y', 'Z'], 647 "VSA": ['J', 'H', 'Ks', 'Y', 'Z'], 648 "OSA": []} 649 illumList = archive.query( 650 selectStr="Multiframe.multiframeID, filterName", 651 fromStr=Join(["Multiframe", "FlatFileLookUp"], ["multiframeID"]), 652 whereStr=''.join(["illumFile like 'NONE'", 653 " AND catName not like '%empty_catalogue.fits'", 654 " AND catName not like 'NONE'", 655 " AND dateVersStr like '%s_v%s'" % (dateStr, 656 versStr)])) 657 counter = 0 658 missingIllumFiles = set() 659 for mfID, filterName in illumList: 660 if filterName in filterDict[sysc.loadDatabase]: 661 illumDate = (cal.checkDate(dateStr)[-3:].lower() if sysc.isWSA() 662 else dateStr[:4]) 663 illumFileName = '_'.join([ 664 "illum", illumDate, cal.getMonth(dateStr).lower(), filterName]) 665 if os.path.exists(os.path.join(sysc.illuminationDir(), 666 "%s.table" % illumFileName)): 667 counter += archive.update( 668 "Multiframe", "illumFile='%s.table'" % illumFileName, 669 where="multiframeID=%s" % mfID) 670 else: 671 missingIllumFiles.add(illumFileName) 672 Logger.addMessage("%s entries updated." % counter) 673 return missingIllumFiles
674 675 #------------------------------------------------------------------------------ 676 # Entry point for script. 677 678 # Allow module to be imported as well as executed from the command line 679 if __name__ == "__main__": 680 CLI.progOpts += [ 681 CLI.Option('S', "semester", 682 "Run illum update on this semester list (or 'all')", "LIST"), 683 CLI.Option('U', "update_illum", 684 "Update illum table entries"), 685 CLI.Option('V', "version", "version number of the data", "STR"), 686 CLI.Option('b', "updateCats", "Update the mean mjd in FITS catalogues" 687 " only"), 688 CLI.Option('f', "set_flag", 689 "Set a flag that this code has been run."), 690 CLI.Option('j', "janet", 691 "Use JANET instead of UKLight"), 692 CLI.Option('m', "updateMeanMjd", 693 "Update the mean MJD of OB pawprints"), 694 CLI.Option('p', "provenance", 695 "Update provenance"), 696 CLI.Option('s', "setup", 697 "Setup the programme requirements"), 698 CLI.Option('x', "transfer_illum", 699 "Transfer illum tables"), 700 CLI.Option('y', "threads", 701 "Number of threads used in mean MJD udate", 702 "INT", int(0.6 * multiprocessing.cpu_count())) 703 ] 704 705 cli = CLI("FinaliseFlatFileIngest", "$Revision: 10129 $", __doc__) 706 database = cli.getArg("database") 707 Logger.isVerbose = False 708 log = Logger("%s_FFLI_%s.log" % (database.split('.')[-1], 709 utils.makeTimeStamp().replace(' ', '_'))) 710 711 Logger.addMessage(cli.getProgDetails()) 712 sysc = SystemConstants(database.split('.')[-1]) 713 obsCal = sysc.obsCal 714 715 if not cli.getOpt("semester"): 716 semesterList = obsCal.getSemList()[-3:] 717 elif cli.getOpt("semester").lower() == "all": 718 semesterList = obsCal.getSemList() 719 else: 720 semesterList = cli.getOpt("semester").split(',') 721 722 version = (cli.getOpt("version") or 723 str(obsCal.maxVersOfDate(obsCal.getDates(semesterList[0], 724 "%Y%m%d")[0]))) 725 726 doAll = not any([cli.getOpt("setup"), cli.getOpt("provenance"), 727 cli.getOpt("transfer_illum"), cli.getOpt("update_illum"), 728 cli.getOpt("set_flag"), cli.getOpt("updateMeanMjd")]) 729 730 731 732 if (doAll or cli.getOpt("transfer_illum")) and not sysc.isOSA(): 733 if os.getenv('HOST') == 'menkaure': 734 transferIllumTables(sysc, isUKLight=not cli.getOpt("janet"), 735 isTrialRun=cli.getOpt("test")) 736 else: 737 Logger.addMessage("<WARNING> CASU connection is only available on" 738 " menkaure!") 739 740 if doAll or cli.getOpt("update_illum") or cli.getOpt("set_flag"): 741 fitsDirs = fits.FitsList(sysc, prefix="cu0_") 742 fitsDirs.createFitsDateDict() 743 744 if (doAll or cli.getOpt("update_illum")) and not sysc.isOSA(): 745 missingIllumFiles = set() 746 try: 747 # create a DB connection 748 archive = DbSession(database, False, cli.getOpt("test"), 749 cli.getOpt("user")) 750 # check dates 751 for dateVersStr in sorted(fitsDirs.invFitsDateDict): 752 dateStr, versStr = dateVersStr.partition("_v")[::2] 753 usedVersion = cli.getOpt("version") or \ 754 str(obsCal.maxVersOfDate(dateStr)) 755 if obsCal.getDates(semesterList[0], "%Y%m%d")[0] <= dateStr <= \ 756 obsCal.getDates(semesterList[-1], "%Y%m%d")[1] \ 757 and usedVersion == versStr: 758 Logger.addMessage( 759 "Updating illumination table entries" 760 " in %s for %s (%s v.%s)" % ( 761 database, obsCal.formatDate( 762 dateStr, outFormat="%Y-%m-%d"), 763 obsCal.checkDate(dateStr), versStr)) 764 missingIllumFiles.update(updateIllumEntries( 765 archive, sysc, obsCal, dateStr, versStr)) 766 #else: 767 # Logger.addMessage( 768 # "No illumination table updated since " 769 # "%s not in given semester range (%s)." % ( 770 # dateStr, ','.join(semesterList))) 771 if missingIllumFiles: 772 Logger.addMessage( 773 "<WARNING> The following illumination tables do not " 774 "exist, please upload from CASU: %s" % 775 ", ".join(sorted(missingIllumFiles))) 776 del archive 777 except Exception, details: 778 Logger.addExceptionMessage(details) 779 raise 780 781 if doAll or cli.getOpt("provenance"): 782 removeIncorrectProvenance(DbSession(autoCommit=True, cli=cli)) 783 cli.resetOptArg("comment", CuSession.comment) 784 Logger.addMessage(cli.getProgDetails("ProvenanceFiller")) 785 ProvenanceFiller(cli=cli).run() 786 787 if doAll or cli.getOpt("setup"): 788 cli.resetOptArg("comment", CuSession.comment) 789 Logger.addMessage(cli.getProgDetails("ProgrammeBuilder")) 790 ProgrammeBuilder(cli=cli).run() 791 792 if doAll or cli.getOpt("set_flag"): 793 Logger.addMessage("Setting flags that this program has run...") 794 for dateVersStr in sorted(fitsDirs.invFitsDateDict): 795 try: 796 dateDirPath = os.path.join( 797 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr) 798 except AttributeError: 799 Logger.addMessage("<ERROR> for %s in %r" % ( 800 dateVersStr, fitsDirs.invFitsDateDict[dateVersStr])) 801 for dateVStr in fitsDirs.invFitsDateDict: 802 if len(fitsDirs.invFitsDateDict[dateVStr]) > 1: 803 Logger.addMessage( 804 "The data for %s is in more than 1 directory %r" % ( 805 dateVStr, 806 fitsDirs.invFitsDateDict[dateVStr])) 807 raise SystemExit 808 # temporary change for VSA_v1_3 809 #dbName = ("VSA" if "VSA" in database.split('.')[-1] 810 # else database.split('.')[-1]) 811 dbName = database.split('.')[-1] 812 cu03edFlag = os.path.join(dateDirPath, "CU03ED_%s" % dbName) 813 fffiedFlag = os.path.join(dateDirPath, "FFFIED_%s" % dbName) 814 if os.path.exists(cu03edFlag) and not os.path.exists(fffiedFlag): 815 Logger.addMessage("wrote %s" % fffiedFlag) 816 file(fffiedFlag, 'w').write(utils.makeTimeStamp() + '\n') 817 818 if sysc.isVSA() and (doAll or cli.getOpt("updateMeanMjd")): 819 cli.resetOptArg("comment", CuSession.comment) 820 # Now update new multiframes with meanMjdObs 821 Logger.addMessage(cli.getProgDetails("FileHistoryDataFiller")) 822 # @TODO: See if above line needed when CU-ise code. 823 # create a DB connection 824 archive = DbSession(database, False, cli.getOpt("test"), 825 cli.getOpt("user")) 826 archive.enableDirtyRead() 827 FileHistoryDataFiller(cli=cli).run(archive, semesterList, version, 828 cli.getOpt("updateCats"), int(cli.getOpt("threads"))) 829 830 #------------------------------------------------------------------------------ 831 # Change log: 832 # 833 # 18-Jul-2008, ETWS: First Version 834 # 2-Oct-2008, ETWS: Reversed to simple run options and fixed illumination 835 # table handling. 836