Package invocations :: Package cu19 :: Module cu19
[hide private]

Source Code for Module invocations.cu19.cu19

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: cu19.py 10301 2014-05-01 14:10:57Z RossCollins $ 
   4  """ 
   5     Invoke CU19. Prepare release database for a given survey. Also incorporates 
   6     CUs 18 and 20 from the original design. 
   7   
   8     @author: R.S. Collins 
   9     @org:    WFAU, IfA, University of Edinburgh 
  10   
  11     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  12     @contributors: J. Bryant, N.C. Hambly, E. Sutorius 
  13   
  14     @todo: Resolve complication over whether PTS/CAL should be considered 
  15            non-surveys or not - it may well be easier to create survey like DBs 
  16            for them (though there is the consideration of deprecation 
  17            selections). 
  18     @todo: Create a Cu19._parseSchemas() sub-routine to tidy the code up a bit. 
  19     @todo: Provide Info message about largest table size. 
  20     @todo: Automatic determination of initial file-group file sizes. See trac 
  21            ticket:89 for implementation details. 
  22  """ 
  23  #------------------------------------------------------------------------------ 
  24  from __future__ import division, print_function 
  25   
  26  from   email.mime.text import MIMEText 
  27  import smtplib 
  28   
  29  from   wsatools.CLI                 import CLI 
  30  import wsatools.CSV                     as csv 
  31  import wsatools.DataFactory             as df 
  32  import wsatools.DbConnect.CommonQueries as queries 
  33  from   wsatools.DbConnect.CuSession import CuSession 
  34  import wsatools.DbConnect.DbConstants   as dbc 
  35  from   wsatools.DbConnect.DbSession import Database, DbSession, Join, odbc, \ 
  36                                             SelectSQL, bulkCopy 
  37  import wsatools.DbConnect.Schema        as schema 
  38  from   wsatools.Logger              import Logger, ForLoopMonitor 
  39  from   wsatools.SystemConstants     import DepCodes 
  40  import wsatools.Utilities               as utils 
  41  #------------------------------------------------------------------------------ 
  42   
