Package invocations :: Package cu16 :: Module cu16
[hide private]

Source Code for Module invocations.cu16.cu16

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: cu16.py 10018 2013-08-22 13:45:52Z RossCollins $ 
  4  """ 
  5     Invoke CU16. Create neighbour and default join tables with external 
  6     catalogues for a given programme. 
  7   
  8     @note: Load server must have an active SQL link to external catalogue 
  9            servers (for Sloan DSS cross-match, and if a new external catalogue 
 10            has been included since the last run of this script) this vital as we 
 11            outgest to load server share directory from external catalogue 
 12            server, and need SQL link to do a cross-query with Sloan DSS. 
 13   
 14     @author: R.S. Collins 
 15     @org:    WFAU, IfA, University of Edinburgh 
 16  """ 
 17  #------------------------------------------------------------------------------ 
 18  from   __future__ import division, print_function 
 19  from   itertools  import dropwhile 
 20  from   operator   import attrgetter 
 21  import os 
 22   
 23  from   wsatools.CLI                 import CLI 
 24  import wsatools.DataFactory             as df 
 25  import wsatools.DbConnect.CommonQueries as queries 
 26  from   wsatools.DbConnect.CuSession import CuSession 
 27  import wsatools.DbConnect.DbConstants   as dbc 
 28  from   wsatools.DbConnect.DbSession import DbSession, Ingester, Join, \ 
 29                                             Outgester, SelectSQL 
 30  import wsatools.DbConnect.Schema        as schema 
 31  import wsatools.ExternalProcess         as extp 
 32  from   wsatools.Logger              import ForLoopMonitor, Logger 
 33  from   wsatools.ProgrammeBuilder    import SynopticSetup 
 34  from   wsatools.SystemConstants     import DepCodes, SystemConstants 
 35  import wsatools.Utilities               as utils 
 36  #------------------------------------------------------------------------------ 
 37   
