Package invocations :: Package monitoring :: Module CheckIngests
[hide private]

Source Code for Module invocations.monitoring.CheckIngests

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: CheckIngests.py 10232 2014-03-03 14:10:13Z EckhardSutorius $ 
  4  """ 
  5     Checks ingests into Multiframe vs FlatFileLookup vs FITS files on disk. 
  6   
  7     @author: E. Sutorius 
  8     @org:    WFAU, IfA, University of Edinburgh 
  9  """ 
 10  #------------------------------------------------------------------------------ 
 11  from   collections import defaultdict, namedtuple 
 12  import inspect 
 13  import mx.DateTime as mxTime 
 14  import os 
 15  import time 
 16   
 17  from   wsatools.CLI                 import CLI 
 18  from   wsatools.DbConnect.DbSession import DbSession 
 19  from   wsatools.File                import File 
 20  import wsatools.FitsUtils               as fits 
 21  from   wsatools.DbConnect.IngCuSession import IngCuSession 
 22  from   wsatools.Logger              import Logger, ForLoopMonitor 
 23  import wsatools.Utilities               as utils 
 24  #------------------------------------------------------------------------------ 
 25   
26 -class CheckIngests(IngCuSession):
27 """ Check if metadata ingests where successful. 28 """ 29 30 #-------------------------------------------------------------------------- 31
32 - def __init__(self, 33 curator=CLI.getOptDef("curator"), 34 database=DbSession.database, 35 beginDate=CLI.getOptDef("begin"), 36 endDate=CLI.getOptDef("end"), 37 outPath=CLI.getOptDef("outpath"), 38 cuNums=CLI.getOptDef("cunums"), 39 versionStr=CLI.getOptDef("version"), 40 excludedIDs=CLI.getOptDef("exclude"), 41 checkOmetries=CLI.getOptDef("ometries"), 42 isTrialRun=DbSession.isTrialRun, 43 comment=CLI.getArgDef("comment")):
44 """ 45 @param beginDate: First date to process, eg. 20050101. 46 @type beginDate: str 47 @param comment: Descriptive comment as to why curation task is 48 being performed. 49 @type comment: str 50 @param checkOmetries: Check Astrometry and Phototmetry tables. 51 @type checkOmetries: bool 52 @param cuNums: Curation task numbers. 53 @type cuNums: list[int] 54 @param curator: Name of curator. 55 @type curator: str 56 @param database: Name of the database to connect to. 57 @type database: str 58 @param endDate: Last date to process, eg. 20050131. 59 @type endDate: str 60 @param excludedIDs: Excluded fileIDs. 61 @type excludedIDs: list 62 @param isTrialRun: If True, do not perform database modifications. 63 @type isTrialRun: bool 64 @param outPath: Log file directory. 65 @type outPath: str 66 @param versionStr: Version number. 67 @type versionStr: str 68 69 """ 70 # Initialize parent class 71 super(CheckIngests, self).__init__(cuNum=0, 72 curator=curator, 73 comment=comment, 74 database=database, 75 autoCommit=False, 76 isTrialRun=isTrialRun) 77 78 beginDate, endDate = \ 79 self.sysc.obsCal.getDatesFromInput(beginDate, endDate) 80 81 typeTranslation = {"curator":str, 82 "database":str, 83 "beginDate":str, 84 "endDate":str, 85 "cuNums":list, 86 "versionStr":str, 87 "excludedIDs":list, 88 "outPath":str, 89 "checkOmetries":bool, 90 "isTrialRun":bool, 91 "comment":str} 92 93 super(CheckIngests, self).attributesFromArguments( 94 inspect.getargspec(CheckIngests.__init__)[0], locals(), 95 types=typeTranslation) 96 97 if not self.versionStr: 98 self.versionStr = str(max( 99 self.sysc.obsCal.maxVersOfDate(self.beginDate), 100 self.sysc.obsCal.maxVersOfDate(self.endDate))) 101 102 if self.outPath == "./": 103 self.outPath = "./check%s" % self.sysc.loadDatabase 104 utils.ensureDirExist(self.outPath)
105 106 #-------------------------------------------------------------------------- 107
108 - def getDiskInfo(self):
109 """ Read the available dirs on the disks 110 """ 111 fitsDirs = fits.FitsList(self.sysc, prefix="ci_") 112 fitsDirs.createFitsDateDict(ingestDirectory=self.sysc.fitsDir, 113 beginDateStr=self.beginDate, 114 endDateStr=self.endDate, 115 versionStr=self.versionStr) 116 return fitsDirs
117 118 #-------------------------------------------------------------------------- 119
120 - def _onRun(self):
121 Logger.addMessage("Checking Ingests from %s to %s" % ( 122 self.beginDate, self.endDate)) 123 Logger.addMessage("Reading disk info...") 124 125 fitsDirs = self.getDiskInfo() 126 127 # get disk lists 128 diskFileDict = defaultdict(list) 129 for dateVersStr in sorted(fitsDirs.invFitsDateDict): 130 dirPath = os.path.join( 131 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr) 132 for fileName in os.listdir(dirPath): 133 theFile = File(os.path.join(dirPath, fileName)) 134 if theFile.fileID not in self.excludedIDs \ 135 and ".fit" in theFile.ext: 136 diskFileDict[theFile.fileID].append(theFile.name) 137 del theFile 138 139 obsDates = self.sysc.obsCal.getObsDates(self.beginDate, self.endDate, 140 self.versionStr) 141 obsMonths = sorted(set(x[:6] for x in obsDates)) 142 143 # get FlatFileLookup/Multiframe data 144 ffluDict = defaultdict(list) 145 mfidDict = defaultdict() 146 mfDict = defaultdict(list) 147 progIDbyFileID = defaultdict(list) 148 fileIDbyProgID = defaultdict(list) 149 db = DbSession(self.database) 150 db.enableDirtyRead() 151 152 Logger.addMessage("Querying metadata...") 153 progress = ForLoopMonitor(obsDates) 154 for dateVersStr in obsDates: 155 ffluData = db.query( 156 "multiframeID, fileName", "FlatFileLookup", 157 " AND ".join(["multiframeID>0", 158 "dateVersStr='%s'" % dateVersStr])) 159 for multiframeID, fileName in sorted(ffluData): 160 theFile = File(fileName) 161 if theFile.fileID not in self.excludedIDs: 162 mfidDict[multiframeID] = theFile.fileID 163 ffluDict[theFile.fileID] = [theFile.name, multiframeID] 164 del theFile 165 mfData = db.query( 166 "fileName, catName, programmeID", 167 "Multiframe m, programmeFrame p", 168 " AND ".join(["m.multiframeID = p.multiframeID", 169 "m.multiframeID>0", 170 "fileName like '%%%s%%'" % dateVersStr, 171 "fileName not like '%%deprecated%%'"])) 172 for fileName, catName, progID in sorted(mfData): 173 theFile = File(fileName) 174 if theFile.fileID not in self.excludedIDs: 175 if self.sysc.emptyFitsCatalogueFileName in catName: 176 #mfDict[theFile.fileID].extend([theFile.name, "empty"]) 177 pass 178 else: 179 mfDict[theFile.fileID].extend([theFile.name, catName]) 180 progIDbyFileID[theFile.fileID].append(progID) 181 fileIDbyProgID[progID].append(theFile.fileID) 182 del theFile 183 progress.testForOutput() 184 185 # get Detection data 186 theSurvs = defaultdict(list) 187 photoSurvs = defaultdict(list) 188 astroSurvs = defaultdict(list) 189 if 4 in self.cuNums: 190 if "VVV" in self.database: 191 tmpProgIdDict = dict(db.query( 192 "programmeID, detectionTable", "Programme", 193 "programmeID=120")) 194 else: 195 tmpProgIdDict = dict(db.query( 196 "programmeID, detectionTable", "Programme", 197 "programmeID>=1")) 198 if self.sysc.isVSA(): 199 del tmpProgIdDict[100] # remove SV-Orion 200 del tmpProgIdDict[101] # remove SV-NGC253 201 202 if "VVV" in self.database: 203 theSurvs = {120: [x for x in db.query( 204 "name", "sysobjects", "name like 'vvvDetectionRaw20%'") 205 if any(m in x for m in obsMonths)]} 206 if self.checkOmetries: 207 photoSurvs = {120: [x for x in db.query( 208 "name", "sysobjects", 209 "name like 'vvvDetectionPhotometry20%'") 210 if any(m in x for m in obsMonths)]} 211 astroSurvs = {120: [x for x in db.query( 212 "name", "sysobjects", 213 "name like 'vvvDetectionAstrometry20%'") 214 if any(m in x for m in obsMonths)]} 215 else: 216 for pid in tmpProgIdDict: 217 theSurvs[pid] = [tmpProgIdDict[pid] + "Raw"] 218 if self.checkOmetries: 219 photoSurvs[pid] = [tmpProgIdDict[pid] + "Photometry"] 220 astroSurvs[pid] = [tmpProgIdDict[pid] + "Astrometry"] 221 222 #tmpProgIdDict.update(utils.invertDict(tmpProgIdDict)) 223 224 detData = defaultdict(lambda : defaultdict(int)) 225 photoData = defaultdict(lambda : defaultdict(int)) 226 astroData = defaultdict(lambda : defaultdict(int)) 227 tmpData = defaultdict() 228 Logger.addMessage("Querying catalogues...") 229 for pid in sorted(theSurvs): 230 for detName in theSurvs[pid]: 231 Logger.addMessage("... %s" % detName) 232 tmpData = dict(db.query( 233 "multiframeID, count(*)", detName, 234 groupBy="multiframeID")) 235 for mfid in tmpData: 236 detData[pid][mfid] += tmpData[mfid] 237 if self.checkOmetries:# and "VVV" in self.database: 238 for pid in sorted(photoSurvs): 239 for detName in photoSurvs[pid]: 240 Logger.addMessage("... %s" % detName) 241 tmpData = dict(db.query( 242 "multiframeID, count(*)", detName, 243 groupBy="multiframeID")) 244 for mfid in tmpData: 245 photoData[pid][mfid] += tmpData[mfid] 246 for pid in sorted(astroSurvs): 247 for detName in astroSurvs[pid]: 248 Logger.addMessage("... %s" % detName) 249 tmpData = dict(db.query( 250 "multiframeID, count(*)", detName, 251 groupBy="multiframeID")) 252 for mfid in tmpData: 253 astroData[pid][mfid] += tmpData[mfid] 254 255 # compare ingests 256 misFiles = defaultdict(list) 257 if 3 in self.cuNums: 258 for fileID in diskFileDict: 259 if fileID not in ffluDict: 260 misFiles["fflu"].append((fileID, diskFileDict[fileID])) 261 if fileID not in mfDict: 262 misFiles["mf"].append((fileID, diskFileDict[fileID])) 263 elif len(diskFileDict[fileID]) == 3 and \ 264 sorted(mfDict[fileID]) != sorted(diskFileDict[fileID])[::2]: 265 misFiles["cat"].append((fileID, diskFileDict[fileID])) 266 267 if 4 in self.cuNums: 268 missMfids = defaultdict(list) 269 for progID in sorted(theSurvs): 270 allMfIds = set() 271 for fileID in fileIDbyProgID[progID]: 272 if fileID in ffluDict and \ 273 fileID in mfDict and len(mfDict[fileID]) > 1: 274 allMfIds.add(ffluDict[fileID][1]) 275 if ffluDict[fileID][1] not in detData[progID] or \ 276 detData[progID][ffluDict[fileID][1]] == 0: 277 if any(x in tmpProgIdDict.keys() 278 for x in progIDbyFileID[fileID]): 279 misFiles["detmf"].append( 280 (fileID, 281 ["%s: %d" % tuple(ffluDict[fileID])])) 282 misFiles["detcat"].append( 283 (fileID, 284 mfDict[fileID])) 285 #["%s\n%s" % tuple(mfDict[fileID])])) 286 missMfids['C'].append(ffluDict[fileID][1]) 287 288 if self.checkOmetries:# and "VVV" in self.database: 289 for mfid in sorted(allMfIds): 290 fileID = mfidDict[mfid] 291 if mfid not in photoData[progID] \ 292 or photoData[progID][mfid] == 0: 293 misFiles["detphoto"].append( 294 (fileID, mfDict[fileID])) 295 #["%s\n%s" % tuple(mfDict[fileID])])) 296 if mfid not in missMfids['P']: 297 missMfids['P'].append(mfid) 298 if mfid not in astroData[progID] \ 299 or astroData[progID][mfid] == 0: 300 misFiles["detastro"].append( 301 (fileID, mfDict[fileID])) 302 #["%s\n%s" % tuple(mfDict[fileID])])) 303 if mfid not in missMfids['A']: 304 missMfids['A'].append(mfid) 305 if mfid not in detData[progID] \ 306 or detData[progID][mfid] == 0: 307 misFiles["detraw"].append( 308 (fileID, mfDict[fileID])) 309 #["%s\n%s" % tuple(mfDict[fileID])])) 310 if mfid not in missMfids['R']: 311 missMfids['R'].append(mfid) 312 313 outFile = File(os.path.join( 314 self.outPath, "checkIngests_%s_%s_%s_%s.log" % ( 315 '_'.join(map(str, self.cuNums)), 316 self.sysc.loadDatabase, self.beginDate, self.endDate))) 317 outFile.wopen() 318 if 3 in self.cuNums: 319 toing3File = File(os.path.join( 320 self.outPath, "toing_3_%s_%s_%s.list" % ( 321 self.sysc.loadDatabase, self.beginDate, self.endDate))) 322 toing3File.wopen() 323 if 4 in self.cuNums: 324 toing4File = File(os.path.join( 325 self.outPath, "toing_4_%s_%s_%s.list" % ( 326 self.sysc.loadDatabase, self.beginDate, self.endDate))) 327 toing4File.wopen() 328 329 for misType in misFiles: 330 outFile.writetheline("%d missing in %s:" % (len(misFiles[misType]), 331 misType.upper())) 332 for entry in sorted(misFiles[misType]): 333 if "mf" in misType: 334 outFile.writetheline('%s : %s' % (entry[0], 335 ','.join(entry[1]))) 336 if misType == "mf": 337 toing3File.writetheline('\n'.join(entry[1])) 338 if misType in ["detcat", "detraw", "detphoto", "detastro"]: 339 outFile.writetheline('%s : %s' % (entry[0], 340 ','.join(entry[1]))) 341 toing4File.writetheline('\n'.join(entry[1])) 342 343 outFile.writetheline("MFIDs:") 344 for misType in missMfids: 345 outFile.writetheline(" %s: %s" % (misType, 346 ','.join("%d" % i for i in sorted(missMfids[misType])))) 347 outFile.close() 348 Logger.addMessage("Output written to %s" % outFile.name) 349 if 3 in self.cuNums: 350 toing3File.close() 351 Logger.addMessage("Ingest list written to %s" % (toing3File.name)) 352 if 4 in self.cuNums: 353 toing4File.close() 354 Logger.addMessage("Ingest list written to %s" % (toing4File.name))
355 356 #------------------------------------------------------------------------------ 357 # Entry point for script. 358 359 # Allow module to be imported as well as executed from the command line 360 if __name__ == "__main__": 361 # Define command-line interface settings for CheckIngests 362 CLI.progOpts += [ 363 CLI.Option('b', "begindate", 364 "first date to process, eg. 20050101", 365 "DATE", str(IngCuSession.beginDateDef)), 366 CLI.Option('e', "enddate", 367 "last date to process, eg. 20050131", 368 "DATE", str(IngCuSession.endDateDef)), 369 CLI.Option('o', "outpath", 370 "directory where the output data is written to", 371 "PATH", './'), 372 CLI.Option('v', "version", 373 "overwrite CASU version with this version", 374 "STR", ''), 375 CLI.Option('x', "cunums", 376 "csv list of CU numbers to check (3, 4)", 377 "LIST", '3', 378 isValOK=lambda x: x.replace(',', '').isdigit() and \ 379 all(int(y) in (3, 4) for y in x.replace(',', ''))), 380 CLI.Option('O', "ometries", 381 "with CU4 check also Astr- and Phot-ometry tables"), 382 CLI.Option('X', "exclude", 383 "fileIDs (<dateVersStr>/<file root without cat>) to " 384 "be excluded", 385 "LIST") 386 ] 387 cli = CLI(CheckIngests, "$Revision: 10232 $") 388 389 checkIngests = CheckIngests(cli.getOpt("curator"), 390 cli.getArg("database"), 391 cli.getOpt("begindate"), 392 cli.getOpt("enddate"), 393 cli.getOpt("outpath"), 394 cli.getOpt("cunums"), 395 cli.getOpt("version"), 396 cli.getOpt("exclude"), 397 cli.getOpt("ometries"), 398 cli.getOpt("test"), 399 cli.getArg("comment")) 400 checkIngests.run() 401