43 -class Cu19(CuSession):
44 """ Prepare release database for a given survey. 45 """ 46 #-------------------------------------------------------------------------- 47 # Define class constants (access as Cu19.varName) 48 49 cuNum = 19 50 propPeriodOfGrace = 1 51 """ Additional proprietary period-of-grace in months. This constant defines 52 an additional time interval that is added to the nominal proprietary 53 period of proprietary data such that an allowance is made for the 54 time-lag between ingest (which is the currently assumed start of the 55 proprietary period, as opposed to the date of observation) and release. 56 This period of grace is currently set to one month, since this is of 57 order the time it will take between ingest and release of data. 58 """ 59 # Private class parameters - should not be altered 60 _autoCommit = True 61 62 #-------------------------------------------------------------------------- 63 # Define public member variable default values (access as obj.varName) 64 # these need to be set from command-line options 65 66 #: Append to pre-existing release databases of the same name? 67 appendDb = False 68 #: Observation date range to release. 69 dateRange = CuSession.sysc.obsCal.dateRange() 70 #: Tuple of programme acronyms for which detections should not be released. 71 excludeProgs = tuple() 72 #: CSV string of field IDs that only their data should be released. 73 fieldIDs = '' 74 #: Start copy from this table onward. 75 fromTable = None 76 #: Tuple of programme acronyms that only their detections will be copied. 77 insertProgs = tuple() 78 #: When inserting programme data, keep existing metadata? 79 keepMetadata = False 80 #: Overwrite pre-existing release databases of the same name? 81 overwriteDb = False 82 #: Resume from this chunk number for VVV detection outgests 83 resumeChunkNumber = 1 84 85 #-------------------------------------------------------------------------- 86 # Define private member variable default values (access prohibited) 87 88 _idxInfo = {} #: Parsed index information. 89 _isNonSurvey = True #: Is release of a non-survey propriety product? 90 _objectSchema = [] #: Parsed database objects schema. 91 _releaseDatabase = None #: Database object with released database options. 92 _releaseServer = None #: Server for release database. 93 _surveyID = 0 #: Unique ID of survey to release. 94 _tableList = [] #: Ordered list of tables to copy by table name. 95 _tableSchema = {} #: Parsed schemas dict referenced by table name. 96 _versionNum = 0 #: Release version number. 97 _workSpace = '' #: Temporary outgest work space volume. 98 99 #-------------------------------------------------------------------------- 100
101 - def __init__(self, surveyID, release=_releaseDatabase, 102 curator=CuSession.curator, 103 comment=CuSession.comment, 104 database=DbSession.database, 105 isTrialRun=DbSession.isTrialRun, 106 userName=DbSession.userName):
107 """ 108 Initialises member data and prepares database connection. 109 110 @param surveyID: The unique identifier for the survey to release, 111 or db name e.g. 1 for UKIDSS, 106 for TRANSIT. 112 @type surveyID: int or str 113 @param release: Optionally override default name of release 114 database. 115 @type release: str 116 @param curator: Name of curator. 117 @type curator: str 118 @param comment: Descriptive comment as to why curation task is 119 being performed. 120 @type comment: str 121 @param database: Name of the database to connect to. 122 @type database: str 123 @param isTrialRun: If True, do not perform database modifications, 124 just print the SQL statement to the terminal. 125 @type isTrialRun: bool 126 @param userName: Optionally override default database username. 127 @type userName: str 128 129 """ 130 # Initialise parent class 131 super(Cu19, self).__init__(curator=curator, comment=comment, 132 database=database, isTrialRun=isTrialRun, userName=userName) 133 134 # Determine survey from either surveyID or dbNamePrefix 135 surveyTable = df.Table('Survey', self.archive) 136 if type(surveyID) is int or surveyID.isdigit(): 137 try: 138 surveyTable.setCurRow(surveyID=int(surveyID)) 139 except ValueError: 140 Logger.addMessage("<ERROR> Survey ID %s does not exist. " 141 "Please select from %s" % (surveyID, 142 ', '.join(surveyTable.getAttr('dbNamePrefix')))) 143 raise SystemExit 144 else: 145 try: 146 surveyTable.setCurRow(dbNamePrefix=surveyID.upper()) 147 except ValueError: 148 Logger.addMessage("<ERROR> Survey %s does not exist. " 149 "Please select from %s" % (surveyID.upper(), 150 ', '.join(surveyTable.getAttr('dbNamePrefix')))) 151 raise SystemExit 152 153 Logger.addMessage("of survey: " + surveyTable.getAttr('name')) 154 155 self._surveyID = surveyTable.getAttr('surveyID') 156 self._isNonSurvey = (self._surveyID > dbc.nonSurveyProgrammeOffset() or 157 self._surveyID == 106 or self._surveyID == 200) 158 self._versionNum = self._getNextID('releaseNum', 'Release', 159 'surveyID=%s' % self._surveyID) 160 161 # Obtain the list of programmes that comprise the specified survey 162 self.programmeID = set(self.archive.query("programmeID", 163 "SurveyProgrammes", "surveyID=%s" % self._surveyID)) 164 165 if not self.programmeID: 166 Logger.addMessage("<ERROR> There are no programmes specified for " 167 "this survey in table SurveyProgrammes") 168 raise SystemExit 169 170 gap = "\n " 171 Logger.addMessage("The following programmes are part of this survey:" + 172 gap + gap.join(self.programme.getAttr('title', programmeID=pID) 173 for pID in self.programmeID)) 174 175 if release: 176 self._releaseServer, releaseDbName = (release.split('.') 177 if '.' in release else (self.archive.server, release)) 178 else: 179 self._releaseServer = self.archive.server 180 releaseDbName = surveyTable.getAttr('dbNamePrefix') 181 182 Logger.addMessage("Name of database to be released: " + releaseDbName) 183 184 # Define the database 185 if self._isNonSurvey: 186 # @note: Rob wants all non-survey databases on the same volume... 187 dbVolume = self.sysc.catServerVolumes(self._releaseServer)[0] 188 self._releaseDatabase = Database(releaseDbName, volumes=[dbVolume], 189 primarySize="100 MB", logSize="10 MB", filegrowth="25%") 190 191 Logger.addMessage("Will create database in " 192 + self._releaseDatabase._folders[0]) 193 else: 194 surveys = ' '.join(self.programme.getAcronym(progID) 195 for progID in self.programmeID) 196 197 filegroups = self.sysc.metadataFileGroups 198 filegroups += self.sysc.surveyFileGroups(surveys) 199 200 # @@TODO: should be automatically calculated 201 pSize, fgSize = (("4 GB", "40 GB") if self.archive.isLoadDb else 202 ("16 MB", "16 MB")) 203 204 if self.sysc.scienceProgs.get("VVV") in self.programmeID: 205 filegroups = [("vvvDetection", "520 GB"), 206 ("vvvSource", fgSize), 207 ("vvvCuration", fgSize), 208 ("vvvIndices", fgSize)] 209 else: 210 filegroups = list(zip(filegroups, [fgSize] * len(filegroups))) 211 212 Logger.addMessage("Filegroup size = " + fgSize) 213 self._releaseDatabase = Database(releaseDbName, primarySize=pSize, 214 volumes=self.sysc.catServerVolumes(self._releaseServer), 215 dbDir=releaseDbName, 216 filegroups=filegroups) 217 218 # if self.archive.server == "ramses12": 219 # Set temporary outgest work space volume to be the most free volume 220 adminDb = DbSession(self._releaseServer + '.' + self.sysc.adminDatabase, 221 autoCommit=True, isTrialRun=self.archive.isTrialRun, 222 userName=self.archive.userName) 223 224 self._workSpace = adminDb.uncPath(adminDb.getBestVolume()) 225 # self._workSpace = self.archive.getBestVolume() 226 Logger.addMessage("Using volume %s for temporary outgests" 227 % self._workSpace)
228 229 #-------------------------------------------------------------------------- 230
231 - def _onRun(self):
232 """ 233 Verify, freeze, backup and release database for a given survey. 234 235 """ 236 # Get all schema parsing and checking done ASAP to catch errors early 237 Logger.addMessage("Parsing schemas...") 238 239 # Obtain correct list of schemas for indices and database objects 240 self._idxInfo = schema.parseIndices(self.sysc.indexScript) 241 self._objectSchema = schema.parseFuncProcs(self.sysc.procScript) 242 if self._isNonSurvey: 243 self._idxInfo.update(schema.parseIndices(self.sysc.nsIndexScript)) 244 else: 245 self._objectSchema += ( 246 schema.parseFuncProcs(self.sysc.surveyProcScript) 247 + schema.parseViews(self.sysc.surveyViewScript)) 248 249 if self.sysc.isVSA(): 250 # Assume only one programme 251 progAcronym = \ 252 self.programme.getAcronym(list(self.programmeID)[0]).lower() 253 254 # Fix for poor design of VSA schema 255 self._objectSchema = [obj for obj in self._objectSchema 256 if "TilePawTDOnly" not in obj.name 257 or obj.name.startswith(progAcronym)] 258 259 if self.insertProgs: 260 progIDs = \ 261 set(self.programme.setProgID(prog) for prog in self.insertProgs) 262 263 self.insertProgs = tuple(self.programme.getAcronym(progID).upper() 264 for progID in progIDs) 265 Logger.addMessage( 266 "Only these programmes will be inserted into existing database: " 267 + ", ".join(self.insertProgs)) 268 else: 269 progIDs = utils.arbSort(self.programmeID, 270 [self.sysc.scienceProgs.get("GPS")], isFullKeySet=False) 271 272 # Parse excludeProgs into programme acronyms 273 self.excludeProgs = \ 274 tuple(self.programme.getAcronym(self.programme.setProgID(prog)) 275 for prog in self.excludeProgs) 276 277 # Obtain correct list of schemas for programme-specific tables, sorted 278 # so that GPS tables are the first to be copied. 279 parsedScripts = set([dbc.charDefault().upper(), 280 dbc.charDefault().lower()]) 281 tableSchema = [] 282 neighboursSchema = [] 283 for progID in progIDs: 284 self.programme.setCurRow(programmeID=progID) 285 286 # Programme script tables 287 progScript = self.programme.getSchemaScript() 288 if progScript not in parsedScripts: 289 tableSchema += schema.parseTables(progScript) 290 parsedScripts.add(progScript) 291 292 # Neighbour script tables 293 neighboursScript = self.programme.getAttr("neighboursSchema") 294 if neighboursScript not in parsedScripts: 295 neighboursSchema += schema.parseTables(neighboursScript) 296 parsedScripts.add(neighboursScript) 297 298 if self.programme.getAcronym() not in self.excludeProgs: 299 tableSchema.extend(table for table in neighboursSchema 300 if table.name.startswith(self.programme.getAcronym())) 301 302 if self.keepMetadata: # Already have all the tables we need to copy 303 # Create ordered list of tables to copy by table name 304 self._tableList = map(str, tableSchema) 305 306 # Obtain correct list of schemas for the common tables 307 tableSchema += schema.parseTables(self.sysc.metadataSchema()) 308 tableSchema += schema.parseTables(self.sysc.curationSchema()) 309 tableSchema += schema.parseTables(self.sysc.calibrationSchema()) 310 if not self.keepMetadata: # Include common tables in copy 311 # Create ordered list of tables to copy by table name 312 self._tableList = map(str, tableSchema) 313 314 # Store table schemas in a dictionary referenced by table name 315 self._tableSchema = dict((table.name, table) for table in tableSchema) 316 317 # Check database against schema 318 Logger.addMessage("Checking that the database schema is correct.") 319 try: 320 self.archive.checkSchema(tableSchema, releasedOnly=True) 321 except schema.MismatchError as error: 322 raise Cu19.CuError(error) 323 324 fromTable = self.fromTable 325 if fromTable and fromTable.startswith("vvvDetectionRaw"): 326 fromTable = "vvvDetection" 327 328 if fromTable and fromTable not in self._tableList: 329 raise Cu19.CuError("Unknown table: " + self.fromTable) 330 331 # Open administrative DB connection, with auto-commit set on to enable 332 # direct modifications unfettered by transaction support issues: 333 adminDb = DbSession(self._releaseServer + '.' + self.sysc.adminDatabase, 334 autoCommit=True, isTrialRun=self.archive.isTrialRun, 335 userName=self.archive.userName) 336 337 if self.appendDb: 338 Logger.addMessage( 339 "Updating database %s..." % self._releaseDatabase.name) 340 try: 341 self._releaseDatabase.attach(adminDb) 342 except odbc.DatabaseError as error: 343 if "activation error. The physical file name" in str(error): 344 raise Cu19.CuError( 345 "Cannot append to database %s as it does not exist." 346 % self._releaseDatabase.name) 347 raise 348 except Database.ExistsError: 349 pass 350 else: 351 Logger.addMessage( 352 "Creating database %s..." % self._releaseDatabase.name) 353 for _attempt in range(2): 354 try: 355 self._releaseDatabase.create(adminDb, self.overwriteDb) 356 except odbc.DatabaseError as error: 357 if "file names listed could not be created" in str(error) \ 358 and not self.overwriteDb: 359 try: 360 self._releaseDatabase.attach(adminDb) 361 except odbc.DatabaseError: 362 raise odbc.DatabaseError(error) 363 else: 364 raise 365 except Database.ExistsError as error: 366 raise Cu19.CuError(error) 367 else: 368 break 369 370 # Change some standard database options that were set on creation 371 # to options that are more efficient for bulk loading. 372 self._releaseDatabase.setBulkLoadOptions(adminDb) 373 374 # Add ProgrammeFrame indices to speed up outgests of detection tables 375 for index in self._idxInfo["ProgrammeFrame"]: 376 self.archive.addIndex(index) 377 378 self._copyOutData() 379 380 if self._releaseServer == self.archive.server: 381 self._releaseDatabase.setReleaseOptions(adminDb) 382 try: 383 self._releaseDatabase.detach(adminDb) 384 except odbc.DatabaseError as error: 385 Logger.addExceptionWarning(error) 386 Logger.addMessage( 387 "Database %s prepared, but still needs to be detached prior " 388 "to copy to the public server." % self._releaseDatabase.name) 389 else: 390 Logger.addMessage( 391 "Database %s prepared, detached and ready for copy to the " 392 "public server." % self._releaseDatabase.name) 393 394 else: 395 releaseServers = [self._releaseServer] 396 if self._releaseServer == "ramses13": 397 self._releaseDatabase.setReleaseOptions(adminDb) 398 else: 399 Logger.addMessage("Mirroring database...") 400 releaseServers.append(self._releaseDatabase.mirror(adminDb)) 401 Logger.addMessage("done.") 402 403 if self._isNonSurvey and self._surveyID not in (106, 200): 404 self._setUserNames(releaseServers) 405 406 if self.archive.isRealRun: 407 self._sendMail() # E-mail ETWS
408 409 #-------------------------------------------------------------------------- 410
411 - def _onException(self):
412 """ Try a roll-back on exception. 413 """ 414 super(Cu19, self)._onException() 415 # Don't update programme curation history on exception 416 self.programmeID = set()
417 418 #-------------------------------------------------------------------------- 419
420 - def _copyOutData(self):
421 """ Fills the release database with the correct data. 422 """ 423 # Open a connection to the release database 424 releaseDb = DbSession( 425 self._releaseServer + '.' + self._releaseDatabase.name, 426 autoCommit=True, isTrialRun=self.archive.isTrialRun, 427 userName=self.archive.userName) 428 429 if self.insertProgs: 430 # Must update metadata and curation tables too because of possible 431 # reprocessing or new frames, e.g. GPS difference images, 432 # UDS mosaic, and the ACHs etc. need updating. Existing tables 433 # removed by default in case of schema changes and for simplicity. 434 Logger.addMessage( 435 "Removing existing %s%s tables..." % (("metadata, curation and " 436 if not self.keepMetadata else ''), ", ".join(self.insertProgs))) 437 438 existingTables = [self._tableSchema[table] 439 for table in self._tableList if releaseDb.existsTable(table)] 440 441 releaseDb.dropObjects(existingTables) # No FKs in release tables 442 443 if self.appendDb: 444 if self.fromTable and (not self.fromTable.endswith("Detection") 445 and not self.fromTable.endswith("Source") or 446 'X' in self.fromTable) and releaseDb.existsTable(self.fromTable): 447 Logger.addMessage( 448 "Removing existing %s table..." % self.fromTable) 449 releaseDb.dropObjects([self._tableSchema[self.fromTable]]) 450 451 # Edit table schema list to just copy missing tables 452 Logger.addMessage( 453 "Determining list of remaining tables to copy...") 454 existingTables = [table for table in self._tableList 455 if releaseDb.existsTable(table)] 456 457 fromTable = self.fromTable 458 if fromTable and fromTable.startswith("vvvDetectionRaw"): 459 fromTable = "vvvDetection" 460 461 for table in existingTables: 462 if table != fromTable: 463 self._tableList.remove(table) 464 465 self._copyOutTableData(releaseDb) 466 467 # This is just done for aesthetic reasons 468 Logger.addMessage("Resetting empty catalogue filenames to default...") 469 releaseDb.update("Multiframe", "catName=%r" % dbc.charDefault(), 470 where="catName LIKE '%empty%'") 471 472 Logger.addMessage("Creating any views and stored db functions...") 473 if not releaseDb.createObjects(self._objectSchema, ignoreNS=True, 474 releasedOnly=True): 475 msg = "Not all views and functions could be created!" 476 if not self.overwriteDb: 477 Logger.addMessage("<Info> %s This may just be because they " 478 "already exist." % msg) 479 else: 480 Logger.addMessage("<WARNING> " + msg) 481 482 Logger.addMessage("Updating release history with this row:") 483 row = (self._surveyID, self._versionNum, utils.makeMssqlTimeStamp(), 484 self.comment, self._releaseDatabase.name) 485 Logger.addMessage(repr(row)) 486 releaseDb.insertData(dbc.releaseTableName(), row) 487 self.archive.insertData(dbc.releaseTableName(), row) 488 489 # Update index statistics - not sure if this is needed. Sometimes 490 # indices weren't being used prior to this running. 491 releaseDb.updateStatistics() 492 if not self._isNonSurvey or self._surveyID in (106, 200): 493 releaseDb.grantAccess("wsaro" if self.sysc.isWSA() else 494 self.programme.getAcronym().lower() + "ro")
495 496 #-------------------------------------------------------------------------- 497
498 - def _getDateRangeSQL(self):
499 """ Returns the date range selection clause for this survey release. 500 """ 501 return " AND (utDate BETWEEN '%s' AND '%s'" % self.dateRange \ 502 + " OR utDate='%s')" % self.sysc.obsCal.lateDefaultDateTime
503 504 #-------------------------------------------------------------------------- 505
506 - def _getDeprecationClause(self):
507 """ Returns the deprecation selection clause for this survey release. 508 """ 509 # NB: If want to exc. dep. dets. add deprecated != DepCodes.propDepDet 510 511 return ("deprecated<%s" % DepCodes.reprocCASU 512 if self._isNonSurvey or not self.sysc.isWSA() else 513 "deprecated IN (%s)" % ', '.join(map(str, [DepCodes.nonDep, 514 DepCodes.deepStackOnly, DepCodes.poorPhotom, DepCodes.reprocAltn, 515 DepCodes.extProdComp])))
516 517 #-------------------------------------------------------------------------- 518
519 - def _copyOutTableData(self, releaseDb):
520 """ 521 Copies out release table data into the new release db. To replicate all 522 necessary load database objects and data that are relevant to that 523 release. Called within the context of the release DB, having first 524 created the release database to receive the replicated data. 525 526 @param releaseDb: A connection to the database to be released. 527 @type releaseDb: DbSession 528 529 @note: Uses BCP native binary outgests and ingests in preference to the 530 MS SQL Server T-SQL statement SELECT INTO, as performance tests 531 have revealed this to be much faster. 532 """ 533 # Common selection terms 534 dateRange = self._getDateRangeSQL() 535 depClause = self._getDeprecationClause() 536 progIDs = csv.join(self.programmeID) 537 if not self.sysc.isWSA(): 538 # Include calibration metadata (not catalogues) for VISTA & OSA 539 progIDs += ",%s" % dbc.calibrationProgrammeID() 540 541 progClause = "programmeID IN (%s)" % progIDs 542 543 if self.fieldIDs: 544 if len(self.programmeID) > 1: 545 raise Cu19.CuError("The fieldIDs option cannot be applied to " 546 "multi-programme surveys") 547 548 releaseMfs = queries.getMultiframes(self.programme, self.dateRange, 549 self.fieldIDs, depClause) 550 551 sourceIDs = queries.getSourceIDs(self.programme, self.fieldIDs) 552 fieldFrameSets = \ 553 queries.getFrameSetIDs(self.programme, self.fieldIDs) 554 555 # Include calibration frames and default row 556 calMfs = SelectSQL("multiframeID", 557 table=self.archive.tablePath("ProgrammeFrame"), 558 where="programmeID=%s OR multiframeID=%s" 559 % (dbc.calibrationProgrammeID(), dbc.intDefault())) 560 561 else: 562 progMfs = SelectSQL("multiframeID", 563 table=self.archive.tablePath("ProgrammeFrame"), 564 where=progClause + " OR multiframeID=%s" % dbc.intDefault()) 565 566 releaseMfs = SelectSQL("multiframeID", 567 table=self.archive.tablePath("Multiframe"), 568 where="multiframeID IN (%s) AND %s%s" % 569 (progMfs, depClause, dateRange)) 570 571 releaseTables = [table for table in self._tableList 572 if self._tableSchema[table].releasable] 573 574 Logger.addMessage("Copying %s tables into database %s" % 575 (len(releaseTables), releaseDb.database)) 576 577 # Bulk copy into release db for every released table 578 for count, tableName in enumerate(releaseTables): 579 Logger.addMessage("Outgesting %s/%s %s" 580 % (count + 1, len(releaseTables), tableName)) 581 582 tableSchema = self._tableSchema[tableName] 583 if self._isNonSurvey: 584 tableSchema.fileGroup = '' 585 586 #### Select data to be outgest #### 587 # The simplest case (copy everything in PK order): 588 columns = '*' 589 srcTable = self.archive.tablePath(tableName) 590 whereStr = '' 591 whereChunks = [] 592 orderBy = tableSchema.primaryKey() + " ASC" 593 594 # Tables that can join with Multiframe must do so to select the 595 # desired date range, those with programmeIDs must be 596 # restricted to just the data from survey programmes, and 597 # finally only non-deprecated data must be copied out: 598 599 isDetTable = \ 600 tableName.endswith("Detection") and 'X' not in tableName 601 602 isVVVDetTable = (isDetTable and not self.fieldIDs 603 and self.sysc.scienceProgs.get("VVV") in self.programmeID) 604 605 if isDetTable and not isVVVDetTable: 606 if self.fieldIDs: 607 whereStr, whereChunks = \ 608 self._splitOutgest(releaseDb, tableSchema, 609 whereStr="(multiframeID=%s OR multiframeID IN (%s))" \ 610 % (dbc.intDefault(), releaseMfs), 611 reqDefRow=True) 612 613 else: 614 columns, srcTable, whereStr, orderBy, detProgs = \ 615 self._joinDetectionTables(tableName) 616 617 pkJoin = Join([tableName + "Raw", "Multiframe", 618 "MultiframeDetector", "ProgrammeFrame"], 619 ["multiframeID"]) 620 621 pkWhere = "%sRaw.extNum=MultiframeDetector.extNum AND " \ 622 "MultiframeDetector.%s%s AND (programmeID IN (%s) OR " \ 623 "ProgrammeFrame.multiframeID=%s)" \ 624 % (tableName, depClause, dateRange, csv.join(detProgs), 625 dbc.intDefault()) 626 627 whereStr, whereChunks = \ 628 self._splitOutgest(releaseDb, tableSchema, whereStr, 629 specialJoin=(pkJoin, pkWhere, tableName + "Raw", "R"), 630 reqDefRow=True) 631 632 elif tableName.endswith("Source") and "Synoptic" not in tableName \ 633 and 'X' not in tableName: 634 635 if tableName.startswith(self.excludeProgs): 636 whereStr = "sourceID=0" 637 else: 638 if self.fieldIDs: 639 whereStr = "frameSetID IN (%s)" % fieldFrameSets 640 641 whereStr, whereChunks = \ 642 self._splitOutgest(releaseDb, tableSchema, whereStr) 643 644 elif self.fieldIDs \ 645 and tableName.endswith(("Variability", "VarFrameSetInfo")): 646 647 whereStr = "frameSetID IN (%s)" % fieldFrameSets 648 649 elif tableName.endswith("SynopticSource") and 'X' not in tableName: 650 651 if self.fieldIDs: 652 whereStr = "synFrameSetID IN (%s)" \ 653 % queries.getSynFrameSetIDs(self.programme, 654 self.dateRange, self.fieldIDs) 655 656 whereStr, whereChunks = \ 657 self._splitOutgest(releaseDb, tableSchema, whereStr) 658 659 elif self.fieldIDs and "BestMatch" in tableName: 660 661 whereStr = "sourceID IN (%s)" % sourceIDs 662 663 elif tableName.endswith("MergeLog"): 664 665 whereStr = depClause 666 if self.fieldIDs and "Synoptic" not in tableName: 667 whereStr += " AND frameSetID IN (%s)" % fieldFrameSets 668 669 elif tableName.endswith("TileSet"): 670 671 whereStr = "tlmfID IN (%s)" % releaseMfs 672 673 elif tableName.endswith("TilePawPrints"): 674 675 tileSetTable = \ 676 self.archive.tablePath(tableName.replace("PawPrints", "Set")) 677 678 whereStr = "tileSetID IN (%s)" % SelectSQL( 679 select="tileSetID", 680 table=tileSetTable, 681 where="tlmfID IN (%s)" % releaseMfs) 682 683 elif tableName == "Multiframe": 684 685 whereStr = (releaseMfs.whereStr if not self.fieldIDs else 686 "(multiframeID IN (%s) OR multiframeID IN (%s))" 687 % (releaseMfs, calMfs)) 688 689 elif tableName in ("MultiframeDetector", "MultiframeEsoKeys", 690 "FlatFileLookUp"): 691 692 alias = "This" 693 columns = alias + ".*" 694 srcTable = Join( 695 [(self.archive.tablePath(tableName), alias), 696 (self.archive.tablePath("Multiframe"), 'M')], 697 ["multiframeID"]) 698 699 depAlias = alias if tableName == "MultiframeDetector" else "M" 700 whereStr = "%s.%s" % (depAlias, depClause) 701 if not self.fieldIDs: 702 whereStr += " AND M.multiframeID IN (%s)%s" \ 703 % (progMfs, dateRange) 704 else: 705 whereStr += " AND (M.multiframeID IN (%s) OR "\ 706 "M.multiframeID IN (%s))" % (releaseMfs, calMfs) 707 708 orderBy = tableSchema.primaryKey(alias) + " ASC" 709 710 elif tableName in ("CurrentAstrometry", "PreviousMFDZP", 711 "MultiframeDetectorEsoKeys") \ 712 or tableName.endswith("AstrometricInfo"): 713 714 alias = "This" 715 columns = alias + ".*" 716 srcTable = Join( 717 [(self.archive.tablePath(tableName), alias), 718 (self.archive.tablePath("Multiframe"), 'M'), 719 (self.archive.tablePath("MultiframeDetector"), 'D')], 720 ["multiframeID"]) 721 722 whereStr = "%s.extNum=D.extNum AND D.%s" % (alias, depClause) 723 if not self.fieldIDs: 724 whereStr += " AND M.multiframeID IN (%s)%s" \ 725 % (progMfs, dateRange) 726 else: 727 whereStr += " AND (M.multiframeID IN (%s) OR "\ 728 "M.multiframeID IN (%s))" % (releaseMfs, calMfs) 729 730 orderBy = tableSchema.primaryKey(alias) + " ASC" 731 732 elif tableName == "ArchiveCurationHistory": 733 734 whereStr = "rolledBack=0 AND cuEventID IN (%s)" % SelectSQL( 735 select="cuEventID", 736 table=self.archive.tablePath("ProgrammeCurationHistory"), 737 where=progClause) 738 739 elif tableName.startswith("Survey") or tableName == "Release": 740 741 whereStr = "surveyID=%s" % self._surveyID 742 743 elif tableName == "Provenance": 744 745 if not self.fieldIDs: 746 whereStr = "combiframeID IN (%s) AND multiframeID IN (%s)"\ 747 % (releaseMfs, releaseMfs) 748 else: 749 whereStr = "(combiframeID IN (%s) OR combiframeID IN (%s))"\ 750 " AND (multiframeID IN (%s) OR multiframeID IN (%s))"\ 751 % (releaseMfs, calMfs, releaseMfs, calMfs) 752 753 elif "Programme" in tableName or tableName.startswith("Required")\ 754 or tableName == "ProductLinks": 755 756 whereStr = progClause 757 if tableName == "Programme": 758 whereStr += " OR programmeID = %s" % dbc.intDefault() 759 elif tableName == "ProgrammeFrame": 760 whereStr = "multiframeID IN (%s)" % releaseMfs 761 if self.fieldIDs: 762 whereStr += " OR " + calMfs.whereStr 763 764 elif self.fieldIDs and any(phrase in tableName 765 for phrase in ['X', "Neighbours"]): 766 767 whereStr = "masterObjID IN (%s)" % sourceIDs 768 769 progress = ForLoopMonitor(whereChunks) 770 chunks = (whereChunks or [whereStr]) if not isVVVDetTable else [] 771 for whereStr in chunks: 772 progress.preMessage("Outgesting chunk") 773 774 bulkCopy(query=SelectSQL(columns, srcTable, whereStr, orderBy), 775 tableSchema=tableSchema, 776 fromDb=self.archive, 777 toDb=releaseDb, 778 fileTag=self.shareFileID, 779 isSmallTable=not isDetTable) # @@GOTCHA: Even small 780 # detection table outgests, without whereChunks have failed 781 # with this turned on, so always turn off now. 782 783 if isVVVDetTable: 784 # Have to do it differently via monthly-split detection tables 785 self._bulkCopyVVVDetTable(tableSchema, releaseDb) 786 787 # Create indices 788 for index in self._idxInfo[tableName]: 789 if tableName.startswith("vvv"): 790 releaseDb.goOffline() # Reset DB connection to speed VVV 791 792 releaseDb.addIndex(index, ignoreNS=True, releasedOnly=True, 793 usingDefaultFG=self._isNonSurvey) 794 795 # Create statistics 796 releaseDb.createStatistics([tableSchema])
797 798 #-------------------------------------------------------------------------- 799
800 - def _bulkCopyVVVDetTable(self, tableSchema, releaseDb):
801 """ Copy VVV detection table by individual monthly table chunks. 802 """ 803 rawColumns = [str(column) for column 804 in self._tableSchema[tableSchema.name + "Raw"].columns] 805 806 rawAlias = 'R' 807 columns = ', '.join(('%s.%s' % (rawAlias, column) 808 if str(column) in rawColumns else str(column)) 809 for column in tableSchema.columns) 810 811 joinStr = ("D.extNum=R.extNum AND A.extNum=R.extNum AND " 812 "P.extNum=R.extNum AND A.seqNum=R.seqNum AND P.seqNum=R.seqNum" 813 " AND D." + self._getDeprecationClause()) 814 815 lastMonthTable = \ 816 self.programme.getMonthlyDetectionTables(self.dateRange, 817 excludeDeeps=True)[-1] 818 819 detTables = self.programme.getMonthlyDetectionTables(self.dateRange) 820 if self.fromTable and self.fromTable in detTables: 821 detTables = detTables[detTables.index(self.fromTable):] 822 823 progress = ForLoopMonitor(detTables) 824 for detTable in detTables: 825 progress.preMessage("Outgesting %s table" % detTable) 826 srcTables = [ 827 (self.archive.tablePath(detTable), rawAlias), 828 (self.archive.tablePath(detTable.replace("Raw", "Astrometry")), 829 'A'), 830 (self.archive.tablePath(detTable.replace("Raw", "Photometry")), 831 'P'), 832 (self.archive.tablePath("MultiframeDetector"), 'D')] 833 834 pkSrcTables = [detTable, "MultiframeDetector"] 835 pkWhere = detTable + ".extNum=MultiframeDetector.extNum AND " \ 836 "MultiframeDetector." + self._getDeprecationClause() 837 838 whereStr = joinStr 839 if detTable == lastMonthTable: 840 endDateClause = " AND utDate <= '%s'" % self.dateRange.end 841 srcTables.append((self.archive.tablePath("Multiframe"), 'M')) 842 whereStr += endDateClause 843 pkSrcTables.append("Multiframe") 844 pkWhere += endDateClause 845 846 pkJoin = Join(pkSrcTables, "multiframeID") 847 chunkOffset = self.resumeChunkNumber - 1 848 849 whereStr, whereChunks = \ 850 self._splitOutgest(releaseDb, tableSchema, whereStr, 851 specialJoin=(pkJoin, pkWhere, detTable, "R"), 852 isResumable=False) 853 854 chunkProgress = ForLoopMonitor(whereChunks) 855 for whereStr in (whereChunks or [whereStr]): 856 chunkProgress.preMessage("Outgesting chunk", chunkOffset) 857 858 query = SelectSQL(columns, Join(srcTables, "multiframeID"), 859 whereStr, orderBy=tableSchema.primaryKey(rawAlias)) 860 861 bulkCopy(query, tableSchema, self.archive, releaseDb, 862 self.shareFileID, drive=self._workSpace, 863 isSmallTable=False) 864 865 # @TODO: Why not let splitOutgest do that through reqDefRow option? 866 # Insert default row 867 defs = (column.getDefaultValue() for column in tableSchema.columns) 868 releaseDb.insertData(tableSchema, defs)
869 870 #-------------------------------------------------------------------------- 871
872 - def _joinDetectionTables(self, tableName):
873 """ 874 Prepares the necessary query to join the split detection tables to 875 create the given detection table. 876 877 """ 878 joinAlias = 'R' 879 joinList = [(self._tableSchema[tableName + "Raw"], joinAlias), 880 (self._tableSchema[tableName + "Astrometry"], 'A'), 881 (self._tableSchema[tableName + "Photometry"], 'P'), 882 (self._tableSchema["Multiframe"], 'M'), 883 (self._tableSchema["MultiframeDetector"], 'D'), 884 (self._tableSchema["ProgrammeFrame"], 'F')] 885 886 commonCols = utils.getDuplicates(utils.unpackList( 887 map(str, table.columns) for table, _alias in joinList)) 888 889 columns = ', '.join(('%s.%s' % (joinAlias, column) 890 if str(column) in commonCols else str(column)) 891 for column in self._tableSchema[tableName].columns) 892 893 srcTable = Join([(self.archive.tablePath(table.name), alias) 894 for table, alias in joinList], 895 ["multiframeID"]) 896 897 # Where different programmes share the same detection table, 898 # e.g. science verification, this is an important criterion. 899 detProgs = (self.programmeID.intersection( 900 self.programme.getProgIDsfromTable(tableName)) 901 if not tableName.startswith(self.excludeProgs) else [0]) 902 903 whereStr = ("D.extNum=R.extNum AND A.extNum=R.extNum AND " 904 "P.extNum=R.extNum AND A.seqNum=R.seqNum AND P.seqNum=R.seqNum AND " 905 "D.%s AND (F.multiframeID=%s OR programmeID IN (%s))%s" % 906 (self._getDeprecationClause(), dbc.intDefault(), csv.join(detProgs), 907 self._getDateRangeSQL())) 908 909 orderBy = self._tableSchema[tableName].primaryKey(joinAlias) + " ASC" 910 911 return columns, srcTable, whereStr, orderBy, detProgs
912 913 #-------------------------------------------------------------------------- 914
915 - def _sendMail(self):
916 """ Sends e-mail notification to ETWS about successful release. 917 """ 918 text = '%s at %s by %s: "%s"\n' % (self._releaseDatabase, 919 utils.makeMssqlTimeStamp(), self.curator, self.comment) 920 921 msg = MIMEText(text, 'plain') 922 msg['Subject'] = "DB Release" 923 msg['From'] = "etws@roe.ac.uk" 924 msg['To'] = "etws@roe.ac.uk" 925 s = smtplib.SMTP('localhost') 926 s.sendmail("etws@roe.ac.uk", ["etws@roe.ac.uk"], msg.as_string()) 927 s.quit()
928 929 #-------------------------------------------------------------------------- 930
931 - def _setUserNames(self, servers):
932 """ 933 Sets up user name permissions on given list of servers. 934 935 @param servers: List of server names. 936 @type servers: sequence(str) 937 938 """ 939 for server in servers: 940 Logger.addMessage("Setting up user names on " + server) 941 db = DbSession(server + '.' + self._releaseDatabase.name, 942 autoCommit=True, isTrialRun=self.archive.isTrialRun, 943 userName=self.archive.userName) 944 945 surveyName = self._releaseDatabase.name.split('v')[0].lower() 946 nsUserName = surveyName + "ro" 947 nsPassword = nsUserName + "pw" 948 db.createUser(nsUserName, nsPassword) 949 db.grantAccess("userAdmin")
950 951 #-------------------------------------------------------------------------- 952
953 - def _splitOutgest(self, releaseDb, table, whereStr='', specialJoin=None, 954 reqDefRow=False, isResumable=True):
955 """ 956 Splits the outgest of the given table over several chunks defined by 957 a where clause. NB: Try to avoid the iResumable=False option in a 958 production environment as it is not a reliable way to carry out 959 operations. 960 961 """ 962 # Check to see how much has been ingested already 963 pk = csv.values(table.primaryKey())[0] 964 maxPK = (0 if not isResumable or not releaseDb.existsTable(table.name) 965 else releaseDb.queryAttrMax(pk, table.name)) or 0 966 967 pkRemaining = "%s>%s" % (pk, maxPK) 968 if whereStr: 969 originalWhereStr = whereStr 970 whereStr += " AND " 971 972 if not specialJoin: 973 srcTable = self.archive.tablePath(table) 974 pkWhere = whereStr + pkRemaining 975 groupBy = pk 976 else: 977 srcTable, pkWhere, pkTable, pkAlias = specialJoin 978 pkWhere += " AND " + pkTable + '.' + pkRemaining 979 groupBy = pkTable + '.' + pk 980 pk = pkAlias + '.' + pk 981 pkRemaining = pkAlias + '.' + pkRemaining 982 983 # Break whereStr up to produce chunks of <= 100 M rows, larger values 984 # produce weird query plans... 985 chunkSize = 1e8 986 if ',' in table.primaryKey(): 987 pkCounts = self.archive.queryNumRows(srcTable, pkWhere, groupBy) 988 # @TODO: Experiment with changing pkCounts to query MFD for det tables 989 pkRanges = utils.groupByCounts(pkCounts, groupSize=chunkSize) 990 else: 991 # @TODO: Make this code more elegant. 992 pkQuery = "MAX(%s) AS pkMax, MIN(%s) AS pkMin" % (pk, pk) 993 pkRange = self.archive.query(pkQuery, srcTable, pkWhere)[0] 994 pkRanges = [] 995 curPk = pkRange.pkMin 996 while True: 997 nextPk = curPk + chunkSize - 1 998 if nextPk >= pkRange.pkMax: 999 pkRanges.append((curPk, pkRange.pkMax)) 1000 break 1001 pkRanges.append((curPk, nextPk)) 1002 curPk = nextPk + 1 1003 1004 if len(pkRanges) is 1: 1005 pkRanges = [] 1006 1007 whereChunks = [] 1008 if pkRanges: 1009 if not isResumable: 1010 pkRanges = pkRanges[self.resumeChunkNumber - 1:] 1011 1012 # Revert to default value after first table is resumed 1013 self.resumeChunkNumber = Cu19.resumeChunkNumber 1014 1015 whereChunks = [whereStr + "%s BETWEEN %s AND %s" 1016 % (pk, pkMin, pkMax) for pkMin, pkMax in pkRanges] 1017 1018 if reqDefRow and not maxPK: 1019 # Copying by chunks skips out default row, so need to 1020 # put that in first manually so that partial re-runs 1021 # don't try to re-copy the default row. 1022 releaseDb.createObjects([table]) 1023 releaseDb.insertData(table.name, 1024 [c.getDefaultValue() for c in table.columns]) 1025 1026 elif maxPK: 1027 whereStr += pkRemaining 1028 1029 elif whereStr: # No splitting necessary 1030 whereStr = originalWhereStr 1031 1032 return whereStr, whereChunks
1033 1034 #------------------------------------------------------------------------------ 1035 # Entry point for script. 1036 1037 # Allow module to be imported as well as executed from the command line 1038 if __name__ == '__main__': 1039 # Define additional command-line interface options for Cu19 1040 CLI.progArgs += [ 1041 CLI.Argument("surveyID", "U05A100"), 1042 CLI.Argument("begin_date", "05A", isValOK=CLI.isDateOK), 1043 CLI.Argument("end_date", "05A", isValOK=CLI.isDateOK)] 1044 1045 CLI.progOpts += [ 1046 CLI.Option('a', "append", 1047 "append tables to any pre-existing release database of the same name"), 1048 CLI.Option('f', "from", 1049 "continue appending to existing database from this table", 1050 "NAME"), 1051 CLI.Option('i', "insert", 1052 "copy only survey detection data for these programmes, e.g. UDS", 1053 "LIST"), 1054 CLI.Option('k', "keep_metadata", 1055 "when inserting keep existing metadata"), 1056 CLI.Option('l', "fields", 1057 "release data from these fieldIDs only", 1058 "LIST"), 1059 CLI.Option('n', "number", 1060 "resume from this chunk number (VVV detection outgests only)", 1061 "NUMBER", str(Cu19.resumeChunkNumber), 1062 isValOK=lambda val: val.isdigit() and int(val) > 0), 1063 CLI.Option('o', "overwrite", 1064 "overwrite any pre-existing release database of the same name"), 1065 CLI.Option('r', "release", 1066 "release database name e.g. UKIDSSDR1PLUS", 1067 "NAME"), 1068 CLI.Option('v', "verbose", 1069 "log additional verbose debug messages"), 1070 CLI.Option('x', "exclude", 1071 "exclude detections from these programmes, e.g. GPS", 1072 "LIST")] 1073 1074 cli = CLI(Cu19, "$Revision: 10301 $") 1075 Logger.isVerbose = cli.getOpt("verbose") 1076 Logger.addMessage(cli.getProgDetails()) 1077 1078 cu19 = Cu19(cli.getArg("surveyID").replace('/', ''), cli.getOpt("release"), 1079 cli.getOpt("curator"), cli.getArg("comment"), 1080 cli.getArg("database"), cli.getOpt("test"), cli.getOpt("user")) 1081 1082 cu19.appendDb = cli.getOpt("append") 1083 if not cu19.appendDb and (cli.getOpt("insert") or cli.getOpt("from")): 1084 cu19.appendDb = True 1085 1086 try: 1087 cu19.dateRange = cu19.sysc.obsCal.dateRange(cli.getArg("begin_date"), 1088 cli.getArg("end_date")) 1089 except Exception as error: 1090 eType = "Invalid Option" 1091 Logger.addExceptionMessage(error, eType) 1092 raise SystemExit(eType + ": see log " + cu19._log.pathName) 1093 1094 if cli.getOpt("exclude"): 1095 cu19.excludeProgs = csv.values(cli.getOpt("exclude")) 1096 1097 cu19.fieldIDs = cli.getOpt("fields") 1098 cu19.fromTable = cli.getOpt("from") 1099 cu19.resumeChunkNumber = int(cli.getOpt("number")) 1100 if cli.getOpt("insert") and not cli.getOpt("from"): 1101 # not "from" avoids a particular mistake in option choices 1102 cu19.insertProgs = csv.values(cli.getOpt("insert")) 1103 cu19.keepMetadata = cli.getOpt("keep_metadata") 1104 1105 cu19.overwriteDb = cli.getOpt("overwrite") and not cu19.appendDb 1106 cu19.run() 1107 1108 #------------------------------------------------------------------------------ 1109