Package invocations :: Package cu3 :: Module cu3
[hide private]

Source Code for Module invocations.cu3.cu3

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: cu3.py 10247 2014-03-13 16:26:53Z EckhardSutorius $ 
   4  """ 
   5     Invoke CU3. Ingest details of image products. 
   6   
   7     @author: E. Sutorius 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9   
  10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  11     @contributors: I. Bond, J. Bryant, R.S. Collins, N.C. Hambly 
  12  """ 
  13  #------------------------------------------------------------------------------ 
  14  from   collections import defaultdict 
  15  import inspect 
  16  import os 
  17  import math 
  18  import pyfits 
  19  import time 
  20  import warnings 
  21   
  22  from   wsatools.CLI                    import CLI 
  23  import wsatools.DataFactory                as df 
  24  import wsatools.DbConnect.DbConstants      as dbc 
  25  from   wsatools.DbConnect.DbSession    import DbSession 
  26  from   wsatools.Logger                 import ForLoopMonitor 
  27  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  28  from   wsatools.DbConnect.IngIngester  import IngestLogger as Logger, \ 
  29                                                IngestLogFile 
  30  import wsatools.DbConnect.Schema           as schema 
  31  from   wsatools.File                   import File 
  32  import wsatools.FitsUtils                  as fits 
  33  import wsatools.Utilities                  as utils 
  34  import wsatools.WfcamsrcInterface          as wfcamsrc 
