Package wsatools :: Module Automator
[hide private]

Source Code for Module wsatools.Automator

   1  #------------------------------------------------------------------------------ 
   2  #$Id: Automator.py 10022 2013-08-29 08:51:25Z RossCollins $ 
   3  """ 
   4     Automatic curation routines. Determines what needs to run on a given 
   5     programme and automatically calls those curation tasks. 
   6   
   7     @author: N.J.G. Cross 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9   
  10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  11     @contributors: R.S. Collins 
  12  """ 
  13  #------------------------------------------------------------------------------ 
  14  from __future__      import division, print_function 
  15  from future_builtins import map 
  16   
  17  from   collections import defaultdict 
  18  import gc 
  19  import os 
  20  import time 
  21   
  22  from   invocations.cu6.cu6         import Cu6 
  23  from   invocations.cu7.cu7         import Cu7 
  24  from   invocations.cu8.cu8         import Cu8 
  25  from   invocations.cu13.cu13       import Cu13 
  26  from   invocations.cu16.cu16       import Cu16 
  27  from   invocations.cu29.cu29       import Cu29 
  28   
  29  import wsatools.DataFactory             as df 
  30  import wsatools.DbConnect.CommonQueries as queries 
  31  from   wsatools.DbConnect.CuSession import CuSession 
  32  import wsatools.DbConnect.DbConstants   as dbc 
  33  from   wsatools.DbConnect.DbSession import DbSession, Join, SelectSQL 
  34  import wsatools.ExternalProcess         as extp 
  35  from   wsatools.FitsToDb            import ProvenanceFiller 
  36  import wsatools.FitsUtils               as fits 
  37  from   wsatools.Logger              import Logger 
  38  from   wsatools.QualityControl      import QualityFlagUpdater 
  39  from   wsatools.SourceMerger        import SynopticMerger, TilePawPrintMerger 
  40  from   wsatools.ProgrammeBuilder    import SynopticSetup, ProgrammeBuilder 
  41  from   wsatools.SystemConstants     import DepCodes, SystemConstants 
  42  import wsatools.Utilities               as utils 
  43  #------------------------------------------------------------------------------ 
  44  # @TODO: spatial/time variations - fit function f(x,y) to each intermediate 
  45  # @TODO: This should really be an Automator class, that has a subclass 
  46  #        DeepAutomator, which uses polymorphism for helper functions. 
  47   
