Package wsatools :: Module FitsToDb
[hide private]

Source Code for Module wsatools.FitsToDb

   1  #------------------------------------------------------------------------------ 
   2  #$Id: FitsToDb.py 9945 2013-07-18 15:10:44Z EckhardSutorius $ 
   3  """ 
   4     Applications to perform various 'post-ingest' updates of database table data 
   5     from the original FITS file header values. 
   6   
   7     Applications 
   8     ============ 
   9   
  10     G{classtree: DbUpdater} 
  11   
  12     @group Applications: AttributeUpdater, CatFileUpdater, ProvenanceFiller 
  13     @group Error Handling: FileError, FileErrorList, MissingKeyword, 
  14                            MissingProvenance, NoProvenance 
  15   
  16     @author: R.S. Collins 
  17     @org:    WFAU, IfA, University of Edinburgh 
  18   
  19     @todo: DbUpdater probably should update ProgrammeCurationHistory. 
  20     @todo: Member variables and documentation need to be updated a bit for 
  21            FitsReader and associated classes. 
  22   
  23     @requires: PyFITS 
  24  """ 
  25  #------------------------------------------------------------------------------ 
  26  from __future__      import division, print_function 
  27  from future_builtins import map, zip 
  28   
  29  from   operator import itemgetter 
  30  import os 
  31  import re 
  32  import warnings 
  33   
  34  from   wsatools.CLI                 import CLI 
  35  from   wsatools.DbConnect.CuSession import CuSession 
  36  from   wsatools.DbConnect.DbSession import odbc 
  37  import wsatools.DbConnect.Schema        as schema 
  38  import wsatools.FitsUtils               as fits 
  39  from   wsatools.Logger              import Logger, ForLoopMonitor 
  40  from   wsatools.SystemConstants     import DepCodes 
  41  import wsatools.Utilities               as utils 
  42  #------------------------------------------------------------------------------ 
  43  # @@TODO: This CLI option will be defined either in individual scripts or by 
  44  #         Logger. 
  45   
  46  CLI.progOpts += [CLI.Option('v', 'verbose', 'display all warning messages')] 