38 -class Cu16(CuSession):
39 """ 40 Create neighbour and default join tables with external catalogues for a 41 given programme. 42 43 """ 44 #-------------------------------------------------------------------------- 45 # Class parameters - should not be altered 46 cuNum = 16 # Overrides CuSession default 47 _autoCommit = True # Overrides CuSession default 48 _isPersistent = True # Overrides CuSession default 49 tempDetTable = "vvvDetectionTemp" # VVV requires temporary detection table 50 51 #-------------------------------------------------------------------------- 52 # Public member variables (with defaults) - set from command-line options 53 54 #: Detections should be cross-matched only for this date range. 55 dateRange = CuSession.sysc.obsCal.dateRange() 56 #: Enable neighbour table ingest? 57 enableIngest = not CLI.getOptDef('test') 58 #: Optional specific external survey for which to create neighbour tables. 59 extSurveyID = None 60 #: Optionally continue from this neighbour table onwards. 61 fromTableName = '' 62 #: Database from which to outgest data. 63 outgestDb = '' 64 #: Optional specific neighbour table to create. 65 soleTableName = '' 66 67 #-------------------------------------------------------------------------- 68 # Private member variables - initialised by class 69 70 #: A DbSession.Ingester object for performing ingests. 71 _ingester = None 72 #: A DbSession.Outgester object for performing outgests. 73 _outgester = None 74 75 #-------------------------------------------------------------------------- 76
77 - def _onRun(self):
78 """ Create neighbour and default join tables with external catalogues. 79 """ 80 if self.programme.isDeep() and not self.programme.isUkidss(): 81 synSetup = SynopticSetup(self.programme, self.dateRange) 82 synSetup.updateSynopticMatchRadius() 83 self.dateRange = synSetup.getTimeRange(isCorrelated=False) 84 85 if not self.outgestDb: 86 self._outgester = Outgester(self.archive, tag=self.shareFileID) 87 else: 88 self._outgester = Outgester(DbSession( 89 database=self.outgestDb, 90 autoCommit=self.archive._autoCommit, 91 isTrialRun=self.archive.isTrialRun, 92 userName=dbc.loadServerRwUsername(), 93 isPersistent=self.archive._isPersistent), tag=self.shareFileID) 94 95 # Get the details of all required joins for this programme: 96 requiredJoins = self._outgester.query( 97 selectStr="tableID, surveyID, extTableID, extProgID, " 98 "joinCriterion, neighbourTable", 99 fromStr="RequiredNeighbours", 100 whereStr="programmeID=%s" % self.programmeID) 101 102 # If continuing CU16 from a crash, then select from given table only 103 if self.fromTableName: 104 Logger.addMessage("Starting from table: " + self.fromTableName) 105 predicate = lambda join: join.neighbourTable != self.fromTableName 106 requiredJoins = list(dropwhile(predicate, requiredJoins)) 107 108 # If requested to join with only a single specified external survey, 109 # then strip requiredJoins down to just those between the programme and 110 # that given external survey. 111 if self.extSurveyID: 112 # Determine external survey from either surveyID or surveyName 113 extSurveyTable = df.Table('ExternalSurvey', self._outgester) 114 if type(self.extSurveyID) is int or self.extSurveyID.isdigit(): 115 try: 116 extSurveyTable.setCurRow(surveyID=int(self.extSurveyID)) 117 except ValueError: 118 raise Cu16.CuError("External survey ID %s does not " 119 "exist. Please select from %s" % (self.extSurveyID, 120 ', '.join(extSurveyTable.getAttr('surveyName')))) 121 else: 122 try: 123 extSurveyTable.setCurRow(surveyName=self.extSurveyID) 124 except ValueError: 125 raise Cu16.CuError("External survey %s does not " 126 "exist. Please select from %s" % (self.extSurveyID, 127 ', '.join(extSurveyTable.getAttr('surveyName')))) 128 129 Logger.addMessage("only for external survey: %s (%s)" % 130 (extSurveyTable.getAttr('surveyName'), 131 extSurveyTable.getAttr('description'))) 132 self.extSurveyID = extSurveyTable.getAttr('surveyID') 133 requiredJoins = [join for join in requiredJoins 134 if join.surveyID == self.extSurveyID] 135 136 # If creating just a single specific neighbourTable, strip down list. 137 if self.soleTableName: 138 Logger.addMessage("only for neighbour table: " + self.soleTableName) 139 requiredJoins = [join for join in requiredJoins 140 if join.neighbourTable == self.soleTableName] 141 142 # Only proceed if it's necessary: 143 if not requiredJoins: 144 raise Cu16.CuError("No neighbour tables or default joins with " 145 "external catalogues are specified for this programme.") 146 147 # Parse and check schemas 148 if self.enableIngest: 149 schemaScript = self.programme.getAttr('neighboursSchema') 150 if schemaScript == dbc.charDefault(): 151 # Pick one schema script at random if not defined 152 schemaScript = \ 153 list(set(self.programme.getAll('neighboursSchema')) 154 - set([dbc.charDefault()]))[0] 155 156 Logger.addMessage( 157 "<Warning> %s has no neighbour table schema entry in the " 158 "Programme table, using %s instead." % 159 (self.programme.getAcronym().upper(), schemaScript)) 160 161 tableSchema = schema.parseTables(schemaScript, 162 map(attrgetter("neighbourTable"), requiredJoins), 163 reqAllTables=False) 164 165 sdssTables = self._outgester.query("neighbourTable", 166 Join(["RequiredNeighbours", "ExternalSurvey"], "surveyID"), 167 whereStr="programmeID=%s" % self.programmeID 168 + " AND surveyName LIKE 'SDSS%'") 169 170 self._ingester = NeighboursIngester(self.archive, tableSchema, 171 sdssTables, self.shareFileID) 172 173 progTablePathName = {} 174 try: 175 # Outgest source position table(s) for this programme. 176 allEmptyFiles = True 177 for tableID in set(attr.tableID for attr in requiredJoins): 178 progTablePathName[tableID] = \ 179 self._getSourcePosTable(progID=self.programmeID, 180 tableID=tableID) 181 182 allEmptyFiles = (allEmptyFiles and 183 os.path.getsize(progTablePathName[tableID]) == 0) 184 185 # Only continue if the programme catalogue has any entries 186 if allEmptyFiles: 187 msg = "This programme has no entries. Nothing to do." 188 raise Cu16.CuError(msg) 189 190 self._createNeighbourTables(requiredJoins, progTablePathName) 191 192 finally: 193 # Tidy-up, don't need programme source position table any more 194 for tableID in progTablePathName: 195 os.remove(progTablePathName[tableID]) 196 os.remove(progTablePathName[tableID].replace('.dat', '.dtd')) 197 198 # Delete temporary files in source position table share path 199 os.system('rm -fv ' + os.path.join(self.sysc.sourcePosTablesPath(), 200 '*%s*' % self.shareFileID)) 201 202 # Delete temporary VVV outgest table 203 if self.programmeID == self.sysc.scienceProgs.get("VVV"): 204 tempDetTable = schema.Table() 205 tempDetTable.name = Cu16.tempDetTable 206 self._outgester.dropTable(tempDetTable, [])
207 208 #-------------------------------------------------------------------------- 209
210 - def _createNeighbourTables(self, requiredJoins, progTablePathName):
211 """ Create neighbour table for given programme and ingest into db. 212 """ 213 matcher = MatchingInterface() 214 215 for (tableID, surveyID, extTableID, extProgID, joinCriterion, 216 neighbourTable) in requiredJoins: 217 218 doIngest = self.enableIngest 219 matchRadius = joinCriterion * 3600.0 220 neighbourTablePathName = os.path.join( 221 self.sysc.sourcePosTablesPath(), 222 neighbourTable + self.shareFileID + '.dat') 223 224 isCrossNeighbs = not (surveyID == dbc.intDefault() and 225 extTableID == dbc.intDefault()) 226 227 # Calculate neighbours table 228 if isCrossNeighbs: 229 extCatPathName = self._getSourcePosTable( 230 surveyID=surveyID if surveyID != dbc.intDefault() else None, 231 progID=extProgID if extProgID != dbc.intDefault() else 232 self.programmeID, 233 tableID=extTableID) 234 235 Logger.addMessage( 236 "Creating cross-neighbour table " + neighbourTable) 237 238 if os.path.getsize(extCatPathName) > 0: 239 matcher.createCrossNeighboursTable( 240 progTablePathName[tableID], extCatPathName, 241 neighbourTablePathName, matchRadius) 242 243 else: 244 doIngest = False 245 246 # Tidy-up non-static outgest tables as they are always outgest 247 if surveyID == dbc.intDefault(): 248 os.remove(extCatPathName) 249 os.remove(extCatPathName.replace('.dat', '.dtd')) 250 else: 251 Logger.addMessage("Creating neighbour table " + neighbourTable) 252 matcher.createNeighboursTable(progTablePathName[tableID], 253 neighbourTablePathName, matchRadius) 254 255 if doIngest: 256 self._ingester.ingestTable(neighbourTable, neighbourTablePathName)
257 258 #-------------------------------------------------------------------------- 259
260 - def _getSourcePosTable(self, surveyID=None, progID=None, tableID=None):
261 """ 262 Returns the path to the binary flat-file for the source position table 263 of the given database source table. If the flat-file does not already 264 exist (or the source table is a non-static UKIDSS table) then it is 265 first outgested from the database. Creates data-type definition (.dtd) 266 files to allow the C++ Matching code to interpret the outgest files. 267 268 @param surveyID: External survey ID, if table is part of an external 269 non-UKIDSS survey. 270 @type surveyID: int 271 @param progID: Programme ID, either of UKIDSS survey or external 272 survey if it exists. 273 @type progID: int 274 @param tableID: Table ID number from RequiredNeighbours 275 (UKIDSS: 1 = detection, 2 = source; or extTableID). 276 @type tableID: int 277 278 @return: File path name of the outgested source table. 279 @rtype: str 280 281 """ 282 # Query for server, dbName, tableName, and sourceID, ra & dec col names 283 if surveyID: 284 tableName, sourceID, ra, dec = self._outgester.query( 285 selectStr="extTableName, sourceIDName, coord1, coord2", 286 fromStr=dbc.externalProgrammeTable(), 287 whereStr="surveyID=%s AND extTableID=%s" % (surveyID, tableID), 288 firstOnly=True) 289 290 if not tableName: 291 raise Cu16.CuError("%s does not have an entry in %s for " 292 "survey %s table %s" % (self._outgester.database, 293 dbc.externalProgrammeTable(), surveyID, tableID)) 294 295 dbName = self._outgester.query("databaseName", "ExternalSurvey", 296 whereStr="surveyID=%s" % surveyID, firstOnly=True) 297 298 if not dbName: 299 raise Cu16.CuError("Survey %s not in %s.ExternalSurvey" 300 % (self._outgester.database, surveyID)) 301 302 fileName = dbName + tableName 303 tableName = '.'.join([dbName, dbc.loadServerDbsOwner(), tableName]) 304 outPathName = \ 305 os.path.join(self.sysc.sourcePosTablesPath(), fileName + '.dat') 306 307 else: 308 tableName, sourceID = self._outgester.query( 309 selectStr='tableName, sourceIDName', 310 fromStr=dbc.programmeTable(), 311 whereStr='programmeID=%s AND tableID=%s' % (progID, tableID), 312 firstOnly=True) 313 314 if not tableName: 315 raise Cu16.CuError("%s does not have an entry in %s for " 316 "programme %s table %s" % (self._outgester.database, 317 dbc.programmeTable(), progID, tableID)) 318 319 ra, dec = 'ra', 'dec' 320 fileName = tableName 321 tableName = self._outgester.tablePath(tableName) 322 outPathName = os.path.join(self.sysc.sourcePosTablesPath(), 323 fileName + self.shareFileID + ".dat") 324 325 # Create the flat file if a UKIDSS survey or if the external survey 326 # flat file does not exist 327 if not surveyID or not os.path.exists(outPathName) \ 328 or os.path.getsize(outPathName) == 0 \ 329 or not os.path.exists(outPathName.replace('.dat', '.dtd')): 330 331 Logger.addMessage("Outgesting table " + tableName) 332 utils.ensureDirExist(self.sysc.sourcePosTablesPath()) 333 334 columns = ', '.join([sourceID, ra, dec]) 335 table = tableName 336 whereStr = '' 337 338 if not surveyID and tableID is 1: 339 table, whereStr = self._getDetectionPosQuery(tableName) 340 341 elif DbSession(self._outgester.server + '.' + tableName.split('.')[-3])\ 342 .queryAllowsNulls(tableName.split('.')[-1], [ra, dec]): 343 344 raise Cu16.CuError("Table %s contains nullable columns." 345 "Outgests in this format are not supported." 346 "Modify the table schema to disallow nulls." 347 % tableName) 348 349 query = SelectSQL(columns, table, whereStr, orderBy=dec) 350 351 Logger.addMessage("Outgesting...") 352 numRows = self._outgester.outgestQuery(query, fileName, outPathName, 353 createDtdFile=True) 354 355 Logger.addMessage("%s rows outgested." % numRows) 356 357 if numRows is 0: 358 Logger.addMessage("<WARNING> Empty outgest file: " 359 + outPathName) 360 361 # Make external survey flat files read only 362 if surveyID: 363 os.chmod(outPathName, 0444) 364 365 return outPathName
366 367 #-------------------------------------------------------------------------- 368
369 - def _getDetectionPosQuery(self, tableName):
370 """ Returns the tables and clauses required for a query to outgest 371 the positions of detections in the detection table. 372 """ 373 whereStr = "Raw.seqNum>0 AND Raw." + DepCodes.selectNonDepAndTest 374 375 # @TODO: Give VVV its own sysc for publicServers for better check 376 if self.programmeID == self.sysc.scienceProgs.get("VVV") \ 377 and not self._outgester.database.startswith("VVV"): 378 379 # VVV requires the creation of a temporary table for eff. outgest 380 Logger.addMessage("Filling %s..." % Cu16.tempDetTable) 381 382 # Ensure it doesn't exist 383 tempDetTable = schema.Table() 384 tempDetTable.name = Cu16.tempDetTable 385 self._outgester.dropTable(tempDetTable, []) 386 387 detTables = self.programme.getMonthlyDetectionTables(self.dateRange, 388 excludeDeeps=True) 389 390 progress = ForLoopMonitor(detTables) 391 for monthIdx, detTable in enumerate(detTables): 392 # NB: Ignore start date for VVV (only) 393 dateLimit = ("" if monthIdx != len(detTables) - 1 else 394 " AND utDate <= '%s'" % self.dateRange.end) 395 396 mfCols = "multiframeID, extNum, seqNum, objID" 397 astCols = "multiframeID, extNum, seqNum, ra, dec" 398 script = "WITH MF(%s) AS (%s)" % (mfCols, 399 SelectSQL("Multiframe." + mfCols, 400 Join(["Multiframe", (detTable, "Raw")], "multiframeID"), 401 whereStr + " AND frameType LIKE 'tilestack%'" + dateLimit)) 402 403 script += ", AST(%s) AS (%s)" % (astCols, 404 SelectSQL(astCols, detTable.replace("Raw", "Astrometry"))) 405 406 querySQL = SelectSQL("objID, ra, dec", 407 Join(["MF", "AST"], ["multiframeID", "extNum", "seqNum"])) 408 409 destSQL = "INTO %s " % Cu16.tempDetTable 410 if monthIdx is 0: 411 script += " " + str(querySQL).replace("FROM", 412 "%sFROM" % destSQL) 413 else: 414 script += " INSERT %s%s" % (destSQL, querySQL) 415 416 self._outgester._executeScript(script) 417 progress.testForOutput("Copied " + detTable) 418 419 tempIdx = schema.Index() 420 tempIdx.attrList = "dec" 421 tempIdx.tableName = Cu16.tempDetTable 422 tempIdx.name = "idx_%s_%s" % (tempIdx.tableName, tempIdx.attrList) 423 self._outgester.addIndex(tempIdx, usingDefaultFG=True) 424 425 return self._outgester.tablePath(Cu16.tempDetTable), '' 426 427 if self._outgester.server in self._outgester.sysc.publicServers \ 428 or self.programmeID == self.sysc.scienceProgs.get("VVV"): 429 # Release database detection tables are straight outgests 430 tableJoin = Join([(tableName, "Raw"), 431 (self._outgester.tablePath("Multiframe"), "mf")], 432 attributes=["multiframeID"]) 433 else: 434 # Load database detection tables need to be joined 435 tableJoin = Join( 436 tables=[(tableName + "Raw", "Raw"), 437 (tableName + "Astrometry", "Astrometry")], 438 attributes=["multiframeID", "extNum", "seqNum"], 439 subJoin=(self._outgester.tablePath("Multiframe"), 440 "multiframeID")) 441 442 # @TODO: This should produce sensible minimal clauses. 443 frameTypes = queries.getFrameSelection( 444 imageType=self.programme.getAttr("epochFrameType"), 445 noDeeps=True) 446 447 whereStr += " AND utDate BETWEEN '%s' AND '%s'" % self.dateRange \ 448 + " AND " + frameTypes 449 450 return tableJoin, whereStr
451 452 #------------------------------------------------------------------------------ 453
454 -class MatchingInterface(object):
455 """ Use this class to call the C++ Matching code. """ 456
457 - def __init__(self):
458 """ Just initialises data members. """ 459 self._fileSizeLimit = 2 * SystemConstants.one_gigabyte 460 self._largeSwitchOn = {True: "large", False: "small"} 461 self._swapSwitchOn = {True: "swap", False: "noswap"}
462 463 #-------------------------------------------------------------------------- 464
465 - def createNeighboursTable(self, mainCatPathName, outputPathName, 466 matchRadius):
467 """ 468 Creates a neighbours table for the supplied source table. 469 470 @param mainCatPathName: Input path of the source table flat-file. 471 @type mainCatPathName: str 472 @param outputPathName: Output path for the neighbours table flat-file. 473 @type outputPathName: str 474 @param matchRadius: Join criterion - search radius in arcseconds. 475 @type matchRadius: float 476 477 """ 478 argDir1, argMainFileName = os.path.split(mainCatPathName) 479 argDir2, argOutputFileName = os.path.split(outputPathName) 480 481 self._testArguments(argDir1, argDir2) 482 483 isLargeFile = (os.path.getsize(mainCatPathName) > self._fileSizeLimit) 484 485 cmd = "find_neighbours " + argDir1 + " " + argMainFileName + " binary " 486 cmd += self._largeSwitchOn[isLargeFile] + " " + str(matchRadius) 487 cmd += " index filter " + argOutputFileName + " binary" 488 489 for line in extp.run(cmd): 490 Logger.addMessage(line)
491 492 #-------------------------------------------------------------------------- 493
494 - def createCrossNeighboursTable(self, mainCatPathName, extCatPathName, 495 outputPathName, matchRadius):
496 """ 497 Creates a cross neighbours table of neighbours in the given external 498 catalogue to the supplied sources. 499 500 @param mainCatPathName: Path to the source table flat-file. 501 @type mainCatPathName: str 502 @param extCatPathName: Path to the external source table flat-file. 503 @type extCatPathName: str 504 @param outputPathName: Output path for the neighbours table flat-file. 505 @type outputPathName: str 506 @param matchRadius: Join criterion - search radius in arcseconds. 507 @type matchRadius: float 508 509 """ 510 argDir1, argMainFileName = os.path.split(mainCatPathName) 511 _argDir2, argExtFileName = os.path.split(extCatPathName) 512 argDir3, argOutputFileName = os.path.split(outputPathName) 513 514 self._testArguments(argDir1, argDir3) 515 516 mainFileSize = os.path.getsize(mainCatPathName) 517 extFileSize = os.path.getsize(extCatPathName) 518 519 # Main catalogue should be the largest 520 isSwapped = (mainFileSize < extFileSize) 521 if isSwapped: 522 argMainFileName, argExtFileName = argExtFileName, argMainFileName 523 mainFileSize, extFileSize = extFileSize, mainFileSize 524 525 isLargeMainFile = (mainFileSize > self._fileSizeLimit) 526 isLargeExtFile = (extFileSize > self._fileSizeLimit) 527 528 matcher = "list" if matchRadius > 16 else "index" 529 530 cmd = "crosscat_neighbours " + argDir1 + " " + argMainFileName 531 cmd += " binary " + self._largeSwitchOn[isLargeMainFile] + " " 532 cmd += argExtFileName + " binary " + self._largeSwitchOn[isLargeExtFile] 533 cmd += " %s %s filter %s binary %s" % (matchRadius, matcher, 534 argOutputFileName, self._swapSwitchOn[isSwapped]) 535 536 for line in extp.run(cmd): 537 Logger.addMessage(line)
538 539 #-------------------------------------------------------------------------- 540
541 - def _testArguments(self, argDir1, argDir2):
542 """ Make sure user has supplied sensible file path attributes. """ 543 if argDir1 != argDir2: 544 raise Exception( 545 "Neighbours table must be created in the same directory " 546 "as the source tables: %s!=%s" % (argDir1, argDir2))
547 548 #------------------------------------------------------------------------------ 549
550 -class NeighboursIngester(Ingester):
551 """ 552 A special version of the ingester class for Neighbours tables. Designed to 553 add extra attributes for Sloan DSS cross-match tables. 554 555 @todo: This should no longer be a subclass of Ingester. Instead these 556 functions should become methods of the Cu16 class and call the 557 parent Ingester class directly. 558 559 """ 560 sdssTables = None 561
562 - def __init__(self, dbCon, tableSchemas, sdssTables, tag=Ingester.fileTag):
563 """ 564 Makes connection to the requested database, and reads the schemas of 565 the tables to be created by the supplied list of files. 566 567 @param dbCon: A database connection. 568 @type dbCon: DbSession 569 @param tableSchemas: Schema for tables to be ingested this session, as 570 provided by L{schema.parseTables()}. 571 @type tableSchemas: list(Schema.Table) 572 @param sdssTables: A list of neighbour table names that have the 573 additional SDSS attributes. 574 @param sdssTables: list(str) 575 @param tag: A tag string to append to the file name on the 576 share directory, so file clean up can be rm *tag*. 577 @type tag: str 578 579 """ 580 self.sdssTables = sdssTables 581 super(NeighboursIngester, 582 self).__init__(dbCon, tableSchemas, tag, skipSchemaCheck=True) 583 584 # Sloan tables are default at ingest stage initially and create default 585 # tables where schema doesn't exist. NB: Can't use iteritems() here 586 # as we are expanding the dictionary that we iterate over! 587 for tableName, tableSchema in self._schema.items(): 588 if not tableSchema: 589 Logger.addMessage("Reverting to default neighbour table " 590 "structure for " + tableName) 591 592 self._schema[tableName] = \ 593 self._stdNeighbourTableSchema(tableName) 594 595 elif tableName in self.sdssTables: 596 # Create a default neighbour table for Sloan DSS for ingest, 597 # saving the desired table schema for future reference. 598 self._schema[tableName + 'ext'], self._schema[tableName] = \ 599 tableSchema, self._stdNeighbourTableSchema(tableName) 600 601 # However, for VISTA especially don't change the file group! 602 self._schema[tableName].fileGroup = tableSchema.fileGroup 603 else: 604 # Remove primary keys and other constraints to speed up ingest 605 self._schema[tableName].constraints = []
606 607 #-------------------------------------------------------------------------- 608
609 - def ingestTable(self, tableName, filePathName):
610 """ 611 Loads the created neighbours table into the database. 612 613 @param tableName: Name of table to create/update in the database. 614 @type tableName: str 615 @param filePathName: Full path to the neighbour table flat file on the 616 curation server. 617 @type filePathName: str 618 619 """ 620 super(NeighboursIngester, 621 self).ingestTable(tableName, filePathName, overWrite=True) 622 623 # Attach primary key after ingest for neighbour tables because the lack 624 # of constraints speeds up the ingest 625 pk = schema.PrimaryKeyConstraint() 626 pk.name, pk.tableName = "pk_%s" % tableName, tableName 627 pk.attrList = "masterObjID, slaveObjID" 628 Logger.addMessage("Attaching primary key to table...") 629 self.createObjects([pk]) 630 Logger.addMessage("done!") 631 632 # Sloan Neighbour tables need extra attributes 633 if tableName in self.sdssTables: 634 self._completeSDSS(tableName)
635 636 #-------------------------------------------------------------------------- 637
638 - def _completeSDSS(self, tableName):
639 """ 640 Re-insert the additional two columns in the SDSS cross-match table and 641 fill with data read from the database with the latest DR of SDSS. 642 643 @param tableName: Name of the SDSS cross-match table in the database. 644 @type tableName: str 645 646 @todo: Alternative method would be to outgest these attributes along 647 with source positions, passing them through the C++ code, to 648 enable direct ingest. But that requires extensive C++ 649 redevelopment for just the SDSS xmatch. 650 651 """ 652 Logger.addMessage( 653 "Adding extra attributes to Sloan DSS Neighbour table...") 654 655 # Retrieve details from schema file 656 sdssType = self._schema[tableName + 'ext'].attribute['sdssType'] 657 sdssPrimary = self._schema[tableName + 'ext'].attribute['sdssPrimary'] 658 659 # Add the columns to the database 660 self.addColumn(tableName, sdssType.name, sdssType.dataType, 661 sdssType.defaultStr) 662 663 self.addColumn(tableName, sdssPrimary.name, 664 sdssPrimary.dataType, sdssPrimary.defaultStr) 665 666 if self.server in self.sysc.ingestDbForServer: 667 # Use the load database skinny SDSS flags table 668 sdssTableName = '%s.%s.SDSS%s' \ 669 % (self.sysc.ingestDbForServer[self.server], 670 dbc.loadServerDbsOwner(), 671 tableName.split('X', 1)[-1] 672 .replace(dbc.sdssSourceTableName(), 'Flags') 673 .replace('All', '')) 674 else: 675 # Use the SDSS database directly 676 serverName, dbName = self.query("server, databaseName", 677 Join(["ExternalSurvey", "RequiredNeighbours"], ["surveyID"]), 678 whereStr="neighbourTable = '%s'" % tableName, firstOnly=True) 679 680 sdssTableName = '%s.%s.%s.%s%s' % ( 681 (serverName, dbName, dbc.loadServerDbsOwner()) + 682 tableName.partition(dbc.sdssSourceTableName())[1:]) 683 684 # Fill the new columns with the appropriate values in the DSS db 685 self.update(tableName, 686 entryList="sdssType = DSS.type, sdssPrimary = (CASE WHEN " 687 "(DSS.status & 0x00002000) > 0 THEN 1 ELSE 0 END)", 688 where="slaveObjId = DSS.objId", 689 fromTables=tableName + ", %s AS DSS" % sdssTableName) 690 691 Logger.addMessage("done!")
692 693 #-------------------------------------------------------------------------- 694
695 - def _stdNeighbourTableSchema(self, tableName):
696 """ 697 Defines the standard default Neighbour table schema. 698 699 @param tableName: Name of the Neighbour table that will be given the 700 default schema. 701 @type tableName: str 702 703 @return: A list of the default neighbour table schema attributes. 704 @rtype: Schema.Table 705 706 """ 707 stdSchema = schema.Table() 708 stdSchema.name = tableName 709 stdSchema.fileGroup = \ 710 ("Curation_FG" if self.sysc.isWSA() else "") 711 712 stdSchema.columns = [ 713 schema._parseAttribute('masterObjID bigint not null'.split()), 714 schema._parseAttribute('slaveObjID bigint not null'.split()), 715 schema._parseAttribute('distanceMins real not null'.split())] 716 717 for attr in stdSchema.columns: 718 stdSchema.attribute[attr.name] = attr 719 720 return stdSchema
721 722 #------------------------------------------------------------------------------ 723 # Entry point for script. 724 725 # Allow module to be imported as well as executed from the command line 726 if __name__ == '__main__': 727 # Define additional command-line options for Cu16 728 CLI.progArgs.append(CLI.Argument('programmeID', 'LAS')) 729 CLI.progOpts += [ 730 CLI.Option('b', "begin", 731 "observation date of first detections (deep surveys only) to match" 732 " e.g. 2004-04-01 or 20040401, or semester e.g. 05A_SV", 733 "DATE", isValOK=CLI.isDateOK), 734 CLI.Option('e', "end", 735 "observation date of last detections (deep surveys only) to match" 736 " e.g. 2006-07-31 or 20060731, or semester e.g. 07A", 737 "DATE", isValOK=CLI.isDateOK), 738 CLI.Option('f', 'from', 'continue from this neighbour table', 'NAME'), 739 CLI.Option('n', 'table', 'only for this neighbour table', 'NAME'), 740 CLI.Option('o', 'outgest', 'outgest from the default database (%s), ' 741 'rather than this database' % DbSession.database), 742 CLI.Option('s', 'extsurvey', 'only for this external survey e.g. MGC', 743 'NAME/ID'), ] 744 745 cli = CLI(Cu16, '$Revision: 10018 $') 746 Logger.isVerbose = False 747 Logger.addMessage(cli.getProgDetails()) 748 749 cu16 = Cu16(cli.getArg('programmeID'), cli=cli) 750 try: 751 cu16.dateRange = \ 752 cu16.sysc.obsCal.dateRange(cli.getOpt("begin"), cli.getOpt("end")) 753 754 except Exception as error: 755 eType = "Invalid Option" 756 Logger.addExceptionMessage(error, eType) 757 raise SystemExit(eType + ": see log " + cu16._log.pathName) 758 759 cu16.enableIngest = not cli.getOpt('test') 760 cu16.extSurveyID = cli.getOpt('extsurvey') 761 cu16.fromTableName = cli.getOpt('from') 762 cu16.soleTableName = cli.getOpt('table') 763 if cli.getOpt('outgest'): 764 cu16.outgestDb = DbSession.database 765 766 cu16.run() 767 768 #------------------------------------------------------------------------------ 769 # Change log: 770 # 771 # 29-Nov-2005, RSC: Original version of the new C++ matching code CU16 version 772 # 13-Dec-2005, RSC: Initial check-in of "working" test version 773 # 13-Dec-2005, RSC: Major redesign of functions and redesign of dtd files 774 # 14-Dec-2005, RSC: * Reverted to original design of dtd files, which are now 775 # sorted in the correct order. 776 # * Improved error handling. 777 # * Minor outgest bug fix. 778 # 15-Dec-2005, RSC: * File transfer bug fix - now using "cp" instead of "mv" 779 # - expected file size is checked before transfer 780 # - more error checking for transfers 781 # * Separate outgest directory on ahmose for easy clean-up 782 # * SSA outgest bug fixed 783 # * Preparing to use TestWSArsc for ingest testing 784 # 2-Feb-2006, RSC: * Timestamps added to database outgest files on ahmose 785 # * Debugged ingest 786 # * Removed code debug features 787 # * Added helpful comments and dbName to logfile 788 # 10-Feb-2006, RSC: Working version - save for Sloan cross-catalogues matches 789 # * Various ingest bug fixes 790 # - tables now emptied before ingest 791 # - overcame slow windows nfs issues 792 # - working Sloan issue solution - but way too slow, 793 # so commented out for now 794 # * Script now tidies up its own mess 795 # * The unlikely case of an empty catalogue is now handled 796 # well 797 # * Temporary mods for test purposes 798 # 13-Feb-2006, RSC: Sloan cross-matches now work 799 # 16-Feb-2006, RSC: * Adapted code to adopt the new OO design for curation 800 # tasks 801 # * Neighbour tables are now dropped/recreated using the 802 # SQL wrapper, instead of just emptying the existing table 803 # * Neatened up use of RequiredNeighbours table 804 # * Improved script execution 805 # 17-Feb-2006, RSC: Improved OO design splitting functionality into separate 806 # objects in separate files in preparation for install 807 # 11-Apr-2006, RSC: * Curator name is now a command line option 808 # * Neatened up exception handling 809 # 11-May-2006, RSC: Significant overhaul of design to allow outgest of 810 # detection as well as source tables for neighbours 811 # 27-Jun-2006, RSC: Upgraded to use the new command-line interface. 812 # 10-Jul-2006, RSC: Deprecated the old CU16 now that the new CU16 has proven 813 # itself for DR1. A working version of the old CU16 can 814 # still be obtained by checking out the DR1_FINAL tag in CVS 815 # 2-Aug-2006, RSC: Upgraded to latest CuSession OO framework design. 816 # 4-Aug-2006, RSC: Upgraded to handle separate files for different 817 # programmes' neighbour table schemas. 818 # 11-Apr-2007, RSC: Upgraded for new RequiredNeighbours schema to allow cross- 819 # matches between UKIDSS surveys. 820