48 -def autoCurate(cli, progID, releaseNum=0, 49 dateRange=SystemConstants().obsCal.dateRange(), 50 redoSources=False, redoProducts=None, 51 redoRecal=False, redoNeigh=False, doCu6=False, 52 skipMosaicCheck=False, minProdID=None, maxProdID=None, 53 fileListForType=None, isRerelease=False, numCPUs=None, 54 fields=None):
55 """ 56 Automatically runs the necessary curation tasks to prepare a programme's 57 data for release. 58 59 @param cli: Standard command-line options for CuSessions. 60 @type cli: CLI 61 @param progID: Unique ID for programme, or acronym. 62 @type progID: str 63 @param releaseNum: Curate products of this specific WFAU-product version 64 number. 65 @type releaseNum: int 66 @param dateRange: Curate data observed between these nights only. 67 @type dateRange: DateRange 68 @param redoSources: If True, force re-creation of source table. 69 @type redoSources: bool 70 @param redoProducts: List of specified product types to recreate along with 71 any descendants. 72 @type redoProducts: list(str) 73 @param redoRecal: If True, force recalibration of deep survey detections 74 from variability analysis. 75 @type redoRecal: bool 76 @param redoNeigh: If True, force recreation of neighbour tables. 77 @type redoNeigh: bool 78 @param doCu6: If True, also run CU6 with default options at the end. 79 @type doCu6: bool 80 @param skipMosaicCheck: If True, force curation, even if mosaic provenance 81 is incomplete. 82 @type skipMosaicCheck: bool 83 @param minProdID: Minimum product ID to curate for deep stacks. 84 @type minProdID: str 85 @param maxProdID: Maximum product ID to curate for deep stacks. 86 @type maxProdID: str 87 @param fileListForType: Append newly created image and catalogue products 88 to file lists written to the file paths given in 89 this dictionary referenced by product type. 90 @type fileListForType: str 91 @param isRerelease: If True, deprecate existing products of the current 92 release number before proceeding with curation. 93 @type isRerelease: bool 94 @param numCPUs: Optionally limit number of CPUs to this number. 95 @type numCPUs: int 96 @param fields: List of fieldIDs to curate. 97 @type fields: list(str) 98 99 @return: True, if ready for release. 100 @rtype: bool 101 102 """ 103 Logger.addMessage("Running automatic programme curation") 104 # Temporarily change the command-line comment to the default one. 105 cliComment = cli.resetOptArg("comment", CuSession.comment) 106 107 # Connect to requested database and check given programme 108 db = DbSession(autoCommit=True, isPersistent=True, cli=cli) 109 programme = df.ProgrammeTable(db) 110 programmeID = programme.setProgID(progID) 111 Logger.addMessage( 112 "Programme %s is a %s survey" % (programme.getAcronym().upper(), 113 "deep" if programme.isDeep() else "shallow")) 114 115 Logger.addMessage("Determining required products for this programme...") 116 numCPUs = Cu13.getUsefulNumCPUs(numCPUs, programme) 117 redoProducts = redoProducts or [] 118 fileListForType = fileListForType or fits.createFileListDict(programme) 119 releaseNum = releaseNum or getReleaseNum(programme) 120 Logger.addMessage("This will be release number: %s" % releaseNum) 121 122 # Use ProgrammeFrame to decide what needs to be done 123 # @TODO: Check ProgrammeFrame to see if non-surveys share multiframes and 124 # if so, bulk copy all data into the latest detection table, update 125 # object ID and somehow mark old non-survey programmeID as dead, 126 # e.g. remove its detections. At the very least we need a warning 127 # to cease curation of non-surveys with multiple programmeIDs. 128 # @TODO: Reset programme, deprecate all existing products, if non-survey!! 129 if isRerelease and programme.isDeep(): # @FIXME: isDeep() or always? 130 Logger.addMessage("Deprecating %s products with release number %s..." 131 % (programme.getAcronym().upper(), releaseNum)) 132 133 # @TODO: Test new code to see if doing it in parts works? 134 # Need rerelease reason - restack, retile, remosaic, recatalogue 135 # Or other software issue. 136 deprecateProducts(releaseNum, db, programme, deleteDeeps=True) 137 138 # Deprecate deeps which contain deprecated OBs 139 deprecateDodgyDeeps(db, programmeID) 140 Logger.addMessage( 141 "Querying database for existing products for this programme...") 142 143 try: 144 reqProdsOfType = queries.findRequiredDeepProducts(programme, minProdID, 145 maxProdID, 146 fieldIDs=fields) 147 except queries.ProgrammeNotSetUpError as error: 148 raise CuSession.CuError(error) 149 150 Logger.addMessage("Determining synoptic table setup...") 151 synSetup = SynopticSetup(programme, dateRange) 152 153 createdImagesOfType = defaultdict(dict) 154 ingestProdsOfType = {} 155 newProdsOfType = defaultdict(list) 156 for pType in reqProdsOfType: 157 ingestedProdIDs, numMultiframes = getIngestedProducts(db, programmeID, 158 releaseNum, reqProdsOfType[pType], pType) 159 160 # Redo stacks of previous release num 161 ingestProdsOfType[pType] = set(reqProdsOfType[pType]) 162 if pType not in redoProducts: 163 ingestProdsOfType[pType] -= ingestedProdIDs 164 165 if numMultiframes < len(reqProdsOfType[pType]): 166 Logger.addMessage("Database has an incomplete set of %s products" 167 % pType) 168 169 Logger.addMessage("Searching file system for existing products...") 170 createdImagesOfType[pType] = findCreatedProducts(db, programmeID, 171 releaseNum, reqProdsOfType, ingestedProdIDs, pType) 172 173 newProdsOfType[pType] = \ 174 ingestProdsOfType[pType].difference(createdImagesOfType[pType]) 175 176 areIngestProds = \ 177 any(createdImagesOfType[pType] for pType in createdImagesOfType) 178 179 if areIngestProds: 180 Logger.addMessage("Found non-ingested existing products; " 181 "adding to ingest file lists...") 182 183 for pType in createdImagesOfType: 184 if createdImagesOfType[pType]: 185 fileList = (prod.imageName + '\n' 186 + prod.confName + '\n' 187 + prod.catName + '\n' 188 for prod in createdImagesOfType[pType].values()) 189 190 file(fileListForType[pType], 'a').writelines(fileList) 191 192 if any(newProdsOfType[pType] for pType in newProdsOfType): 193 if releaseNum > 1: 194 Logger.addMessage( 195 "Deprecating %s products with release numbers < %s..." 196 % (programme.getAcronym().upper(), releaseNum)) 197 198 deprecateProducts(releaseNum, db, programme, depOld=True) 199 200 comment = "Updating detection quality bit-flags for %s CASU products" \ 201 % programme.getAcronym().upper() 202 203 Logger.addMessage(comment) 204 prepareCu("QualityFlagUpdater", db, cli, comment=comment) 205 QualityFlagUpdater.skipProvenanceCheck = True 206 QualityFlagUpdater(str(programmeID), cli=cli).run() 207 208 if db.sysc.isVSA(): 209 prepareCu("TilePawPrintMerger", db, cli) 210 TilePawPrintMerger.dateRange = dateRange 211 TilePawPrintMerger(str(programmeID), cli=cli).run() 212 213 Logger.addMessage("Attempting to create missing required deep " 214 "products:") 215 216 for pType in newProdsOfType: 217 if newProdsOfType[pType]: 218 Logger.addMessage(pType.title() + "s: " + 219 utils.numberRange(newProdsOfType[pType])) 220 221 prepareCu("Cu13", db, cli, 222 comment="Creating deep products for release %s of " % releaseNum 223 + programme.getAcronym().upper()) 224 225 creator = Cu13(str(programmeID), cli=cli) 226 creator.breakIntoParts = True 227 creator.dateRange = dateRange 228 creator.doQFlagCheck = False 229 creator.fileListForType = fileListForType 230 creator.newProductsOfType = createdImagesOfType 231 creator.numCPUs = numCPUs 232 creator.releaseNum = releaseNum 233 creator.reprocessTypes = redoProducts 234 creator.reqProdsOfType = newProdsOfType 235 creator.useDeep = True 236 # @TODO: CU13 doesn't force redo if the same night. Is this required? 237 # Also I think it might be a good idea to delete any existing 238 # products with the same product ID and release number, 239 # irrespective of night, in addition to deprecating their 240 # database entries. 241 creator.run() 242 243 # Reduce the number of expected files by the number not created. 244 for pType in ingestProdsOfType: 245 ingestProdsOfType[pType] -= \ 246 newProdsOfType[pType].difference(creator.newProductsOfType[pType]) 247 248 del creator 249 250 for pType in ingestProdsOfType: 251 if ingestProdsOfType[pType]: 252 areIngestProds = True 253 Logger.addMessage( 254 "Deprecating %ss to be ingested with product IDs: " % pType 255 + utils.numberRange(ingestProdsOfType[pType])) 256 257 deprecateProducts(releaseNum, db, programme, 258 (pType, ingestProdsOfType[pType])) 259 260 # Add already created products to file lists 261 fileLists = [fileList for fileList in fileListForType.values() 262 if os.path.exists(fileList)] 263 264 if areIngestProds: 265 if not ingestDeepStacks(db, fileLists, releaseNum) \ 266 or programme.isNonSurvey(): 267 # Stop here if running on the load db or on non-surveys, where the 268 # file list is appended to for multiple non-surveys. 269 # Restore command-line comment to the user-supplied one. 270 cli.resetOptArg("comment", cliComment) 271 return False 272 273 isNewData = areIngestProds 274 isNewDeepData = areIngestProds 275 if not areIngestProds: 276 Logger.addMessage("Querying existing source data...") 277 mergedMFIDs = getMergedMFIDs(db, programme) 278 fTypes = \ 279 queries.getFrameSelection(programme.getAttr("sourceProdType"), 280 deepOnly=True) 281 282 multiframeIDs = db.query("Multiframe.multiframeID", 283 fromStr=Join(["Multiframe", "ProgrammeFrame"], "multiframeID"), 284 whereStr="programmeID=%d AND releaseNum=%d AND productID>0 AND %s " 285 "AND deprecated=0" % (programmeID, releaseNum, fTypes)) 286 287 # @TODO: check logic 288 isNewDeepData = len(set(multiframeIDs) - mergedMFIDs) > 0 289 fTypes = \ 290 queries.getFrameSelection(programme.getAttr("sourceProdType")) 291 292 multiframeIDs = db.query("Multiframe.multiframeID", 293 fromStr=Join(["Multiframe", "ProgrammeFrame"], "multiframeID"), 294 whereStr="programmeID=%d AND releaseNum=%d AND productID>0 AND %s " 295 "AND deprecated=0" % (programmeID, releaseNum, fTypes)) 296 297 isNewData = len(set(multiframeIDs) - mergedMFIDs) > 0 298 299 # If new deep products have been ingested... 300 if programme.isDeep() and isNewDeepData: 301 # Update Provenance but not for non-surveys, since this is invoked once 302 # for all non-surveys by the NonSurveyRelease script 303 if not programme.isNonSurvey(): 304 prepareCu("ProvenanceFiller", db, cli, 305 comment="Updating Provenance for new deep stacks in " 306 + programme.getAcronym().upper()) 307 308 ProvenanceFiller.combiFrameTypes = { 309 'undef': ['deep%', 'mosaic%'], 310 'nOffsets': ['tiledeep%']} 311 312 ProvenanceFiller.isQuickRun = True 313 ProvenanceFiller(cli=cli).run() 314 315 # Deprecate deep stack detector frames with no components 316 deprecateMissingDeepFrames(db, programmeID) 317 318 comment = "Updating detection quality bit-flags for %s WFAU products" 319 comment %= programme.getAcronym().upper() 320 Logger.addMessage(comment) 321 prepareCu("QualityFlagUpdater", db, cli, comment=comment) 322 QualityFlagUpdater.isDeepOnly = True 323 QualityFlagUpdater.skipProvenanceCheck = skipMosaicCheck 324 QualityFlagUpdater(str(programmeID), cli=cli).run() 325 326 # @TODO: Include TilePawPrintMerger once s/w is ready. 327 if db.sysc.isVSA(): 328 prepareCu("TilePawPrintMerger", db, cli) 329 TilePawPrintMerger.deepOnly = True 330 TilePawPrintMerger(str(programmeID), cli=cli).run() 331 332 doSources = redoSources or isNewData 333 doSeaming = doSources or db.queryEntriesExist(programme.getMergeLogTable(), 334 where="newFrameSet=%s" % dbc.yes()) 335 336 if doSources or doSeaming: 337 prepareCu("Cu7", db, cli) 338 Cu7.continueCompleteReseam = True # @NOTE: Turn off if appending 339 Cu7.createNewTable = doSources 340 # @TODO: source merge type - sourceProdType 341 # @NOTE: If we ever append then the beginDate needs to go. 342 Cu7.dateRange = dateRange 343 Cu7(str(programmeID), cli=cli).run() 344 345 # min and max used in multi-epoch data 346 reqProducts = reqProdsOfType[programme.getAttr("sourceProdType")] 347 minProdID = min(reqProducts) 348 maxProdID = max(reqProducts) 349 doRecal = programme.isDeep() and (redoRecal or isUncalibrated(programme)) 350 # @TODO: Check VIDEO / VIKING - is there an improvement? 351 # @TODO: Eventually recal pawprints and use these 352 if doRecal and programme.getAttr("epochFrameType") == "stack": 353 prepareCu("Cu8", db, cli, 354 comment="Relative recalibration of %s intermediate stacks" 355 % programme.getAcronym().upper()) 356 357 calPath = (db.sysc.calibrationDir() if db.isRealRun else 358 os.path.join(db.sysc.testOutputPath(), "calibrations")) 359 360 utils.ensureDirExist(calPath) 361 362 # @TODO: Make sure detection table is not zero length for deeps. 363 Cu8.copyFits = not db.isRealRun 364 Cu8.dateRange = dateRange 365 Cu8.releaseNum = releaseNum 366 if not db.isRealRun: 367 Cu8.fromMf = (0,) # Just update database, not FITS files 368 369 Cu8.reqProdRange = (minProdID, maxProdID) 370 Cu8.tablePathName = os.path.join(calPath, "wfau-p%s-r%s-%s.log" 371 % (programmeID, releaseNum, time.strftime('%Y%m%d'))) 372 373 Cu8(str(programmeID), cli=cli).run() 374 375 isNewSources = doSources or doSeaming 376 if doCu6 and programme.isSynoptic(): 377 # @TODO: use startContinuous and check of SynopticMergeLog to see if 378 # needs updating. 379 # @TODO: Keeps rechecking in VVV - why? 380 doSynoptic = isNewSources or doRecal 381 synDateRange = synSetup.getTimeRange(isCorrelated=True) 382 if not doSynoptic: 383 Logger.addMessage("Querying existing synoptic data...") 384 intMFIDs = queries.getIntStacks(db, programmeID, reqProducts, 385 programme.getAttr("sourceProdType"), synDateRange, 386 intFrameType=programme.getAttr("epochFrameType")) 387 388 mergedMFIDs = getMergedMFIDs(db, programme, isSynoptic=True) 389 doSynoptic = len(intMFIDs - mergedMFIDs) > 0 390 391 if doSynoptic: 392 prepareCu("SynopticMerger", db, cli, 393 comment="Producing synoptic source table of correlated passbands") 394 395 SynopticMerger.minProdID = \ 396 min(reqProdsOfType[programme.getAttr("epochFrameType")]) 397 398 SynopticMerger.maxProdID = \ 399 max(reqProdsOfType[programme.getAttr("epochFrameType")]) 400 401 # @TODO: recreate? What circumstances 402 SynopticMerger.dateRange = synDateRange 403 SynopticMerger(str(programmeID), cli=cli).run() 404 405 # Only CU16 and CU6 rely upon detection deprecation flags being up-to-date 406 if programme.isDeep(): 407 propagateDeprecations(programme) 408 409 # Add in any survey team catalogues. 410 doCu29 = db.sysc.isVSA() and (isNewSources 411 or isCuTaskIncomplete(programme, cuNum=29)) 412 if doCu29: 413 prepareCu("Cu29", db, cli) 414 Cu29(str(programmeID), cli=cli).run() 415 416 # Update schema for safety, unless multi-programme non-survey release 417 if doCu29 or not programme.isNonSurvey(): 418 prepareCu("SurveyParser", db, cli) 419 pb = ProgrammeBuilder(str(programmeID), 420 database=db.server + "." + db.database, 421 isTrialRun=True, userName=db.userName) 422 423 pb.createSchemaOnly = True 424 pb.run() 425 del pb 426 427 if redoNeigh or isNewSources or isCuTaskIncomplete(programme, cuNum=16): 428 prepareCu("Cu16", db, cli) 429 Cu16.dateRange = dateRange 430 Cu16(str(programmeID), cli=cli).run() 431 432 if doCu6 and programme.isDeep(): 433 prepareCu("Cu6", db, cli) 434 Cu6.minProdID = minProdID 435 Cu6.maxProdID = maxProdID 436 Cu6.dateRange = dateRange 437 Cu6(progID, cli=cli).run() 438 439 # Restore command-line comment to the user-supplied one. 440 cli.resetOptArg("comment", cliComment) 441 Logger.addMessage("Automatic programme curation complete") 442 443 # UDS should always return False so that CU21 is not invoked 444 return programmeID is not db.sysc.scienceProgs.get("UDS")
445 446 #------------------------------------------------------------------------------ 447
448 -def prepareCu(cuName, db, cli, comment=CuSession.comment):
449 """ Must be run before each CuSession in the batch script. 450 """ 451 # Open fresh database connection for each CU to improve reliability 452 db.goOffline() 453 gc.collect() 454 cli.resetOptArg("comment", comment) 455 Logger.addMessage(cli.getProgDetails(cuName))
456 457 #------------------------------------------------------------------------------ 458 459
460 -def deprecateMissingDeepFrames(db, programmeID):
461 """ 462 Deprecate deep stack detector frames with no components. 463 464 @param db: Connection to database where deprecations are to be made. 465 @type db: DbSession 466 @param programmeID: Only deprecrate frames belonging to this unique 467 programme ID. 468 @type programmeID: int 469 470 """ 471 Logger.addMessage( 472 "Deprecating missing detector frames of newly ingested deep stacks...") 473 Logger.addMessage("...%s deep detector frames deprecated." % 474 db.update("MultiframeDetector", "deprecated=103", 475 where="totalExpTime=0 AND frameType LIKE 'deep%stack' AND " 476 + "programmeID=%s AND MultiframeDetector.deprecated=0" 477 % programmeID, 478 fromTables=Join(["Multiframe", "ProgrammeFrame", "MultiframeDetector"], 479 ["multiframeID"])))
480 481 #------------------------------------------------------------------------------ 482
483 -def deprecateDodgyDeeps(db, programmeID):
484 """ 485 Deprecates deeps where some components have been deprecated 486 487 @param db: Connection to database where deprecations are to be made. 488 @type db: DbSession 489 @param programmeID: UID of current programme 490 @type programmeID: int 491 492 493 """ 494 495 # Find deep stacks with deprecated components 496 dodgyDeepMfIDs = SelectSQL( 497 "distinct md.multiframeID", 498 "ProgrammeFrame as p,Multiframe as md,Provenance as v," 499 "Multiframe as mo", 500 "p.programmeID=%s AND p.multiframeID=md.multiframeID AND " 501 "md.frameType like '%%deep%%stack' AND md.frameType not like " 502 "'%%tile%%' AND md.frameType not like '%%mosaic%%' AND " 503 "md.deprecated IN (0,255) AND md.multiframeID=v.combiframeID AND " 504 "v.multiframeID=mo.multiframeID AND mo.deprecated>0 AND " 505 "mo.deprecated NOT IN (51)" % programmeID) 506 507 nStacks = db.update("Multiframe", [("deprecated", DepCodes.constitDep)], 508 "multiframeID in (%s)" % dodgyDeepMfIDs) 509 Logger.addMessage("Deprecated %s deep stacks with deprecated components" 510 % nStacks) 511 if 'tile' in db.sysc.productTypes: 512 # Find deep tiles with deprecated components 513 dodgyDeepMfIDs = SelectSQL( 514 "distinct md.multiframeID", 515 "ProgrammeFrame as p,Multiframe as md,Provenance as v," 516 "Multiframe as mo", 517 "p.programmeID=%s AND p.multiframeID=md.multiframeID AND " 518 "md.frameType like 'tile%%deep%%stack' AND md.frameType not " 519 "like '%%mosaic%%' AND " 520 "md.deprecated IN (0,255) AND md.multiframeID=v.combiframeID AND " 521 "v.multiframeID=mo.multiframeID AND mo.deprecated>0 AND " 522 "mo.deprecated NOT IN (51,255)" % programmeID) 523 524 nTiles = db.update("Multiframe", [("deprecated", DepCodes.constitDep)], 525 "multiframeID in (%s)" % dodgyDeepMfIDs) 526 Logger.addMessage("Deprecated %s deep tiles with deprecated components" 527 % nTiles)
528 #------------------------------------------------------------------------------ 529
530 -def deprecateProducts(releaseNum, db, programme, prodIDs=None, depOld=False, 531 deleteDeeps=False):
532 """ 533 Deprecates products of given release numbers for the current programme. 534 Optionally supply a specific list of product IDs to deprecate. 535 536 @param releaseNum: Release number currently being curated. 537 @type releaseNum: int 538 @param db: Connection to database where deprecations are to be made. 539 @type db: DbSession 540 @param programme: Programme table for current database with the current row 541 set to that of the programme for which the products will 542 be deprecated. 543 @type programme: df.ProgrammeTable 544 @param prodIDs: List of product IDs to deprecate with prodType. 545 @type prodIDs: tuple(prodType, list(int)) 546 @param depOld: If True, deprecate files with release numbers older than 547 the current release, else just deprecate files with the 548 current release number. 549 @type depOld: bool 550 551 """ 552 if depOld and releaseNum <= 1: 553 return 554 555 if prodIDs: 556 prodType, pIDs = prodIDs 557 frameTypePredicate = queries.getFrameSelection(prodType, noDeeps=False, 558 selType='%stack%') 559 else: 560 prodType = '' 561 frameTypePredicate = "frameType LIKE '%deep%'" 562 563 progPredicate = " AND programmeID=%s" % programme.getAttr("programmeID") 564 productPredicate = (" AND productID IN (%s)" % ','.join(map(str, pIDs)) 565 if prodIDs else '') 566 567 # By making releaseNum in fileName, doesn't deprecate deeps that 568 # have changed their releaseNum with no new data 569 releasePredicate = (" AND releaseNum BETWEEN 1 AND " + str(releaseNum - 1) 570 if depOld else 571 " AND releaseNum=%d and fileName like '%%_v%d%%'" % 572 (releaseNum, releaseNum)) 573 574 # Only deprecate deeps 575 dmfIDs = SelectSQL("Multiframe.multiframeID", 576 table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 577 where="deprecated=0 AND " + frameTypePredicate + progPredicate 578 + releasePredicate + productPredicate) 579 580 if deleteDeeps: 581 for mfID, fileName in db.query("multiframeID, fileName", "Multiframe", 582 whereStr="multiframeID IN (%s)" % dmfIDs): 583 584 fName = fileName.split(":")[-1] 585 if os.path.exists(fName): 586 if (db.sysc.testOutputPath() not in fName 587 and os.getenv('USER') != 'scos'): 588 Logger.addMessage("<WARNING> Cannot delete %s, because it " 589 "is write-protected" % fName) 590 else: 591 Logger.addMessage("Deleting %s, which is obsolete" % fName) 592 os.remove(fName) 593 db.update("Multiframe", 594 "fileName='PixelFileNoLongerAvailable:%s'" % fName, 595 where="multiframeID=%s" % mfID) 596 597 numDep = db.update("Multiframe", "deprecated=%s" % DepCodes.reprocWFAU, 598 where="multiframeID IN (%s) and deprecated=0" % dmfIDs) 599 600 Logger.addMessage("...%s products deprecated." % numDep) 601 if numDep: 602 Logger.addMessage("Propagating deprecations to MultiframeDetector...") 603 numDep = db.update("MultiframeDetector", 604 entryList="MultiframeDetector.deprecated=%s" % DepCodes.reprocWFAU, 605 where="Multiframe.deprecated=%s AND " % DepCodes.reprocWFAU 606 + "MultiframeDetector.deprecated!=%s" % DepCodes.reprocWFAU, 607 fromTables=Join(["Multiframe", "MultiframeDetector"], 608 ["multiframeID"])) 609 610 Logger.addMessage("...%s frames deprecated." % numDep) 611 612 #@@NOTE: Propagation of deprecation flag to detections should also occur 613 # here but is skipped to reduce the number of times it is called as 614 # this isn't immediately necessary at this stage. 615 616 if not depOld: 617 releasePredicate = " AND releaseNum=%d" % releaseNum 618 619 Logger.addMessage("Resetting intermediate %s product assignments..." 620 % prodType) 621 622 623 imfIDs = SelectSQL("Multiframe.multiframeID", 624 table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 625 where="frameType NOT LIKE '%%deep%%'%s" % (" AND " + frameTypePredicate if prodIDs else "") 626 + releasePredicate + progPredicate + productPredicate) 627 628 numRes = db.update("ProgrammeFrame", 629 entryList=[("productID", dbc.intDefault()), 630 ("releaseNum", dbc.smallIntDefault())], 631 where="multiframeID IN (%s)" % imfIDs) 632 633 Logger.addMessage("...%s productIDs reset" % numRes)
634 635 #------------------------------------------------------------------------------ 636
637 -def findCreatedProducts(db, programmeID, releaseNum, reqProdsOfType, 638 ingestedIDs, prodType):
639 """ 640 Searches file system to find existing products from the list provided of 641 given file type and release number for the given programme. 642 643 @param db: Connection to database containing the observations. 644 @type db: DbSession 645 @param programmeID: Unique ID of desired programme's products. 646 @type programmeID: int 647 @param releaseNum: Release number of desired products. 648 @type releaseNum: int 649 @param reqProdsOfType: List of required productIDs for each product type. 650 @type reqProdsOfType: list(int) 651 @param ingestedIDs: List of already ingested productIDs for product type. 652 @type ingestedIDs: list(int) 653 @param prodType: Product type. 654 @type prodType: str 655 656 @return: Details of products already created reference by productID. 657 @rtype: dict(int: Cu13.ProductDetails) 658 659 """ 660 filterTable = df.Table("Filter", db) 661 imageProds = {} 662 prodInfo = queries.getProductInfo(db, programmeID, prodType, 663 reqProdsOfType[prodType]) 664 # @TODO: Should be <=107 --> finished but not ingested! 665 currentlyProcessingProducts = db.query( 666 "fileName", 667 "ProductProcessing", 668 "bitProcessingFlag<1073741824 AND programmeID=%s AND productType='%s'" 669 % (programmeID, prodType)) 670 671 currentlyFinishedProducts = db.query( 672 "fileName", 673 "ProductProcessing", 674 "bitProcessingFlag=1073741824 AND programmeID=%s AND productType='%s'" 675 % (programmeID, prodType)) 676 677 for productID in reqProdsOfType[prodType]: 678 if productID not in ingestedIDs: 679 prodFilePath = fits.findProduct(programmeID, releaseNum, productID, 680 prodType, prodInfo, filterTable, 681 not db.isRealRun, db.sysc) 682 # Check confidence file AND catalogue 683 if (prodFilePath and os.path.exists(fits.getConfMap(prodFilePath)) 684 and os.path.exists(fits.getCatalogue(prodFilePath)) 685 and prodFilePath in currentlyFinishedProducts): 686 687 imageProds[productID] = Cu13.ProductDetails(productID, 688 multiframeID=dbc.intDefault(), 689 imageName=prodFilePath, 690 confName=fits.getConfMap(prodFilePath), 691 catName=fits.getCatalogue(prodFilePath)) 692 693 elif prodFilePath and prodFilePath not in currentlyProcessingProducts: 694 Logger.addMessage("Incomplete product %s: %d formed that is " 695 "not in ProductProcessing" 696 " Deleting %s" % (prodType, productID, prodFilePath)) 697 698 os.remove(prodFilePath) 699 if os.path.exists(fits.getConfMap(prodFilePath)): 700 os.remove(fits.getConfMap(prodFilePath)) 701 702 if os.path.exists(fits.getCatalogue(prodFilePath)): 703 os.remove(fits.getCatalogue(prodFilePath)) 704 return imageProds
705 706 #------------------------------------------------------------------------------ 707
708 -def getIngestedProducts(db, programmeID, releaseNum, prodIDs, productType):
709 """ 710 Searches database for all existing products ingested in the database for 711 the given programme and date range. If deep product IDs provided then just 712 these existing products of the given release number are retrieved, 713 otherwise all non-deprecated observations with catalogues of the given type 714 are retrieved. 715 716 @param db: Connection to database containing the observations. 717 @type db: DbSession 718 @param programmeID: Unique ID of desired programme's observations. 719 @type programmeID: int 720 @param releaseNum: Release number of data to retrieve. 721 @type releaseNum: int 722 @param prodIDs: List of product IDs for all required deep products. 723 @type prodIDs: list(int) 724 @param productType: Frame type string pattern of required product. 725 @type productType: str 726 727 @return: Set of product IDs already assigned in the database and the total 728 number of multiframes with assigned product IDs. 729 @rtype: tuple(set(int), int) 730 731 """ 732 products = db.query("Multiframe.multiframeID, productID", 733 fromStr=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 734 whereStr="programmeID=%s AND releaseNum=%s AND productID IN (%s)" 735 " AND %s AND confID>0 AND %s" 736 % (programmeID, releaseNum, ','.join(map(str, prodIDs)), 737 queries.getFrameSelection(productType), 738 DepCodes.selectNonDeprecated)) 739 740 # @TODO: Check that unique set of mfIDs could differ in total from prodIDs 741 numMultiframes = len(set(product.multiframeID for product in products)) 742 743 return set(product.productID for product in products), numMultiframes
744 745 746 #------------------------------------------------------------------------------ 747
748 -def getMergedMFIDs(db, programme, isSynoptic=False):
749 """ 750 Retrieves all multiframe IDs listed in the merge log (synoptic if 751 requested) of the given programme. 752 753 @param db: Connection to database containing desired merge log. 754 @type db: DbSession 755 @param programme: Programme table for current database with the current 756 row set to the desired programme. 757 @type programme: df.ProgrammeTable 758 @param isSynoptic: If True, retrieve multiframe IDs from the synoptic merge 759 log. 760 @type isSynoptic: bool 761 762 @return: All multiframe IDs listed in the programme's merge log table. 763 @rtype: set(int) 764 765 """ 766 bandFrames = df.PassbandList(programme, isSynoptic).getMfCols() 767 768 mergeTable = (programme.getMergeLogTable() if not isSynoptic else 769 programme.getSynopticMergeLogTable()) 770 771 mergedMFIDs = set() 772 for mfIDs in db.query(', '.join(bandFrames), mergeTable): 773 if len(bandFrames) > 1: 774 mergedMFIDs.update(mfIDs) 775 else: # Single passband 776 mergedMFIDs.add(mfIDs) 777 778 mergedMFIDs.discard(dbc.intDefault()) 779 780 return mergedMFIDs
781 782 #------------------------------------------------------------------------------ 783
784 -def getReleaseNum(programme):
785 """ 786 Queries database to find the next available release number for a given 787 survey. 788 789 @param programme: Database Programme table information with the row set 790 to the current programme. 791 @type programme: DataFactory.ProgrammeTable 792 793 @return: Next available release number. 794 @rtype: int 795 796 """ 797 if not programme.isNonSurvey(): 798 raise SystemExit("<ERROR> When curating survey programmes, the " 799 "release number must be expressly stated as the relevant " 800 "WFAU-product version number, which must not be zero.") 801 802 db = programme._db 803 progID = programme.getAttr("programmeID") 804 relNum = db.queryAttrMax("releaseNum", "Release", "surveyID=%s" % progID) 805 806 return (relNum + 1 if relNum else 1)
807 808 #------------------------------------------------------------------------------ 809
810 -def ingestDeepStacks(db, fileLists, releaseNum):
811 """ 812 Ingests into given database the deep stack products listed in the given 813 file lists. 814 815 @param db: Connection to ingest database. 816 @type db: DbSession 817 @param fileLists: List of paths to files containing the list of deep stack 818 product files to ingest. 819 @type fileLists: list(str) 820 @param releaseNum: Release number of data to ingest. 821 @type releaseNum: int 822 823 @return: True, if stacks and catalogues are ingested. 824 @rtype: bool 825 826 """ 827 if db.database.startswith(db.sysc.loadDatabase): 828 for filePath in fileLists: 829 if filePath and os.path.exists(filePath): 830 Logger.addMessage( 831 "File list %s must be ingested as part of the ongoing " 832 "chron ingestion job to avoid ingesting files with the same " 833 "multiframe ID" % filePath) 834 return False 835 836 Logger.addMessage("Ingesting deep stacks and catalogues...") 837 838 # Run DataBuilder as command line script 839 dbScript = os.path.join(db.sysc.curationCodePath(), 840 'invocations', 'cu0', 'DataBuilder.py') 841 842 inScript = os.path.join(db.sysc.curationCodePath(), 843 'invocations', 'cu0', 'IngestCUFiles.py') 844 845 846 dbCmd = "%s -f -o '%s' -r -v %s" % (dbScript, db.sharePath(), releaseNum) 847 inCmd = "%s %s.%s" % (inScript, db.server, db.database) 848 849 for filePath in fileLists: 850 if filePath and os.path.exists(filePath): 851 # Remove old CU03ED_database files 852 outputDirs = set(os.path.dirname(line) for line in file(filePath)) 853 for path in outputDirs: 854 for cuNum in [3, 4]: 855 flagPath = \ 856 os.path.join(path, 'CU0%sED_%s' % (cuNum, db.database)) 857 if os.path.exists(flagPath): 858 Logger.addMessage("Removing old flag: " + flagPath) 859 os.remove(flagPath) 860 861 # Build and ingest CU3 862 subDir = 'products/' + getProductType(filePath) 863 args = " -x 3 -l %s -s %s %s.%s" 864 args %= (filePath, subDir, db.server, db.database) 865 extp.run(dbCmd + args, parseStdOut=False) 866 extp.run(inCmd, parseStdOut=False) 867 868 # If no CU03ED flag, produce one 869 for path in outputDirs: 870 for cuNum in [3]: 871 flagPath = \ 872 os.path.join(path, 'CU0%sED_%s' % (cuNum, db.database)) 873 if not os.path.exists(flagPath): 874 Logger.addMessage("Writing: " + flagPath) 875 file(flagPath, 'w').write('') 876 877 # Build and ingest CU4 878 args = " -x 4 -l %s -s %s %s.%s" 879 args %= (filePath, subDir, db.server, db.database) 880 extp.run(dbCmd + args, parseStdOut=False) 881 extp.run(inCmd, parseStdOut=False) 882 883 return True
884 885 #------------------------------------------------------------------------------ 886
887 -def getProductType(filePath):
888 """ Checks type of product being ingested 889 """ 890 prodTypes = set(fileName.split('products')[1].split('/')[0] 891 for fileName in file(filePath)) 892 893 if len(prodTypes) != 1: 894 raise Exception("Incorrect number of product types") 895 896 return prodTypes.pop()
897 898 #------------------------------------------------------------------------------ 899 900
901 -def isCuTaskIncomplete(programme, cuNum, refCuNum=13):
902 """ 903 Tests whether a programme has had the given CU task run for the current 904 release. 905 906 @param programme: Programme table for current database with the current 907 row set to the programme to test. 908 @type programme: df.ProgrammeTable 909 @param cuNum: Unique CU task ID for the CU to test for incompleteness. 910 @type cuNum: int 911 @param refCuNum: Unique CU task ID for the CU that should always occur 912 sometime prior to the CU being tested on a given 913 release run. 914 @type refCuNum: int 915 916 @return: True, if given programme has had a successful CU run for the 917 current release. 918 @rtype: bool 919 920 """ 921 curationHistory = programme._db.query("cuID", 922 Join(["ArchiveCurationHistory", "ProgrammeCurationHistory"], 923 ["cuEventID"]), 924 whereStr="cuID IN (%s, %s) AND programmeID=%s AND status=%s AND " 925 "rolledBack=%s ORDER BY ArchiveCurationHistory.cuEventID" 926 % (refCuNum, cuNum, programme.getAttr("programmeID"), 927 dbc.yes(), dbc.no())) 928 929 return refCuNum in curationHistory and curationHistory[-1] != cuNum
930 931 #------------------------------------------------------------------------------ 932
933 -def isUncalibrated(programme):
934 """ 935 Tests a deep programme to see if CU8 needs to be applied to it. 936 937 @param programme: Programme table for current database with the current 938 row set to the programme to test. 939 @type programme: df.ProgrammeTable 940 941 @return: True, if given programme has not had a successful CU8 run 942 following a deep catalogue ingest. 943 @rtype: bool 944 945 """ 946 return isCuTaskIncomplete(programme, cuNum=8)
947 948 #------------------------------------------------------------------------------ 949 950 # @TODO: This will become a common SQL query function, also used by 951 # UpdateDetectionTables, if not elsewhere. Could replace current 952 # deprecateOldDetections() used by CU4 953
954 -def propagateDeprecations(programme):
955 """ 956 Propagate deprecations from MultiframeDetector to the detection table of 957 the given programme. 958 959 @param programme: Programme table for current database with the current row 960 set to that of the programme where the deprecations 961 require propagating. 962 @type programme: df.ProgrammeTable 963 964 """ 965 Logger.addMessage("Updating deprecated flag for table %s..." 966 % programme.getDetectionTable()) 967 968 numRows = 0 969 for table in programme.getMonthlyDetectionTables(): 970 tableJoin = \ 971 Join([table, "MultiframeDetector"], ["multiframeID", "extNum"]) 972 973 # Detections from frames deprecated for special reasons which need to 974 # be kept later, set deprecation in detection to the same as in 975 # MultiframeDetector 976 numRows += programme._db.update(table, 977 entryList=table + ".deprecated=MultiframeDetector.deprecated", 978 where="MultiframeDetector.%s and MultiframeDetector.deprecated>0" 979 % DepCodes.selectNonDepAndTest, 980 fromTables=tableJoin) 981 982 # Detections from frames deprecated on grounds of quality have 983 # deprecated flags set to the value of 102. 984 numRows += programme._db.update(table, 985 entryList="%s.deprecated=%s" % (table, DepCodes.propDepDet), 986 where="MultiframeDetector.deprecated BETWEEN %s AND %s AND " 987 "%s.deprecated!=%s AND MultiframeDetector.%s" 988 % (DepCodes.nonDep + 1, DepCodes.reprocCASU - 1, table, 989 DepCodes.propDepDet, DepCodes.selectDepAndNotTest), 990 fromTables=tableJoin) 991 992 # Detections from frames deprecated due to reprocessing have deprecated 993 # flags set to the reprocessing deprecation value (either 128 or 255). 994 numRows += programme._db.update(table, 995 entryList="%s.deprecated=MultiframeDetector.deprecated" % table, 996 where=table + ".deprecated!=MultiframeDetector.deprecated AND " 997 "MultiframeDetector.deprecated>=%s" % DepCodes.reprocCASU, 998 fromTables=tableJoin) 999 1000 Logger.addMessage("%s rows affected." % numRows)
1001 1002 #------------------------------------------------------------------------------ 1003