Package helpers :: Module UpdateFitsFileNames
[hide private]

Source Code for Module helpers.UpdateFitsFileNames

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: UpdateFitsFileNames.py 9564 2012-12-12 14:25:29Z RossCollins $ 
  4  """ 
  5     Updates fileName and catName in the given databases from the DB 'disk??' 
  6     value to the given disk. 
  7   
  8     @author: E. Sutorius 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10  """ 
 11  #------------------------------------------------------------------------------ 
 12  from   collections import defaultdict 
 13  import os 
 14  import time 
 15   
 16  from   wsatools.CLI                 import CLI 
 17  import wsatools.CSV                     as csv 
 18  from   wsatools.DbConnect.DbSession import DbSession, Join 
 19  from   wsatools.File                import File 
 20  from   wsatools.Logger              import Logger, ForLoopMonitor 
 21  import wsatools.DbConnect.CommonQueries as queries 
 22  from   wsatools.SystemConstants     import DepCodes, SystemConstants 
 23  #------------------------------------------------------------------------------ 
 24   
25 -class UpdateFits(object):
26 """ Updates fileName and catName in the given databases. 27 """ 28 #-------------------------------------------------------------------------- 29 # Define public member variable default values (access as obj.varName) 30 # these need to be set from command-line options 31 32 dateVersStr = '20050101' #: Date and version dirname of the data. 33 deprecateFiles = False #: Set the deprecated flag. 34 isTrialRun = False #: Don't update just print statements? 35 newDisk = '' #: Disk where data has been moved to. 36 outDir = SystemConstants.fitsDir #: Sub-directory containing the new data. 37 subDir = SystemConstants.fitsDir #: Sub-directory containing the old data. 38 updateFileName = None #: File containing 'old,new' path pairs. 39 updateVersion = '' #: Version change 'old=>new' 40 userName = DbSession.userName #: Database user name. 41 42 #-------------------------------------------------------------------------- 43 # Private member variables 44 45 _databases = [] #: List of full-paths to databases to update. 46 _updateFileDict = {} #: Dictionary containing file info. 47 _updateMask = {'disk': None, 'subdir': None, 'version': None, "avail": None} 48 49 #-------------------------------------------------------------------------- 50
51 - def __init__(self, cli):
52 """ 53 Initialise class from command-line options. 54 55 @param cli: Command-line arguments. 56 @type cli: CLI 57 58 """ 59 # version 60 versNum = cli.getOpt("version") 61 if '=>' in versNum: 62 self.updateVersion = versNum.replace(' ', '').partition("=>")[::2] 63 self._updateMask['version'] = ['_v%s' % self.updateVersion[0], 64 '_v%s' % self.updateVersion[1]] 65 else: 66 self.updateVersion = (versNum, versNum) 67 68 # date 69 self.dateReq = cli.getArg("date") 70 self.obsCal = SystemConstants(cli.getArg("archive")).obsCal 71 if self.dateReq.isdigit(): 72 self.dateVersStr = ( 73 "%s_v%s" % (self.dateReq, self.updateVersion[0]) 74 if self.dateReq is not UpdateFits.dateVersStr 75 else None) 76 self.dateVersStrList = [self.dateVersStr] 77 else: 78 semDays = self.obsCal.getSemDayList(self.dateReq) 79 self.dateVersStrList = ["%s_v%s" % (x, self.updateVersion[0]) 80 for x in semDays] 81 self.dateVersStr = self.dateVersStrList[0] 82 83 # disk 84 if cli.getOpt("newdisk") != "diskNN": 85 self.newDisk = os.sep + os.path.join(cli.getOpt("newdisk"), 86 cli.getArg("archive").lower()) 87 self._updateMask['disk'] = ['', self.newDisk] 88 89 # subdir 90 self.subDir = cli.getOpt("subdir") 91 self.outDir = cli.getOpt("outdir") 92 if self.subDir != self.outDir: 93 self._updateMask['subdir'] = [self.subDir, self.outDir] 94 95 # deprecation 96 self.deprecateFiles = cli.getOpt("deprecate") 97 # availability 98 self.checkPixAvailable = cli.getOpt("available") 99 if self.checkPixAvailable and not self.dateVersStr: 100 raise Exception("Must choose date with -A option") 101 # check against master DB 102 self.checkDB = cli.getOpt("masterdb") 103 # filename 104 self.updateFileName = cli.getOpt("filename") 105 # database options 106 self.isTrialRun = cli.getOpt("test") 107 self.userName = cli.getOpt("user") 108 109 # Prepare list of available databases from user-supplied options 110 if cli.getOpt("multidbs"): 111 server, dbs = cli.getOpt("multidbs").split("::") 112 databases = set(dbs.split(',') if dbs != "all" else []) 113 elif cli.getOpt("database"): 114 server, db = cli.getOpt("database").split('.') 115 databases = set([db]) 116 else: 117 raise Exception("Must choose either -M or -d options") 118 119 self._databases = getAvailDBs(cli.getArg("archive"), server, databases)
120 121 #-------------------------------------------------------------------------- 122
123 - def getFileNames(self, archive):
124 """ 125 Get a list of all MultiframeIds, filenames, and catnames. 126 127 @param archive: Connection to database to query. 128 @type archive: DbSession 129 130 @return: Dictionary giving fileName, catName tuples for every MfID. 131 @rtype: dict(int:list(str)) 132 133 """ 134 compareList = ['/'.join(entry.rsplit('/', 2)[-2:]) 135 for entry in sorted(self._updateFileDict)] 136 137 whereClause = '' 138 if self.dateVersStr: 139 whereClause = "dateVersStr in ('%s')" % "','".join( 140 self.dateVersStrList) 141 142 mffflu = archive.query(selectStr="name", 143 fromStr="sysobjects", 144 whereStr=' OR '.join([ 145 "name like 'Multiframe'", 146 "name like 'FlatFileLookUp'"])) 147 148 if 'FlatFileLookUp' in mffflu: 149 results = archive.query( 150 selectStr="Multiframe.multiframeID, Multiframe.fileName, " 151 "catName, dateVersStr", 152 fromStr=Join(["Multiframe", "FlatFileLookUp"], 153 ["multiframeID"]), 154 whereStr=whereClause) 155 156 elif 'Multiframe' in mffflu: 157 results = archive.query( 158 selectStr="multiframeID, fileName, catName, '%s'" % self.dateVersStr, 159 fromStr="Multiframe", 160 whereStr=" OR ".join(["fileName like '%%%s%%'" % x 161 for x in self.dateVersStrList])) 162 else: 163 Logger.addMessage("<ERROR> %s is missing the Multiframe and" 164 " FlatFileLookUp table!" % archive.database) 165 results = [] 166 167 mfd = defaultdict(list) 168 Logger.addMessage("Creating mfID dict...") 169 for mfID, fN, cN, dvStr in results: 170 dvStr = (dvStr if dvStr in fN else fN.rsplit('/', 2)[-2]) 171 fileName = fN.replace(archive.sysc.pixelServerHostName, '') 172 catName = cN.replace(archive.sysc.pixelServerHostName, '') 173 if '/'.join(fileName.rsplit('/', 2)[-2:]) in compareList \ 174 or not compareList: 175 mfd[mfID] += [fileName, catName, dvStr] 176 Logger.addMessage("%d files found." % len(mfd)) 177 return mfd
178 179 #-------------------------------------------------------------------------- 180
181 - def run(self):
182 """ Update the prepared list of databases. 183 """ 184 if self.updateFileName: 185 Logger.addMessage("Updating from file: %s" % self.updateFileName) 186 self.updateFromFile() 187 elif self.dateVersStr: 188 Logger.addMessage("Updating from arguments...") 189 else: 190 raise SystemExit( 191 "<ERROR> You must name either a fileName (-f) or a dateVersStr.") 192 193 if self.checkDB: 194 Logger.addMessage("Connecting to master DB: %s" % self.checkDB) 195 masterDb = DbSession(self.checkDB, autoCommit=True, 196 isTrialRun=self.isTrialRun, userName=self.userName) 197 # get a dict of entries from the master DB 198 Logger.addMessage("Getting file names...") 199 self.masterMfidDict = self.getFileNames(masterDb) 200 201 for db in self._databases: 202 self.updater(db)
203 204 #-------------------------------------------------------------------------- 205
206 - def updateFromArgs(self, oldFileName, dateVersStr, updateMask):
207 """ 208 Update the given file name from arguments. 209 210 @param oldFileName: Old file name. 211 @type oldFileName: str 212 @param dateVersStr: Date-version string. 213 @type dateVersStr: str 214 @param updateMask: Update mask to use. 215 @type updateMask: dict(str:list(str)) 216 217 @return: New file name. 218 @rtype: str 219 220 """ 221 newFileName = oldFileName 222 for flag in updateMask: 223 if updateMask[flag]: 224 if flag == "version": 225 oldEntry = dateVersStr 226 newEntry = dateVersStr.replace( 227 '_v%s' % self.updateVersion[0], 228 '_v%s' % self.updateVersion[1]) 229 else: 230 oldEntry = updateMask[flag][0] 231 newEntry = updateMask[flag][1] 232 newFileName = newFileName.replace(oldEntry, newEntry) 233 return newFileName
234 235 #-------------------------------------------------------------------------- 236
237 - def updateFromFile(self):
238 """Create the dictionary containing old/new path relations from 239 a file. 240 """ 241 if not os.path.exists(self.updateFileName): 242 raise SystemExit( 243 "<ERROR> %s does not exist!" % self.updateFileName) 244 245 Logger.addMessage("Updating from " + self.updateFileName) 246 self._updateFileDict = dict(csv.File(self.updateFileName)) 247 248 datedirSet = set() 249 for entry in self._updateFileDict: 250 updFile = File(entry) 251 datedirSet.add(updFile.subdir) 252 253 if len(datedirSet) > 1: 254 raise SystemExit("<ERROR> More than one day in the given list, " 255 "please split it up!") 256 elif len(datedirSet) == 1: 257 self.dateVersStr = datedirSet.pop() 258 self.dateVersStrList = [self.dateVersStr] 259 else: 260 raise SystemExit("<ERROR> No date found in the list!")
261 262 #-------------------------------------------------------------------------- 263
264 - def updater(self, database):
265 """ 266 Update the given database. 267 268 @param database: Database to update. 269 @type database: str 270 271 """ 272 Logger.addMessage("Connecting to %s" % database) 273 archive = DbSession(database, autoCommit=True, 274 isTrialRun=self.isTrialRun, userName=self.userName) 275 276 # get a dict of entries from the DB 277 Logger.addMessage("Getting file names...") 278 mfidDict = self.getFileNames(archive) 279 280 # build a dictionary of data to update 281 newMfidDict = defaultdict(list) 282 283 fitsDirWarning = False 284 filePathWarning = False 285 pfnlaStr = "PixelFileNoLongerAvailable:" 286 updateMask = self._updateMask.copy() 287 Logger.addMessage("Checking %s files..." % len(mfidDict)) 288 progress = ForLoopMonitor(sorted(mfidDict)) 289 for mfID in sorted(mfidDict): 290 oldDbFileName, oldDbCatName, dateVersStr = mfidDict[mfID] 291 # update from file 292 if self.updateFileName: 293 try: 294 newFileName = self._updateFileDict[oldDbFileName] 295 if "empty_catalogue.fits" in oldDbCatName \ 296 or oldDbCatName.upper() == "NONE": 297 newCatName = oldDbCatName 298 else: 299 newCatName = self._updateFileDict[oldDbCatName] 300 newMfidDict[mfID].extend([newFileName, newCatName]) 301 except KeyError: 302 Logger.addMessage( 303 "%s in DB but not in provided list." % oldDbFileName) 304 # update from master DB 305 elif self.checkDB: 306 masterDbFileName, masterDbCatName, masterDateVersStr = \ 307 self.masterMfidDict[mfID] 308 if masterDateVersStr == dateVersStr: 309 newFileName = (masterDbFileName 310 if oldDbFileName != masterDbFileName 311 else None) 312 newCatName = (masterDbCatName 313 if oldDbCatName != masterDbCatName 314 and not (oldDbCatName == "NONE" \ 315 and "empty" in masterDbCatName) 316 else None) 317 if newFileName or newCatName: 318 newMfidDict[mfID].extend([newFileName, newCatName]) 319 # update from arguments 320 else: 321 # get old file name properties 322 oldFile = File(oldDbFileName.replace(pfnlaStr, '')) 323 oldDisk = oldFile.diskdir 324 oldSubDir = oldFile.commondir 325 if updateMask['disk']: 326 updateMask['disk'][0] = oldDisk 327 328 # Log update info 329 if oldSubDir == self.subDir: 330 if mfID == sorted(mfidDict)[0]: 331 text = [] 332 if updateMask['disk']: 333 text.append('=>'.join([oldDisk, self.newDisk])) 334 if updateMask['subdir']: 335 text.append('=>'.join([self.subDir, self.outDir])) 336 if updateMask['version']: 337 text.append('=>'.join([ 338 'version:' + self.updateVersion[0], 339 self.updateVersion[1]])) 340 if text: 341 Logger.addMessage("Updating %s" % ', '.join(text)) 342 else: 343 if not fitsDirWarning: 344 Logger.addMessage("<WARNING> DB file(s) in different "\ 345 "fits directory: %s (%s)" % ( 346 oldSubDir, self.subDir)) 347 fitsDirWarning = True 348 349 if self.checkPixAvailable: 350 if os.path.exists(oldFile.name): 351 updateMask["avail"] = [pfnlaStr, ''] 352 else: 353 if not pfnlaStr in oldDbFileName: 354 prefix = "/disk" 355 updateMask["avail"] = [ 356 prefix, pfnlaStr + prefix] 357 print updateMask 358 elif pfnlaStr + pfnlaStr in oldDbFileName: 359 updateMask["avail"] = [pfnlaStr + pfnlaStr, pfnlaStr] 360 else: 361 updateMask["avail"] = ['', ''] 362 363 if oldFile.subdir.rpartition('_v')[2] == self.updateVersion[0] \ 364 and dateVersStr in oldFile.name \ 365 and oldFile.commondir == self.subDir: 366 newDbFileName = self.updateFromArgs( 367 oldDbFileName, dateVersStr, updateMask) 368 if os.path.exists(newDbFileName.rpartition(':')[-1]): 369 newDbFileName = newDbFileName.replace(pfnlaStr, '') 370 if "empty_catalogue.fits" in oldDbCatName \ 371 or oldDbCatName.upper() == "NONE": 372 newDbCatName = oldDbCatName 373 else: 374 newDbCatName = self.updateFromArgs( 375 oldDbCatName, dateVersStr, updateMask) 376 if os.path.exists(newDbCatName.rpartition(':')[-1]): 377 newDbCatName = newDbCatName.replace(pfnlaStr, '') 378 else: 379 newDbFileName = oldDbFileName 380 newDbCatName = oldDbCatName 381 382 newFilePath = newDbFileName.rpartition(':')[-1] 383 if newDbFileName != oldDbFileName: 384 if not os.path.exists(newFilePath) \ 385 and pfnlaStr not in newDbFileName: 386 newDbFileName = ':'.join([pfnlaStr, newFilePath]) 387 Logger.addMessage( 388 "<WARNING> Image File missing: %s" % newFilePath) 389 filePathWarning = True 390 newMfidDict[mfID].extend([newDbFileName, newDbCatName]) 391 392 if not (newDbCatName.endswith("empty_catalogue.fits") 393 or newDbCatName.upper() == "NONE") \ 394 and not os.path.exists(newDbCatName): 395 Logger.addMessage( 396 "<WARNING> Catalogue File missing: %s" % newDbCatName) 397 filePathWarning = True 398 progress.testForOutput() 399 400 if filePathWarning or fitsDirWarning: 401 Logger.addMessage("<WARNING> Check if version is correct or " 402 "new file path exists.") 403 # update the DB 404 if newMfidDict: 405 Logger.addMessage("Updating %s entries in %s..." % 406 (len(newMfidDict), database)) 407 408 # Write to file if database is not the loadDB 409 if archive.database.upper().startswith(("UKIDSS", 410 "WORLD", "TRANSIT")): 411 path = os.path.join( 412 "/mnt", archive.sysc.loadServer, 413 "SqlUpdates", "FITS_PubDB_%s.%s_%sv%s_%s.sql" 414 % (archive.server, archive.database, 415 self.dateReq, self.updateVersion[1], 416 time.strftime('%Y%m%d_%H%M'))) 417 418 updateFile = File(path) 419 updateFile.aopen() 420 else: 421 updateFile = None 422 423 updateDBFileNames(archive, newMfidDict, updateFile) 424 if archive.database.upper().startswith(("UKIDSS", 425 "WORLD", "TRANSIT")): 426 updateFile.close() 427 428 if self.deprecateFiles: 429 if archive.server == archive.sysc.loadServer: 430 deprecateFileNames(archive, sorted(newMfidDict)) 431 else: 432 Logger.addMessage("You're not allowed to deprecate data " 433 "in a public database.") 434 else: 435 Logger.addMessage(database + " is up-to-date.")
436 437 #------------------------------------------------------------------------------ 438
439 -def deprecateFileNames(archive, mfIDList):
440 """ 441 Deprecate the frames in the multiframeID list. 442 443 @param archive: Connection to database to update. 444 @type archive: DbSession 445 @param mfIDList: List of MultiframeIDs. 446 @type mfIDList: list 447 448 """ 449 entries = "deprecated=deprecated+%s" % DepCodes.reprocCASU 450 451 rows = "deprecated<%s AND multiframeID IN (%s)" % \ 452 (DepCodes.reprocCASU, ','.join(map(str, mfIDList))) 453 454 Logger.addMessage("%s deprecated in Multiframe." % 455 archive.update("Multiframe", entries, rows)) 456 457 Logger.addMessage("%s deprecated in MultiframeDetector." % 458 archive.update("MultiframeDetector", entries, rows))
459 460 #------------------------------------------------------------------------------ 461
462 -def getAvailDBs(archive, mainServer, databases=None):
463 """ 464 Get all or selected DBs from given server. 465 466 @param archive: The archive to be updated, e.g. WSA. 467 @type archive: str 468 @param mainServer: The server to query for databases, can be the name or 469 the synonym, e.g. dbload or dbpub 470 (for all public servers). 471 @type mainServer: str 472 @param databases: Optionally request specific database names else all are 473 returned. 474 @type databases: sequence(str) 475 476 @return: List of full paths to databases on selected server. 477 @rtype: list(str) 478 479 """ 480 sysc = SystemConstants(archive) 481 482 # get all public databases 483 if mainServer == "dbpub": 484 availDatabases = [] 485 for server in sysc.publicServers: 486 pubDBs = set(queries.getAllDBs(server)) 487 if sysc.isVSA(): 488 pubDBs = set([db for db in pubDBs 489 if not db.startswith(("UKIDSS", "TRANSIT", 490 "WORLD"))]) 491 else: 492 pubDBs = set([db for db in pubDBs if not (db.startswith("V") 493 or db.endswith("SIAP"))]) 494 if databases: 495 pubDBs.intersection_update(databases) 496 availDatabases.extend(server + '.' + db for db in pubDBs) 497 498 # get all load server databases 499 elif mainServer == "dbload": 500 availDatabases = [sysc.loadServer + '.' + db for db in databases] 501 502 # get other specified databases 503 elif mainServer in sysc.publicServers + [sysc.loadServer]: 504 availDatabases = [mainServer + '.' + db for db in databases] 505 else: 506 raise SystemExit("<ERROR> %s not known." % mainServer) 507 508 return sorted(availDatabases)
509 510 #------------------------------------------------------------------------------ 511
512 -def updateDBFileNames(archive, mfDict, updateFile):
513 """ 514 Update the database entries for fileName and catName. 515 516 @param archive: Connection to database to update. 517 @type archive: DbSession 518 @param mfDict: Dictionary giving fileName, catName tuples for every MfID. 519 @type mfDict: dict(int:list(str)) 520 @param updateFile: Write to this file for read only DBs. 521 @type updateFile: File obj 522 523 """ 524 numRows = 0 525 for mfID in sorted(mfDict): 526 updSql = [] 527 fileName, catName = mfDict[mfID][:2] 528 if fileName: 529 fileName = fileName.replace( 530 "/disk", archive.sysc.pixelServerHostName + "/disk") 531 updSql.append("fileName='%s'" % fileName) 532 533 if catName: 534 #not (catName.endswith("empty_catalogue.fits") 535 # or catName.upper() == "NONE"): 536 catName = catName.replace( 537 "/disk", archive.sysc.pixelServerHostName + "/disk") 538 updSql.append("catName='%s'" % catName) 539 540 # Write to file on dbShare for public DBs 541 if archive.database.upper().startswith(("UKIDSS", "WORLD", "TRANSIT")): 542 numRows += 1 543 updateFile.writetheline( 544 "UPDATE Multiframe SET %s WHERE multiframeID=%s" % ( 545 ','.join(updSql), mfID)) 546 else: 547 numRows += archive.update("Multiframe", ','.join(updSql), 548 "multiframeID=%s" % mfID) 549 550 Logger.addMessage("%s fileNames affected in Multiframe." % numRows)
551 552 #------------------------------------------------------------------------------ 553 # Entry point for script. 554 555 # Allow module to be imported as well as executed from the command line 556 if __name__ == '__main__': 557 558 # Define command-line interface settings for UpdateFitsFileNames 559 CLI.progArgs.remove("database") 560 CLI.progArgs += [ 561 CLI.Argument("archive", "WSA", 562 isValOK=lambda x: x.lower() in ['wsa', 'vsa', 'osa']), 563 CLI.Argument("date", UpdateFits.dateVersStr, isOptional=True, 564 isValOK=CLI.isDateOK) 565 ] 566 CLI.progOpts += [ 567 CLI.Option('d', 'database', 568 "update individual database", 569 "NAME", isValOK=lambda x: x.count('.') <= 1), 570 CLI.Option('A', 'available', 571 "check if the pixel file is available and rename filename " 572 "accordingly in Multiframe"), 573 CLI.Option('D', 'deprecate', 574 "set the deprecated flag in Multiframe and " 575 "MutiframeDetector"), 576 CLI.Option('M', 'multidbs', 577 "'server::database' where server can be 'dbload' or 'dbpub' " 578 "and database a comma-seperated list of DBs or 'all'", 579 "LIST", isValOK=lambda x: "::" in x), 580 CLI.Option('f', 'filename', "File containing old/new file path pairs", 581 "FILE"), 582 CLI.Option('k', 'newdisk', "new disk containing the data", "DISK", 583 "diskNN", isValOK=lambda x: "disk" in x and x[4:].isdigit()), 584 CLI.Option('m', 'masterdb', 585 "check against this master DB.", 586 "DB", None), 587 CLI.Option('o', 'outdir', 588 "new destination subdirectory containing FITS date " 589 "directories", "DIR", UpdateFits.outDir), 590 CLI.Option('s', 'subdir', 591 "subdirectory containing FITS date directories", 592 "DIR", UpdateFits.subDir), 593 CLI.Option('v', 'version', "version number or update from version N " 594 "to version M ('N=>M', partitions on '=>')", "VERSION", 595 '1', isValOK=lambda x: x.replace('.', '').isdigit() 596 or "=>" in x) 597 ] 598 599 cli = CLI(UpdateFits, "$Revision: 9564 $") 600 Logger.isVerbose = False 601 Logger.addMessage(cli.getProgDetails()) 602 logFileName = "updateFitsFileNames_%s.log" % (time.strftime('%Y%m%d_%H%M')) 603 log = Logger(logFileName, path=os.curdir) 604 605 UpdateFits(cli).run() 606 607 #------------------------------------------------------------------------------ 608 # Change log: 609 # 610 # 12-Mar-2007, ETWS: First Version 611 # 13-Mar-2007, ETWS: Fixed load server list. 612 # 12-Sep-2007, ETWS: Included possibility to update from a log file containing 613 # old name, new name pairs. 614 # 14-Sep-2007, ETWS: Included functionality to deprecate updated files in DB. 615 # 10-Jan-2007, ETWS: Fixed update from given dateVersStr and new disk name. 616 # 11-Jan-2008, ETWS: Fixed bug in dataBasesDef. 617 # 14-Jan-2008, ETWS: Fixed update for files that weren't available before. 618 # 6-Mar-2008, ETWS: Enhanced documentation. 619 # 19-May-2008, ETWS: Fixed bug in file selection. 620 # 30-Jul-2008, ETWS: Enhanced for new deprecation handling. 621 # 31-Jul-2008, ETWS: Fixed update from list. 622 # 4-Feb-2009, ETWS: Rewritten; included version number update. 623