Package invocations :: Package cu21 :: Module NonSurveyRelease
[hide private]

Source Code for Module invocations.cu21.NonSurveyRelease

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  # $Id: NonSurveyRelease.py 10022 2013-08-29 08:51:25Z RossCollins $ 
  4  """ 
  5     Releases non-survey data up to the given last night. First prepares the 
  6     non-survey data by applying quality control, before automatically 
  7     determining what kind of survey the programme is, which allows the curation 
  8     tables to be updated and survey tables defined and created. Finally, 
  9     performs all necessary release curation tasks on the complete set of 
 10     non-survey data in the load database and creates a release database on the 
 11     public catalogue server. 
 12   
 13     @author: R.S. Collins 
 14     @org:    WFAU, IfA, University of Edinburgh 
 15   
 16     @todo: Have an option to prepare the release of just shallow or deep surveys 
 17            (i.e. first stage run). However, this requires the script to do 
 18            the ProgrammeBuilder setup first to determine whether it is shallow 
 19            or deep, then the ProgrammeTable needs to test and skip if not the 
 20            right type (without failure). 
 21  """ 
 22  #------------------------------------------------------------------------------ 
 23  from __future__      import division, print_function 
 24  from future_builtins import map 
 25   
 26  from   operator import itemgetter 
 27  import os 
 28  import shutil 
 29  import time 
 30   
 31  from   invocations.cu19.cu19 import Cu19 
 32   
 33  from   wsatools.Automator           import autoCurate 
 34  from   wsatools.CLI                 import CLI 
 35  import wsatools.CSV                     as csv 
 36  from   wsatools.DataFactory         import ProgrammeTable 
 37  from   wsatools.DbConnect.CuSession import CuSession 
 38  from   wsatools.DbConnect.DbSession import DbSession, Join, SelectSQL 
 39  from   wsatools.FitsToDb            import ProvenanceFiller 
 40  import wsatools.FitsUtils               as fits 
 41  from   wsatools.Logger              import Logger 
 42  from   wsatools.ProgrammeBuilder    import ProgrammeBuilder, commitSchema 
 43  from   wsatools.QualityControl      import NonSurveyDeprecator 
 44  from   wsatools.SystemConstants     import SystemConstants 
 45  import wsatools.Utilities               as utils 
 46  #------------------------------------------------------------------------------ 
 47   
