Package invocations :: Package monitoring :: Module CheckJpgs
[hide private]

Source Code for Module invocations.monitoring.CheckJpgs

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: CheckJpgs.py 8674 2011-09-16 17:12:03Z RossCollins $ 
  4  """ 
  5     Compares the entries in the database with the JPG paths and updates 
  6     them accordingly. 
  7   
  8     @author: E. Sutorius 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10  """ 
 11  #------------------------------------------------------------------------------ 
 12  from __future__      import print_function 
 13  from   collections import defaultdict 
 14  import mx.DateTime     as mxTime 
 15  import os 
 16   
 17  from   wsatools.CLI                 import CLI 
 18  from   wsatools.DbConnect.DbSession import DbSession 
 19  from   wsatools.File                import File 
 20  import wsatools.FitsUtils               as fits 
 21  from   wsatools.Logger              import Logger, ForLoopMonitor 
 22  import wsatools.DbConnect.CommonQueries as queries 
 23  from   wsatools.SystemConstants     import SystemConstants 
 24  #------------------------------------------------------------------------------ 
25 -class CheckJpgs(object):
26 """Checks JPG paths in DB and updates them. 27 """ 28 #-------------------------------------------------------------------------- 29 # Define public member variable default values (access as obj.varName) 30 # these need to be set from command-line options 31 isTrialRun = False #: Don't update just print statements? 32 userName = DbSession.userName #: Database user name. 33 34 #-------------------------------------------------------------------------- 35 # Private member variables 36 37 _databases = [] #: List of full-paths to databases to update. 38 _missingJpgs = set() #: List of FITS files without JPGs. 39 _updateEntries = defaultdict() #: Dict of entries that need to be updated. 40 41 #-------------------------------------------------------------------------- 42
43 - def __init__(self, 44 database=DbSession.database, 45 beginDate=CLI.getArgDef("startdate"), 46 endDate=CLI.getArgDef("enddate"), 47 versionStr=CLI.getArgDef("version"), 48 checkDB=CLI.getOptDef("check"), 49 updateRelDBs=CLI.getOptDef("updatedbs"), 50 outPath=CLI.getOptDef("outpath") 51 ):
52 """ 53 @param beginDate: First date to process, eg. 20050101. 54 @type beginDate: int 55 @param checkDB: Check the DB for wrong entries of compFile. 56 @type checkDB: bool 57 @param database: Name of the database to connect to. 58 @type database: str 59 @param endDate: Last date to process, eg. 20050131. 60 @type endDate: int 61 @param outPath: Path to directory for produced files 62 @type outPath: str 63 @param updateRelDBs: List of release DBs to update. 64 @type updateRelDBs: list 65 @param versionStr: Version number of the data. 66 @type versionStr: str 67 68 """ 69 self.database = database 70 self.sysc = SystemConstants(self.database.rpartition('.')[2]) 71 self.dateReq = (beginDate if beginDate == endDate 72 else '-'.join([beginDate, endDate])) 73 beginDate, endDate = \ 74 self.sysc.obsCal.getDatesFromInput(beginDate, endDate) 75 76 self.startDateStr = beginDate 77 self.endDateStr = endDate 78 self.versionStr = versionStr 79 self.startMJD = int(mxTime.strptime(self.startDateStr, "%Y%m%d").mjd) 80 self.endMJD = int(mxTime.strptime(self.endDateStr, "%Y%m%d").mjd) 81 self.checkDB = checkDB 82 self.outPath = outPath 83 self.updateRelDBs = updateRelDBs 84 85 if self.updateRelDBs: 86 server, dbs = self.updateRelDBs.split("::") 87 databases = set(dbs.split(',') if dbs != "all" else []) 88 self._databases = getAvailDBs(self.database, server, 89 databases) 90 91 Logger.addMessage("Connecting to master DB: %s" % 92 self.database.rpartition('.')[2]) 93 self.masterDb = DbSession(self.database, autoCommit=True, 94 isTrialRun=self.isTrialRun, 95 userName=self.userName) 96 Logger.addMessage("Getting jpg names...") 97 self.masterDbData = self.getMasterData()
98 99 #-------------------------------------------------------------------------- 100
101 - def run(self):
102 """ 103 """ 104 outFileTimeStamp = ''.join(["_", self.startDateStr, 105 "_", self.endDateStr, 106 "_", self.versionStr]) 107 outFileSuffix = ''.join(["_", self.database.rpartition('.')[2], 108 outFileTimeStamp]) 109 110 if self.checkDB: 111 jpgDiskData = self.getJpegPaths() 112 Logger.addMessage("Checking master DB: %s..." % 113 self.database.rpartition('.')[2]) 114 self.checkMasterDb(jpgDiskData) 115 116 if self._updateEntries: 117 Logger.addMessage("Updating %s..." % 118 self.database.rpartition('.')[2]) 119 updateDBFileNames(self.masterDb, self._updateEntries, 120 isTest=self.isTrialRun) 121 122 if self._missingJpgs: 123 reprocJpgsFile = File(os.path.join( 124 self.outPath, "reprocJPGs%s.list" % outFileSuffix)) 125 Logger.addMessage(' '.join([str(len(self._missingJpgs)), 126 "files with no JPGs are in", 127 reprocJpgsFile.name])) 128 reprocJpgsFile.wopen() 129 reprocJpgsFile.writelines(sorted(self._missingJpgs)) 130 reprocJpgsFile.close() 131 132 if self.updateRelDBs: 133 for db in self._databases: 134 self.updater(db)
135 136 #-------------------------------------------------------------------------- 137
138 - def checkMasterDb(self, diskData):
139 """Check the DB data against files on disk. 140 """ 141 for dateVersStr in self.masterDbData: 142 for mfExt in self.masterDbData[dateVersStr]: 143 partJpgDbName = self.masterDbData[dateVersStr][mfExt][0] 144 jpgDbName = partJpgDbName[2].rpartition('/')[2] 145 if jpgDbName in diskData[dateVersStr] and \ 146 not partJpgDbName[2] in diskData[dateVersStr][jpgDbName]: 147 if len(diskData[dateVersStr][jpgDbName]) > 2: 148 for fileName in diskData[dateVersStr][jpgDbName]: 149 if "deprecated" in self.masterDbData[dateVersStr][ 150 mfExt][1][2] and "deprecated" in fileName: 151 newJpgName = fileName 152 if not "deprecated" in self.masterDbData[ 153 dateVersStr][mfExt][1] and \ 154 not "deprecated" in fileName: 155 newJpgName = fileName 156 else: 157 newJpgName = diskData[dateVersStr][jpgDbName][0] 158 159 self._updateEntries[mfExt] = newJpgName 160 161 elif jpgDbName not in diskData[dateVersStr] and \ 162 'PixelFileNoLongerAvailable' not in \ 163 self.masterDbData[dateVersStr][mfExt][1][0] \ 164 and not "deprecated" in jpgDbName: 165 self._missingJpgs.add(self.masterDbData[dateVersStr][ 166 mfExt][1][2]) 167 Logger.addMessage("Found %d wrong entries and %d missing entries." % ( 168 len(self._updateEntries), len(self._missingJpgs)))
169 170 #-------------------------------------------------------------------------- 171
172 - def getJpegPaths(self):
173 """ Create a dictionary of compressed image dates and their dirs. 174 """ 175 self.raidDisks = self.sysc.availableRaidFileSystem() + \ 176 self.sysc.deprecatedDataStorage 177 178 deprDirs = [] 179 for direc in self.sysc.deprecatedDataStorage: 180 for subDir in os.listdir(os.path.join(direc,self.sysc.productsDir)): 181 if self.sysc.deprecatedComprImDir+"_20" in \ 182 os.path.join(self.sysc.productsDir, subDir): 183 deprDirs.append(os.path.join(self.sysc.productsDir, subDir)) 184 185 fitsDirs = [self.sysc.compressImDir] + deprDirs 186 187 jpgList = fits.FitsList(self.sysc, prefix="CJ_") 188 jpgList.createFitsDateDict(disklist=self.raidDisks, 189 ingestDirectory=fitsDirs, 190 beginDateStr=self.startDateStr, 191 endDateStr=self.endDateStr, 192 versionStr=self.versionStr, 193 forceLists=True) 194 195 jpgDataDict = defaultdict(dict) 196 for dateVersStr in jpgList.invFitsDateDict: 197 jpgDiskDict = defaultdict(list) 198 for jpgDir in jpgList.invFitsDateDict[dateVersStr]: 199 for jpgFile in os.listdir(os.path.join(jpgDir, dateVersStr)): 200 jpgDiskDict[jpgFile].append(os.path.join( 201 jpgDir, dateVersStr, jpgFile)) 202 jpgDataDict[dateVersStr].update(jpgDiskDict) 203 return jpgDataDict
204 205 #-------------------------------------------------------------------------- 206
207 - def getMasterData(self):
208 """ Read multiframeIDs, extNum, fileName, compFile from the master DB. 209 """ 210 dbJpgsDict = defaultdict(dict) 211 for mjd in xrange(self.startMJD, self.endMJD+1): 212 date = mxTime.DateTimeFromMJD(mjd) 213 dateVersStr = "%04d%02d%02d_v%s" % ( 214 date.year, date.month, date.day, self.versionStr) 215 querySelect = "F.dateVersStr, F.multiframeID, MFD.extNum," \ 216 " MFD.compFile, F.fileName" 217 queryFrom = "MultiframeDetector as MFD, FlatFileLookUp as F" 218 queryWhere = "F.multiframeID=MFD.multiframeID AND " \ 219 "dateVersStr='%s'" % dateVersStr 220 mfidlist = self.masterDb.query(querySelect, fromStr=queryFrom, 221 whereStr=queryWhere) 222 for entry in mfidlist: 223 dbJpgsDict[entry[0]].update({(entry[1], entry[2]):( 224 entry[3].rpartition(':'), entry[4].rpartition(':'))}) 225 Logger.addMessage("%d files found." % len(mfidlist)) 226 return dbJpgsDict
227 228 #-------------------------------------------------------------------------- 229
230 - def getDbData(self, archive):
231 """ Read multiframeIDs, extNum, compFile from the given database. 232 """ 233 dbJpgsDict = defaultdict(dict) 234 for mjd in xrange(self.startMJD, self.endMJD+1): 235 date = mxTime.DateTimeFromMJD(mjd) 236 dateVersStr = "%04d%02d%02d_v%s" % ( 237 date.year, date.month, date.day, self.versionStr) 238 querySelect = "'%s', multiframeID, extNum, compFile" % \ 239 dateVersStr 240 queryFrom = "MultiframeDetector" 241 queryWhere = "compFile like '%%%s%%'" % dateVersStr 242 243 mfidlist = archive.query(querySelect, fromStr=queryFrom, 244 whereStr=queryWhere) 245 for entry in mfidlist: 246 dbJpgsDict[entry[0]].update({(entry[1], entry[2]):( 247 entry[3].rpartition(':'))}) 248 return dbJpgsDict
249 250 #-------------------------------------------------------------------------- 251
252 - def updater(self, database):
253 """ 254 Update the given database. 255 256 @param database: Database to update. 257 @type database: str 258 259 """ 260 Logger.addMessage("Connecting to %s" % database) 261 archive = DbSession(database, autoCommit=True, 262 isTrialRun=self.isTrialRun, userName=self.userName) 263 264 # get a dict of entries from the DB 265 Logger.addMessage("Getting file names...") 266 dbJpgsDict = self.getDbData(archive) 267 268 # build a dictionary of data to update 269 newMfidDict = defaultdict() 270 271 Logger.addMessage("Checking %s files..." % len(dbJpgsDict)) 272 progress = ForLoopMonitor(sorted(self.masterDbData)) 273 274 for dateVersStr in sorted(dbJpgsDict): 275 for mfExt in dbJpgsDict[dateVersStr]: 276 if self.masterDbData[dateVersStr][mfExt][0][2] != dbJpgsDict[ 277 dateVersStr][mfExt][2]: 278 newMfidDict[mfExt] = self.masterDbData[dateVersStr][mfExt] 279 progress.testForOutput() 280 281 # update the DB 282 if newMfidDict: 283 Logger.addMessage("Updating %s entries in %s..." % 284 (len(newMfidDict), database)) 285 # Write to file if database is not the loadDB 286 if archive.database.upper().startswith(("UKIDSS", 287 "WORLD", "TRANSIT")): 288 path = os.path.join( 289 "/mnt", archive.sysc.loadServer, 290 "SqlUpdates", "JPGS_PubDB_%s.%s_%sv%s_%s.sql" 291 % (archive.server, archive.database, 292 self.dateReq, self.versionStr, 293 mxTime.localtime().strftime('%Y%m%d_%H%M'))) 294 295 updateFile = File(path) 296 updateFile.aopen() 297 else: 298 updateFile = None 299 300 updateDBFileNames(archive, newMfidDict, updateFile, 301 isTest=self.isTrialRun) 302 if archive.database.upper().startswith(("UKIDSS", 303 "WORLD", "TRANSIT")): 304 updateFile.close() 305 306 else: 307 Logger.addMessage(database + " is up-to-date.")
308 309 310 #------------------------------------------------------------------------------ 311
312 -def getAvailDBs(archive, mainServer, databases=None):
313 """ 314 Get all or selected DBs from given server. 315 316 @param archive: The archive to be updated, e.g. WSA. 317 @type archive: str 318 @param mainServer: The server to query for databases, can be the name or 319 the synonym, e.g. dbload or dbpub 320 (for all public servers). 321 @type mainServer: str 322 @param databases: Optionally request specific database names else all are 323 returned. 324 @type databases: sequence(str) 325 326 @return: List of full paths to databases on selected server. 327 @rtype: list(str) 328 329 """ 330 sysc = SystemConstants(archive) 331 332 # get all public databases 333 if mainServer == "dbpub": 334 availDatabases = [] 335 for server in sysc.publicServers: 336 pubDBs = set(queries.getAllDBs(server)) 337 if sysc.isVSA(): 338 pubDBs = set([db for db in pubDBs 339 if not db.startswith(("UKIDSS", "TRANSIT", 340 "WORLD"))]) 341 else: 342 pubDBs = set([db for db in pubDBs if not (db.startswith("V") 343 or db.endswith("SIAP"))]) 344 if databases: 345 pubDBs.intersection_update(databases) 346 availDatabases.extend(server+'.'+db for db in pubDBs) 347 348 # get all load server databases 349 elif mainServer == "dbload": 350 availDatabases = [sysc.loadServer+'.'+db for db in databases] 351 352 # get other specified databases 353 elif mainServer in sysc.publicServers + [sysc.loadServer]: 354 availDatabases = [mainServer+'.'+db for db in databases] 355 else: 356 raise SystemExit("<ERROR> %s not known." % mainServer) 357 358 return sorted(availDatabases)
359 #------------------------------------------------------------------------------ 360
361 -def updateDBFileNames(archive, mfDict, updateFile=None, isTest=False):
362 """ 363 Update the database entries for compFile. 364 365 @param archive: Connection to database to update. 366 @type archive: DbSession 367 @param mfDict: Dictionary giving compFile for every MfID, extNum tuple. 368 @type mfDict: dict(int:list(str)) 369 @param updateFile: Write to this file for read only DBs. 370 @type updateFile: File obj 371 372 """ 373 numRows = 0 374 for mfID, extNum in sorted(mfDict): 375 # Write to file on dbShare for public DBs 376 if archive.database.upper().startswith(("UKIDSS", "WORLD", "TRANSIT")): 377 numRows += 1 378 updateFile.writetheline( 379 "UPDATE MultiframeDetector SET %s WHERE multiframeID=%d " 380 "AND extNum=%d" % ( 381 "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]), 382 mfID, extNum)) 383 else: 384 if isTest: 385 numRows += 1 386 print("UPDATE MultiframeDetector SET %s WHERE multiframeID=%d " 387 "AND extNum=%d" % ( 388 "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]), 389 mfID, extNum)) 390 else: 391 numRows += archive.update( 392 "MultiframeDetector", 393 "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]), 394 " AND ".join(["multiframeID=%s" % mfID, "extNum=%s" % \ 395 extNum])) 396 397 Logger.addMessage("%s fileNames affected in MultiframeDetector." % numRows)
398 399 400 #------------------------------------------------------------------------------ 401 # Entry point for script. 402 403 # Allow module to be imported as well as executed from the command line 404 if __name__ == "__main__": 405 406 # Define command-line interface settings for CheckJpgs 407 CLI.progArgs += [ 408 CLI.Argument("startdate", "05A", isValOK=CLI.isDateOK), 409 CLI.Argument("enddate", "05A", isValOK=CLI.isDateOK), 410 CLI.Argument("version", '1') 411 ] 412 CLI.progOpts += [ 413 CLI.Option('U', 'updatedbs', 414 "release DBs to update; 'server::database' where server " 415 "can be 'dbload' or 'dbpub' and database a comma-seperated " 416 "list of DBs or 'all'", 417 "LIST", isValOK=lambda x: "::" in x), 418 CLI.Option('c', 'check', 419 "check and update the master DB for wrong entries of " 420 "compFile"), 421 CLI.Option('o', 'outpath', 422 "new destination for produced files", 423 "DIR", os.curdir) 424 ] 425 426 cli = CLI(CheckJpgs.__name__, "$Revision: 8674 $", 427 CheckJpgs.__doc__) 428 Logger.isVerbose = False 429 Logger.addMessage(cli.getProgDetails()) 430 431 CheckJpgs.isTrialRun = cli.getOpt("test") 432 CheckJpgs.userName = cli.getOpt("user") 433 checkJpgs = CheckJpgs( 434 cli.getArg("database"), 435 cli.getArg("startdate"), 436 cli.getArg("enddate"), 437 cli.getArg("version"), 438 cli.getOpt("check"), 439 cli.getOpt("updatedbs"), 440 cli.getOpt("outpath") 441 ) 442 checkJpgs.run() 443