47 48 #------------------------------------------------------------------------------ 49 50 -class FileError(object):
51 """ 52 Abstract base class for cumulative logging of file errors. Derive from this 53 class to denote a new type of error associated with reading a file, 54 defining the behaviour of each of the functions given below. See, as an 55 example, L{MissingKeyword}. These FileError objects are designed to be 56 stored in an FileErrorList container that collects all of the error logs 57 for an individual file, so that only one report is issued _per_ file. 58 59 """ 60 name = "FileError" #: Name of this type of file error. 61
62 - def __init__(self):
63 """ Initialises member data. """ 64 # Define this function with no arguments 65 pass
66 67 #-------------------------------------------------------------------------- 68
69 - def __str__(self):
70 """ @return: Error message. 71 @rtype: str 72 """ 73 # Define this function with no arguments 74 return ''
75 76 #-------------------------------------------------------------------------- 77
78 - def append(self):
79 """ Adds a new error associated to this object. """ 80 # Override this function with optional number of arguments 81 pass
82 83 #-------------------------------------------------------------------------- 84
85 - def errorOccurred(self):
86 """ 87 @return: True if an error has been logged via append() for this file. 88 @rtype: bool 89 90 """ 91 # Define this function with no arguments 92 return False
93 94 #-------------------------------------------------------------------------- 95
96 - def getFinalAnalysis(self, _numErrors):
97 """ 98 @param _numErrors: The total number of files with this error. 99 @type _numErrors: int 100 101 @return: The final summary report for all errors of this type. 102 @rtype: str 103 """ 104 return ''
105
106 #------------------------------------------------------------------------------ 107 108 -class MissingKeyword(FileError):
109 """ 110 Logs all instances of missing keywords for an individual FITS file. 111 112 """ 113 name = "MissingKeyword" #: Name of this type of file error. 114 _missingKeys = None 115 """ Dictionary of missing keys containing a string listing the full set of 116 extension headers from which they are missing. 117 """ 118
119 - def __init__(self):
120 """ Initialises member data. """ 121 self._missingKeys = {}
122 123 #-------------------------------------------------------------------------- 124
125 - def __str__(self):
126 """ @return: Error message. 127 @rtype: str 128 """ 129 intro = "has no keyword named " 130 return intro + utils.joinDict(self._missingKeys, " in extension(s) ", 131 " and " + intro)
132 133 #-------------------------------------------------------------------------- 134
135 - def append(self, fitsKey, fitsHeader):
136 """ 137 Adds a new missing keyword error for a given FITS keyword and HDU. 138 139 @param fitsKey: The FITS keyword that is missing from the file. 140 @type fitsKey: str 141 @param fitsHeader: The HDU number where the keyword is missing. 142 @type fitsHeader: int 143 144 """ 145 try: 146 self._missingKeys[fitsKey] += ", %s" % fitsHeader 147 except KeyError: 148 self._missingKeys[fitsKey] = str(fitsHeader)
149 150 #-------------------------------------------------------------------------- 151
152 - def errorOccurred(self):
153 """ @return: True if an error has been logged for this file. 154 @rtype: bool 155 """ 156 return self._missingKeys != {}
157 158 #-------------------------------------------------------------------------- 159
160 - def getFinalAnalysis(self, numErrors):
161 """ 162 @param numErrors: The total number of files with this error. 163 @type numErrors: int 164 165 @return: The final summary report giving the number of files with 166 missing keywords. 167 @rtype: str 168 169 """ 170 return "%s updates were incomplete." % numErrors
171
172 #------------------------------------------------------------------------------ 173 174 -class MissingProvenance(FileError):
175 """ 176 FileError class to collect all instances of missing component file 177 multiframeIDs for a given combiframe. 178 179 """ 180 name = 'MissingProvenance' #: Name of this type of file error. 181
182 - def __init__(self):
183 """ Initialises member data. """ 184 self._missingProv = []
185
186 - def __str__(self):
187 """ @return: Error message. 188 @rtype: str 189 """ 190 return 'cannot find a Multiframe ID for component files: ' + \ 191 ', '.join(self._missingProv)
192
193 - def append(self, fileName):
194 """ 195 Adds a new missing component file. 196 197 @param fileName: Name of the component file that cannot be found. 198 @type fileName: str 199 200 """ 201 self._missingProv.append(fileName)
202
203 - def errorOccurred(self):
204 """ @return: True if an error has been logged for this file. 205 @rtype: bool 206 """ 207 return self._missingProv != []
208
209 - def getFinalAnalysis(self, numErrors):
210 """ 211 @param numErrors: The total number of combiframe files with this error. 212 @type numErrors: int 213 214 @return: The final summary report giving the number of files with 215 missing keywords. 216 @rtype: str 217 218 """ 219 return ('%s combiframes have incomplete entries in the Provenance ' 220 'table due to component frames not existing in the database.' % 221 numErrors)
222
223 #------------------------------------------------------------------------------ 224 225 -class NoProvenance(FileError):
226 """ FileError class to indicate that multiframe is not a combiframe. """ 227 name = 'NoProvenance' 228
229 - def __init__(self):
230 """ Initialises member data. """ 231 self._errorOccurred = False
232
233 - def __str__(self):
234 """ @return: Error message. 235 @rtype: str 236 """ 237 return 'is not a combiframe'
238
239 - def append(self):
240 """ Indicate that this file is not a combiframe. """ 241 self._errorOccurred = True
242
243 - def errorOccurred(self):
244 """ @return: True if an error has been logged for this file. 245 @rtype: bool 246 """ 247 return self._errorOccurred
248
249 - def getFinalAnalysis(self, numErrors):
250 """ 251 @param numErrors: The total number of files with this error. 252 @type numErrors: int 253 254 @return: The final summary report giving the number of files with 255 missing keywords. 256 @rtype: str 257 258 """ 259 return ('%s combiframe files do not provide a provenance list in the ' 260 'FITS header.' % numErrors)
261
262 #------------------------------------------------------------------------------ 263 264 -class FileErrorList(object):
265 """ 266 Container class for file errors associated with a single file. Construct an 267 instance of this class for every file, append the error log for each file, 268 and on destruction a total summary for the file will be reported. This 269 reduces the total error log output by just providing one error report per 270 file. Total statistics for all files (i.e. all instances of this class 271 created) are maintained to provide final summary reports. 272 273 """ 274 # Class-scope member data 275 #: Maintains total error count per error type for all files/FileErrorLists. 276 _errTypeCount = {} 277 _fileErrorHistory = {} #: Keeps a record of every file error for summary. 278
279 - def __init__(self, filePathName):
280 """ 281 Constructs a FileError list for supplied file. 282 283 @param filePathName: Full path to file. 284 @type filePathName: str 285 286 """ 287 self._fileErrorList = [] 288 self._filePathName = filePathName
289 290 #-------------------------------------------------------------------------- 291
292 - def __del__(self):
293 """ 294 Outputs a warning if errors occurred, and maintains a count of every 295 error for the final summary. 296 297 """ 298 errorList = [error for error in self._fileErrorList 299 if error.errorOccurred()] 300 if errorList: 301 # Increment the count for each error type for final summary 302 for errorName in set([error.name for error in errorList]): 303 FileErrorList._errTypeCount[errorName] += 1 304 305 Logger.addMessage('<Warning> File %s ' % self._filePathName 306 + '. Also, '.join(map(str, errorList)), alwaysLog=False)
307 308 #-------------------------------------------------------------------------- 309
310 - def append(self, fileError):
311 """ 312 Associate another error type with this file. 313 314 @param fileError: The error type. 315 @type fileError: FileError 316 317 """ 318 self._fileErrorList.append(fileError) 319 if fileError.name not in FileErrorList._errTypeCount: 320 FileErrorList._errTypeCount[fileError.name] = 0 321 if fileError.name not in FileErrorList._fileErrorHistory: 322 FileErrorList._fileErrorHistory[fileError.name] = fileError
323 324 #-------------------------------------------------------------------------- 325 326 @staticmethod
327 - def logFinalSummary(filePath):
328 """ 329 Logs the total number of files that failed for each type of error 330 encountered. 331 332 @param filePath: Full path to the log file where the problem files are 333 listed. 334 @type filePath: str 335 336 """ 337 if FileErrorList._fileErrorHistory: 338 Logger.addMessage("Error summary:") 339 for errorType in FileErrorList._fileErrorHistory.values(): 340 numErrors = FileErrorList._errTypeCount.get(errorType.name, 0) 341 Logger.addMessage(errorType.getFinalAnalysis(numErrors)) 342 343 Logger.addMessage("Problem files listed in " 344 + os.path.dirname(filePath))
345
346 #------------------------------------------------------------------------------ 347 348 -class FitsReader(object):
349 """ 350 Reads the headers of a selection of FITS files, executing a series of 351 specified tasks, as defined by the list of DbUpdater objects supplied on 352 initialisation. Invoke processFitsFiles() to initiate the job. 353 354 @todo: Test on more than one task at a time. 355 356 """
357 - def __init__(self, taskList):
358 """ 359 Compiles a complete list of FITS files for every supplied DbUpdater 360 task and notes as to which tasks need to be performed on which files. 361 362 @param taskList: A list of DbUpdater tasks to perform. 363 @type taskList: list(DbUpdater) 364 365 """ 366 self._fileList = [] 367 self._taskList = taskList 368 369 if not taskList: 370 raise Exception("FitsReader: List of tasks to do is empty.") 371 372 self.sysc = self._taskList[0].sysc 373 for task in self._taskList: 374 self._fileList += task.getFileList() 375 376 self._fileList = sorted(set(self._fileList), key=itemgetter(1))
377 378 #-------------------------------------------------------------------------- 379
380 - def processFitsFiles(self):
381 """ 382 Goes through the list of FITS files, opens them, and performs the set 383 of tasks assigned to it. 384 385 """ 386 logPath = self.sysc.filesMissingPath() 387 Logger.addMessage("Reading FITS files") 388 Logger.addMessage("%s files will be read" % len(self._fileList)) 389 390 progress = ForLoopMonitor(self._fileList) 391 for filePathName, mfid in self._fileList: 392 if not os.path.exists(filePathName): 393 Logger.addMessage("<Warning> Database points to a file that " 394 "no longer exists: " + filePathName) 395 396 self._taskList[0].logBrokenFile(filePathName, logPath) 397 else: 398 with warnings.catch_warnings(record=True) as w: 399 # Cause all warnings to always be triggered. 400 warnings.simplefilter("always") 401 fitsFile = fits.open(filePathName) 402 # Check for truncated warning 403 if len(w) and "truncated" in str(w[-1].message): 404 Logger.addMessage( 405 "<Warning> %s: %s" % (str(w[-1].message), 406 filePathName)) 407 408 fileErrors = FileErrorList(filePathName) 409 for task in self._taskList: 410 if len(self._taskList) == 1 \ 411 or (filePathName, mfid) in task.getFileList(): 412 task.updateDb(mfid, fitsFile, filePathName) 413 fileErrors.append(task.getErrorLog()) 414 415 fitsFile.close() 416 progress.testForOutput() 417 418 # Summarise errors 419 fileErrors = None # ensure last file error is recorded before summary 420 FileErrorList.logFinalSummary(logPath)
421
422 #------------------------------------------------------------------------------ 423 424 -class DbUpdater(CuSession):
425 """ 426 Base class to define generic post-curation database update activities. Call 427 L{getFileList()} to prepare the list of files to read / records to update, 428 then call L{updateDb()} on each file/record to update its entry in the db. 429 L{updateDb()} should be overridden by classes that inherit this class. To 430 modify the file/record list in daughter classes please see documentation of 431 the method L{_getRecordsList()}. If further initialisation needs to be 432 performed before determining the file list, then override L{_prepare()}. 433 434 """ 435 # DbUpdaters absolutely must have minimally logged database updates 436 # otherwise will fall fail of the bug whereby large transactions fail 437 # without error. 438 _autoCommit = True 439 440 #: Required standard attributes for the list of records to update. 441 _recordAttrs = "fileName, multiframeID" 442 #: Defines the standard set of records to update. 443 _recordSet = "multiframeID > 0 AND fileName != 'none' " + \ 444 "AND fileName NOT LIKE 'PixelFileNoLongerAvailable:%' " + \ 445 "ORDER BY multiframeID ASC" 446 #: Defines the standard table to search for records to update. 447 _recordTable = "Multiframe" 448 449 # Define private member variable default values (access prohibited) 450 _errorLog = FileError() 451 _recordList = None 452 453 #-------------------------------------------------------------------------- 454
455 - def _onRun(self):
456 """ Update db: Prepare file list, and read the FITS files. 457 """ 458 FitsReader([self]).processFitsFiles()
459 460 #-------------------------------------------------------------------------- 461
462 - def getErrorLog(self):
463 """ @return: The log of errors generated by this file. 464 @rtype: FileError 465 """ 466 return self._errorLog
467 468 #-------------------------------------------------------------------------- 469
470 - def getFileList(self):
471 """ @return: List of records to be updated consisting of a tuple of 472 attributes values. 473 @rtype: list(tuple) 474 """ 475 if not self._recordList: 476 self._prepare() 477 return self._recordList
478 479 #-------------------------------------------------------------------------- 480
481 - def updateDb(self, mfid, fitsFile=None, filePathName=None):
482 """ 483 Update database record for this multiframe ID. This method is overriden 484 by subclasses to define their own individual database updates. 485 486 @param mfid: MultiframeID of database record to update. 487 @type mfid int 488 @param fitsFile: Open FITS file containing the data for this mfid. 489 @type fitsFile pyfits.HDUList 490 @param filePathName: Full path to the FITS file. 491 @type filePathName: str 492 493 """ 494 pass
495 496 #-------------------------------------------------------------------------- 497
498 - def _getRecordsList(self):
499 """ 500 Retrieves a list of records that need updating in the database. The set 501 is defined by L{DbUpdater._recordAttrs} and L{DbUpdater._recordSet}, 502 which returns all multiframeIDs in table Multiframe for which files 503 exist on our pixel server. Override this function and/or modify 504 self._recordSet to produce different lists of records to update. 505 506 @return: List of records that need updating in the database, in the 507 form (filePathName, multiframeID), with the server name 508 stripped from the database fileName attribute. 509 @rtype: list(tuple(str, int)) 510 511 """ 512 recordList = self.archive.query(selectStr=DbUpdater._recordAttrs, 513 fromStr=self._recordTable, 514 whereStr=self._recordSet) 515 516 return [(self._stripServerName(fileName), mfid) 517 for fileName, mfid in recordList]
518 519 #-------------------------------------------------------------------------- 520
521 - def _prepare(self):
522 """ Fills the list of records that need to be updated. 523 """ 524 Logger.addMessage("Retrieving list of FITS files from database") 525 self._recordList = self._getRecordsList()
526 527 #-------------------------------------------------------------------------- 528
529 - def _stripServerName(self, filePathName):
530 """ Removes server name from file path returned by the database. 531 @rtype: str 532 """ 533 return filePathName.split(':')[-1]
534
535 #------------------------------------------------------------------------------ 536 537 -class AttributeUpdater(DbUpdater):
538 """ 539 Updates all of the values in a set of database columns from the original 540 FITS header keyword values using the prescription described in the table 541 schema. The set of database columns may be manually supplied and to these 542 any new columns, defined in the schema yet not present in the database, 543 will be automatically included and created in the database. If there no new 544 columns in the schema and no columns are manually supplied then no updates 545 are performed. 546 547 This class is used by L{FitsReader} which, when retrieving the list of FITS 548 files to be read, invokes L{_prepare()} to determine the list of attributes 549 to be updated and uses L{DbUpdater} to prepare the file list. L{FitsReader} 550 then calls L{updateDb()} to update each record. 551 552 """ 553 # Set persistant database connection to on 554 _isPersistent = True 555 # Overrides CuSession.comment 556 comment = 'Updating tables with new schema attributes' 557 558 #-------------------------------------------------------------------------- 559 # Define public member variable default values (access as obj.varName) 560 # these need to be set from command-line options 561 562 #: Only update attributes from stack files? 563 doStacksOnly = False 564 #: List of named attributes that already exist in the db to update. 565 reqAttrs = None 566 #: Name of the .sql file defining the schema to update. 567 schemaFileName = 'WSA_MultiframeSchema.sql' 568 #: List of tables within the schema to be updated. 569 tableList = ['Multiframe', 'MultiframeDetector'] 570 571 #-------------------------------------------------------------------------- 572 # Define private member variable default values (access prohibited) 573 574 _attrUpdateList = [] #: List of attribute lists for every table to update. 575 576 #-------------------------------------------------------------------------- 577
578 - def _onRun(self):
579 """ 580 Run the database updater. Wrapper to catch table not found in schema 581 errors, which are most likely to be caused by user input error. 582 583 """ 584 try: 585 super(AttributeUpdater, self)._onRun() 586 except schema.Table.NotFoundError as error: 587 raise AttributeUpdater.CuError(error)
588 589 #-------------------------------------------------------------------------- 590
591 - def _prepare(self):
592 """ 593 Determines the list of attributes to be updated and fills the list of 594 records that need to be updated. 595 596 """ 597 if not self.schemaFileName.startswith(self.archive.sysc.loadDatabase): 598 raise AttributeUpdater.CuError("Unexpected schema file (%s) for %s" 599 " archive. Please supply correct schema file." 600 % (self.schemaFileName, self.archive.sysc.loadDatabase)) 601 602 # Initialise local copy of the list from the default value 603 self._attrUpdateList = AttributeUpdater._attrUpdateList[:] 604 605 # Read in schema attributes from the schema file 606 for table in schema.parseTables(self.schemaFileName, self.tableList): 607 608 # Reduce list of attributes to just those to update for this table. 609 tableAttrList = self._prepTableAttrList(table.name, table.columns) 610 if tableAttrList: 611 self._attrUpdateList.append(tableAttrList) 612 else: 613 # Nothing to do for this table so lose from the list. 614 self.tableList.remove(table.name) 615 616 if not self.tableList: 617 raise AttributeUpdater.CuError("No tables require updating") 618 619 msg = ("Updating %s with new schema attributes: " 620 % ', '.join(self.tableList) 621 + ', '.join(str(attr) for attrList in self._attrUpdateList 622 for attr in attrList)) 623 624 # Update default comment to reflect what will be done. 625 if AttributeUpdater.comment in self.comment: 626 self.comment = msg 627 628 if self.doStacksOnly: 629 self._recordSet = "frameType LIKE '%stack' AND " + self._recordSet 630 631 # If any attributes to update from FITS files then produce a file list. 632 if self._attrUpdateList \ 633 and any(attr.fitsKeyword or attr.tag.get('--/Q') 634 for attrList in self._attrUpdateList for attr in attrList): 635 Logger.addMessage(msg) 636 DbUpdater._prepare(self) 637 else: 638 self._recordList = [] 639 640 if self.reqAttrs: 641 Logger.addMessage( 642 "<Warning> Attribute(s) %s not found in table(s) %s" 643 % (', '.join(self.reqAttrs), ', '.join(self.tableList)))
644 645 #-------------------------------------------------------------------------- 646
647 - def _prepTableAttrList(self, tableName, attrList):
648 """ 649 Reduce the list of attributes for a given table to just those that need 650 to be updated. Compares the schema described by the .sql file with that 651 in the current database to see if there are any new attributes. If so, 652 new columns are added to the database for these attributes with default 653 values. 654 655 @param tableName: Database table to update. 656 @type tableName: str 657 @param attrList: List of all attributes in this table. 658 @type attrList: list(schema.Attribute) 659 660 @return: The list of attributes that need to be updated for this table. 661 @rtype: list(schema.Attribute) 662 663 """ 664 # Obtain list of attributes in present db table 665 dbAttrs = self.archive.queryColumnNames(tableName) 666 667 # Determine list of new schema attributes to enter the db 668 newAttrs = [attr for attr in attrList 669 if attr.name not in dbAttrs] 670 671 if not newAttrs: 672 Logger.addMessage("Database matches schema for table %s - " 673 "no new columns to add" % tableName) 674 else: 675 # Add columns to the database for the new attributes 676 for attr in newAttrs: 677 Logger.addMessage("Adding column %s to database table %s" % 678 (attr, tableName)) 679 try: 680 self.archive.addColumn(tableName, attr.name, attr.dataType, 681 attr.defaultStr) 682 except Exception as error: 683 # On error - revert to previous db state 684 for doneAttr in newAttrs[:newAttrs.index(attr)]: 685 try: 686 self.archive.dropColumn(tableName, doneAttr.name) 687 except odbc.ProgrammingError: 688 Logger.addMessage( 689 "<Warning> Couldn't drop column %s from table %s" 690 " - either it doesn't exist or the constraint " 691 "can't be dropped." % (doneAttr, tableName)) 692 raise AttributeUpdater.CuError(error) 693 694 # Add manually supplied specific attributes to update values from FITS 695 if self.reqAttrs: 696 newAttrs += [attr for attr in attrList 697 if attr.name in self.reqAttrs and 698 self.reqAttrs.pop(self.reqAttrs.index(attr.name)) 699 and attr not in newAttrs] 700 701 catAttrs = [attr.name for attr in newAttrs if attr.isCatFile] 702 if catAttrs and type(self) is AttributeUpdater: 703 Logger.addMessage("<Warning> The values from some attributes must" 704 " come from catalogue files. Afterward please run:\n" 705 "UpdateCatData.py -a %s" % ','.join(catAttrs)) 706 707 return [attr for attr in newAttrs if not attr.isCatFile] 708 709 return newAttrs
710 711 #-------------------------------------------------------------------------- 712
713 - def updateDb(self, mfid, fitsFile=None, filePathName=None):
714 """ 715 Update entries for previously selected list of attributes in the 716 database record for this multiframe ID. 717 718 @param mfid: MultiframeID of database record to update. 719 @type mfid int 720 @param fitsFile: Open FITS file containing the data for this mfid. 721 @type fitsFile pyfits.HDUList 722 @param filePathName: Full path to the FITS file. 723 @type filePathName: str 724 725 """ 726 self._errorLog = MissingKeyword() 727 stdRecordIndex = {'multiframeID': mfid} 728 729 for table, newAttrs in zip(self.tableList, self._attrUpdateList): 730 # MultiframeDetector entries require a extNum value as well as a 731 # multiframeID to define a unique record 732 if table in ['MultiframeDetector', 'CurrentAstrometry']: 733 extNums = range(2, len(fitsFile) + 1) 734 else: 735 extNums = [None] 736 737 for extNum in extNums: 738 recordIndex = stdRecordIndex 739 if extNum: 740 recordIndex['extNum'] = extNum 741 742 entriesList = [] 743 for attr in newAttrs: 744 # A basic check to see if keyword exists and is sensible 745 if not (attr.fitsHDU >= 0 or attr.isNotFitsKey()): 746 raise AttributeUpdater.CuError( 747 "--/K tag for attribute %s is missing or malformed." 748 % attr) 749 750 if attr.name == 'catName': 751 value = self.sysc.pixelServerHostName + filePathName 752 elif attr.name == 'totalExpTime': 753 hduIndex = extNum - 1 754 value = fitsFile[hduIndex].header.get("TEXPTIME") 755 if value is None: 756 if self.sysc.isWSA(): 757 try: 758 njitter = fitsFile[0].header["NJITTER"] 759 nustep = fitsFile[0].header["NUSTEP"] 760 exptime = fitsFile[0].header["EXP_TIME"] 761 except KeyError: 762 pass 763 else: 764 value = njitter * nustep * exptime 765 elif self.sysc.isVSA(): 766 try: 767 njitter = fitsFile[0].header["NJITTER"] 768 nustep = fitsFile[0].header["NUSTEP"] 769 detDit = fitsFile[0].header["HIERARCH ESO DET DIT"] 770 detNdit = fitsFile[0].header["HIERARCH ESO DET NDIT"] 771 except KeyError: 772 pass 773 else: 774 value = njitter * nustep * detDit * detNdit 775 elif self.sysc.isOSA(): 776 try: 777 ncombine = fitsFile[hduIndex].header["NICOMB"] 778 exptime = fitsFile[0].header["EXPTIME"] 779 except KeyError: 780 pass 781 else: 782 value = ncombine * exptime 783 else: 784 hduIndex = attr.fitsHDU 785 if extNum: 786 hduIndex += extNum - 2 787 elif self.sysc.tileSuffix in filePathName: 788 # @@NOTE: Tile primary header is in 1st extension 789 hduIndex = hduIndex or 1 790 791 value = fitsFile[hduIndex].header.get(attr.fitsKeyword) 792 793 if value is None: 794 # @@TODO: Could put in default value from --/N keyword 795 # here, instead of leaving preset default. 796 self._errorLog.append(attr.fitsKeyword, hduIndex) 797 else: 798 if attr.isParsedFitsKey(): 799 value = value.split()[-1] 800 801 # Reformat value to be in SQL statement 802 if attr.isString(): 803 value = self.archive._sqlString(value) 804 elif attr.dataType == 'tinyint': # bool->tinyint 805 value = int(value) 806 elif attr.dataType in ['float', 'real']: 807 try: 808 value = float(value) 809 except ValueError: 810 value = attr.getDefaultValue() 811 812 entriesList.append((attr.name, value)) 813 814 if entriesList: 815 self.archive.updateEntries(table, entriesList, 816 recordIndex.items())
817
818 #------------------------------------------------------------------------------ 819 820 -class CatFileUpdater(AttributeUpdater):
821 """ 822 Updates metadata table values in the database from catalogue file headers 823 for new attributes, given attributes or where there is missing/dodgy data 824 for a given date. Essentially the same as AttributeUpdater, except only 825 catalogue files are selected, and all catalogue attributes are 826 automatically determined from WSA_MultiframeSchema.sql. 827 828 """ 829 #: Optionally just update this night's catalogue data, eg '2005-12-11'. 830 date = None 831 #: Only update entries without catalogue files specified? 832 doMissingOnly = False 833 # Overrides AttributeUpdater.doStacksOnly 834 doStacksOnly = True 835 836 #-------------------------------------------------------------------------- 837
838 - def _getRecordsList(self):
839 """ 840 Retrieves a list of records that need updating in the database. As 841 DbUpdater but with self._recordSet modified to just select frames with 842 catalogues (i.e. stacked frames) from the given date, and optionally 843 only those with missing catalogue name entries. File names returned are 844 those of stacks' catalogue files. 845 846 @return: List of records that need updating in the database, in the 847 form (catName, multiframeID), with the server name stripped 848 from the database catName attribute. 849 @rtype: list(tuple(str, int)) 850 851 """ 852 if self.date: 853 self._recordSet = "utDate=%r AND " % self.date + self._recordSet 854 855 if self.doMissingOnly: 856 self._recordSet = "(catName LIKE '%empty%' OR " \ 857 "catName LIKE '%none%') AND " + self._recordSet 858 859 return [(fileName.replace('.fit', '_cat.fits'), mfid) 860 for fileName, mfid in DbUpdater._getRecordsList(self)]
861 862 #-------------------------------------------------------------------------- 863
864 - def _prepTableAttrList(self, tableName, attrList):
865 """ 866 Finds all catalogue attributes and produces a list of database entries 867 to update. 868 869 @param tableName: Database table to update. 870 @type tableName: str 871 @param attrList: List of all attributes in this table. 872 @type attrList: list(Schema.Attribute) 873 874 @return: The list of attributes that need to be updated for this table. 875 @rtype: list(Schema.Attribute) 876 877 """ 878 catAttrs = [attr for attr in attrList 879 if attr.isCatFile or attr.name == 'catName'] 880 881 # Create new columns where needed etc. from just catalogue attributes 882 updateAttrs = super(CatFileUpdater, 883 self)._prepTableAttrList(tableName, catAttrs) 884 885 if self.reqAttrs is None: 886 return catAttrs 887 else: 888 return updateAttrs
889
890 #------------------------------------------------------------------------------ 891 892 -class ProvenanceFiller(DbUpdater):
893 """ 894 Updates the Provenance table. Retrieves a list of non-deprecated 895 combiframe IDs that are new or have incomplete or indeterminable (if the 896 -q/--quick option is not selected) provenance records to be updated. The 897 header of the FITS file for each combiframe is read to determine the list 898 of provenance files, which are converted to multiframe IDs (to insert into 899 the Provenance table) via a look-up table. This table lists all frames that 900 are not deprecated due to reprocessing, referrenced by file name (which is 901 combined with the night to distinguish between common file names across 902 nights for confidence images etc.) and so should provide a unique list of 903 the latest versions of each of the component files. If this list is not 904 unique an error is thrown, as this suggests a database inconsistency. Since 905 only non-deprecated combiframes have their provenance updated, we only need 906 to consider the latest reprocessed version. 907 908 The final statistics report the number of broken combiframe files that list 909 no provenance, as well as the number of combiframe files where a component 910 file is missing from our database. A re-run in -q/-quick mode will recheck 911 these files as well as files that are broken due to providing incomplete 912 provenance lists, due to e.g. an interrupted observation run (these will 913 normally become deprecated by quality control later on, so will not be 914 rechecked on subsequent runs). 915 916 @note: If component file does not yet exist in the database it cannot 917 be used for provenance information. 918 @note: Sometimes a combiframe file does not list the full provenance in its 919 FITS header, due to an interrupted observation run, or some other 920 FU. These are normally deprecated by quality control, so won't be 921 re-checked on subsequent runs. 922 @note: UDS mosaics list provenance in their catalogue files not their image 923 files, but the relevant multiframeID is still the image file for the 924 Provenance table. 925 @note: UDS mosaic conf images should have provenance entries, but don't. 926 So they aren't checked. 927 """ 928 cuNum = 23 929 #-------------------------------------------------------------------------- 930 # Define class constants (access as ProvenanceFiller.varName) 931 932 #: A list of frameTypes that are combiframes (i.e. have provenance info) 933 #: and the parameter that defines the number of component frames. 934 combiFrameTypes = {'nJitter': ['stack', 'leavstack', 'leavstackconf', 935 'filtleavstack', 'filtleavstackconf'], 936 'nOffsets': ['tile%'], 937 'nuStep': ['leav', 'leavconf'], 938 'undef': ['stackconf', 'deep%', 'mosaic%'], 939 '2': ['diff', 'diffconf']} 940 941 #-------------------------------------------------------------------------- 942 # Define public member variable default values (access as obj.varName) 943 # these need to be set from command-line options 944 945 isQuickRun = False 946 """ Don't re-check for missing provenance of frames that are already in the 947 Provenance table, where we don't know how many components they should 948 have (i.e. just stackconf, %deep%, dark, sky). 949 """ 950 recheckDate = None 951 """ Only re-check stackconf and deep frames observed after this date. 952 Expects string, e.g. '2005-05-05' or '20050505' or '05A'. For semesters 953 it takes the first night of the semester. 954 """ 955 #-------------------------------------------------------------------------- 956 # Define private member variable default values (access prohibited) 957 958 #: Dictionary of multiframeIDs referenced just by file name. 959 _guessDict = None 960 #: Dictionary of multiframeIDs referenced by file name and night. 961 _mfidDict = None 962 963 #-------------------------------------------------------------------------- 964
965 - def _onRun(self):
966 """ Update db: Prepare file list, and read the FITS files. 967 """ 968 # ProvenanceFiller requires CuEventID as table entry even for trial run 969 if not self.cuEventID: 970 self.cuEventID = self._getNextCuEventID() 971 super(ProvenanceFiller, self)._onRun()
972 973 #-------------------------------------------------------------------------- 974
975 - def _prepare(self):
976 """ 977 Retrieve list of records to be updated and prepare the mfid<->filename 978 look-up table. 979 980 """ 981 # Create the record list - this invokes ProvFiller._getRecordsList() 982 DbUpdater._prepare(self) 983 984 Logger.addMessage('Producing mfid<->filename look-up table') 985 self._mfidDict = {} 986 987 # So that we don't mix-up between reprocessing revision versions we 988 # only place the most recent version in the look up table, and we no 989 # longer care if the pixel file still exists, as we are just interested 990 # in the multiframe ID. We don't update the provenance of deprecated 991 # combiframes, so we don't need to worry about those. Hopefully, 992 # earlier deprecated reprocessing revisions will not have been favoured 993 # over new ones in the combined frame! 994 self._recordSet = DbUpdater._recordSet.replace( 995 "AND fileName NOT LIKE 'PixelFileNoLongerAvailable:%'", 996 "AND deprecated < %s" % DepCodes.reprocCASU) 997 self._recordTable = DbUpdater._recordTable 998 999 duplicatedFiles = [] 1000 for filePathName, mfid in DbUpdater._getRecordsList(self): 1001 filePath, fileName = os.path.split(filePathName) 1002 night = os.path.basename(filePath).rsplit('_', 1)[0] 1003 if self._mfidDict.has_key((night, fileName)): 1004 duplicatedFiles.append((night, fileName)) 1005 else: 1006 self._mfidDict[(night, fileName)] = mfid 1007 1008 if duplicatedFiles: 1009 dupePath = self.sysc.filesDuplicatedPath() 1010 for fileID in duplicatedFiles: 1011 self.logBrokenFile('%s:%s' % fileID, dupePath) 1012 1013 raise ProvenanceFiller.CuError( 1014 "%s duplicate non-deprecated multiframes in database, logged in " 1015 "file: %s" % (len(duplicatedFiles), dupePath)) 1016 1017 # Make a copy of the dictionary referenced just by file name 1018 self._guessDict = {} 1019 for (night, fileName), mfID in self._mfidDict.iteritems(): 1020 self._guessDict[fileName] = mfID
1021 1022 #-------------------------------------------------------------------------- 1023
1024 - def _getRecordsList(self):
1025 """ 1026 Retrieve a list of records that need updating in the database. As 1027 DbUpdater but with self._recordSet modified to just select frames with 1028 provenance information that is not already in the Provenance table. 1029 1030 @return: List of records that need updating in the database, in the 1031 form (filePathName, multiframeID), with the server name 1032 stripped from the database fileName attribute. 1033 @rtype: list(tuple(str, int)) 1034 1035 """ 1036 frameType = (ProvenanceFiller.combiFrameTypes 1037 if not self.sysc.isOSA() else {'undef': 'stack%'}) 1038 1039 # VISTA stackconfs currently have no provenance info 1040 if self.sysc.isVSA() and 'stackconf' in frameType['undef']: 1041 frameType['undef'].remove('stackconf') 1042 1043 # WSA doesn't have nOffsets in its schema 1044 if self.sysc.isWSA() and 'nOffsets' in frameType: 1045 del frameType['nOffsets'] 1046 1047 # ** Find those frames without *any* provenance info already in db ** 1048 1049 allFramesList = utils.unpackList(frameType.values()) 1050 1051 whereNoProv = self._frameTypeLike(allFramesList) + " AND " + \ 1052 "(multiframeID NOT IN (SELECT combiframeID FROM Provenance)) AND " 1053 1054 frameTypeQueries = [('Multiframe', whereNoProv)] 1055 1056 # ** Find those frames with incomplete provenance info in db ** 1057 1058 joinStr = "Multiframe AS M, (SELECT combiframeID,count(*) AS nProv" + \ 1059 " FROM Provenance GROUP BY combiframeID) AS T" 1060 1061 for numProvInd in frameType: 1062 whereMissingProv = self._frameTypeLike(frameType[numProvInd]) + \ 1063 " AND T.combiframeID=M.multiframeID AND " 1064 1065 if numProvInd != 'undef': 1066 whereMissingProv += numProvInd + " != T.nProv AND" 1067 elif self.recheckDate: 1068 date = self.sysc.obsCal.dateRange(self.recheckDate).begin 1069 whereMissingProv += " M.utDate > '%s' AND" % date 1070 1071 if numProvInd != 'undef' or not self.isQuickRun: 1072 frameTypeQueries.append((joinStr, whereMissingProv)) 1073 1074 recordList = [] 1075 for fromStr, whereStr in frameTypeQueries: 1076 self._recordTable = fromStr 1077 # Don't bother with deprecated combiframes as they won't be 1078 # released and probably don't have correct provenance anyway. Same 1079 # with three certain older projects where we know the provenance 1080 # is broken and they won't be re-released. 1081 self._recordSet = whereStr + " " + " AND ".join([ 1082 DepCodes.selectNonDeprecated, "project != 'U/EC/10'", 1083 "project != 'U/UKIDSS/LAS_SV'", "project != 'U/UKIDSS/GPS_SV'", 1084 DbUpdater._recordSet]) 1085 1086 # Swap mosaic images for catalogues, as this contains the 1087 # provenance list, though it's still the image mfID that should be 1088 # updated. 1089 for filePathName, mfID in DbUpdater._getRecordsList(self): 1090 if self.sysc.mosaicSuffix not in filePathName: 1091 recordList.append((filePathName, mfID)) 1092 else: 1093 recordList.append((fits.getCatalogue(filePathName), mfID)) 1094 1095 return recordList
1096 1097 #-------------------------------------------------------------------------- 1098
1099 - def updateDb(self, mfid, fitsFile, filePathName):
1100 """ 1101 Inserts provenance info for given combiframe into table Provenance if 1102 its FITS file contains the PROV000x keywords. 1103 1104 @param mfid: MultiframeID of combiframe for which provenance 1105 info is to be inserted into table Provenance. 1106 @type mfid: int 1107 @param fitsFile: Open FITS file containing the data for this mfid. 1108 @type fitsFile: pyfits.HDUList 1109 @param filePathName: Full path to the FITS file. 1110 @type filePathName: str 1111 1112 """ 1113 # just looking in first extension for now - they should all be the same 1114 fitsHdr = fitsFile[1].header 1115 1116 if not fitsHdr.has_key('PROV0000'): 1117 self._errorLog = NoProvenance() 1118 self._errorLog.append() 1119 self.logBrokenFile(filePathName, self.sysc.filesBrokenPath()) 1120 return 1121 else: 1122 self._errorLog = MissingProvenance() 1123 1124 # find complete set of Provenance keys 1125 provKeys = [key for key, _value in fitsHdr.items() 1126 if key.startswith('PROV') and not key.endswith('0000')] 1127 prevID = -1 1128 for key in provKeys: 1129 # trim off extension info at the end of file name if exists, and 1130 # for deep stacks we just want the filename. 1131 compFrameFileName = \ 1132 os.path.basename(fitsHdr[key].rsplit('[', 1)[0]) 1133 try: # Determine the correct night from the file name 1134 night = re.findall("20[0-9]{6}", compFrameFileName)[0] 1135 except IndexError: 1136 # If night is not in file name assume it is the same as for the 1137 # combined frame 1138 fileDir = os.path.basename(os.path.dirname(filePathName)) 1139 night = os.path.basename(fileDir).rsplit('_', 1)[0] 1140 # translate filename to mfid value in database 1141 compFrameID = self._mfidDict.get((night, compFrameFileName)) 1142 if not compFrameID: # find any file with the same name 1143 compFrameID = self._guessDict.get(compFrameFileName) 1144 if not compFrameID: # no component frame in the database (yet) 1145 self._errorLog.append(compFrameFileName) 1146 self.logBrokenFile("Component %s of ingested stack %s" 1147 % (compFrameFileName, filePathName), 1148 self.sysc.filesNotIngestedPath()) 1149 # Check for duplicate entries 1150 elif compFrameID != prevID and not self.archive.queryEntriesExist( 1151 "Provenance", where="combiframeID=%s AND multiframeID=%s" 1152 % (mfid, compFrameID)): 1153 prevID = compFrameID 1154 self.archive.insertData("Provenance", 1155 rowData=[mfid, compFrameID, self.cuEventID])
1156 1157 #-------------------------------------------------------------------------- 1158
1159 - def _frameTypeLike(self, frameTypes):
1160 """ 1161 @param frameTypes: A list of frameTypes to be included. 1162 @type frameTypes: list(str) 1163 1164 @return: A SQL WHERE string of selected frameTypes. 1165 @rtype: str 1166 1167 """ 1168 predicates = ("frameType LIKE " + self.archive._sqlString(frame) 1169 for frame in frameTypes) 1170 return "(%s)" % " OR ".join(predicates)
1171 1172 #------------------------------------------------------------------------------ 1173 # Change log: 1174 # 1175 # 4-Jun-2007, RSC: Created from the merger of DbConnect/DbUpdater.py, 1176 # FitsUtils.py, and the extraction of helper script classes 1177 # from ProvenanceFiller.py, UpdateSchema.py, UpdateMoon.py, 1178 # UpdateCatData.py. 1179