48 -class NonSurveyCurator(object):
49 """ 50 Performs post-data-ingest curation on non-survey data and automatically 51 prepares release databases. 52 53 @todo: This class borrows heavily on the L{CuSession} design, so need to 54 derive both L{CuSession} and this class from a class that represents 55 the common super set. 56 """ 57 #: Connection to the database to curate (DbSession). 58 archive = None 59 #: Command-line options (CLI). 60 cli = None 61 #: Range of observation nights to curate - only end date is applied. 62 dateRange = SystemConstants().obsCal.dateRange() 63 #: List of programmes that fail curation. 64 failures = [] 65 #: Locations of file lists of new products to ingest ref. by product type. 66 fileListForType = None 67 #: Just determine programmes to curate? 68 isDryRun = False 69 #: Perform the second part of the curation process that releases database? 70 isReleaseRun = False 71 #: All programmes currently being curated are shallow? 72 isShallowOnly = False 73 #: List of programmes to be curated, can be acronym or unique ID as string. 74 progIDs = [] 75 #: Database Programme table entries (ProgrammeTable). 76 progTable = None 77 #: Stage currently being curated. 78 stage = "No curation task performed" 79 #: List of programmes that have completed curation. 80 successes = [] 81 #: System constants object. 82 sysc = SystemConstants() 83 #: Temporary working directory location. 84 workSpace = '' 85 86 #-------------------------------------------------------------------------- 87
88 - def __init__(self, cli):
89 """ Initialises member variables from command-line options, which 90 enables it to determine the list of programmes to curate. 91 """ 92 self.cli = cli 93 self.isDryRun = cli.getOpt("dry_run") 94 self.isReleaseRun = cli.getOpt("release") 95 self.isShallowOnly = cli.getOpt("shallow_only") 96 # Reset mutable member variables. 97 self.failures = [] 98 self.successes = [] 99 100 Logger.addMessage("Querying database to prepare programme " 101 "selection...") 102 103 # Connect to database 104 self.archive = DbSession(autoCommit=True, isPersistent=True, cli=cli) 105 self.sysc = self.archive.sysc 106 self.progTable = ProgrammeTable(self.archive) 107 self.progIDs = self._getProgsToRelease() 108 109 Logger.addMessage("Checking catalogue server share paths...") 110 self.archive.testSharePath() 111 112 if not self.isReleaseRun: 113 # @@TODO: Need a better way to come up with a unique working 114 # directory. Can't rely upon process ID, as the same one is used 115 # for each CuSession. All codes need to use the CuSession.workDir 116 # member variable instead of sysc.tempWorkPath(), which is created 117 # with whatever unique name is available (using _X in a while loop 118 # rather than .old). sysc.tempWorkPath() will then just point to 119 # "tmp". Need to change all codes when working on VSA branch. This 120 # is also ideal with-block territory. 121 self.workSpace = self.sysc.tempWorkPath() + "nsr" 122 if os.path.exists(self.workSpace): 123 os.rename(self.workSpace, self.workSpace + ".old") 124 Logger.addMessage("<Warning> Working directory %s exists, " 125 "moved to %s.old" % (2 * (self.workSpace,))) 126 127 os.mkdir(self.workSpace) 128 self.fileListForType = fits.createFileListDict(self.progTable, 129 fileTag='nonsurveys', workingDir=self.workSpace)
130 131 #-------------------------------------------------------------------------- 132
133 - def run(self):
134 """ Executes the _onRun() method of the session within a try block, so 135 that all the standard exceptions may be dealt with correctly. 136 137 @todo: This should become part of abstracted class at a level above 138 CuSession. 139 """ 140 logFileName = '%s_NonSurveyRelease_%s.log' % \ 141 (self.archive.database, time.strftime('%Y%m%d_%H%M')) 142 try: 143 self._onRun() 144 except BaseException as error: 145 with utils.noInterrupt(): 146 stdErrMsg = CuSession.logException(error) 147 if error.args: 148 stdErrMsg += ": see log " + logFileName 149 raise SystemExit(stdErrMsg) 150 finally: 151 try: 152 self._onCompletion() 153 finally: 154 # Save last log entries 155 if not self.archive.isTrialRun: 156 Logger(logFileName)
157 158 #-------------------------------------------------------------------------- 159
160 - def _getProgsToRelease(self):
161 """ 162 Queries database and non-survey registration files to determine 163 selection of programmes to release according to command-line options. 164 165 @return: List of acronyms of programmes to release. 166 @rtype: list(str) 167 168 """ 169 if self.cli.getOpt("progID") != "NONSURVEYS": 170 progIDs = set(self.progTable.setProgID(progID) for progID 171 in csv.values(self.cli.getOpt("progID"))) 172 else: 173 # All non-survey programmes with ingested catalogues 174 allProgs = \ 175 [self.progTable.getAttr("dfsIdString", programmeID=progID) 176 for progID in self.progTable.getProgIDList(onlyNonSurvey=True)] 177 178 # Remove those that are non-registered 179 progNames = (set(allProgs) if self.cli.getOpt("non_registered") else 180 self.getRegisteredProgs().intersection(allProgs)) 181 182 if not self.cli.getOpt("all"): # Remove those already released 183 progNames -= set(self.archive.query("dfsIdString", 184 fromStr="Programme, Release", 185 whereStr="Programme.programmeID=Release.surveyID")) 186 187 progIDs = set(self.progTable.setProgID(progName) 188 for progName in progNames) 189 190 if self.cli.getOpt("exclude"): # Remove those requested 191 progIDs -= set(self.progTable.setProgID(prog) 192 for prog in csv.values(self.cli.getOpt("exclude"))) 193 194 # Remove any programmes not setup if releasing 195 if self.isReleaseRun: 196 progIDs.intersection_update(self.archive.query("programmeID", 197 "Programme, Survey", "Programme.programmeID=Survey.surveyID")) 198 199 # Remove deep surveys if requested 200 if self.isShallowOnly: 201 progIDs = set(progID for progID in progIDs 202 if not self.progTable.isDeep(programmeID=progID)) 203 204 # Remove those without observations in semester range. 205 for progID, firstNight in self.getFirstNights(progIDs): 206 if utils.makeDateTime(firstNight) > self.dateRange.end: 207 progIDs.discard(progID) 208 209 # Remove those that share observations with another programme (for now) 210 xSemProgs = set() 211 for progID in progIDs: 212 if self.archive.queryNumRows("ProgrammeFrame", 213 whereStr="multiframeID IN (%s)" 214 % SelectSQL("multiframeID", "ProgrammeFrame", 215 "programmeID=%s" % progID), 216 distinctAttr="programmeID") is not 1: 217 218 if self.cli.getOpt("progID") != "NONSURVEYS": 219 msg = ("<WARNING> %s (%s) is a cross-programme " 220 "non-survey. If its detections are not shared amongst " 221 "all programmeIDs then this script will fail to do the " 222 "right thing.") 223 else: 224 xSemProgs.add(progID) 225 msg = ("<Info> %s (%s) is a, presently " 226 "unsupported, cross-programme non-survey and so is " 227 "excluded from release.") 228 229 Logger.addMessage(msg 230 % (self.progTable.getAcronym(progID).upper(), progID)) 231 232 progIDs -= xSemProgs 233 234 # Determine if remaining programmes are all shallow 235 if not self.isShallowOnly and self.isReleaseRun: 236 self.isShallowOnly = \ 237 all(not self.progTable.isDeep(programmeID=progID) 238 for progID in progIDs) 239 240 return \ 241 [self.progTable.getAcronym(progID) for progID in sorted(progIDs)]
242 243 #-------------------------------------------------------------------------- 244
245 - def _moveToShare(self):
246 """ 247 Moves the file containing the list of deep products to ingest to the 248 share path of the server hosting the database where the products will 249 be stored. 250 251 """ 252 ingestDir = self.archive.sharePath("ingestLists") 253 utils.ensureDirExist(ingestDir) 254 for fileListPath in self.fileListForType.values(): 255 if os.path.exists(fileListPath): 256 ingestPath = \ 257 os.path.join(ingestDir, os.path.basename(fileListPath)) 258 259 if os.path.exists(ingestPath): 260 Logger.addMessage("<WARNING> %s already exists! Renaming " 261 "to %s." % (ingestPath, ingestPath + ".old")) 262 263 shutil.move(ingestPath, ingestPath + ".old") 264 265 shutil.move(fileListPath, ingestPath) 266 Logger.addMessage("<IMPORTANT> Please invoke parallel CUs 2-4 " 267 "on this file list: " + ingestPath)
268 269 #-------------------------------------------------------------------------- 270
271 - def _onCompletion(self):
272 """ Always executes after a run to log results and tidy up. 273 """ 274 if self.successes: 275 Logger.addMessage( 276 "These non-survey programmes have been successfully %s:\n%s" % 277 ("released" if self.isReleaseRun else "prepared", 278 '\n'.join(self.successes))) 279 280 if self.failures: 281 Logger.addMessage( 282 "These non-survey programmes failed at the given stages:\n%s" % 283 '\n'.join('%s at %s' % f for f in self.failures)) 284 285 # Wipe the temporary working directory 286 if self.workSpace: 287 try: 288 shutil.rmtree(self.workSpace) 289 except Exception as error: 290 Logger.addExceptionMessage(error)
291 292 #-------------------------------------------------------------------------- 293
294 - def _onRun(self):
295 """ Runs automatic curation and release for all programmes. 296 """ 297 Logger.addMessage("The following %s programmes will be prepared: %s" 298 % (len(self.progIDs), ', '.join(self.progIDs))) 299 300 # PART A - QC and set-up 301 if not self.isReleaseRun and not self.isDryRun: 302 try: 303 self._runPartA() 304 finally: 305 if not self.cli.getOpt('qc_only') and self.progIDs: 306 commitSchema(self.archive, "updated") 307 elif not self.cli.getOpt('qc_only'): 308 Logger.addMessage("<Info> Schema changes not committed " 309 "into SVN due to curation failure. If any changes exist " 310 "then these will have to be manually committed.") 311 312 self.isReleaseRun = not self.cli.getOpt('qc_only') \ 313 and (self.isReleaseRun or self.isShallowOnly) 314 315 # PART B - Release 316 if self.isReleaseRun and self.progIDs: 317 try: 318 self._runPartB() 319 finally: 320 # @@GOTCHA: Need to check progIDs again in case all failed 321 if self.progIDs and not self.isShallowOnly: 322 # @@GOTCHA: Automator modifies the cross-match criteria 323 Logger.addMessage("Updating schema documentation") 324 Logger.addMessage( 325 self.cli.getProgDetails("ProgrammeBuilder")) 326 327 for progID in self.progIDs: 328 pb = ProgrammeBuilder(progID, 329 database=self.cli.getArg("database"), 330 isTrialRun=True, 331 userName=self.cli.getOpt("user")) 332 333 pb.createSchemaOnly = True 334 pb.run() 335 del pb 336 337 commitSchema(self.archive, "updated")
338 339 #-------------------------------------------------------------------------- 340
341 - def _prepareData(self, progID):
342 """ 343 Performs part A auto-curation for given programme. 344 345 @param progID: Programme to curate, can be acronym or unique ID. 346 @type progID: str 347 348 """ 349 # Re-run QC to propagate detection deprecations for this prog 350 Logger.addMessage(self.cli.getProgDetails("NonSurveyDeprecator")) 351 NonSurveyDeprecator(programmeID=progID, cli=self.cli).run() 352 353 if not self.cli.getOpt('qc_only'): 354 # Setup requirements for this non-survey programme 355 self.stage = "Setup" 356 Logger.addMessage(self.cli.getProgDetails("ProgrammeBuilder")) 357 ProgrammeBuilder.numberStks = int(self.cli.getOpt("max_stacks")) 358 ProgrammeBuilder(progID, cli=self.cli).run() 359 360 # Create deep stacks if required otherwise curate stacks 361 self.stage = "Automated Curation (Part A)" 362 autoCurate(self.cli, progID, dateRange=self.dateRange, 363 redoProducts=[pType for pType in self.sysc.productTypes 364 if pType != 'mosaic'], 365 redoSources=True, fileListForType=self.fileListForType)
366 367 #-------------------------------------------------------------------------- 368
369 - def _releaseData(self, progID, relDate):
370 """ 371 Performs part B auto-curation and release for given programme. 372 373 @param progID: Programme to curate, can be acronym or unique ID. 374 @type progID: str 375 @param relDate: Date for release database name YYYYMMDD. 376 @type relDate: str 377 378 """ 379 self.stage = "Automated Curation (Part B)" 380 381 # Curate deep/synoptic data if available 382 isComplete = \ 383 autoCurate(self.cli, progID, dateRange=self.dateRange, doCu6=True) 384 385 if not isComplete: 386 raise CuSession.CuError("Programme %s is not ready for release." 387 % progID) 388 389 # Create release database (no overwrite to avoid mistakes) 390 self.stage = "CU19" 391 Logger.addMessage(self.cli.getProgDetails("CU19")) 392 progName = (progID.upper() if not progID.isdigit() else 393 self.progTable.getAcronym(int(progID)).upper()) 394 395 releaseServer = self.sysc.publicServers[0] 396 releaseDb = "%sv%s" % (progName, relDate) 397 Cu19(progID, releaseServer + '.' + releaseDb, 398 self.cli.getOpt("curator"), 'Release of ' + releaseDb, 399 self.cli.getArg("database"), self.cli.getOpt("test"), 400 self.cli.getOpt("user")).run()
401 402 #-------------------------------------------------------------------------- 403
404 - def _runPartA(self):
405 """ PART A - QC and set-up via automatic curation for each programme. 406 """ 407 self.stage = "QC" 408 # Quality control for all frame and multiframe data 409 Logger.addMessage(self.cli.getProgDetails("NonSurveyDeprecator")) 410 NonSurveyDeprecator(programmeID='NONSURVEYS', cli=self.cli).run() 411 412 self._toEachProg(self._prepareData) 413 414 Logger.addMessage("Non-survey release script part A " 415 "(quality control and setup) complete.") 416 417 isIngestRequired = any(os.path.exists(filePath) 418 for filePath in self.fileListForType.values()) 419 420 self.isShallowOnly = (not isIngestRequired 421 and "curation not started" not in map(itemgetter(0), self.failures)) 422 423 if self.isShallowOnly and self.progIDs: 424 self.successes = [] 425 Logger.addMessage( 426 "<Info> Only shallow surveys curated (no deep stacks created), " 427 "so continuing with part B of the release...") 428 429 elif isIngestRequired: 430 # Move deep stack file list to load server ready for ingest 431 self._moveToShare()
432 433 #-------------------------------------------------------------------------- 434
435 - def _runPartB(self):
436 """ PART B - Release following final automatic curation. 437 """ 438 if not self.isShallowOnly: 439 self.stage = "Provenance update for new deeps" 440 Logger.addMessage(self.cli.getProgDetails("ProvenanceFiller")) 441 ProvenanceFiller.combiFrameTypes = {'undef': ['deep%'], 442 'nOffsets': ['tiledeep%']} 443 ProvenanceFiller.isQuickRun = True 444 ProvenanceFiller(cli=self.cli).run() 445 446 self._toEachProg(self._releaseData, relDate=time.strftime('%Y%m%d')) 447 448 Logger.addMessage( 449 "Non-survey release script part B (data release) complete.")
450 451 #-------------------------------------------------------------------------- 452
453 - def _toEachProg(self, method, **kwds):
454 """ 455 Wrapper method to apply the given method to every programme in 456 self.progIDs, whilst handling exceptions, so that if one programme 457 fails its failure is logged and execution is continued with the next 458 programme. 459 460 @param method: Method to apply to each progID in self.progIDs. 461 @type method: instancemethod or function 462 @param kwds: Optional keyword argument list for the method. 463 @type kwds: dict 464 465 """ 466 for progID in self.progIDs: 467 try: 468 method(progID, **kwds) 469 except BaseException as error: 470 with utils.noInterrupt(): 471 CuSession.logException(error) 472 self.failures.append((progID, self.stage)) 473 Logger.addMessage( 474 "<ERROR> Curation for programme %s failed at stage %s." % 475 self.failures[-1]) 476 477 if isinstance(error, KeyboardInterrupt) \ 478 or str(error) == KeyboardInterrupt.__name__: 479 stage = "(curation not started)" 480 remains = self.progIDs[self.progIDs.index(progID) + 1:] 481 self.failures.extend((pID, stage) for pID in remains) 482 break 483 else: 484 self.successes.append(progID) 485 486 if self.failures: 487 self.progIDs = [progID for progID in self.progIDs 488 if progID not in map(itemgetter(0), self.failures)] 489 490 if not self.progIDs: 491 raise CuSession.CuError("All non-surveys failed!")
492 493 #-------------------------------------------------------------------------- 494
495 - def getFirstNights(self, progIDs):
496 """ 497 Database query to determine the first night of observation for given 498 progs. 499 500 @param progIDs: List of programme IDs to query. 501 @type progIDs: list(int) 502 503 @return: List of programmes with date of the first night of 504 observation. 505 @rtype: list(tuple(int, str)) 506 507 """ 508 if not progIDs: 509 return [] 510 511 return self.archive.query("programmeID, MIN(utDate)", 512 Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 513 "programmeID IN (%s) " % ','.join(map(str, progIDs)) + 514 "GROUP BY programmeID")
515 516 #-------------------------------------------------------------------------- 517
518 - def getRegisteredProgs(self):
519 """ @return: Set of registered non-survey programme dfsIdStrings. 520 @rtype: set(str) 521 """ 522 progNames = set() 523 for fileName in os.listdir(self.sysc.nonSurveyRegPath()): 524 if '.reg' in fileName: 525 path = self.sysc.nonSurveyRegPath(fileName) 526 for line in utils.ParsedFile(path): 527 if line.startswith("programme="): 528 progNames.add(line.replace("programme=", "")) 529 break 530 531 return progNames
532 533 #------------------------------------------------------------------------------ 534 # Entry point for script. 535 536 # Allow module to be imported as well as executed from the command line 537 if __name__ == "__main__": 538 # Define command-line interface settings for NonSurveyRelease 539 CLI.progArgs.remove('programmeID') 540 CLI.progArgs.append(CLI.Argument("end_date", "07A", isValOK=CLI.isDateOK)) 541 542 CLI.progOpts += [ 543 CLI.Option('a', "all", 544 "release all registered programmes not just newly registered ones"), 545 CLI.Option('d', "dry_run", 546 "just print list of programmes that will be curated"), 547 CLI.Option('m', "max_stacks", 548 "set the maximum number of components of a deep stack", 549 "NUMBER", str(ProgrammeBuilder.numberStks), 550 isValOK=lambda val: val.isdigit()), 551 CLI.Option('n', "non_registered", 552 "include non-registered programmes in release"), 553 CLI.Option('p', "progID", 554 "release just the non-survey data with these programme IDs", 555 "LIST", "NONSURVEYS"), 556 CLI.Option('q', "qc_only", 557 "just update the quality control deprecations for these data"), 558 CLI.Option('r', "release", 559 "create release databases (only run when quality controlled and setup)" 560 ), 561 CLI.Option('s', "shallow_only", 562 "release only shallow surveys (with option --release)"), 563 CLI.Option('x', "exclude", 564 "exclude these programme IDs", 565 "LIST")] 566 567 # Change default comment 568 CLI.progArgs['comment'] = "Preparing non-survey for release" 569 570 cli = CLI("NonSurveyRelease", "$Revision: 10022 $", __doc__) 571 Logger.isVerbose = cli.getOpt('verbose') 572 Logger.addMessage(cli.getProgDetails()) 573 574 if cli.getOpt("dry_run") and not cli.getOpt("test"): 575 exit("Please specify -t/--test when supplying -d/--dry_run") 576 577 if cli.getOpt("shallow_only") and not cli.getOpt("release"): 578 exit("Option -s/--shallow_only may only be used in combination with " 579 "option -r/--release") 580 581 if not any([cli.getOpt("shallow_only"), cli.getOpt("release"), 582 cli.getOpt("qc_only")]): 583 CLI.check64bitServer() 584 585 cal = SystemConstants(cli.getArg("database").split('.')[-1]).obsCal 586 try: 587 NonSurveyCurator.dateRange = cal.dateRange(end=cli.getArg("end_date")) 588 except Exception as error: 589 raise SystemExit("Illegal Option: " + str(error)) 590 591 NonSurveyDeprecator.endDate = NonSurveyCurator.dateRange.end 592 Cu19.dateRange = ProgrammeBuilder.dateRange = NonSurveyCurator.dateRange 593 594 NonSurveyCurator(cli).run() 595 596 #------------------------------------------------------------------------------ 597 # Change log: 598 # 599 # 14-Aug-2008, RSC: Original version. 600