35 #------------------------------------------------------------------------------ 36 37 -class ModifyMfIDs(object):
38 """Class for updating MfIDs and doing pre-ingest checks. 39 """ 40 corruptFileDict = defaultdict(list) 41 maxHDUsDef = {"WSA":5, "VSA":17, "OSA":33} 42 maxHDUs = None 43 44 #-------------------------------------------------------------------------- 45 46 @staticmethod
47 - def checkExtensions(hduList):
48 """Check if the fits file has the correct number of extensions. 49 @param hduList: The list of all HDUs. 50 @type hduList: PyFITS list object. 51 """ 52 if len(hduList) < ModifyMfIDs.maxHDUs: 53 return -1 54 elif len(hduList) > ModifyMfIDs.maxHDUs: 55 return -2 56 else: 57 return 1
58 59 #-------------------------------------------------------------------------- 60 61 @staticmethod
62 - def writeNewMfIDs(fileMfIDDict, fileList, redo, forceComp=False):
63 """ 64 Write new multiframeIDs and wsaTimestamps into the files. 65 @param fileList: List of tuples (pixelFile, catalogueFile) 66 @type fileList: list((str,str)) 67 @param forceComp: Compress uncompressed FITS files during MfID 68 writing. 69 @type forceComp: bool 70 @param redo: If True, overwrite existing MfID. 71 @type redo: bool 72 @return: The next available MultiframeID. 73 @rtype: int 74 75 """ 76 sysc = IngCuSession.sysc 77 progress = ForLoopMonitor(fileList) 78 for fileNames in fileList: 79 # get file creation date 80 dirName = os.path.split(os.path.dirname(fileNames[0]))[1][2:8] 81 modtime = time.strptime(time.ctime(os.stat(fileNames[0]).st_mtime)) 82 wsaTimestamp = dirName + time.strftime("%y%m%d%H%M%S", modtime) 83 84 # get multiframeID 85 if fileNames[0] in fileMfIDDict: 86 fileMfID = fileMfIDDict[fileNames[0]] 87 else: 88 Logger.addMessage("<ERROR> No multiframeID available.") 89 raise SystemExit 90 91 # file modified? 92 modif = [0, 0, 0] 93 94 # open file in update mode 95 if sysc.mosaicSuffix in fileNames[0]: 96 ModifyMfIDs.maxHDUs = 2 97 elif sysc.tileSuffix in fileNames[0]: 98 ModifyMfIDs.maxHDUs = 2 99 else: 100 ModifyMfIDs.maxHDUs = ModifyMfIDs.maxHDUsDef[sysc.loadDatabase] 101 102 # in tiles all keywords are in the extension 103 extNum = 1 if sysc.tileSuffix in fileNames[0] else 0 104 105 catUpdate = False 106 try: 107 with warnings.catch_warnings(record=True) as w: 108 # Cause all warnings to always be triggered. 109 warnings.simplefilter("always") 110 fimgptr = fits.open(fileNames[0], "update") 111 # Check for truncated warning 112 if len(w) and "truncated" in str(w[-1].message): 113 Logger.addMessage( 114 "<Warning> %s: %s" % (str(w[-1].message), 115 fileNames[0])) 116 ModifyMfIDs.corruptFileDict["truncFits"].append( 117 fileNames) 118 119 if sysc.tileSuffix in fileNames[0] \ 120 and fimgptr[0].header["NAXIS"] > 0: 121 isExtOK = -3 122 else: 123 isExtOK = ModifyMfIDs.checkExtensions(fimgptr) 124 125 if isExtOK == -3 and forceComp: 126 fimgptr.close() 127 fits.compressFits([fileNames[0]]) 128 fimgptr = fits.open(fileNames[0], "update") 129 isExtOK = ModifyMfIDs.checkExtensions(fimgptr) 130 131 if isExtOK > -2: 132 # write timestamp into pixel file 133 modif[0] = fits.writeToFitsHdu(fimgptr, (extNum,), 134 sysc.loadDatabase + "_TIME", wsaTimestamp, 135 sysc.loadDatabase + " time stamp", redo) 136 137 # write camnum into OSA files 138 if sysc.isOSA(): 139 for hdunum in range(1, ModifyMfIDs.maxHDUs): 140 camNum = sysc.hduNumDict[int(fimgptr[hdunum].header[ 141 "HIERARCH ESO DET CHIP ID"].partition('#')[2])] 142 modif[2] = fits.writeToFitsHdu( 143 fimgptr, (hdunum,), "CAMNUM", camNum, 144 "Number of OMEGACAM detector (1-32)", True) 145 146 # write timestamp into catalogue 147 if "empty" not in fileNames[1]: 148 cattime = time.strptime(time.ctime( 149 os.stat(fileNames[1]).st_mtime)) 150 catTimestamp = dirName + time.strftime("%y%m%d%H%M%S", 151 cattime) 152 fcatptr = fits.open(fileNames[1], "update") 153 fits.writeToFitsHdu(fcatptr, (0,), 154 sysc.loadDatabase + "_TIME", catTimestamp, 155 sysc.loadDatabase + " time stamp", redo, 156 verify='silentfix') 157 fcatptr.close() 158 timetpl = time.strptime(catTimestamp[len(dirName):], 159 '%y%m%d%H%M%S') 160 modtime = time.mktime(timetpl) 161 os.utime(fileNames[1], (time.time(), modtime)) 162 163 # write MultiframeId 164 modif[1] = fits.writeToFitsHdu(fimgptr, (extNum,), 165 sysc.loadDatabase + "_MFID", fileMfID, 166 sysc.loadDatabase + " Multiframe ID", redo) 167 168 # update CIR_CPM for filtered files 169 if "two" in fileNames[0] and not "conf" in fileNames[0]: 170 for hdunum in range(1, ModifyMfIDs.maxHDUs): 171 circpm = fimgptr[hdunum].header[ 172 "CIR_CPM"].partition('_st') 173 circpmnew = ''.join(circpm[:2]) \ 174 + "_two_conf.fit" \ 175 + '[%d]' % hdunum 176 177 fits.writeToFitsHdu(fimgptr, (hdunum,), "CIR_CPM", 178 circpmnew, "Confidence Map", True) 179 180 # update cat as well 181 catUpdate = True 182 modif[0] = 1 183 184 elif isExtOK == -1: 185 ModifyMfIDs.corruptFileDict["missingExt"].append(fileNames) 186 Logger.addMessage( 187 "<Warning> missing extension in " + fileNames[0]) 188 189 elif isExtOK == -2: 190 ModifyMfIDs.corruptFileDict["tooManyExt"].append(fileNames) 191 Logger.addMessage( 192 "<Warning> too many extensions in " + fileNames[0]) 193 194 elif isExtOK == -3: 195 ModifyMfIDs.corruptFileDict["uncompFits"].append(fileNames) 196 Logger.addMessage( 197 "<Warning> uncompressed FITS file: " + fileNames[0]) 198 199 # close file 200 fimgptr.close() 201 except AttributeError as error: 202 if "SystemConstants" in repr(error): 203 ModifyMfIDs.corruptFileDict["sysConError"].append(fileNames) 204 Logger.addMessage("<ERROR>:%s in %s" 205 % (error[0], fileNames[0])) 206 else: 207 raise 208 except pyfits.VerifyError as error: 209 Logger.addMessage("<ERROR>:%s in %s" % (error[0], fileNames[0])) 210 if "not FITS standard" in repr(error): 211 fimgptr.close(output_verify='fix') 212 else: 213 fimgptr.close() 214 except Exception as error: 215 print(error) 216 Logger.addMessage("<ERROR>:%s in %s" % (error[0], fileNames[0])) 217 218 try: 219 del fimgptr 220 except Exception as error: 221 print(error) 222 Logger.addMessage("<ERROR>:%s in %s" % (error[0], fileNames[0])) 223 224 if catUpdate: 225 modtime = time.strptime(time.ctime(os.stat(fileNames[1]).st_mtime)) 226 wsaTimestamp = dirName + time.strftime("%y%m%d%H%M%S", modtime) 227 228 fimgptr2 = fits.open(fileNames[1], "update") 229 for hdunum in range(1, ModifyMfIDs.maxHDUs): 230 circpm = fimgptr2[hdunum].header[ 231 "CIR_CPM"].partition('_st') 232 circpmnew = ''.join(circpm[:2]) \ 233 + "_two_conf.fit" \ 234 + '[%d]' % hdunum 235 fits.fixExponentBug(fimgptr2[hdunum].header) 236 fits.writeToFitsHdu(fimgptr2, (hdunum,), "CIR_CPM", 237 circpmnew, "Confidence Map", True, 'ignore') 238 239 fimgptr2.close() 240 timetpl = time.strptime(wsaTimestamp[len(dirName):], 241 '%y%m%d%H%M%S') 242 modtime = time.mktime(timetpl) 243 os.utime(fileNames[1], (time.time(), modtime)) 244 245 # if file was modified, set the modification date back 246 # to the original one 247 if sum(modif) > 0: 248 timetpl = time.strptime(wsaTimestamp[len(dirName):], 249 '%y%m%d%H%M%S') 250 modtime = time.mktime(timetpl) 251 os.utime(fileNames[0], (time.time(), modtime)) 252 253 progress.testForOutput() 254 255 Logger.addMessage("finished.")
256 257 #-------------------------------------------------------------------------- 258 259 @staticmethod
260 - def writeZeroMfIDs(fileList):
261 """ 262 Write a zero (0) multiframeID into the files. 263 @param fileList: List of file names 264 @type fileList: list(str) 265 """ 266 sysc = IngCuSession.sysc 267 archive = sysc.loadDatabase 268 for fileName in fileList: 269 # get file creation date 270 dirName = os.path.split(os.path.dirname(fileName))[1][2:8] 271 modtime = time.strptime(time.ctime(os.stat(fileName).st_mtime)) 272 wsaTimestamp = dirName + time.strftime("%y%m%d%H%M%S", modtime) 273 # in tiles all keywords are in the extension 274 extNum = 1 if sysc.tileSuffix in fileName else 0 275 276 # open file in update mode 277 fimgptr = fits.open(fileName, "update") 278 # write MultiframeId 279 fits.writeToFitsHdu(fimgptr, (extNum,), "%s_MFID" % archive, 0, 280 "%s Multiframe ID" % archive, True) 281 # close file 282 fimgptr.close() 283 284 # set the modification date back to the original one 285 modtime = time.mktime(time.strptime(wsaTimestamp[len(dirName):], 286 '%y%m%d%H%M%S')) 287 os.utime(fileName, (time.time(), modtime)) 288 289 # add warning to log file 290 Logger.addMessage("<Warning> Set MfID to 0 in " + fileName) 291 Logger.addMessage("finished.")
292
293 #------------------------------------------------------------------------------ 294 295 -class Cu3(IngCuSession):
296 """ 297 Invokes frame metadata ingestion for a given log of files transferred from 298 the data processing centre. 299 300 method: Establish locked DB connectivity; initiate curation objects; 301 cycle though the transfer log, accumulating individual table metadata in 302 CSV files. The database bulk ingest from those CSV files and the update of 303 the curation logs will be done at a later stage. 304 305 """
306 - class NoFilesError(Exception):
307 """ A curation error instead of a programming error. """ 308 pass
309 310 #-------------------------------------------------------------------------- 311 cuedFileExists = False # Does CU3ED file exist? 312 313 #-------------------------------------------------------------------------- 314
315 - def __init__(self, 316 curator=CLI.getOptDef("curator"), 317 database=DbSession.database, 318 csvSharePath=None, 319 programmeList=CLI.getOptDef("programmes"), 320 noMfID=CLI.getOptDef("nomfid"), 321 ReDo=CLI.getOptDef("redo"), 322 onlyMfID=CLI.getOptDef("onlymfid"), 323 excludeFiles=CLI.getOptDef("exclude"), 324 isTrialRun=DbSession.isTrialRun, 325 keepWorkDir=CLI.getOptDef("keepwork"), 326 forceComp=CLI.getOptDef("forcecomp"), 327 isWfauProduct=CLI.getOptDef("iswfauprod"), 328 xferlog=CLI.getArgDef("xferlog"), 329 comment=CLI.getArgDef("comment")):
330 """ 331 @param curator: Name of curator. 332 @type curator: str 333 @param comment: Descriptive comment as to why curation task 334 is being performed. 335 @type comment: str 336 @param csvSharePath: Directory where data is written to. 337 @type csvSharePath: str 338 @param database: Name of the database to connect to. 339 @type database: str 340 @param excludeFiles: Excluded files from processing. 341 @type excludeFiles: list(str) 342 @param forceComp: Compress uncompressed FITS files during MfID 343 writing. 344 @type forceComp: bool 345 @param isTrialRun: If True, do not perform database modifications. 346 @type isTrialRun: bool 347 @param isWfauProduct: If True, files (e20*) are in a products subdir. 348 @type isWfauProduct: bool 349 @param keepWorkDir: Don't remove working directory. 350 @type keepWorkDir: bool 351 @param noMfID: If True, don't write new MfID into FITS file. 352 @type noMfID: bool 353 @param onlyMfID: If True, only write new MfID into FITS file. 354 @type onlyMfID: bool 355 @param programmeList: Only process data for given programmes (accepts 356 keywords 'all', 'ns' (non-survey), 357 'ukidss' (all 5 main surveys)). 358 @type programmeList: list(str) 359 @param ReDo: If True, overwrite existing MfID. 360 @type ReDo: bool 361 @param xferlog: Logfile containing files to be ingested. 362 @type xferlog: str 363 364 """ 365 typeTranslation = {"curator":str, 366 "database":str, 367 "csvSharePath":str, 368 "programmeList":list, 369 "noMfID":bool, 370 "ReDo":bool, 371 "onlyMfID":bool, 372 "excludeFiles":list, 373 "isTrialRun":bool, 374 "xferlog":str, 375 "keepWorkDir":bool, 376 "forceComp":bool, 377 "isWfauProduct":bool, 378 "comment":str} 379 380 super(Cu3, self).attributesFromArguments( 381 inspect.getargspec(Cu3.__init__)[0], locals(), 382 types=typeTranslation) 383 384 # Initialize parent class 385 super(Cu3, self).__init__(cuNum=3, 386 curator=self.curator, 387 comment=self.comment, 388 reqWorkDir=True, 389 keepWorkDir=self.keepWorkDir, 390 database=self.database, 391 autoCommit=False, 392 isTrialRun=self.isTrialRun) 393 394 # Write list of table names and schemas to the file used by exmeta 395 self._tableListPath = \ 396 os.path.join(self._workPath, "metadata_schema.dat") 397 398 lines = ("%s %s\n" % (self.sysc.metadataSchema(), table) 399 for table in self.sysc.metadataTables) 400 401 file(self._tableListPath, 'w').writelines(lines) 402 403 # Other constants used within this script: 404 self.createFileList() 405 406 self.procFileName = os.path.join(self._workPath, 'procfilelist.dat') 407 408 # exclude all items containing the names given via exclude option 409 tmpList = [] 410 if self.excludeFiles: 411 Logger.addMessage("Excluding %s" % ','.join(self.excludeFiles)) 412 for excl in self.excludeFiles: 413 for entry in self._fileList: 414 if not (excl in entry[0] or excl in entry[1]): 415 tmpList.append(entry) 416 self._fileList = tmpList[:] 417 418 self._initFromDB() 419 420 self.csvSharePath = self.csvSharePath or self.archive.sharePath() 421 utils.ensureDirExist(self.csvSharePath)
422 423 #-------------------------------------------------------------------------- 424
425 - def _initFromDB(self):
426 """initialize constants from the DB 427 """ 428 # Connect to database 429 self._connectToDb() 430 431 try: 432 if not self.isTrialRun: 433 self._cuEventID = self._getNextCuEventID() 434 435 if self.cuNum != dbc.smallIntDefault(): 436 try: 437 self._cuTable = df.CurationTaskTable(self.archive, 438 self.cuNum) 439 Logger.addMessage("Initialising Curation Use case:") 440 Logger.addMessage(self._cuTable.getDescription()) 441 Logger.addMessage("on list: " + self.xferlog) 442 except ValueError as error: 443 Logger.addMessage("<Warning> " + str(error)) 444 self._cuTable = None 445 446 # if local products are ingested get progIDs first 447 if self.isWfauProduct: 448 self.fileProgIdDict = self.getProgIDs() 449 450 # create the multiframeID lookup dict 451 self.prepareMfIDlookup() 452 453 # write multiframe IDs and Timestamps into fits headers 454 if not self.noMfID: 455 self.writeMfIDs() 456 if not self._fileList: 457 raise Cu3.NoFilesError 458 459 if self.onlyMfID: 460 Logger.addMessage( 461 "Finished writing mutiframeIDs. If you want " 462 "to process the data run CU3 with the '-n' flag.") 463 else: 464 # create the programmeID lookup dict 465 if not self.isWfauProduct: 466 self.fileProgIdDict = self.getProgIDs() 467 468 # create the programmes.csv file 469 self.csvFileName = os.path.join(self.sysc.srcPath, 470 "wfcamsrc/programmes.csv.%d" % self._cuEventID) 471 self.createProgrammeCsv(progIDofFile=self.fileProgIdDict) 472 finally: 473 self._disconnectFromDb()
474 475 #-------------------------------------------------------------------------- 476
477 - def _onRun(self):
478 """ Do CU3. 479 """ 480 Logger.addMessage( 481 "Started CU%s on list: %s" % (self.cuNum, self.xferlog)) 482 483 # Create a log file to hold the verbose record for this invocation 484 log = Logger(self.createLogFileName()) 485 486 try: 487 # Only invoke ingestion if needed: 488 if not self._fileList: 489 Logger.addMessage("No new metadata are required to be loaded.") 490 return 491 492 # output file prefix 493 self._outPrefix = self.createFilePrefix( 494 self.cuNum, self._cuEventID, self._hostName, 495 self.database.rpartition('.')[2]) 496 497 # Create ingest log file 498 ingestLogFile = IngestLogFile(os.path.join( 499 self.dbMntPath, self._outPrefix + ".log")) 500 Logger.addMessage( 501 "CUEvent %s: No. of files containing metadata to be ingested: %s" 502 % (self._cuEventID, len(self._fileList))) 503 504 # Flag to rewrite the file list if files with more than 505 # the number of allowed extensions are found. 506 rewriteFileList = False 507 508 if not self.onlyMfID: 509 self.setTableVariables() 510 511 for table in self._tableDict: 512 csvFileName = os.path.join( 513 self.csvSharePath, self._tableDict[table]["ingfile"]) 514 if os.path.exists(csvFileName): 515 os.remove(csvFileName) 516 517 # split the production frames from the general filelist; 518 # rewrite the general filelist if necessary; 519 # do a dry run without ingest first to cross reference 520 # all mfIDs correctly 521 522 prodFileName, prodFileNum = self.splitFileList(rewriteFileList) 523 524 # move in the programmes.csv file for this cuEventID 525 526 os.rename(self.csvFileName, 527 ''.join(self.csvFileName.rpartition('.csv')[:2])) 528 529 fitsForIngList = self.selectFilesByProg() 530 531 # first run 'metadata' on files in prodFileName 532 if prodFileNum > 0: 533 Logger.addMessage( 534 "Running exmeta on production frames first...") 535 # @@TODO: These variables are subsequently overwritten, if 536 # not a problem then don't save the return result. 537 notIngList, wcsNaNFlag = wfcamsrc.extractMetadata( 538 self._tableListPath, prodFileName, self._cuEventID, 539 self._workPath, self._outPrefix, log.pathName, self.sysc) 540 541 # then run 'metadata' on all the selected files for real 542 Logger.addMessage("Running exmeta on %d/%d frames..." % ( 543 len(fitsForIngList), len(self._fileList))) 544 notIngList, wcsNaNFlag = wfcamsrc.extractMetadata( 545 self._tableListPath, self.procFileName, self._cuEventID, 546 self.csvSharePath, self._outPrefix, log.pathName, self.sysc) 547 548 if notIngList: 549 self.writeErrorFiles(notIngList) 550 551 if self.sysc.isWSA(): 552 fitsList = [fileNames[0] for fileNames in fitsForIngList] 553 pathList = [os.path.dirname(fileNames[0]) 554 for fileNames in fitsForIngList] 555 satList = self.checkForSatData( 556 os.path.commonprefix(pathList)) 557 if satList: 558 satCsv = self.parseSatData(satList, fitsList) 559 self.updateTableDict() 560 561 # write ingest info into the ingest log file 562 timestamp = utils.makeMssqlTimeStamp() 563 histories = {"DCH": [], "ACH": [], "PCH": []} 564 # include processed dates 565 histories["DCH"].extend(self._dateList) 566 # for the update of the curation event history 567 histories["ACH"].append( 568 [self._cuEventID, self.cuNum, log.pathName, dbc.charDefault(), 569 timestamp, self.curator, self.comment, dbc.yes()]) 570 571 isNaN = wcsNaNFlag is None 572 preComment = ("WCSERROR: " if isNaN else 573 self._historyCommentPrefix(self._success, 574 dbc.yes())) 575 576 row = (self._cuEventID, self.cuNum, log.pathName, 577 dbc.charDefault(), timestamp, self.curator, 578 preComment + self.comment, dbc.yes()) 579 580 self._connectToDb() 581 self._updateArchiveCurationHistory(row) 582 self.archive.commitTransaction() 583 584 # for the update of the programme curation history 585 for programmeID in self._progOrderRatedDict: 586 histories["PCH"].append( 587 [programmeID, self._cuEventID, timestamp, -1]) 588 589 # check if CU3ED files exist 590 for dateDirPath in histories["DCH"]: 591 cuedFileName = "CU%02dED_%s" % ( 592 self.cuNum, self.database.rpartition('.')[2]) 593 if os.path.exists(os.path.join(dateDirPath, cuedFileName)): 594 self.cuedFileExists = True 595 596 # handle NaN error files 597 if isNaN: 598 utils.ensureDirExist( 599 self.sysc.wcsErrorFilesPath(self.archive.sharePath())) 600 601 if os.path.exists(ingestLogFile.name): 602 newName = os.path.join( 603 self.sysc.wcsErrorFilesPath(self.archive.sharePath()), 604 os.path.basename(ingestLogFile.name)) 605 os.rename(ingestLogFile.name, newName + ".wcserr") 606 607 for tableName in self._tableDict: 608 csvFileName = self.archive.sharePath( 609 self._tableDict[tableName]["ingfile"]) 610 newName = os.path.join( 611 self.sysc.wcsErrorFilesPath(self.archive.sharePath()), 612 self._tableDict[tableName]["ingfile"]) 613 os.rename(csvFileName, newName + ".wcserr") 614 615 Logger.addMessage( 616 "<WCSERROR> All created files have been moved to %s." % 617 self.sysc.wcsErrorFilesPath(self.archive.sharePath())) 618 else: 619 ingestLogFile.pickleWrite( 620 self._ingestOrder, self._tableDict, histories) 621 Logger.addMessage( 622 "Curation History info written to " + ingestLogFile.name) 623 self._disconnectFromDb() 624 625 Logger.addMessage( 626 "Processing of %s files finished." % len(fitsForIngList)) 627 628 forceFlag = (" -F" if self.cuedFileExists else '') 629 Logger.addMessage("Please, run ./IngestCUFiles.py -i -r %s%s %s" 630 % (ingestLogFile.name, forceFlag, self.database)) 631 632 except Cu3.NoFilesError: 633 Logger.addMessage("No metadata are required to be loaded.") 634 except Exception as error: 635 Logger.addExceptionDetails(error) 636 finally: 637 Logger.addMessage("Log written to " + log.pathName) 638 Logger.dump(file(log.pathName, 'a')) 639 Logger.reset()
640 641 #-------------------------------------------------------------------------- 642
643 - def checkForSatData(self, commonPath):
644 dirList = [os.path.join(commonPath, x) for x in os.listdir(commonPath) 645 if x.startswith("satellites")] 646 return dirList
647 648 #-------------------------------------------------------------------------- 649
650 - def parseSatData(self, satList, fitsList):
651 inclList = [] 652 for fitsFileName in fitsList: 653 fitsFile = File(fitsFileName) 654 inclList.append('%s_%s' % (fitsFile.sdate, fitsFile.runno)) 655 del fitsFile 656 print inclList 657 658 for satFileName in satList: 659 satFile = File(satFileName) 660 satFile.ropen() 661 colNames = satFile.readline().split() 662 print colNames 663 satellites = satFile.readlines(omitEmptyLines=True, 664 findValues=inclList) 665 satDict = defaultdict(lambda : defaultdict()) 666 667 satCount = 1 668 for sat in satellites: 669 satAttr = [] 670 satTmp = sat.split() 671 print satTmp 672 satAttr.append(satTmp[0]) 673 satAttr.append(str(15 * (int(satTmp[1]) \ 674 + (int(satTmp[2]) + float(satTmp[3])/60.)/60.))) 675 satAttr.append(str(self.signum(int(satTmp[4])) * ( 676 abs(int(satTmp[4])) + (int(satTmp[5]) \ 677 + float(satTmp[6])/60.)/60.))) 678 for i in range(7, len(satTmp)): 679 satAttr.append(satTmp[i]) 680 for i, col in enumerate(colNames): 681 if satAttr[0] in satDict: 682 satCount += 1 683 else: 684 satCount = 1 685 if i > 0: 686 satDict[(satAttr[0], satCount)][col] = satAttr[i] 687 satFile.close() 688 del satFile 689 690 mfidDict = defaultdict() 691 for date in self.mfIDofFileOfDate: 692 for fileName, satCount in satDict: 693 for mfidFile in self.mfIDofFileOfDate[date]: 694 if fileName.rpartition('_')[0] in mfidFile: 695 mfidDict[fileName] = self.mfIDofFileOfDate[date][ 696 mfidFile] 697 698 outFileName = os.path.join(self.csvSharePath, 699 "%s_u13alm01SatelliteDetection.csv" % self._outPrefix) 700 outFile = File(outFileName) 701 outFile.wopen() 702 counter = 0 703 for entry in satDict: 704 counter += 1 705 ell = (15+float(satDict[entry]["ell'"]))/16 706 convFaktor = math.pi/180 707 delta = self.signum(float(satDict[entry]["PA'"])) * 90 708 pa = math.atan(math.tan(convFaktor*( 709 float(satDict[entry]["PA'"]) + 90)) / 16) / convFaktor + delta 710 line = ",".join([str(mfidDict[entry[0]]), satDict[entry]["ext"], 711 str(self._cuEventID), str(satCount), '3', 712 satDict[entry]["MJD"], 713 satDict[entry]["X"], str(dbc.realDefault()), 714 satDict[entry]["Y"], str(dbc.realDefault()), 715 satDict[entry]["RA"], satDict[entry]["Dec"], 716 satDict[entry]["HA"], satDict[entry]["npix"], 717 satDict[entry]["ell'"], str(ell), 718 satDict[entry]["PA'"], str(pa), 719 str(dbc.realDefault()), str(dbc.realDefault()), 720 satDict[entry]["J"], satDict[entry]["Jerr"], 721 satDict[entry]["PkHt"], str(dbc.realDefault()), 722 '0', '0', str(counter)]) 723 outFile.writetheline(line) 724 outFile.close() 725 return outFileName
726 727 #-------------------------------------------------------------------------- 728
729 - def updateTableDict(self):
730 table = "u13alm01SatelliteDetection" 731 schema = "NonSurvey/WSANSu13alm01_u13alm01Schema.sql" 732 self._ingestOrder.append(table) 733 self._tableDict[table] = dict(schema=schema, 734 ingfile="%s_%s.csv" % (self._outPrefix, table))
735 736 #-------------------------------------------------------------------------- 737
738 - def signum(self, integ):
739 if(integ < 0): 740 return -1 741 elif(integ >= 0): 742 return 1
743 744 #-------------------------------------------------------------------------- 745
746 - def addNonSurveyProgramme(self, newProgs):
747 """ 748 Insert new programme ID entries into the Programme table. 749 750 @param newProgs: A set of new project DFS ID strings, for which 751 programme IDs must be assigned. 752 @type newProgs: set(str) 753 754 """ 755 Logger.addMessage("Identifying new programmes...") 756 maxPIDNS = max(10000, 757 self.archive.queryAttrMax("programmeID", "Programme", 758 "programmeID<11000")) 759 maxPIDSat = max(11000, 760 self.archive.queryAttrMax("programmeID", "Programme")) 761 762 tableSchema = \ 763 schema.parseTables(self.sysc.curationSchema(), ["Programme"])[0] 764 765 for dfsIDString in newProgs: 766 if self.sysc.isWSA() and "LM" in dfsIDString.rpartition('/')[2]: 767 maxPID = maxPIDSat + 1 768 else: 769 maxPID = maxPIDNS + 1 770 row = [] 771 if self.sysc.isVSA() or self.sysc.isOSA(): 772 dfsIDString = dfsIDString.partition('(')[0] 773 for column in tableSchema.columns: 774 if column.name == "programmeID": 775 row.append(maxPID) 776 elif column.name == "dfsIDString": 777 row.append(dfsIDString) 778 elif column.name == "title": 779 if self.sysc.isWSA(): 780 if "LM" in dfsIDString.rpartition('/')[2]: 781 row.append("LM satellite %s" % dfsIDString) 782 else: 783 row.append("Non-survey programme %s" % dfsIDString) 784 else: 785 if dfsIDString.startswith('2'): 786 title = "DDT programme %s" % dfsIDString 787 elif dfsIDString.startswith('3'): 788 title = "Short programme %s" % dfsIDString 789 else: 790 title = "ESO programme %s" % dfsIDString 791 row.append(title) 792 elif column.name == "propPeriod": 793 row.append(dbc.nonSurveyProprietaryPeriod()) 794 elif column.name == "pairingCriterion": 795 if self.sysc.isWSA() \ 796 and "LM" in dfsIDString.rpartition('/')[2]: 797 row.append(column.getDefaultValue()) 798 else: 799 row.append(dbc.neighbourJoinCriterion()) 800 elif column.name == "adjacentFrameTolerance": 801 if self.sysc.isWSA() \ 802 and "LM" in dfsIDString.rpartition('/')[2]: 803 row.append(column.getDefaultValue()) 804 else: 805 row.append(dbc.adjacentFrameTolerance()) 806 else: 807 row.append(column.getDefaultValue()) 808 809 self.archive.insertData(tableSchema.name, row) 810 Logger.addMessage("Updated Programme table for %s: %s" % 811 (dfsIDString, maxPID))
812 813 #-------------------------------------------------------------------------- 814
815 - def createProgrammeCsv(self, progIDofFile=None):
816 """ Query Programme table and write programmes.csv file for C++ code. 817 818 @todo: It might make more sense to write the contents of 819 self._progIDofName to the programmes.csv, which includes the 820 merging of different types of programmes. This would save 821 some code duplication in exmeta. 822 """ 823 progs = self.archive.query("dfsIDString, programmeID", "Programme") 824 825 if self.sysc.isWSA(): 826 file(self.csvFileName, 'w').write('\n'.join("%s, %s" % prog 827 for prog in progs)) 828 else: 829 progDict = dict(progs) 830 csvFile = File(self.csvFileName) 831 csvFile.wopen() 832 833 for fileName in progIDofFile: 834 csvFile.writetheline("%s, %s" % ( 835 fileName, progIDofFile[fileName][1])) 836 for progID in self.knownSurveyTranslations: 837 csvFile.writetheline("%s, %s" % ( 838 progID, progDict[self.knownSurveyTranslations[progID]])) 839 for prog in progs: 840 csvFile.writetheline("%s, %s" % prog) 841 842 csvFile.close()
843 844 #-------------------------------------------------------------------------- 845
846 - def getProgIDs(self):
847 """ 848 Reads FITS files to determine the set of programme IDs for the metadata 849 that will be ingested in this curation event. Any new programmes are 850 identified and the Programme table is updated. 851 852 """ 853 Logger.addMessage("Getting all available programme IDs...") 854 self._createProgrammeTranslation() 855 856 progOfFile, newProgOfFile = self.getProgramIDs(self._allFileName) 857 858 progIDs = set(progOfFile[fileName][0] for fileName in progOfFile) 859 newProgs = set(newProgOfFile.values()).difference( 860 ("Unknown", "UNKNOWN", "None", "NONE")) 861 862 if newProgs: 863 # update the Programme table with new programmeIDs 864 self.addNonSurveyProgramme(newProgs) 865 866 # update translation and progID list 867 Logger.addMessage("Updating list of programme IDs...") 868 self._createProgrammeTranslation() 869 progIDs.update(self._progIDofName[name] for name in newProgs) 870 871 self._progIDs = sorted(progIDs) 872 Logger.addMessage( 873 "Got all programme IDs: " + ', '.join(map(str, self._progIDs))) 874 progOfFile, newProgOfFile = self.getProgramIDs(self._allFileName) 875 return progOfFile
876 877 #-------------------------------------------------------------------------- 878
879 - def selectFilesByProg(self):
880 """ 881 Select files by programme list. 882 """ 883 884 progOfFile = self.fileProgIdDict.copy() 885 progIDs = self._progIDs 886 # Merge together any programme IDs that are the same programme. 887 for entry in self.programmeList: 888 if '==' in entry: 889 inProg, _eql, outProg = entry.rpartition('==') 890 inPID = self._progIDofName[inProg.upper()] 891 if self.sysc.isWSA(): 892 outPID = self._progIDofName[ 893 outProg.upper() if '/' in outProg 894 else "U/UKIDSS/%s" % outProg.upper()] 895 else: 896 outPID = self._progIDofName[outProg.upper()] 897 for fileName in progOfFile: 898 if progOfFile[fileName] == inPID: 899 progOfFile[fileName] = outPID 900 progIDs.discard(inPID) 901 progIDs.add(outPID) 902 903 # get the programmes used in this cu run 904 self._progOrderRatedDict = self.getProcessOrder( 905 progIDs, include=self.programmeList) 906 907 # Create a programme ID -> fits file dictionary 908 fitsForIng = [] 909 for imgName, catName in sorted(self._fileList): 910 if progOfFile[imgName][0] in self._progOrderRatedDict: 911 fitsForIng.append([imgName, catName]) 912 # rewrite process file list 913 file(self.procFileName, 'w').writelines( 914 ' '.join(line) + ' %s\n' % i 915 for i, line in enumerate(fitsForIng)) 916 917 return fitsForIng
918 919 #-------------------------------------------------------------------------- 920
921 - def prepareMfIDlookup(self):
922 """ Get Multiframe IDs for processed dates. 923 """ 924 Logger.addMessage( 925 "Preparing the file name to multiframe ID look-up table...") 926 927 dateList = sorted(set(os.path.basename(os.path.dirname(x[0])) 928 for x in self._fileList)) 929 930 self.mfIDofFileOfDate = {} 931 for date in dateList: 932 self.mfIDofFileOfDate[date] = self.getMultiframeIDs(date) 933 934 for fileNames in self._fileList: 935 if fileNames[0] not in self.mfIDofFileOfDate[date]: 936 mfidName = dbc.multiframeUIDAttributeName() 937 if os.path.basename(fileNames[0]).startswith("e20") and \ 938 self.sysc.isVSA() and \ 939 self.fileProgIdDict[fileNames[0]][0] == 120: 940 fileMfID = max(self._getNextID( 941 mfidName, "FlatFileLookUp", 942 where="%s>1000000000000" % mfidName), 1000000000001) 943 else: 944 fileMfID = self._getNextID( 945 mfidName, "FlatFileLookUp", 946 where="%s<1000000000000" % mfidName) 947 948 self.archive.insertData("FlatFileLookUp", 949 (fileMfID, self._cuEventID, date, '%s%s' % 950 (self.sysc.pixelServerHostName, fileNames[0]))) 951 self.archive.commitTransaction() 952 953 #@@TODO: This could be optimised with a check to see if necessary. 954 self.mfIDofFileOfDate[date] = self.getMultiframeIDs(date)
955 956 #-------------------------------------------------------------------------- 957
958 - def setTableVariables(self):
959 """set the variables dependend of the tables used in this cu 960 """ 961 # Extract the frame metadata from the list of ingestion files: 962 tableList = zip(utils.extractColumn(self._tableListPath, 1), 963 utils.extractColumn(self._tableListPath, 0)) 964 965 self._ingestOrder = [] 966 self._tableDict = {} 967 for table, schema in tableList: 968 self._ingestOrder.append(table) 969 self._tableDict[table] = dict(schema=schema, 970 ingfile="%s_%s.csv" % (self._outPrefix, table))
971 972 #-------------------------------------------------------------------------- 973
974 - def splitFileList(self, update=False):
975 """Split the production frames from the general filelist. If update 976 is True, re-write the general file list as well. 977 """ 978 fileNameList = [] 979 prodFileName = os.path.join(self._workPath, "prodfilelist.dat") 980 fileNameList.append(prodFileName) 981 prodFile = File(prodFileName) 982 prodFile.wopen() 983 sciFileName = os.path.join(self._workPath, "scifilelist.dat") 984 fileNameList.append(sciFileName) 985 sciFile = File(sciFileName) 986 sciFile.wopen() 987 sfnum = 0 988 pfnum = 0 989 for entry in self._fileList: 990 outline = ' '.join(entry) + ' ' 991 filebase = os.path.basename(entry[0]) 992 if filebase.startswith( 993 (self.sysc.casuPrefix, self.sysc.wfauPrefix)): 994 sciFile.writetheline(outline + str(sfnum)) 995 sfnum += 1 996 else: 997 prodFile.writetheline(outline + str(pfnum)) 998 pfnum += 1 999 prodFile.close() 1000 sciFile.close() 1001 1002 if update: 1003 file(self._allFileName, 'w').writelines( 1004 ' '.join(line) + ' %s\n' % i 1005 for i, line in enumerate(self._fileList)) 1006 1007 return prodFileName, pfnum
1008 1009 #-------------------------------------------------------------------------- 1010
1011 - def writeMfIDs(self):
1012 """Write the multiframeIDs into the files. 1013 """ 1014 Logger.addMessage( 1015 "Writing multiframe IDs and Timestamps into FITS headers.") 1016 try: 1017 for dateVersStr in self.mfIDofFileOfDate: 1018 ModifyMfIDs.corruptFileDict = defaultdict(list) 1019 ModifyMfIDs.writeNewMfIDs( 1020 self.mfIDofFileOfDate[dateVersStr], self._fileList, 1021 self.ReDo, forceComp=self.forceComp) 1022 self.archive.commitTransaction() 1023 1024 except IOError: 1025 # @@TODO: Raise a CuError instead? 1026 raise SystemExit("You don't have write permissions to the " 1027 "FITS files. Please re-run with the -n option.") 1028 1029 # handle files with less/more than 4 extensions 1030 if ModifyMfIDs.corruptFileDict: 1031 corruptFileList = \ 1032 ModifyMfIDs.corruptFileDict["missingExt"]\ 1033 + ModifyMfIDs.corruptFileDict["tooManyExt"]\ 1034 + ModifyMfIDs.corruptFileDict["uncompFits"]\ 1035 + ModifyMfIDs.corruptFileDict["sysConError"]\ 1036 + ModifyMfIDs.corruptFileDict["truncFits"] 1037 Logger.addMessage("Corrupted files: %s" % ','.join( 1038 utils.unpackList(corruptFileList))) 1039 rewriteFileList = True 1040 self._fileList = sorted(set(self._fileList).difference( 1041 corruptFileList)) 1042 1043 errorFilePath = os.path.join(self.sysc.corruptFilesPath(), 1044 self.getErrorFileName("missingExtensions")) 1045 file(errorFilePath, 'w').writelines( 1046 ' '.join(f) + '\n' for f in corruptFileList)
1047 1048 #------------------------------------------------------------------------------ 1049 # Entry point for CU3 1050 1051 if __name__ == '__main__': 1052 # Transfer log with 1 filename per line 1053 CLI.progArgs.append(CLI.Argument("xferlog", "xferlog.log")) 1054 CLI.progOpts += [ 1055 CLI.Option('f', "forcecomp", 1056 "compress uncompressed FITS files during MfID writing"), 1057 1058 CLI.Option('m', "onlymfid", 1059 "only write MfIDs"), 1060 1061 CLI.Option('n', "nomfid", 1062 "don't write new MfID into FITS file"), 1063 1064 CLI.Option('o', "outpath", 1065 "directory where the csv file is written to", 1066 "PATH"), 1067 1068 CLI.Option('r', "redo", 1069 "enable overwriting of existing MfIDs"), 1070 1071 CLI.Option('s', "iswfauprod", 1072 "subdir is in products"), 1073 1074 CLI.Option('K', "keepwork", 1075 "Don't remove working dir."), 1076 1077 CLI.Option('P', "programmes", 1078 "only process data for given programmes (accepts keywords " 1079 "'all', 'ns' (non-survey), 'ukidss' (all 5 main surveys); " 1080 "one programme suffixed with 'x-' excludes this programme.)", 1081 "LIST", "all"), 1082 1083 CLI.Option('X', "exclude", 1084 "exclude given files/file patterns from processing", 1085 "LIST", '')] 1086 1087 cli = CLI(Cu3, "$Revision: 10247 $") 1088 Logger.addMessage(cli.getProgDetails()) 1089 1090 cu3 = Cu3(cli.getOpt("curator"), 1091 cli.getArg("database"), 1092 cli.getOpt("outpath"), 1093 cli.getOpt("programmes"), 1094 cli.getOpt("nomfid"), 1095 cli.getOpt("redo"), 1096 cli.getOpt("onlymfid"), 1097 cli.getOpt("exclude"), 1098 cli.getOpt("test"), 1099 cli.getOpt("keepwork"), 1100 cli.getOpt("forcecomp"), 1101 cli.getOpt("iswfauprod"), 1102 cli.getArg("xferlog"), 1103 cli.getArg("comment")) 1104 cu3.run() 1105 1106 #------------------------------------------------------------------------------ 1107 # Change log: 1108 # 1109 # 8-Apr-2003, NCH: Added in statements to implement deprecation of 1110 # reingested frames (not tested!); added in this revision 1111 # history stuff. 1112 # 30-Jul-2004, NCH: Refactoring in the light of changes to the curation 1113 # software data model and other procedures. 1114 # 2-Aug-2004, NCH: Changes, tidying up and finishing off 1115 # 3-Aug-2004, NCH: Small refactoring in the light of testing. 1116 # 4-Aug-2004, NCH: Still testing... small changes 1117 # 10-Aug-2004, NCH: Added share access violation work-around. 1118 # 11-Aug-2004, NCH: Added code to invoke a comprehensive constraint check 1119 # after ingest: checks all FK references and aborts 1120 # the ingest if a data integrity violation occurs. 1121 # 18-Jan-2005, ETWS: Edited some minor errors in the description. 1122 # 21-Feb-2005, ETWS: Included new MultiframeID creation and production 1123 # of an error log of non-ingestable files 1124 # 1-Mar-2005, NCH: Very minor logging changes, and reinstated cleanup at 1125 # the end. 1126 # 8-Mar-2005, ETWS: non-ingestable files log only written if there are any. 1127 # 11-Apr-2005, ETWS: Included creation of the file timestamp from the 1128 # UN*X file systems modification time; 1129 # fixed the NextCuEventId DB update 1130 # 13-Apr-2005, ETWS: move FITS header who belongs into the extensions 1131 # but are in the primary HDU into the right place. 1132 # 14-Apr-2005, ETWS: split up error list in not ingested list and 1133 # partially ingested list. 1134 # 23-May-2005, ETWS: fixed problem with new version directory names 1135 # 25-May-2005, ETWS: Included cuNum in getNextCurationEventID 1136 # 13-Jul-2005, ETWS: Included flag to bypass multiframeID writing if it has 1137 # been written to the fits files before, but the files 1138 # need reprocessing. 1139 # 2-Aug-2005, ETWS: Extended not and partly ingested filename with "_cu3" 1140 # to distinguish them easier from cu4's 1141 # 10-Aug-2005, ETWS: Included redo possibility for re-writing MFID and 1142 # TIMESTAMP to fits files. 1143 # 19-Sep-2005, ETWS: Included possibility to specify the used database; 1144 # included parameters to redo multiframe IDs. 1145 # 4-Oct-2005, ETWS: Edited infile documentation 1146 # 28-Nov-2005, NCH: Added dropping of table indexes prior to bulk load for 1147 # speedier operation. 1148 # 5-Dec-2005, JB: Commented out dropping the indexes prior to bulk load. 1149 # 20-Jan-2006, JB: Added code so that the name of the database being used 1150 # is prepended to the logfile filename. 1151 # 20-Jan-2006, ETWS: reset the ReDo parameter to False 1152 # 30-Mar-2006, ETWS: refactored to fix problem of associating correct 1153 # mfIDs of darks, skys, etc. 1154 # 2-May-2006, ETWS: Included mainDb and cuEventId in partly and not ingested 1155 # file names 1156 # 3-May-2006, ETWS: Fixed indentation 1157 # 11-May-2006, ETWS: Added more error messages 1158 # 16-May-2006, RSC: Replaced Metadata.makeTableList() with 1159 # utils.extractColumn() 1160 # 4-Jul-2006, RSC: utils.seqSum() replaced by new Python 2.3 sum() function. 1161 # 13-Sep-2006, JB: fixed a number of import bugs. 1162 # 30-Nov-2006, ETWS: Refactored to use OO framework and parallelisation, this 1163 # script now produces data files and an ingest log file, 1164 # ingest has to be done seperately with IngestCUFiles.py 1165 # 31-Jan-2007, ETWS: Bug fixes and improvements. 1166 # 14-Mar-2007, ETWS: Fixed typo. 1167 # 21-Mar-2007, ETWS: Added some more screen output. 1168 # 1-May-2007, ETWS: Included WCS error handling. 1169 # 16-May-2007, ETWS: Updated log output. 1170 # 20-Jun-2007, ETWS: Fixed bugs. 1171 # 22-Jun-2007, ETWS: Included default value for PCH entry in pickled file. 1172 # Included exclusion of files without 4 extensions. 1173 # 10-Jul-2007, ETWS: Updated for use with test cases. 1174 # 5-Feb-2008, ETWS: Included usage of FlatFileLookUp for better parallel 1175 # performance. 1176 # 6-Feb-2008, ETWS: Fixed problem with non-existing entries in FlatFileLookUp. 1177 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 1178 # file name setting. 1179 # 21-Feb-2008, ETWS: Fixed single file ingest multiframeID writing. 1180