Package wsatools :: Module SourceMerger
[hide private]

Source Code for Module wsatools.SourceMerger

   1  #------------------------------------------------------------------------------ 
   2  #$Id: SourceMerger.py 10224 2014-02-26 18:18:06Z NicholasCross $ 
   3  """ 
   4     Merges detections over all passbands into a binary source table. Can also 
   5     're-seam' the source table, marking duplicate sources as either being 
   6     primary or secondary depending upon their quality. 
   7   
   8     @warning: The order of sequences are very important in various parts of this 
   9               code. The numbering of passbands is sorted into filterID order 
  10               (with multiple epochs in-between e.g. Z, K_1, K_2, H_1, H_2) in 
  11               some places and by order in the table schema (e.g. mergeLog) in 
  12               other places. This is essentially the same, as the schema order is 
  13               created by a script that does indeed follow filterID order. The 
  14               numbering of passbands is important for cameo schemas & epochs for 
  15               merging, and detection outgests for pairing. Also, presently the 
  16               outgested detections for reseaming only must be in declination 
  17               order, despite being resorted into that order by the pairing code, 
  18               because they are read later in by the Python code, which expects 
  19               them to be in declination order. This could be recoded easily. 
  20   
  21     @todo: Implement View, FrameSetView, and FrameView objects. 
  22     @todo: Remove sort on outgest during reseaming and alter _analyseSeams() to 
  23            remove requirement that pointers are sorted by declination, which 
  24            will also speed up _getFrameSetSources() query as no sort by dec 
  25            will be required. 
  26     @todo: Make SourceMerger a base class, that CU6,7,9 have like an Outgester 
  27            class. 
  28   
  29     @author: N.C. Hambly 
  30     @org:    WFAU, IfA, University of Edinburgh 
  31   
  32     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  33     @contributors: R.S. Collins 
  34  """ 
  35  #------------------------------------------------------------------------------ 
  36  from __future__      import division, print_function 
  37  from future_builtins import map, zip 
  38   
  39  from   collections import defaultdict, namedtuple, Counter 
  40  from   itertools   import groupby 
  41  import math 
  42  import numpy 
  43  from   operator    import attrgetter, itemgetter 
  44  import os 
  45  import shutil 
  46   
  47  import wsatools.Astrometry              as astro 
  48  from   wsatools.CLI                 import CLI 
  49  import wsatools.CSV                     as csv 
  50  import wsatools.DataFactory             as df 
  51  import wsatools.DbConnect.CommonQueries as queries 
  52  import wsatools.DbConnect.DbConstants   as dbc 
  53  from   wsatools.DbConnect.DbSession import Join, Ingester, odbc, Outgester, \ 
  54                                             SelectSQL 
  55  from   wsatools.DbConnect.CuSession import CuSession 
  56  import wsatools.DbConnect.Schema        as schema 
  57  from   wsatools.Logger              import Logger, ForLoopMonitor 
  58  import wsatools.pairing                as pairing 
  59  from   wsatools.SystemConstants     import DepCodes 
  60  import wsatools.Utilities               as utils 
  61  import wsatools.WfcamsrcInterface       as wfcamsrc 
  62  from   wsatools.ProgrammeBuilder    import SynopticSetup 
  63  #------------------------------------------------------------------------------ 
  64   
65 -class SourceMerger(CuSession):
66 """ Merges detections over all passbands into a binary source table. 67 """ 68 #-------------------------------------------------------------------------- 69 # Define class constants (access as SourceMerger.varName) 70 71 commencementMessage = ("Source merging... Sit back and relax. Progress " 72 "estimates are for this stage only (ingest and " 73 "seaming are still to come).") 74 """ Message to log at the commencement of source merging. 75 """ 76 freeCacheInterval = 1000 77 """ Number of frame sets between calling FREEPROCCACHE during reseaming. 78 """ 79 minEpochSep = 183 80 """ Minimum epoch separation in days before a proper motion solution is 81 worthwhile calculating. 82 """ 83 seamErrorBitMask = 0xffffff00 84 """ Error bit mask for removing innocuous information bits from error 85 bit masks prior to seaming. 86 """ 87 seamingPairCriterion = 0.8 88 """ The match radius for associating the same source in overlap regions 89 between frame sets in all mosaiced surveys. This parameter is passed 90 directly to the C pairing applications, and is an angular radius 91 expressed in arcseconds. Originally (e.g. EDR) this was set to 1.0 92 arcsec; discussions with SJW etc. suggest this was somewhat overly 93 generous so it has been reduced to sqrt(2.0) times the typical 94 worst centroiding error, which is equal to about 2pix, or 0.8 arcsec. 95 At the same time, duplicate detection in the seaming algorithm has been 96 relaxed to not insist on the same passband set for duplicates. 97 """ 98 swVersion = 1.0 99 """ The merge software version. Keeps track of any changes after 100 deployment; if changes to this module (or any module used by this 101 one) require a complete remerging of all merged source tables in the 102 archive, then increment this merge software version number. NB: use 103 with caution, since the resulting catalogue processing will take a 104 long time if the archive is large. 105 """ 106 # Private class parameters - should not be altered 107 _autoCommit = True 108 _isPersistent = True 109 _isVerbose = True #: Turns off logging for certain sub-classes only. 110 _useWorkDir = True 111 _useExtNum = True #: Use extNum in detection table selection 112 113 #-------------------------------------------------------------------------- 114 # Define public member variable default values (access as obj.varName) 115 # these need to be set from command-line options 116 117 continueCompleteReseam = False 118 """ Should only be used to continue reseaming a source table from scratch 119 following a crash. This marks frame sets as old whilst reseaming, 120 without the safety catch of resetting all frame sets to new initially. 121 """ 122 dateRange = CuSession.sysc.obsCal.dateRange() 123 """ Range of observation dates to merge. Use begin dates with caution, as 124 for real runs you always need this to be the early default value. 125 """ 126 doCompleteReseam = False 127 """ Reset the merge-log to mark all frame sets as new, and reseam the 128 entire source table. This allows us to mark frame sets as old whilst we 129 are reseaming, which is unsafe if we were to be reseaming partial data. 130 """ 131 #-------------------------------------------------------------------------- 132 # Define private member variable default values (access prohibited) 133 134 _extractor = None 135 """ Name of source catalogue extraction tool used to create this 136 programme's detections. 137 """ 138 _filterPasses = None 139 """ A list of filterIDs and number of passes for this programme. 140 """ 141 _frameSetIDName = '' 142 """ Name of the column in the source table that identifies the unique ID of 143 the frame set for a given source. 144 """ 145 _idxInfo = None 146 """ A dictionary of schema.Index object lists referenced by table name. 147 """ 148 _mergeLog = '' 149 """ Name of the merge log table for this programme. 150 """ 151 _mfidIndices = None 152 """ List of column indices in the merge log table that give the 153 multiframeIDs to each passband's image. 154 """ 155 _newFrameSetIdx = 0 156 """ The column index in the merge log table that contains the newFrameSet 157 attribute. 158 """ 159 _numFilterCol = 0 160 """ The number of columns in the merge log that contain passband-specific 161 attribute values, for a given passband. 162 """ 163 _numHeaderCol = 0 164 """ The number of columns in the merge log table before the repeating 165 set of columns for passband-specific attributes begin. 166 """ 167 _outgester = None 168 """ Outgester object for outgesting detections and sources for reseaming. 169 """ 170 _progSchema = None 171 """ Dictionary of programme's table schema referenced by table name. 172 """ 173 _passbands = None 174 """ List of short-names for passbands of this programme, e.g. k_2. 175 """ 176 _sourceIDName = '' 177 """ Name of the column in the source table for the unique source ID. 178 """ 179 _sourceTable = '' 180 """ Name of the table to ingest merged sources for this programme. 181 """ 182 #-------------------------------------------------------------------------- 183 # Define specific command-line interface settings required by SourceMerger 184 185 CLI.progArgs.append(CLI.Argument('programmeID', 'LAS')) 186 CLI.progOpts.append(CLI.Option('v', 'verbose', 'display details for every ' 187 'frame set currently being processed')) 188 189 #-------------------------------------------------------------------------- 190
191 - def _onRun(self):
192 """ Initialises member variables required for source merging. 193 """ 194 Logger.addMessage("Parsing schemas") 195 196 self._idxInfo = schema.parseIndices(self.sysc.nsIndexScript 197 if self.programme.isNonSurvey() else self.sysc.indexScript) 198 199 self._progSchema = dict((table.name, table) 200 for table in schema.parseTables(self.programme.getSchemaScript())) 201 202 self._outgester = Outgester(self.archive, self.shareFileID) 203 204 self._extractor = self.programme.getAttr("extractTool") 205 if self._extractor == dbc.charDefault(): 206 raise SourceMerger.CuError("Programme.extractTool is not defined " 207 "for programmeID=%s" % self.programmeID) 208 209 self._setProgTableNames() 210 211 srcColumns = self._progSchema[self._sourceTable].columns 212 self._sourceIDName = srcColumns[0].name 213 self._frameSetIDName = srcColumns[2].name 214 215 self._setProgPassbandDetails()
216 217 #-------------------------------------------------------------------------- 218
219 - def deprecateOldFrameSets(self):
220 """ 221 Deprecate frame sets that include one or more deprecated frames. 222 223 @return: Number of frame sets deprecated. 224 @rtype: int 225 226 """ 227 # Create the script on-the-fly from schema metadata (robust against 228 # schema evolution and polymorphic for any programme!): 229 passbands = [column.replace("mfID", "") for column 230 in self.archive.queryColumnNames(self._mergeLog) 231 if column.endswith("mfID")] 232 233 fromStr = self._mergeLog + "".join( 234 ", MultiframeDetector AS M" + passband for passband in passbands) 235 236 whereStr = "".join("%smfID=M%s.multiframeID AND %seNum=M%s.extNum AND " 237 % tuple([passband] * 4) for passband in passbands) 238 whereStr += "(mergeSwVersion<%s %s)" % (SourceMerger.swVersion, 239 "".join(" OR M%s.%s" % (passband, DepCodes.selectDeprecated) 240 for passband in passbands)) 241 whereStr += " AND %s.deprecated=0" % self._mergeLog 242 243 return self.archive.update(self._mergeLog, 244 "deprecated=%s" % DepCodes.reprocWFAU, whereStr, fromStr)
245 246 #-------------------------------------------------------------------------- 247
248 - def mergeSources(self, newFrameSets, matchRadius):
249 """ 250 Merge source detections from different passbands to create the source 251 table for ingest. 252 253 Method: For every detector frame set: 254 1) Obtain the detection list in each passband, bulk-outgest of binary 255 numerical data into the curation-client/load-server share file system; 256 2) Create the sets of pair pointer files, forwards and backwards, for 257 each distinct pair of passbands; 258 3) Create the set of merged pointers based on "hand-shake" pairing 259 between any pair of pointer sets (i.e. ensuring consistency of pair 260 pointers between the passbands); 261 4) Create the merged source list according to the schema prescription, 262 appending new sources to an accumulating binary file for ingest back 263 into the database. 264 265 Low-level C/C++ application codes are extensively used to do the heavy 266 CPU/IO work. 267 268 @param newFrameSets: List of frame sets to be merged. 269 @type newFrameSets: list(self.FrameSet) 270 @param matchRadius: Pair criterion in arcseconds. 271 @type matchRadius: float 272 273 @return: Pair of file names for the binary source table and the CSV 274 formatted merge-log table for ingest. Also returns a status to 275 say whether the source merging was complete or not. 276 @rtype: str, str, bool 277 278 """ 279 if not self._passbands: # Sanity check 280 raise SourceMerger.CuError("This survey has no passbands defined.") 281 282 # Update the filter absorption values in wfcamsrc/aebv.csv 283 if "eBV" in self._progSchema[self._sourceTable].columns: 284 self._updateAebvCSVFile() 285 286 joinAlias = "Raw" 287 detTables = [(self.programme.getDetectionTable(), joinAlias), 288 (self.programme.getAstrometryTable(), "Astro"), 289 (self.programme.getPhotometryTable(), "Photo")] 290 291 commonCols = utils.getDuplicates(utils.unpackList( 292 map(str, self._progSchema[table].columns) 293 for table, _alias in detTables)) 294 295 detSchema = self._progSchema[self.programme.getAttr('detectionTable')] 296 detColumns = ', '.join(('%s.%s' % (joinAlias, column) 297 if str(column) in commonCols else str(column)) 298 for column in detSchema.columns) 299 300 detTables = Join([(self._outgester.tablePath(table), alias) 301 for table, alias in detTables], 302 ["multiframeID", "extNum", "seqNum"]) 303 304 Logger.addMessage("Outgesting default row", alwaysLog=self._isVerbose) 305 defRowQuery = SelectSQL(detColumns, detTables, 306 where="%s.multiframeID=%s AND %s.extNum=%s" % 307 (joinAlias, dbc.intDefault(), joinAlias, dbc.tinyIntDefault())) 308 309 defRowPathName = self._outgester.outgestQuery(defRowQuery) 310 if os.path.getsize(defRowPathName) == 0: 311 raise SourceMerger.CuError("Default detection row is empty.") 312 313 # Prepare info for C modules about the detection position column data 314 posColData, outgestRowSize = self._getPositionColumnData() 315 316 # List of detection files to outgest for each passband 317 # Create the "cameo" schema files for C++ program mkmerge 318 workPath = self.sysc.tempWorkPath() 319 fileIDs = [] 320 outgestFiles = [] 321 cameoFiles = [] 322 for passband in self._passbands: 323 fileID = passband + "_detections" 324 fileIDs.append(fileID) 325 326 outgestPathName = self._outgester.getFilePath(fileID) 327 outgestFiles.append(outgestPathName) 328 329 cameoPathName = os.path.join(workPath, "cameo_%s.dat" % passband) 330 cameoFiles.append(cameoPathName) 331 data = ["%s %s %s" % (outgestPathName, passband, outgestRowSize)] 332 data += self._getCameoPassbandData(passband) 333 file(cameoPathName, 'w').writelines(line + '\n' for line in data) 334 335 if len(cameoFiles) is 1: # Handles the single passband case 336 cameoFiles *= 2 337 338 # Output file for C "pairing" program 339 mergeTablePathName = os.path.join(workPath, "mergetable.dat") 340 # Output file for C++ "mkmerge" program 341 resultsPathName = os.path.join(workPath, self._sourceTable + ".nat") 342 343 # Get the next available running source number from this source table 344 firstID = self._getNextID(self._sourceIDName, self._sourceTable, 345 addProgID=True) 346 347 Logger.addMessage("Creating a look-up dictionary of frame epochs", 348 alwaysLog=self._isVerbose) 349 epochDict = self._createEpochDict() 350 Logger.addMessage("Creating a look-up dictionary of frame pixel scales", 351 alwaysLog=self._isVerbose) 352 pixScaleDict = self._createPixScaleDict() 353 Logger.addMessage(self.commencementMessage, alwaysLog=self._isVerbose) 354 355 failures = [] 356 progress = ForLoopMonitor(newFrameSets) 357 for frameCount, frameSet in enumerate(newFrameSets): 358 Logger.addMessage("Processing frame set %s of %s" % (frameCount + 1, 359 len(newFrameSets)), alwaysLog=False) 360 361 Logger.addMessage("Outgesting detections...", alwaysLog=False) 362 try: 363 # Cycle through each passband, creating a file of detections 364 # where possible: 365 inputList = [] 366 for mfidIndex, fileID in zip(self._mfidIndices, fileIDs): 367 if frameSet[mfidIndex] == dbc.intDefault(): 368 # Create an empty file 369 os.system( 370 "touch " + self._outgester.getFilePath(fileID)) 371 372 outgestPathName = '' 373 else: 374 whereStr = "%s.multiframeID=%s AND %s.seqNum > 0" \ 375 % (joinAlias, frameSet[mfidIndex], joinAlias) 376 377 # @TODO: This should be more schema-driven 378 if self._useExtNum: 379 whereStr += " AND %s.extNum=%s" \ 380 % (joinAlias, frameSet[mfidIndex + 1]) 381 382 detTable = self.programme.getMonthlyDetectionTables( 383 self.dateRange, 384 excludeDeeps=isinstance(self, SynopticMerger), 385 mfIDs=[frameSet[mfidIndex]]) 386 387 outDetTables = detTables 388 if "Raw20" in detTable: 389 outDetTables = [(detTable, joinAlias), 390 (detTable.replace("Raw", "Astrometry"), "Astro"), 391 (detTable.replace("Raw", "Photometry"), "Photo")] 392 393 outDetTables = \ 394 Join([(self._outgester.tablePath(table), alias) 395 for table, alias in outDetTables], 396 ["multiframeID", "extNum", "seqNum"]) 397 398 outgestPathName = self._outgester.outgestQuery( 399 SelectSQL(detColumns, outDetTables, whereStr), fileID) 400 401 inputList.append(tuple([outgestPathName] + posColData)) 402 403 except Exception as error: 404 # Continue, but abandon this entire frame set if an outgest of 405 # any frame fails. This can occur due to NFS glitches. 406 Logger.addExceptionMessage(error) 407 Logger.addMessage("<Warning> Frame set %s is incomplete. " 408 "Skipping..." % frameSet.ID) 409 failures.append(frameSet) 410 else: 411 Logger.addMessage("Pairing & merging detections...", 412 alwaysLog=False) 413 414 # Handle the special case of "merging" single passbands 415 if len(inputList) is 1: 416 # @@NOTE: This also works if the second file name is blank, 417 # but there is no performance benefit. 418 inputList *= 2 419 mergeList = range(2) 420 else: 421 mergeList = range(len(self._passbands)) 422 423 # Do the pairing on all pair permutations from the above list 424 # of files. The output of this will be a set of pointer files 425 # over the various combinations 426 pairing.multiPair(inputList, matchRadius, workPath) 427 428 # Create a merge table of pointers for the next stage 429 path, mergeTableFileName = os.path.split(mergeTablePathName) 430 numMerged = pairing.merge(mergeList, path, mergeTableFileName) 431 Logger.addMessage("Frame set UID %s contains %s merged sources" 432 % (frameSet.ID, numMerged), alwaysLog=False) 433 434 if numMerged > 0: 435 # First check each file of detections; if it is empty, 436 # then substitute in the single row of default 437 # detection values to give mkmerge something to chew on 438 # in all cases 439 for filePathName in outgestFiles: 440 if os.path.getsize(filePathName) == 0: 441 os.remove(filePathName) 442 shutil.copy(defRowPathName, filePathName) 443 444 # Create ordered list of epochs for this frame set 445 epochs = [epochDict.get(frameSet[mfdIdx], 446 dbc.realDefault()) 447 for mfdIdx in self._mfidIndices] 448 449 uniqueEpochs = set(epochs) 450 uniqueEpochs.discard(dbc.realDefault()) 451 452 isLongBaseline = (len(uniqueEpochs) > 1 453 and max(uniqueEpochs) - min(uniqueEpochs) 454 > self.minEpochSep) 455 456 centrePos = ((frameSet.ra, frameSet.dec) 457 if isLongBaseline else None) 458 459 # Create ordered list of pixel scales for this frame set 460 pixScales = [pixScaleDict.get(frameSet[mfdIdx], 461 dbc.realDefault()) 462 for mfdIdx in self._mfidIndices] 463 464 if len(epochs) is 1: # Handles the single passband case 465 epochs *= 2 466 pixScales *= 2 467 468 Logger.addMessage("Appending to source table binary " 469 "file...", alwaysLog=False) 470 471 wfcamsrc.mergeDetections(mergeTablePathName, 472 resultsPathName, self._sourceTable, 473 self.programme.getSchemaScript(), cameoFiles, epochs, 474 pixScales, self.cuEventID, firstID, frameSet.ID, 475 self._extractor, self._frameSetIDName, 476 self._sourceIDName, centrePos, self.minEpochSep) 477 478 firstID += numMerged 479 480 # Clean up the temporary files, ignoring any "No such file" 481 # OS errors since, for generality, an image MAY contain no 482 # detected sources. 483 for filePathName in outgestFiles: 484 try: 485 os.remove(filePathName) 486 except OSError as error: 487 if "No such file" not in str(error): 488 raise 489 490 if self._isVerbose: 491 progress.testForOutput("Completed frameSetID %s" % frameSet.ID) 492 493 # Write the updated merge log of new (plus updated) frame sets (new 494 # frame sets flagged using the merge log attribute "new") to a CSV 495 # file 496 mergeLogPathName = os.path.join(self.sysc.tempWorkPath(), 497 'mergeLog.csv') 498 csv.File(mergeLogPathName, 499 'w+').writelines(set(newFrameSets) - set(failures)) 500 501 if failures: 502 Logger.addMessage("<WARNING> Not all frame sets were merged.") 503 504 return resultsPathName, mergeLogPathName, not failures
505 506 #-------------------------------------------------------------------------- 507
508 - def reseamSources(self):
509 """ 510 Reseam the merged source list for a given programme. Where the same 511 sources have been observed over multiple frame sets, this routine 512 identifies the best source candidate in overlap regions that produce 513 non-unique source pairs. The priOrSec flag is set to the framesetID, 514 with the best source so primary sources are (priOrSec = 0 OR 515 priOrSec=framesetID). 516 517 Method: identifies every current frame set for the programme that is 518 new or has a new adjacent frame; for old current frame sets that have 519 a new adjacent frame, resets the seam flags to default (ie. not seamed) 520 for safety; for every central frame / adjacent frames group, creates 521 lists of sources with attribute subselection required for determination 522 of seamless list; produces pair pointers between the central frame 523 sources and those in adjacent frames; for every central frame record 524 chooses the best source on the basis of a seaming algorithm which 525 incorporates such considerations as distance from optical axis, pixel 526 processing quality considerations etc.; sets all central frame set 527 sources to seam = 0 (i.e. inclusive); and finally resets all central 528 frame set sources that have duplicates in adjacent frame sets to have a 529 seam flag that either points to the central frame or the adjacent 530 frame, depending on which contains the source that is to be considered 531 the best. 532 533 """ 534 if self.doCompleteReseam: 535 Logger.addMessage("Marking all frame sets as new") 536 self.archive.update(self._mergeLog, "newFrameSet=%s" % dbc.yes()) 537 538 Logger.addMessage("Preparing for seaming stage") 539 540 # Define source table outgest data type 541 sourceRowData = self._getSrcPositionColumnData() 542 543 # Rather than hard-wired adjacent frame set tolerance, get the maximum 544 # image extent for frame set constituent images. This generalises to 545 # any image size for a survey. Note that the assumption is that the 546 # (assumed rectangular) image centres need to be within twice the 547 # maximum extent of the largest image for there to be any possible 548 # overlap. This is conservative in the sense that making this 549 # parameter as large as possible will pull in all possible overlap 550 # matches. A 10% additional increase is included for safety. 551 adjacentFrameTol = 2.2 * self._getMedianImageExtent() 552 currentFrameSets = self._getExistingFrameSets() 553 numNew = self.archive.queryNumRows(self._mergeLog, 554 whereStr="newFrameSet=%s" % dbc.yes()) 555 Logger.addMessage("...of which %s are marked as new" % numNew) 556 557 self._addSeamingIndices() 558 559 Logger.addMessage("Reseaming sources...") 560 progress = ForLoopMonitor(currentFrameSets) 561 for setCount, centralFrameSet in enumerate(currentFrameSets): 562 Logger.addMessage("Processing frame set %s of %s" % (setCount + 1, 563 len(currentFrameSets)), alwaysLog=False) 564 565 if setCount % SourceMerger.freeCacheInterval == 0: 566 self.archive.freeProcCache() 567 568 # Get a list of adjacent frames for this central frame: 569 adjacentFrames = astro.matchFrames(currentFrameSets, raIndex=2, 570 centrePos=(centralFrameSet.ra, centralFrameSet.dec), 571 tolerance=adjacentFrameTol) 572 573 # ...remember to exclude the central frame itself from the matches 574 adjacentFrames.remove(centralFrameSet) 575 576 if not adjacentFrames: 577 # The frame is isolated; if new, then reset the seam 578 # attributes to zero from the default at load-time: 579 if centralFrameSet[self._newFrameSetIdx] is dbc.yes(): 580 self.archive.updateEntries( 581 self._sourceTable, "priOrSec=%s" % dbc.no(), 582 [(self._frameSetIDName, centralFrameSet.ID)]) 583 584 # Test to see if the central frame is new or if there is a new set 585 # adjacent to this one (of course, no need to reseam the central 586 # frame set if it is isolated, i.e. has no adjacent frames): 587 if adjacentFrames and ( 588 centralFrameSet[self._newFrameSetIdx] is dbc.yes() 589 or dbc.yes() in set(frame[self._newFrameSetIdx] 590 for frame in adjacentFrames)): 591 # If the central frame is not new, reset the seam flags to 592 # default (i.e. not yet seamed) for the sake of safety: 593 if centralFrameSet[self._newFrameSetIdx] is dbc.no(): 594 self.archive.updateEntries( 595 self._sourceTable, "priOrSec=%s" % dbc.intDefault(), 596 [(self._frameSetIDName, centralFrameSet.ID)]) 597 598 try: 599 sourcesPathName = self._outgester.outgestQuery( 600 SelectSQL("ra, dec", 601 table=self._outgester.tablePath(self._sourceTable), 602 where=self._frameSetIDName + "=%s" % centralFrameSet.ID, 603 orderBy="dec"), "sources_0_") 604 except Exception as error: 605 Logger.addExceptionMessage(error) 606 else: 607 # Define arguments for C pairing code 608 set0 = tuple([sourcesPathName] + sourceRowData) 609 610 # Work through the central/adjacent frame pairs, producing 611 # outgest and handshake pair pointer files: 612 fileNames = [] 613 for j, frame in enumerate(adjacentFrames): 614 sourcesPathName = self._outgester.outgestQuery( 615 SelectSQL("ra, dec", 616 table=self._outgester.tablePath(self._sourceTable), 617 where=self._frameSetIDName + "=%s" % frame.ID, 618 orderBy="dec"), "sources_%s_" % (j + 1)) 619 set1 = tuple([sourcesPathName] + sourceRowData) 620 pFilePath = os.path.join(self.sysc.tempWorkPath(), 621 "pointers_%s_%s.dat") 622 pFile0 = pFilePath % (0, j + 1) 623 radius = SourceMerger.seamingPairCriterion 624 pairing.pairOff(set0, set1, pFile0, radius) 625 # @@TODO: Make even faster by not writing to binary 626 # file, instead just return the pointers? 627 fileNames.append(pFile0) 628 629 # Obtain the optical axis point for each set comprising the 630 # RA/Dec of central and overlapping frame sets: 631 # DR2 seaming modification: set the "optical axis" for each 632 # frame set simply to the centre of the frame: 633 opticalAxes = [(math.radians(centralFrameSet.ra), 634 math.radians(centralFrameSet.dec))] 635 opticalAxes += [(math.radians(frameSet.ra), 636 math.radians(frameSet.dec)) 637 for frameSet in adjacentFrames] 638 639 # Seam this central frame by comparing overlap pairs for 640 # each central frame source in turn; returned is a data 641 # object of those source IDs that must have their seam 642 # index modified to a non-zero value for each of the 643 # central frame set and adjacent frame set identifiers: 644 seamResults = self._analyseSeams(centralFrameSet.ID, 645 adjacentFrames, fileNames, opticalAxes) 646 647 # Reset all seaming flags for the central frame to zero 648 # (i.e. default to no overlap pairs available) 649 self.archive.updateEntries( 650 self._sourceTable, "priOrSec=%s" % dbc.no(), 651 [(self._frameSetIDName, centralFrameSet.ID)]) 652 653 # Work through each adjacent frame set, setting the seam 654 # index for the source IDs listed to the required frame 655 # set ID: 656 for frameSetID, sourceList in seamResults.iteritems(): 657 self.archive.updateEntries( 658 self._sourceTable, "priOrSec=%s" % frameSetID, 659 [(self._sourceIDName, sourceList)]) 660 661 # Can incrementally mark frame sets as old, only if we're doing a 662 # complete reseam. 663 if self.doCompleteReseam or self.continueCompleteReseam: 664 self.archive.updateEntries( 665 self._mergeLog, "newFrameSet=%s" % dbc.no(), 666 [(self._frameSetIDName, centralFrameSet.ID)]) 667 668 progress.testForOutput("Completed frameSetID %s" 669 % centralFrameSet.ID) 670 671 Logger.addMessage("done.") 672 Logger.addMessage("Resetting individual new frame flags " 673 "after seaming:") 674 Logger.addMessage("No. of frame sets with flags reset to old: %s" % 675 self.archive.update(self._mergeLog, 676 [("new%sframe" % passband, str(dbc.no())) 677 for passband in self._passbands])) 678 679 # Reset all "new" merge log attributes 680 if not self.doCompleteReseam and not self.continueCompleteReseam: 681 self.archive.update(self._mergeLog, 682 [("newFrameSet", str(dbc.no()))])
683 684 #-------------------------------------------------------------------------- 685
686 - def _addSeamingIndices(self):
687 """ To optimise seaming we need to apply an index to the Source table 688 frameSetID, ra, dec columns (the CU7 index), which are outgested, 689 as well as a covering index on the ppErrBits and seqNum attributes 690 that are queried many times (the CU7_new index). 691 """ 692 indexExists = False 693 for index in self._idxInfo[self._sourceTable]: 694 if "CU7" in index.name: 695 indexExists = True 696 self.archive.addIndex(index) 697 698 if not indexExists: 699 Logger.addMessage("<WARNING> This programme has no CU7 indices " 700 "defined in %s." % self.sysc.indexScript)
701 702 #-------------------------------------------------------------------------- 703
704 - def _analyseSeams(self, centralSetID, adjacentFrames, fileNames, 705 opticalAxes):
706 """ 707 Analyse a set of merged sources in an overlap region. Sources that are 708 paired between a central frame set and it's adjacent frame sets. The 709 required seaming data are first extracted from the source table; then 710 the function examines each central frame set merged source in turn, 711 checking the adjacent frame set pairs and noting any identical 712 record that should be used in preference to the central record. The 713 function returns a data object containing lists of sources for each 714 frame set ID (central and adjacents) that need their seam index 715 attribute set to indicate the presence of a secondary identical source 716 or a preferred primary source. 717 718 @todo: Could speed things up by looking at function calls within loops 719 consider passing entire lists to functions instead. 720 721 @param centralSetID: The UID of the central fame set. 722 @type centralSetID: int 723 @param adjacentFrames: The merge log rows of all adjacent frames. 724 @type adjacentFrames: list(tuple) 725 @param fileNames: Filenames of outgest source and pointer files. 726 @type fileNames: list(str) 727 @param opticalAxes: Celestial coordinate pairs of the camera optical 728 axis. 729 @type opticalAxes: list((float, float)) 730 731 @return: A duplicate sources dictionary; consisting of a list of source 732 IDs referenced by a certain frame set ID, for which every 733 source in the list must have their seaming flag value set to. 734 @rtype: defaultdict(int:list(int)) 735 736 """ 737 ### LOAD IN POINTERS and SOURCE DATA #### 738 # Read in the required seaming data for the central and adjacent frame 739 # set merged sources; note that these data must be identical to those 740 # bulk outgested and used to create the above pointers: 741 pointers = \ 742 [numpy.fromfile(fileName, numpy.int32) for fileName in fileNames] 743 sourceData = [self._getFrameSetSources(centralSetID)] 744 sourceData.extend( 745 self._getFrameSetSources(frameSet[0]) for frameSet in adjacentFrames) 746 747 ### SANITY CHECKS ### 748 # pointer lists must all be the same length, and the 749 # first set of merged source data should also be of this length; there 750 # should be a merged source data set for every pointer file plus the 751 # central frame set: 752 if len(pointers) != len(sourceData) - 1: 753 raise SourceMerger.CuError( 754 "Inconsistent data set during seam analysis: \n" 755 " - no. of pointer files: %s\n" % len(pointers) + 756 " - no. of adjacent source datasets: %s" % (len(sourceData) - 1)) 757 758 if any(len(ptrAry) != len(pointers[0]) 759 for ptrAry in pointers[1:]): 760 raise SourceMerger.CuError( 761 "Inconsistent data set during seam analysis: \n" 762 " - pointer files have inconsistent lengths.") 763 764 if len(sourceData[0]) != len(pointers[0]): 765 raise SourceMerger.CuError( 766 "Inconsistent data set during seam analysis: \n" 767 " - central frame set source data and pointer files have\n" 768 " inconsistent lengths: central frame set sources: %s; no." 769 " of pointers: %s" % (len(sourceData[0]), len(pointers[0]))) 770 771 #### CREATE LIST OF DUPLICATES #### 772 duplicates = defaultdict(list) 773 for i in range(len(pointers[0])): 774 775 records = [sourceData[0][i]] 776 frameSetIDs = [centralSetID] 777 axes = [opticalAxes[0]] 778 779 for j in range(len(pointers)): 780 pp = pointers[j][i] 781 782 if pp > -1: 783 records.append(sourceData[j + 1][pp]) 784 frameSetIDs.append(adjacentFrames[j][0]) 785 axes.append(opticalAxes[j + 1]) 786 787 # Now, if there are pairs, we need to check for detections in the 788 # same set of passbands as the central source, and then pick the 789 # best source in the case that there are overlap pairs: 790 if len(records) > 1: 791 # How many detections in this record? The following indexing 792 # is based on the attribute selection in getFrameSetSources: 793 offset = 4 794 numPerPass = 2 795 # Index values point to sequence numbers in each passband; 796 # a default indicates no detection: 797 indices = range(offset, 798 offset + numPerPass * len(self._passbands), 799 numPerPass) 800 numDetsList = [ 801 sum(1 for i in indices if record[i] != dbc.intDefault() and 802 record[i - 1] <= 0xffffffff ^ SourceMerger.seamErrorBitMask) 803 for record in records] 804 805 bestID = self._pickBestDuplicate(records, axes, frameSetIDs, 806 numDetsList) 807 808 duplicates[bestID].append(records[0][0]) 809 810 return duplicates
811 812 #-------------------------------------------------------------------------- 813
814 - def _createEpochDict(self):
815 """ @return: A dictionary of frame epochs referenced by multiframeID. 816 @rtype: dict(int, float) 817 """ 818 return dict(self.archive.query("Multiframe.multiframeID, mjdObs", 819 Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 820 "Multiframe.multiframeID > 0 AND frameType LIKE '%stack%' AND " + 821 "programmeID=%s" % self.programmeID))
822 823 #-------------------------------------------------------------------------- 824
825 - def _createPixScaleDict(self):
826 """ @return: A dictionary of frame pixel scales referenced by 827 multiframeID. 828 @rtype: dict(int, float) 829 """ 830 # @@NOTE: Assume xPixSize == yPixSize 831 keyValuePairs = "Multiframe.multiframeID, xPixSize" 832 return dict(self.archive.query(keyValuePairs, 833 Join(["Multiframe", "CurrentAstrometry", "ProgrammeFrame"], 834 ["multiframeID"]), 835 "Multiframe.multiframeID > 0 AND frameType LIKE '%stack%' AND " + 836 "programmeID=%s" % self.programmeID, groupBy=keyValuePairs))
837 838 #-------------------------------------------------------------------------- 839
840 - def _getCameoPassbandData(self, pb):
841 """ 842 Returns the cameo schema for this passband. The schema describes the 843 contents of the individual detection records, i.e. the positions of 844 quantities to be merged and their units. NB: if the base schema for the 845 detection tables is changed, then this function may need to be updated 846 to reflect those changes. 847 848 @param pb: Short name of passband. 849 @type pb: str 850 851 @return: Cameo schema attributes for this passband: Name, first byte on 852 the record, SQL data type, and units. 853 @rtype: list(str) 854 855 """ 856 cameoData = [] 857 detSchema = self._progSchema[self.programme.getAttr('detectionTable')] 858 859 # Make a case insensitive look-up dictionary 860 detColumns = dict((name.lower(), detSchema.attribute[name]) 861 for name in detSchema.attribute) 862 863 # Celestial co-ordinates & errors 864 for colName in ("ra", "dec", "xErr", "yErr"): 865 column = detColumns[colName.lower()] 866 colPos = detSchema.getBytePos(colName) 867 colUnit = column.tag.get("--/U", dbc.charDefault()).title() 868 colData = [colName, str(colPos), column.dataType, colUnit] 869 cameoData.append(' '.join(colData)) 870 871 for column in self._progSchema[self._sourceTable].columns: 872 if column.name.startswith(pb) and "mag" in column.name.lower(): 873 colName = (column.name[len(pb):] 874 if not column.name.endswith("SerMag2DErr") else 875 "SerFit2DChi2") 876 colPos = detSchema.getBytePos(colName) 877 colUnit = column.tag.get("--/U", "mag") 878 colData = [column.name, str(colPos), column.dataType, colUnit] 879 if colPos: 880 cameoData.append(' '.join(colData)) 881 # else: confused filters e.g. h2PetroMag when pb='h' 882 883 # Detection attributes propagated from detection into source: 884 for colName in ("HlCorSMjRadAs", "Gausig", "Ell", "PA", "ErrBits", 885 "Deblend", "Class", "ClassStat", "ppErrBits", 886 "SeqNum", "ObjID", "ExtNum", "Mjd", "AverageConf"): 887 try: 888 column = detColumns[colName.lower()] 889 except KeyError: 890 pass # Not all attributes are in all programme's schema 891 else: 892 colPos = detSchema.getBytePos(colName) 893 colUnit = column.tag.get("--/U", dbc.charDefault()).title() 894 colData = [colName, str(colPos), column.dataType, colUnit] 895 cameoData.append(pb + ' '.join(colData)) 896 897 return cameoData
898 899 #-------------------------------------------------------------------------- 900
901 - def _getExistingFrameSets(self):
902 """ @return: All frame sets in MergeLog for this programme. 903 @rtype: list(tuple) 904 """ 905 Logger.addMessage("Creating list of existing frame sets...") 906 frameSets = self.archive.query( 907 selectStr="*", 908 fromStr=self._mergeLog, 909 whereStr="deprecated = 0 ORDER BY dec", 910 ResultsTuple=self.FrameSet) 911 912 Logger.addMessage("...number of existing frame sets: %s " 913 % len(frameSets)) 914 915 return frameSets
916 917 #-------------------------------------------------------------------------- 918
919 - def _getFrameSetSources(self, frameSetID):
920 """ 921 Get merged source data from a given frame set for the purposes of 922 reseaming. The function makes a selection of attributes from those 923 available for each merged source; those attributes are for use by the 924 seaming algorithm to decide which is the best record amongst a set of 925 duplicates. This function also returns two scalar values which encode 926 the offset and number of attributes per passband as dictated by the 927 selection. 928 929 @param frameSetID: The UID of the source frame set. 930 @type frameSetID: int 931 932 @return: Attributes for the set of sources in the specified frame set. 933 @rtype: list(tuples) 934 935 """ 936 return self.archive.query( 937 selectStr=self._sourceIDName + ", ra, dec, " + 938 ", ".join("%sppErrBits, %sSeqNum" % 939 (pb, pb) for pb in self._passbands), 940 fromStr=self._sourceTable, 941 whereStr=self._frameSetIDName + "=%s ORDER BY dec" % frameSetID)
942 943 #-------------------------------------------------------------------------- 944
945 - def _getMedianImageExtent(self):
946 """ 947 Get the median image extent for all frame sets defined in a particular 948 merge log. The idea is to use this is a measurement to define the 949 adjacent frame set tolerance when seaming overlap regions, rather than 950 using the Programme attribute (or indeed a hardwired-value) to 951 generalise the seaming procedure. 952 953 @return: The median image extent, in degrees. 954 @rtype: float 955 956 """ 957 extents = list() 958 for passband in self._passbands: 959 extents += self.archive.query('imageExtent', 960 "CurrentAstrometry AS CA, %s AS L" % self._mergeLog, 961 whereStr="CA.multiframeID=L.%smfID AND CA.extNum=L.%seNum AND " 962 "CA.multiframeID>0 AND CA.extNum>0" % 963 (passband, passband)) 964 extents = [extent / 60 for extent in extents if extent > 0.0] 965 if extents: 966 return numpy.median(extents) 967 else: 968 return dbc.realDefault()
969 970 #-------------------------------------------------------------------------- 971
972 - def _getNextID(self, attrName, tableName, where='', addProgID=False):
973 """ 974 Obtains next available ID number in the db for a given ID attribute. 975 976 @param attrName: Name of ID attribute. 977 @type attrName: str 978 @param tableName: Name of the table where attribute resides. 979 @type tableName: str 980 @param where: Optional SQL WHERE clause. 981 @type where: str 982 @param addProgID: If True, prefix initial ID with the programme UID to 983 make ID unique across the entire archive. 984 @type addProgID: bool 985 986 @return: Next available ID number. 987 @rtype: int 988 989 """ 990 nextID = super(SourceMerger, 991 self)._getNextID(attrName, tableName, where) 992 if addProgID and nextID is 1: 993 nextID = self.programmeID * 65536L * 65536L + 1L 994 return nextID
995 996 #-------------------------------------------------------------------------- 997
998 - def _getPositionColumnData(self):
999 """ Prepare info for C modules about the detection position column data 1000 from the table schema. 1001 """ 1002 detSchema = self._progSchema[self.programme.getAttr('detectionTable')] 1003 1004 posColData = [] 1005 for colName in ("ra", "dec"): 1006 column = detSchema.attribute[colName] 1007 colPos = detSchema.getBytePos(column.name) 1008 colSize = self.sysc.sqlDataTypeSize[column.dataType] 1009 colUnit = column.tag.get("--/U").upper()[:3] 1010 posColData.append((colPos, colSize, colUnit)) 1011 1012 posColData.append(detSchema.getBytePos()) 1013 1014 return posColData, posColData[-1]
1015 1016 #-------------------------------------------------------------------------- 1017
1018 - def _getProgFilters(self):
1019 """ @return: List of filter IDs and number of passes for the programme. 1020 @rtype: list(tuple(int, int)) 1021 """ 1022 return self.archive.query( 1023 selectStr="filterID, nPass", 1024 fromStr="RequiredFilters", 1025 whereStr="programmeID=%s ORDER BY filterID ASC" % self.programmeID)
1026 1027 #-------------------------------------------------------------------------- 1028
1029 - def _getSrcPositionColumnData(self):
1030 """ Define source position column data of a source table outgest for 1031 reseaming stage. Currently, RA and Dec are all that are required 1032 for the overlap pairing stage and the analysis is done later in 1033 Python by executing a query on the DB to get all required analysis 1034 merged source data. 1035 """ 1036 sourceRowData = [] 1037 srcSchema = self._progSchema[self._sourceTable] 1038 1039 colPos = 0 1040 for colName in ("ra", "dec"): 1041 column = srcSchema.attribute[colName] 1042 colSize = self.sysc.sqlDataTypeSize[column.dataType] 1043 colUnit = column.tag.get("--/U").upper()[:3] 1044 sourceRowData.append((colPos, colSize, colUnit)) 1045 colPos += colSize 1046 1047 sourceRowData.append(colPos) 1048 1049 return sourceRowData
1050 1051 #-------------------------------------------------------------------------- 1052
1053 - def _isQualityFlaggingComplete(self):
1054 """ @return: True if quality bit flags have been fully updated for this 1055 programme's detection table. 1056 @rtype: bool 1057 """ 1058 # @@GOTCHA: Need to include the selection on programmeID as well as 1059 # multiframeID to handle the case where a multiframe is shared 1060 # amongst two programmes with the different detection table. This also 1061 # covers the case of non-surveys that have entries for the default 1062 # programmeID. Also, must check that detections exist, as if any 1063 # multiframe has no detections then they will have ppErrBitsStatus 1064 # values of zero by default. 1065 1066 return not self.archive.queryEntriesExist( 1067 Join(["Multiframe", "ProgrammeFrame", 1068 self.programme.getDetectionTable()], 1069 ["multiframeID"]), 1070 where="utDate BETWEEN '%s' AND '%s'" % self.dateRange 1071 + " AND ppErrBitsStatus=0 AND programmeID=%s" % self.programmeID)
1072 1073 #-------------------------------------------------------------------------- 1074
1075 - def _pickBestDuplicate(self, overlapSources, overlapAxes, overlapFrameIDs, 1076 overlapDetectNos):
1077 """ 1078 Chooses the best source from a set of duplicates in an overlap region. 1079 Given a list of duplicate sources with positions and qualities, and a 1080 list of their respective image optical axis points, choose the best 1081 image out of the set of duplicates. Originally, this was 1082 based on a seaming algorithm that took into account angular distance 1083 from the axis until the precise meaning of the bit-wise quality 1084 flags was clarified, and used proximity to the optical axis only as 1085 the criterion for judging the best source, choosing only from those 1086 sources having the maximum number of constituent detections. 1087 For DR2 et seq., the seaming algorithm has been modified to: 1088 a) mask off innocuous "information"-only error bits amongst the 1089 available passbands; 1090 b) eliminate any duplicate that has "warning" quality bits set 1091 but bringing all back into contention if this would result in no 1092 primary image; 1093 c) check the filter complement, choosing the merged source with 1094 the most filter coverage; 1095 d) then, and only then, use proximity to the frame set centre if 1096 there is still more than one potential primary; the tangent point 1097 for this test is, therefore, the centre of the frame set and not 1098 the optical axis. 1099 1100 @param overlapSources: Merged source data for the duplicates. 1101 @type overlapSources: list(tuple) 1102 @param overlapAxes: Tangent point for each source. 1103 @type overlapAxes: list((float, float)) 1104 @param overlapFrameIDs: UIDs for the frame sets of the sources. 1105 @type overlapFrameIDs: list(int) 1106 @param overlapDetectNos: Counts of the no. of detections in each frame. 1107 @type overlapDetectNos: list(int) 1108 1109 @return: The UID of the frame set which contains the best source. 1110 @rtype: int 1111 1112 """ 1113 # Check the post-processing error bit mask attributes: 1114 bitChecks = [] 1115 numBadBits = [] 1116 for source, frameID in zip(overlapSources, overlapFrameIDs): 1117 # From the prescription in getFrameSetSources: 1118 sourceBits = [source[3 + (2 * i)] for i in range(len(self._passbands))] 1119 numBadBits.append(0) 1120 for bit in sourceBits: 1121 if bit > 0xffffffff ^ SourceMerger.seamErrorBitMask: 1122 numBadBits[-1] += 1 1123 allBits = reduce(lambda x, y: x | y, sourceBits) 1124 bitChecks.append(not (allBits & SourceMerger.seamErrorBitMask)) 1125 if bitChecks[-1]: 1126 bestID = frameID 1127 1128 # If only one source is good no need to do further checking: 1129 if bitChecks.count(True) == 1: 1130 return bestID 1131 1132 allBad = True not in bitChecks 1133 maxSep = 2.0 * math.pi 1134 maxDets = max(overlapDetectNos) 1135 minBad = min(nBad for numDets, nBad 1136 in zip(overlapDetectNos, numBadBits) 1137 if numDets == maxDets) 1138 1139 for source, axis, frameID, numDets, bitsOK, nBad in zip(overlapSources, 1140 overlapAxes, overlapFrameIDs, overlapDetectNos, bitChecks, 1141 numBadBits): 1142 # Pick only from amongst those sources that have the maximum 1143 # number of constituent detections: 1144 if (bitsOK or allBad) and numDets == maxDets and nBad == minBad: 1145 # Idea is to find that duplicate that has the minimum 1146 # offset from the frame set centre as measured by the 1147 # maximum absolute value of delta-xi and delta-eta in 1148 # each case: 1149 pos = (math.radians(source[1]), math.radians(source[2])) 1150 xi, xn = astro.convSkyToTangent(pos, axis) 1151 sep = max(abs(xi), abs(xn)) 1152 if sep < maxSep: 1153 maxSep = sep 1154 bestID = frameID 1155 1156 return bestID
1157 1158 #-------------------------------------------------------------------------- 1159
1160 - def _removeDeprecatedSources(self, tableName):
1161 """ 1162 Remove merged sources that are based on deprecated frame sets. 1163 1164 @param tableName: Source table to edit. 1165 @type tableName: str 1166 1167 @return: Number of sources deleted. 1168 @rtype: int 1169 1170 """ 1171 deprecatedFrameSets = SelectSQL(self._frameSetIDName, self._mergeLog, 1172 where="deprecated > 0") 1173 1174 return self.archive.delete(tableName, 1175 whereStr=self._frameSetIDName + " IN (%s)" % deprecatedFrameSets)
1176 1177 #-------------------------------------------------------------------------- 1178
1179 - def _setProgPassbandDetails(self):
1180 """ Initialisation routine for this programme's merge-log and passband 1181 data. 1182 1183 @todo: This will be a *lot* simpler when I've implemented the 1184 FrameSetView class. 1185 """ 1186 self._filterPasses = self._getProgFilters() 1187 1188 # Create list of passband names 1189 filterTable = df.Table("Filter", self.archive) 1190 self._passbands = [] 1191 for filterID, numEpochs in self._filterPasses: 1192 shortName = \ 1193 filterTable.getAttr('shortName', filterID=filterID).lower() 1194 if numEpochs is 1: 1195 self._passbands.append(shortName) 1196 elif numEpochs > 1: 1197 self._passbands += ["%s_%s" % (shortName, obsNum + 1) 1198 for obsNum in range(numEpochs)] 1199 1200 ## Calculate useful frame set indices from merge log attributes 1201 1202 mlCols = self.archive.queryColumnNames(self._mergeLog) 1203 if not mlCols: 1204 Logger.addMessage("<ERROR> Merge-log table %s not found" 1205 % self._mergeLog) 1206 raise SystemExit 1207 1208 mlCols[0] = "ID" 1209 self.FrameSet = namedtuple(self._mergeLog, mlCols) 1210 1211 # The number of attributes before repeating filter set attributes. 1212 for index, attr in enumerate(mlCols): 1213 if attr.endswith('mfID'): 1214 break 1215 1216 self._numHeaderCol = index 1217 1218 # The number of attributes repeated for each filter. 1219 for index, attr in enumerate(mlCols[self._numHeaderCol + 1:]): 1220 if attr.endswith('mfID') or attr == 'mergeSwVersion': 1221 break 1222 1223 self._numFilterCol = index + 1 1224 1225 finalIdx = self._numHeaderCol + len(self._passbands) * self._numFilterCol 1226 self._mfidIndices = \ 1227 range(self._numHeaderCol, finalIdx, self._numFilterCol) 1228 1229 self._newFrameSetIdx = finalIdx + 1
1230 1231 #-------------------------------------------------------------------------- 1232
1233 - def _setProgTableNames(self):
1234 """ Initialises table name member variables for current programme. 1235 """ 1236 self._mergeLog = self.programme.getMergeLogTable() 1237 self._sourceTable = self.programme.getSourceTable()
1238 1239 #-------------------------------------------------------------------------- 1240
1241 - def _updateAebvCSVFile(self):
1242 """ 1243 Put the latest aebv values from table Filter into aebv.csv for mkmerge. 1244 1245 @todo: Make this method a fundamental part of a new WfcamsrcInterface 1246 class, to replace the WfcamsrcInterface module? 1247 """ 1248 Logger.addMessage("Filling aebv.csv with latest values") 1249 filePath = os.path.join(self.sysc.srcPath, 'wfcamsrc', 'aebv.csv') 1250 lines = ('%s,%s\n' % (aFilter.shortName.title(), aFilter.aebv) 1251 for aFilter in self.archive.query("shortName, aebv", "Filter")) 1252 1253 file(filePath, 'w').writelines(lines)
1254 1255 #------------------------------------------------------------------------------ 1256
1257 -class SynopticMerger(SourceMerger):
1258 """ Create/update merged synoptic source catalogue for a given programme. 1259 """ 1260 #-------------------------------------------------------------------------- 1261 # Define class constants (access as SynopticMerger.varName) 1262 commencementMessage = "Synoptic source merging..." 1263 """ Message to log at the commencement of source merging. 1264 """ 1265 pairingCriterion = 0.8 1266 """ Tight merging criterion because sources should not move in short space 1267 of time. 1268 """ 1269 frameSetChunkSize = 150 1270 """ Number of frame sets in each chunk. 1271 """ 1272 # Class parameters - should not be altered 1273 cuNum = 6 # Overrides CuSession default 1274 _isVerbose = False # Turn off SourceMerger logging since this is chunked 1275 1276 #-------------------------------------------------------------------------- 1277 # Define public member variable default values (access as obj.varName) 1278 # these need to be set from command-line options 1279 1280 maxProdID = None 1281 """ Maximum product ID to include in merged frame sets. 1282 """ 1283 minProdID = None 1284 """ Minimum product ID to include in merged frame sets. 1285 """ 1286 prepareOnly = False 1287 """ Only prepare the frame sets and update mergelog, without source merging? 1288 """ 1289 skipQualityCheck = False 1290 """ Override quality bit flag check? 1291 """ 1292 recreate = False 1293 """ Recreate from scratch 1294 """ 1295 1296 #-------------------------------------------------------------------------- 1297 # Define private member variable default values (access prohibited) 1298 1299 _goodFrameSets = [] 1300 _incompFrameSets = [] 1301 _deprecFrameSets = [] 1302 _newFrameSets = [] 1303 1304 #-------------------------------------------------------------------------- 1305
1306 - def _onRun(self):
1307 """ Create/update merged source catalogue. 1308 """ 1309 super(SynopticMerger, self)._onRun() 1310 1311 self.synSetup = SynopticSetup(self.programme, self.dateRange) 1312 1313 ingSchema = [self._progSchema[self._mergeLog]] 1314 if not self.prepareOnly: 1315 ingSchema.append(self._progSchema[self._sourceTable]) 1316 try: 1317 ingester = Ingester(self.archive, ingSchema, self.shareFileID, 1318 skipSchemaCheck=True) 1319 except schema.MismatchError as error: 1320 raise SynopticMerger.CuError(error) 1321 1322 if not self.prepareOnly and not self.skipQualityCheck: 1323 Logger.addMessage("Checking that the quality bit flags are all set") 1324 1325 if not self._isQualityFlaggingComplete(): 1326 raise SynopticMerger.CuError("Detection quality bit flags have" 1327 " not been updated for this programme!") 1328 1329 if self.recreate: 1330 Logger.addMessage("Destroying & recreating %s and emptying %s" % 1331 (self._sourceTable, self._mergeLog)) 1332 self.archive.dropTable(self._progSchema[self._sourceTable], 1333 self._progSchema.values()) 1334 self.archive.truncate(self._mergeLog) 1335 self.archive.createTable(self._progSchema[self._sourceTable], 1336 self._progSchema.values()) 1337 1338 # Create a new list of frame sets to be merged in this run. 1339 # Include meanMjdObs. 1340 self._getMergeLogInfo() 1341 self._makeNewFrameSets() 1342 if self.mergeLogInfo.values(): 1343 self._checkMissingFrameSets() 1344 1345 Logger.addMessage("...found %s complete existing frame sets" 1346 % len(self._goodFrameSets)) 1347 1348 Logger.addMessage("...found %s existing frame sets that need updating" 1349 % len(self._incompFrameSets)) 1350 1351 Logger.addMessage("...found %s existing frame sets that need deprecating" 1352 % len(self._deprecFrameSets)) 1353 1354 Logger.addMessage("...found %s new frame sets" % len(self._newFrameSets)) 1355 # Incomplete and deprecated -- delete from mergelog 1356 # Add in incomplete to new 1357 self._deleteOldFrameSets() 1358 if not self._newFrameSets: 1359 Logger.addMessage("No new synoptic source merging required for " 1360 "this programme.") 1361 1362 if self._newFrameSets and not self.prepareOnly: 1363 Logger.addMessage("Synoptic source merging...") 1364 1365 # Outgest and merge detections with C/C++ code, creating binary 1366 # source table for ingest 1367 # @TODO: do a frameset at a time? 1368 chunkedFrameSets = list(utils.splitList(self._newFrameSets, 1369 chunkSize=SynopticMerger.frameSetChunkSize)) 1370 1371 progress = ForLoopMonitor(chunkedFrameSets) 1372 for frameSets in chunkedFrameSets: 1373 progress.preMessage("Preparing chunk") 1374 sourceTablePathName, mergeLogPathName, isComplete = \ 1375 self.mergeSources(frameSets, SynopticMerger.pairingCriterion) 1376 1377 if not isComplete: 1378 raise SynopticMerger.CuError("Synoptic merging failed and " 1379 "needs to be re-run.") 1380 1381 try: 1382 ingester.ingestTable(self._mergeLog, mergeLogPathName, 1383 isCsv=True) 1384 1385 ingester.ingestTable(self._sourceTable, sourceTablePathName, 1386 self._idxInfo, isOrdered=True) 1387 1388 except BaseException as error: 1389 Logger.addExceptionMessage(error) 1390 Logger.addMessage( 1391 "Attempting to rollback %s..." % self._mergeLog) 1392 1393 frameSetIDs = (frameSet.ID for frameSet in frameSets) 1394 Logger.addMessage("%s rows removed." % 1395 self.archive.delete(self._mergeLog, 1396 "synframeSetID IN (%s)" 1397 % ','.join(map(str, frameSetIDs)))) 1398 raise 1399 1400 elif self._newFrameSets: 1401 self._ingestMergeLogOnly(ingester) 1402 1403 self._insertDefaultRows()
1404 1405 #-------------------------------------------------------------------------- 1406
1407 - def _checkFrameSet(self, frameSet):
1408 """ Checks frame set against current mergelog 1409 """ 1410 # matches - list of filter index, frameSetID 1411 matches = [] 1412 if not self.mergeLogInfo.values(): 1413 self._newFrameSets.append(self.FrameSet(*frameSet)) 1414 return 1415 1416 noNonDefFS = 0 1417 for index, shtName in enumerate(self._passbands): 1418 mfID, extNum = frameSet[9 + 3 * index], frameSet[10 + 3 * index] 1419 # is mfID in mergeLog 1420 if mfID > 0: noNonDefFS += 1 1421 if mfID > 0 and mfID in self.mergeLogInfo["%smfid" % shtName]: 1422 1423 # find indices of frame sets with this mfID 1424 mlIndices = [ind for ind, multID 1425 in enumerate(self.mergeLogInfo["%smfid" % shtName]) 1426 if mfID == multID] 1427 1428 # is there an extNum match too? 1429 if extNum in [self.mergeLogInfo["%senum" % shtName][ind] 1430 for ind in mlIndices]: 1431 1432 matchIndex = mlIndices[ 1433 [indx for indx, eNum 1434 in enumerate([self.mergeLogInfo["%senum" % shtName][ind] 1435 for ind in mlIndices]) 1436 if extNum == eNum][0]] 1437 1438 matches.append((index, matchIndex, 1439 self.mergeLogInfo[self._frameSetIDName.lower()][matchIndex])) 1440 1441 uniqueFSIDs = set(fsID for _idx, _mIdx, fsID in matches) 1442 1443 # If all filters match to same old frame set -- keep 1444 if len(uniqueFSIDs) == 1: 1445 # Check number of non-default passbands in mergelog or in frame set? 1446 noNonDefML = len([self.mergeLogInfo["%smfid" % shtName][matches[0][1]] 1447 for shtName in self._passbands 1448 if (self.mergeLogInfo["%smfid" % shtName][matches[0][1]] > 0)]) 1449 1450 if len(matches) == noNonDefFS and len(matches) == noNonDefML: 1451 # take old frameSet info 1452 self._goodFrameSets.append( 1453 self.FrameSet(*self.mergeLogInfoList[matches[0][1]])) 1454 1455 # If some match one and some missing - add in and recalc 1456 elif len(matches) < noNonDefFS or len(matches) < noNonDefML: 1457 #mtchIndc=[index for index,_fsID in matches] 1458 #newFilters=[shtName for index,shtName in enumerate(self._passbands) 1459 # if index not in mtchIndc] 1460 self._incompFrameSets.append((list(uniqueFSIDs)[0], frameSet)) 1461 1462 # if some match one and some another - problem? 1463 elif len(uniqueFSIDs) > 1: 1464 # @TODO: Not necessarily a problem but be careful 1465 raise SynopticMerger.CuError("Frameset %s in programme %s is drawn" 1466 " from multiple old frame sets: %s" 1467 % (frameSet[0], self.programmeID, 1468 ','.join(map(str, uniqueFSIDs)))) 1469 1470 # If no matches - new 1471 else: 1472 self._newFrameSets.append(self.FrameSet(*frameSet))
1473 1474 #-------------------------------------------------------------------------- 1475
1476 - def _checkMissingFrameSets(self):
1477 """ Are there any existing frame sets not in the current lists 1478 """ 1479 for index, frameSetID \ 1480 in enumerate(self.mergeLogInfo[self._frameSetIDName]): 1481 1482 # is it in goodFrameSets 1483 matches = [fSet for fSet in self._goodFrameSets 1484 if fSet.ID == frameSetID] 1485 1486 matches += [fSet for fsID, fSet in self._incompFrameSets 1487 if fsID == frameSetID] 1488 1489 if not matches: 1490 self._deprecFrameSets.append( 1491 self.FrameSet(*self.mergeLogInfoList[index]))
1492 1493 #-------------------------------------------------------------------------- 1494
1495 - def _deleteOldFrameSets(self):
1496 """ Remove old frame sets. 1497 """ 1498 deprecatedFSIDs = [fsID for fsID, _fSet in self._incompFrameSets] 1499 deprecatedFSIDs += [fSet.ID for fSet in self._deprecFrameSets] 1500 if deprecatedFSIDs: 1501 Logger.addMessage("...removing deprecated and imcomplete frame sets") 1502 self.archive.delete(self._sourceTable, 1503 whereStr="synframeSetID in (%s)" % 1504 ','.join(map(str, deprecatedFSIDs))) 1505 1506 self.archive.delete(self._mergeLog, 1507 whereStr="synframeSetID in (%s)" % 1508 ','.join(map(str, deprecatedFSIDs))) 1509 1510 if self._incompFrameSets: 1511 Logger.addMessage("...adding updated incomplete frame sets into " 1512 "new frame sets to be processed") 1513 1514 self._newFrameSets += [self.FrameSet(*([fsID] + fSet[1:])) 1515 for fsID, fSet in self._incompFrameSets]
1516 1517 #-------------------------------------------------------------------------- 1518
1519 - def _getMergeLogInfo(self):
1520 """ Creates view of current merge log 1521 """ 1522 tableName = self._mergeLog 1523 progSchema = self.programme.getSchemaScript() 1524 columns = schema.parseTables(progSchema, [tableName])[0].columns 1525 self.mergeLogInfoList = self.archive.query("*", self._mergeLog) 1526 self.mergeLogInfo = df.View(map(str, columns), self.mergeLogInfoList)
1527 1528 #-------------------------------------------------------------------------- 1529
1530 - def _getProgFilters(self):
1531 """ @return: List of filter IDs and number of passes for the programme. 1532 @rtype: list(tuple(int, int)) 1533 """ 1534 return self.archive.query( 1535 selectStr="filterID, 1", 1536 fromStr="RequiredFilters", 1537 whereStr="programmeID=%s ORDER BY filterID ASC" % 1538 self.programmeID)
1539 1540 #-------------------------------------------------------------------------- 1541
1542 - def _ingestMergeLogOnly(self, ingester):
1543 """ 1544 Write the updated merge log of new (plus updated) frame sets (new 1545 frame sets flagged using the merge log attribute "new") to a CSV file. 1546 1547 """ 1548 mergeLogPathName = \ 1549 os.path.join(self.sysc.tempWorkPath(), 'synopticMergeLog.csv') 1550 1551 csv.File(mergeLogPathName, 'w+').writelines(set(self._newFrameSets)) 1552 1553 # Ingest merge log & source table 1554 try: 1555 ingester.ingestTable(self._mergeLog, mergeLogPathName, isCsv=True) 1556 except odbc.DatabaseError as error: 1557 if 'STREAM' in str(error): 1558 Logger.addMessage( 1559 "<ERROR> %s ingest failed. " % self._mergeLog + 1560 "Possibly because of a discrepency between the schema " 1561 "and the entries in RequiredFilters.") 1562 raise
1563 1564 #-------------------------------------------------------------------------- 1565
1566 - def _insertDefaultRows(self):
1567 """ Inserts default rows for every frame into synoptic source table. 1568 """ 1569 Logger.addMessage( 1570 "Inserting default rows into %s..." % self._sourceTable) 1571 1572 # find synframeSetIDs without default rows 1573 missingDefaults = self.archive.query("synFrameSetID", self._mergeLog, 1574 "synFrameSetID NOT IN (%s)" % SelectSQL("synFrameSetID", 1575 table=self._sourceTable, where="synFrameSetID>0 AND synSeqNum<0")) 1576 1577 if not missingDefaults: 1578 Logger.addMessage("done. No missing defaults") 1579 return 1580 1581 nonDefColumns = ["synFrameSetID", "cuEventID"] 1582 1583 nonDefValues = SelectSQL( 1584 select=", ".join(nonDefColumns), 1585 table=self._sourceTable, 1586 where="synFrameSetID IN (%s)" % ','.join(map(str, missingDefaults)), 1587 groupBy=", ".join(nonDefColumns)) 1588 1589 selectStr = ", ".join( 1590 (str(column.getDefaultValue()) if column not in nonDefColumns else 1591 "T.%s" % column) 1592 for column in self._progSchema[self._sourceTable].columns) 1593 1594 numRows = self.archive.copyIntoTable(self._sourceTable, 1595 sourceTable="(%s) AS T" % nonDefValues, columns=selectStr) 1596 1597 Logger.addMessage("done. %s rows inserted." % numRows)
1598 1599 #-------------------------------------------------------------------------- 1600
1601 - def _makeNewFrameSets(self):
1602 """ 1603 Creates new frame sets as appropriate based on available intermediate 1604 stack frames. 1605 1606 @return: List of new frame sets. 1607 @rtype: list(tuple) 1608 1609 @FIXME: If incomplete/deprecated, some frameSetIDs can be repeated 1610 """ 1611 useSpecialFilter = False 1612 addPars = '' 1613 addTables = '' 1614 addConst = '' 1615 if self.synSetup.isCorSingleEpoch: 1616 bandMergeTime = self.programme.getAttr("bandMergingCriterion") 1617 if bandMergeTime < 0: 1618 bandMergeTime = 0. 1619 Logger.addMessage("Creating a single epoch of correlated frame sets...") 1620 if self.synSetup.specialFilter: 1621 filterTable = df.Table("Filter", self.archive) 1622 1623 useSpecialFilter = True 1624 matchOB = False 1625 matchDur = False 1626 filterTable.setCurRow(shortName='%s' % self.synSetup.specialFilter['Name']) 1627 specialFilterID = filterTable.getAttr("filterID") 1628 if 'OB' in self.synSetup.specialFilter: 1629 matchOB = True 1630 OBfilterNameList = self.synSetup.specialFilter['OB'].split('.') 1631 OBfilterIDList = [] 1632 for filtName in OBfilterNameList: 1633 filterTable.setCurRow(shortName='%s' % filtName) 1634 OBfilterIDList.append(filterTable.getAttr("filterID")) 1635 addPars += ',obsID' if (self.sysc.isVSA() or self.sysc.isOSA()) else ',msbID as obsID' 1636 addTables += ',MultiframeEsoKeys as e' if (self.sysc.isVSA() or self.sysc.isOSA()) else '' 1637 addConst += ' and e.multiframeID=m.multiframeID' 1638 if 'DUR' in self.synSetup.specialFilter: 1639 matchDur = True 1640 filtDur = float(self.synSetup.specialFilter['DUR']) 1641 addPars += ',MAX(totalExpTime) as duration' 1642 addTables += ',MultiframeDetector as d' 1643 addConst += ' and d.multiframeID=m.multiframeID group by m.multiframeID,m.mjdObs,m.filterID%s' % ',obsID' if 'OB' in self.synSetup.specialFilter else '' 1644 1645 else: 1646 bandMergeTime = self.programme.getAttr("bandMergingCriterion") 1647 if bandMergeTime < 0: 1648 msg = "No time specified for band pass merging" 1649 raise SynopticMerger.CuError(msg) 1650 1651 Logger.addMessage("Creating correlated frame sets with spacing %s " 1652 "minutes..." % bandMergeTime) 1653 1654 bandMergeTime /= 1440 1655 nextFrameSetID = \ 1656 self._getNextID(self._frameSetIDName, self._mergeLog, addProgID=True) 1657 1658 curAstHdr = ['centralRA', 'centralDec', 'cx', 'cy', 'cz', 'htmID', 1659 'multiframeID', 'extNum'] 1660 1661 frameHdr = ['filterID', 'mjdObs', 'filename', 'confID', 'raBase', 1662 'decBase'] 1663 1664 dPre = ", D." 1665 fPre = ", F." 1666 reqOffsetPos = self.sysc.isVSA() \ 1667 and self.programme.getAttr("epochFrameType") == 'stack' 1668 1669 # @FIXME: remove... 1670 pointings = self.archive.query("DISTINCT ra, dec, stackRadius" 1671 + (",offsetPos" if reqOffsetPos else ""), 1672 fromStr="Required" + self.programme.getAttr("epochFrameType"), 1673 whereStr="programmeID=%s AND productID BETWEEN %s AND %s" 1674 % (self.programmeID, self.minProdID, self.maxProdID)) 1675 for point in pointings: 1676 mfIDs = set() 1677 for filterID, _nPass in self._filterPasses: 1678 mfIDs.update( 1679 queries.getAllFieldObs(self.archive, point.ra, point.dec, 1680 60 * point.stackRadius, filterID, self.programmeID, 1681 frameType=self.programme.getAttr("epochFrameType"), 1682 dateRange=self.synSetup.dateRangeOfTable[self.synSetup.corBMTable], 1683 offsetPos=(point.offsetPos if reqOffsetPos else None), 1684 noDeeps=True)) 1685 1686 # Now get info for merging and order by time 1687 # get MultiframeInfo at first 1688 intStacks = self.archive.query( 1689 selectStr="mjdObs, m.filterID, m.multiframeID%s" % addPars, 1690 fromStr="Multiframe as m%s" % addTables, 1691 whereStr="m.multiframeID IN (%s) %s" % (", ".join(map(str, mfIDs)), addConst), 1692 orderBy="mjdObs") 1693 # @TODO: Two options - single epoch, or multiple ones.. 1694 frameSetIDs = [] 1695 if self.synSetup.isCorSingleEpoch: 1696 # find set of all bands in this pointing. If more than one, take one 1697 # with shortest duration. 1698 # If none - match as many as possible 1699 # If multiple epochs in one band, choose one closest to central epoch of 1700 # other bands 1701 # @TODO: if special filter, be bloody careful! 1702 if useSpecialFilter: 1703 dataInFormat = False 1704 if matchOB: 1705 # find OBs with all filters match 1706 completeOBs = [] 1707 iStacksOB = sorted(intStacks, key=attrgetter("obsID")) 1708 for obsID, obStacks in groupby(iStacksOB, attrgetter("obsID")): 1709 obStacks = list(obStacks) 1710 if len(set(OBfilterIDList).difference(set([obStk.filterID for obStk in obStacks]))) == 0: 1711 completeOBs.append(obsID) 1712 if completeOBs: 1713 dataInFormat = True 1714 if matchDur: 1715 durStacks = [epStack for epStack in intStacks 1716 if epStack.duration == filtDur and epStack.filterID == specialFilterID] 1717 if durStacks: 1718 dataInFormat = True 1719 if dataInFormat: 1720 mustHaveFrames = [] 1721 # find special filter frame and any other constraint 1722 hasCompleted = False 1723 constrainFilters = [] 1724 for selType in self.synSetup.specialFilter['order']: 1725 if selType == 'OB' and completeOBs and not hasCompleted: 1726 if len(completeOBs) == 1: 1727 obsID = completeOBs.pop() 1728 mustHaveFrames.extend([epStack for epStack in intStacks 1729 if epStack.obsID == obsID]) 1730 constrainFilters = OBfilterIDList 1731 hasCompleted = True 1732 else: 1733 # which one? latest? 1734 if matchDur: 1735 obsIDDur = [epStack.obsID for epStack in intStacks 1736 if epStack.obsID in completeOBs 1737 and epStack.duration == filtDur 1738 and epStack.filterID == specialFilterID] 1739 if len(obsIDDur) == 1: 1740 obsID = obsIDDur.pop() 1741 mustHaveFrames.extend([epStack for epStack in intStacks 1742 if epStack.obsID == obsID]) 1743 constrainFilters = OBfilterIDList 1744 hasCompleted = True 1745 else: 1746 mustHaveFrames.extend([epStack for epStack in intStacks 1747 if epStack.obsID in completeOBs]) 1748 constrainFilters = OBfilterIDList 1749 hasCompleted = True 1750 if selType == 'DUR' and durStacks and not hasCompleted: 1751 mustHaveFrames.extend(durStacks) 1752 constrainFilters.append(specialFilterID) 1753 hasCompleted = True 1754 # Now remove stacks in same filter as mustHaveFrames 1755 intStacks = [epStack for epStack in intStacks 1756 if (epStack.filterID not in constrainFilters or 1757 epStack.multiframeID in [frame.multiframeID 1758 for frame in mustHaveFrames])] 1759 1760 # intStacks - with preSelected constraints... 1761 1762 initPos = 0 1763 possFrameSets = [] 1764 while initPos < len(intStacks): 1765 initMjdObs = intStacks[initPos][0] 1766 1767 numObs = defaultdict(int) 1768 groupNos = [] 1769 groupInd = [] 1770 goodGroup = True 1771 allFiltersUsed = False 1772 for ii, stkInf in enumerate(intStacks): 1773 if stkInf.mjdObs >= initMjdObs and goodGroup: 1774 numObs[stkInf.filterID] += 1 1775 # goodGroup = numObs[filterID] <= 1 1776 if not allFiltersUsed: 1777 if len(numObs) > 1: 1778 groupNos.append((stkInf.multiframeID, stkInf.filterID, stkInf.mjdObs)) 1779 else: 1780 groupNos = [(stkInf.multiframeID, stkInf.filterID, stkInf.mjdObs)] 1781 groupInd.append(ii) 1782 allFiltersUsed = set(fID for fID, _nP in self._filterPasses).difference(set(filtID for filtID in numObs)) == set() 1783 1784 possFrameSets.append((groupNos, allFiltersUsed)) 1785 initPos = max(groupInd) + 1 1786 if possFrameSets > 1: 1787 # Find one with shortest duration 1788 1789 durationList = [] 1790 areAFU = len([fSet for fSet, afu in possFrameSets if afu]) > 0 1791 for fSet, allFiltUsed in possFrameSets: 1792 if areAFU and allFiltUsed: 1793 mjdObsList = [mjdObs for _mfID, _fID, mjdObs in fSet] 1794 durationList.append(max(mjdObsList) - min(mjdObsList)) 1795 elif not areAFU: 1796 mjdObsList = [mjdObs for _mfID, _fID, mjdObs in fSet] 1797 durationList.append(max(mjdObsList) - min(mjdObsList)) 1798 index = [idx for idx, dur in enumerate(durationList) if dur == min(durationList)].pop() 1799 possFrameSets = [possFrameSets[index]] 1800 # remove any multiple epochs 1801 fSet, _allFiltUsed = possFrameSets.pop() 1802 groupNos = self._selectBestMfIDs(fSet) 1803 frameSetIDs.append(','.join(map(str, groupNos))) 1804 else: 1805 # group in time, but only one frame per filter 1806 1807 initPos = 0 1808 while initPos < len(intStacks): 1809 # Select frames up to bandMergeCriterion ahead 1810 # or count in any filter>1 1811 initMjdObs = intStacks[initPos][0] 1812 finalMjdObs = initMjdObs + bandMergeTime 1813 #medianMjdObs = numpy.median( 1814 # [mjd for mjd, _f, _m in intStacks 1815 # if initMjdObs <= mjd <= finalMjdObs]) 1816 1817 # Check if this is ideal grouping 1818 # Median is often at one end... 1819 # 1820 #if initMjdObs < (medianMjdObs - 0.5 * bandMergeTime) or \ 1821 # finalMjdObs > (medianMjdObs + 0.5 * bandMergeTime): 1822 # # Must not repeat any 1823 # initMjdObs = max(medianMjdObs - 0.5 * bandMergeTime, 1824 # intStacks[initPos][0]) 1825 # finalMjdObs = medianMjdObs + 0.5 * bandMergeTime 1826 1827 #if initMjdObs > intStacks[initPos][0]: 1828 # # Do in between 1829 # finalMjdObs = initMjdObs 1830 # initMjdObs = intStacks[initPos][0] 1831 1832 numObs = defaultdict(int) 1833 groupNos = [] 1834 groupInd = [] 1835 goodGroup = True 1836 for ii, (mjdObs, filterID, mfID) in enumerate(intStacks): 1837 if mjdObs >= initMjdObs and goodGroup: 1838 numObs[filterID] += 1 1839 goodGroup = numObs[filterID] <= 1 1840 if goodGroup and mjdObs <= finalMjdObs: 1841 groupNos.append(mfID) 1842 groupInd.append(ii) 1843 1844 frameSetIDs.append(','.join(map(str, groupNos))) 1845 initPos = max(groupInd) + 1 1846 1847 # Obtain full set of details for each frame in the pointings. 1848 for frameIDs in frameSetIDs: 1849 allFrames = self.archive.query( 1850 selectStr="D." + dPre.join(curAstHdr) + fPre + fPre.join(frameHdr), 1851 fromStr="Multiframe AS F, MultiframeDetector AS MFD, " 1852 "CurrentAstrometry AS D", 1853 whereStr="D.multiframeID=F.multiframeID AND F.%s" 1854 " AND D.multiframeID=MFD.multiframeID" 1855 " AND D.extNum=MFD.extNum AND MFD.%s" 1856 " AND D.multiframeID IN (%s)" 1857 % (DepCodes.selectNonDeprecated, 1858 DepCodes.selectNonDeprecated, frameIDs)) 1859 1860 # Group frames by detector to produce frame sets 1861 allFrames.sort(key=attrgetter("extNum")) 1862 for _extNum, group in groupby(allFrames, attrgetter("extNum")): 1863 frames = list(group) 1864 if frames: 1865 frameSet = [nextFrameSetID, self.cuEventID] 1866 1867 # @@TODO: Instead of taking the first frame's details 1868 # at random - determine true frame set centre. 1869 frameSet.extend(frames[0][:6]) 1870 frameSet.append(numpy.mean([f.mjdObs for f in frames])) 1871 1872 # @@NOTE: This just selects the last frame of each 1873 # filter in the randomly ordered list. It's neither 1874 # efficient nor consistent. 1875 for filterID, _nPass in self._filterPasses: 1876 multiframeID = dbc.intDefault() 1877 extNum = dbc.tinyIntDefault() 1878 newFrame = dbc.tinyIntDefault() 1879 for frame in frames: 1880 if filterID == frame.filterID: 1881 multiframeID = frame.multiframeID 1882 extNum = frame.extNum 1883 newFrame = dbc.no() 1884 1885 frameSet += [multiframeID, extNum, newFrame] 1886 1887 frameSet += \ 1888 [SynopticMerger.swVersion, dbc.yes(), dbc.no()] 1889 if self.synSetup.isCorSingleEpoch: 1890 duration = (max(f.mjdObs for f in frames) - 1891 min(f.mjdObs for f in frames)) 1892 frameSet.append(duration) 1893 # @TODO: check to see if in mergelog 1894 self._checkFrameSet(frameSet) 1895 nextFrameSetID += 1
1896 1897 #-------------------------------------------------------------------------- 1898
1899 - def _selectBestMfIDs(self, frameSetInfo):
1900 """ Chooses best mfIDs to match time info 1901 """ 1902 # First select filters with only one epoch 1903 filtCount = Counter([fID for _mfID, fID, _mjdObs in frameSetInfo]) 1904 singles = [fID for fID in filtCount if filtCount[fID] == 1] 1905 multis = [fID for fID in filtCount if filtCount[fID] > 1] 1906 groupNos = [] 1907 mjds = [] 1908 for filterID in singles: 1909 groupNos.extend([mfID for mfID, fID, _mjdObs in frameSetInfo if fID == filterID]) 1910 mjds.extend([mjdObs for _mfID, fID, mjdObs in frameSetInfo if fID == filterID]) 1911 1912 # If any filters left, find mjdObs that minimises differences 1913 for filterID in multis: 1914 meanGroupMjd = numpy.mean(mjds) 1915 bestMfID = sorted([(mfID, abs(mjdObs - meanGroupMjd)) 1916 for mfID, fID, mjdObs in frameSetInfo if fID == filterID], 1917 key=itemgetter(1))[0][0] 1918 1919 groupNos.append(bestMfID) 1920 mjds.extend([mjdObs for mfID, _fID, mjdObs in frameSetInfo if mfID == bestMfID]) 1921 return groupNos
1922 #-------------------------------------------------------------------------- 1923
1924 - def _setProgTableNames(self):
1925 """ Initialises table name member variables for current programme. 1926 """ 1927 self._mergeLog = self.programme.getSynopticMergeLogTable() 1928 self._sourceTable = self.programme.getSynopticSourceTable()
1929 1930 #------------------------------------------------------------------------------ 1931
1932 -class TilePawPrintMerger(SourceMerger):
1933 """ Create/update merged tile-pawprint catalogue for a given programme. 1934 """ 1935 #-------------------------------------------------------------------------- 1936 # Define class constants (access as TilePawPrintMerger.varName) 1937 1938 commencementMessage = "Tile-pawprint source merging..." 1939 """ Message to log at the commencement of source merging. 1940 """ 1941 pairingCriterion = 0.8 1942 """ Tight merging criterion because sources should not move in short space 1943 of time. 1944 """ 1945 _useExtNum = False 1946 """ Override useExtNum selection 1947 """ 1948 # Class parameters - should not be altered 1949 cuNum = 26 # Overrides CuSession default 1950 frameSetChunkSize = 150 1951 _isVerbose = False 1952 1953 #-------------------------------------------------------------------------- 1954 # Define public member variable default values (access as obj.varName) 1955 # these need to be set from command-line options 1956 1957 deepOnly = False 1958 """ Match deep tile and pawprint data only 1959 """ 1960 prepareOnly = False 1961 """ Only prepare the tile sets and update mergelog, without source merging? 1962 """ 1963 skipQualityCheck = False 1964 """ Override quality bit flag check? 1965 """ 1966 recreate = False 1967 """ Recreate tables 1968 """ 1969 addDefaults = False 1970 """ Add defaults only 1971 """ 1972 newTileList = None 1973 """ Create table for these multiframes only 1974 """ 1975 #-------------------------------------------------------------------------- 1976
1977 - def _onRun(self):
1978 """ Create/update merged tile<->pawprint catalogue. 1979 """ 1980 # @todo: Add in date range selection 1981 super(TilePawPrintMerger, self)._onRun() 1982 1983 ingSchema = [self._progSchema[self._mergeLog]] 1984 if not self.prepareOnly: 1985 ingSchema.append(self._progSchema[self._sourceTable]) 1986 1987 try: 1988 ingester = Ingester(self.archive, ingSchema, self.shareFileID, 1989 skipSchemaCheck=True) 1990 except schema.MismatchError as error: 1991 raise TilePawPrintMerger.CuError(error) 1992 1993 if self.addDefaults: 1994 self._insertDefaultRows() 1995 1996 if self.recreate and not self.addDefaults: 1997 Logger.addMessage("Destroying & recreating %s and emptying %s" % 1998 (self._sourceTable, self._mergeLog)) 1999 self.archive.dropTable(self._progSchema[self._sourceTable], 2000 self._progSchema.values()) 2001 self.archive.truncate(self._mergeLog) 2002 self.archive.createTable(self._progSchema[self._sourceTable], 2003 self._progSchema.values()) 2004 2005 # Create a new list of frame sets to be merged in this run. 2006 # @TODO .... 2007 newTileSets = [] 2008 if not self.addDefaults: 2009 newTileSets = self._makeNewTileSets() 2010 2011 Logger.addMessage("...found %s new tile sets" % len(newTileSets)) 2012 if not newTileSets: 2013 Logger.addMessage("No new tile<->pawprint merging required for " 2014 "this programme.") 2015 2016 if self.prepareOnly: 2017 self._ingestMergeLogOnly(ingester, newTileSets) 2018 else: 2019 chunkedTileSets = list(utils.splitList(newTileSets, 2020 chunkSize=TilePawPrintMerger.frameSetChunkSize)) 2021 2022 progress = ForLoopMonitor(chunkedTileSets) 2023 for tileSets in chunkedTileSets: 2024 progress.preMessage("Preparing chunk") 2025 2026 # Outgest and merge detections with C/C++ code, creating binary 2027 # table for ingest 2028 mergedTablePathName, mergeLogPathName, isComplete = \ 2029 self.mergeSources(tileSets, self.pairingCriterion) 2030 2031 if not isComplete: 2032 raise TilePawPrintMerger.CuError("Tile-Pawprint merging " 2033 "failed and needs to be re-run.") 2034 2035 try: 2036 ingester.ingestTable(self._mergeLog, mergeLogPathName, 2037 isCsv=True) 2038 2039 ingester.ingestTable(self._sourceTable, mergedTablePathName, 2040 self._idxInfo, isOrdered=True) 2041 2042 except BaseException as error: 2043 Logger.addExceptionMessage(error) 2044 Logger.addMessage( 2045 "Attempting to rollback %s..." % self._mergeLog) 2046 2047 tileSetIDs = (tileSet.ID for tileSet in tileSets) 2048 Logger.addMessage("%s rows removed." 2049 % self.archive.delete(self._mergeLog, 2050 "tileSetID IN (%s)" % ','.join(map(str, tileSetIDs)))) 2051 2052 raise 2053 2054 self._insertDefaultRows()
2055 2056 #-------------------------------------------------------------------------- 2057
2058 - def _ingestMergeLogOnly(self, ingester, newTileSets):
2059 """ Write the updated merge log of new (plus updated) frame sets (new 2060 frame sets flagged using the merge log attribute "new") to a CSV 2061 file) 2062 """ 2063 mergeLogPathName = os.path.join(self.sysc.tempWorkPath(), 2064 'tilePawPrintMergeLog.csv') 2065 2066 csv.File(mergeLogPathName, 'w+').writelines(set(newTileSets)) 2067 try: 2068 ingester.ingestTable(self._mergeLog, mergeLogPathName, isCsv=True) 2069 except odbc.DatabaseError as error: 2070 if 'STREAM' in str(error): 2071 Logger.addMessage("<ERROR> %s ingest failed. " % self._mergeLog 2072 + "Possibly because of a discrepency between the schema " 2073 "and the entries in RequiredFilters.") 2074 raise
2075 2076 #-------------------------------------------------------------------------- 2077
2078 - def _makeNewTileSets(self):
2079 """ 2080 Creates new tile sets as appropriate based on available intermediate 2081 stack frames. 2082 2083 @return: List of new tile sets. 2084 @rtype: list(tuple) 2085 2086 """ 2087 Logger.addMessage("Creating list of tile sets to be merged...") 2088 mlCols = self.archive.queryColumnNames(self._mergeLog) 2089 expNOffsets = len([col for col in mlCols 2090 if ('mfID' in col and 'tl' not in col)]) 2091 2092 nextFrameSetID = \ 2093 self._getNextID(self._frameSetIDName, self._mergeLog, addProgID=True) 2094 2095 curAstHdr = ['centralRA', 'centralDec', 'cx', 'cy', 'cz', 'htmID', 2096 'multiframeID'] 2097 2098 frameType = 'tile%deep%stack' if self.deepOnly else '%tile%stack' 2099 dateSel = ("utDate BETWEEN '%s' AND '%s' AND " % self.dateRange 2100 if not self.deepOnly else "") 2101 2102 selString = '' 2103 if self.newTileList: 2104 selString = 'F.multiframeID IN (%s) AND ' \ 2105 % ','.join(map(str, self.newTileList)) 2106 2107 tiles = self.archive.query( 2108 selectStr="D." + ", D.".join(curAstHdr), 2109 fromStr="Multiframe as F,ProgrammeFrame as p,CurrentAstrometry as D", 2110 whereStr="programmeID=%s and F.multiframeID=p.multiframeID and " 2111 "frameType like '%s' and F.deprecated=0 and " 2112 "D.multiframeID=F.multiframeID and D.extNum=2 and %s %s" 2113 "F.multiframeID NOT IN (%s)" 2114 % (self.programmeID, frameType, dateSel, selString, 2115 SelectSQL("tlmfID", self.programme.getTileSetTable(), 2116 "deprecated=0"))) 2117 2118 frameSets = [] 2119 for tile in tiles: 2120 mfIDs = self.archive.query("multiframeID", "Provenance", 2121 whereStr="combiframeID=%s" % tile.multiframeID) 2122 2123 if len(mfIDs) != expNOffsets: 2124 Logger.addMessage("<WARNING> Tile %s does not have expected " 2125 "number of offsets: %s out of expected %s" 2126 % (tile.multiframeID, len(mfIDs), expNOffsets)) 2127 continue 2128 2129 # Obtain full set of details for each frame in the pointings. 2130 pawPFrames = self.archive.query("multiframeID, offsetID", 2131 fromStr="Multiframe", 2132 whereStr="multiframeID IN (%s)" % ','.join(map(str, mfIDs)), 2133 orderBy="offsetID") 2134 2135 frameSet = [nextFrameSetID, self.cuEventID] 2136 2137 # Use tile position 2138 frameSet.extend(tile) 2139 frameSet.extend(frame.multiframeID for frame in pawPFrames) 2140 frameSet += [TilePawPrintMerger.swVersion, dbc.yes(), dbc.no()] 2141 frameSets.append(self.FrameSet(*frameSet)) 2142 nextFrameSetID += 1 2143 2144 return frameSets
2145 2146 #-------------------------------------------------------------------------- 2147
2148 - def _setProgTableNames(self):
2149 """ Initialises table name member variables for current programme. 2150 """ 2151 self._mergeLog = self.programme.getTileSetTable() 2152 self._sourceTable = self.programme.getTilePpwTable()
2153 2154 #-------------------------------------------------------------------------- 2155
2156 - def _setProgPassbandDetails(self):
2157 """ Initialisation routine for this programme's merge-log and passband 2158 data. 2159 2160 @todo: This will be a *lot* simpler when I've implemented the 2161 FrameSetView class. 2162 """ 2163 ## Calculate useful frame set indices from merge log attributes 2164 mlCols = self.archive.queryColumnNames(self._mergeLog) 2165 if not mlCols: 2166 Logger.addMessage("<ERROR> Merge-log table %s not found" 2167 % self._mergeLog) 2168 raise SystemExit 2169 2170 expNOffsets = sum('mfID' in col and 'tl' not in col for col in mlCols) 2171 2172 self._passbands = ["tl"] 2173 for offset in range(expNOffsets): 2174 self._passbands += ["o%d" % (offset + 1)] 2175 2176 mlCols[0] = "ID" 2177 self.FrameSet = namedtuple(self._mergeLog, mlCols) 2178 2179 # The number of attributes before repeating filter set attributes. 2180 for index, attr in enumerate(mlCols): 2181 if attr.endswith('mfID'): 2182 break 2183 2184 self._numHeaderCol = index 2185 2186 # The number of attributes repeated for each filter. 2187 for index, attr in enumerate(mlCols[self._numHeaderCol + 1:]): 2188 if attr.endswith('mfID') or attr == 'mergeSwVersion': 2189 break 2190 2191 self._numFilterCol = index + 1 2192 2193 finalIdx = self._numHeaderCol + len(self._passbands) * self._numFilterCol 2194 self._mfidIndices = \ 2195 range(self._numHeaderCol, finalIdx, self._numFilterCol) 2196 2197 self._newFrameSetIdx = finalIdx + 1
2198 2199 #-------------------------------------------------------------------------- 2200
2201 - def _insertDefaultRows(self):
2202 """ Inserts default row into TileSet and TilePawPrints table. 2203 """ 2204 if not self.archive.queryEntriesExist(self._mergeLog, "tileSetID<0"): 2205 Logger.addMessage("Inserting default row into %s..." 2206 % self._mergeLog) 2207 # TileSet default row. 2208 self.archive.insertData(self._mergeLog, [column.getDefaultValue() 2209 for column in self._progSchema[self._mergeLog].columns]) 2210 2211 if not self.prepareOnly and \ 2212 not self.archive.queryEntriesExist(self._sourceTable, "tileSetID<0"): 2213 # Default rows in TilePawprints 2214 # @TODO: THIS SHOULD BE SCHEMA DRIVEN! 2215 nonDefColumns = ["cuEventID", "tlExtNum", "o1ExtNum", "o2ExtNum", 2216 "o3ExtNum", "o4ExtNum", "o5ExtNum", "o6ExtNum"] 2217 2218 nonDefValues = SelectSQL( 2219 select="cuEventID, 0 AS tlExtNum, 0 AS o1ExtNum, 0 AS o2ExtNum," 2220 "0 AS o3ExtNum, 0 AS o4ExtNum, 0 AS o5ExtNum, " 2221 "0 AS o6ExtNum", 2222 table=self._mergeLog, 2223 where="tileSetID<0") 2224 2225 Logger.addMessage("Inserting default row into %s..." 2226 % self._sourceTable) 2227 2228 selectStr = ", ".join( 2229 (str(column.getDefaultValue()) if column not in nonDefColumns else 2230 "T.%s" % column) 2231 for column in self._progSchema[self._sourceTable].columns) 2232 2233 numRows = self.archive.copyIntoTable(self._sourceTable, 2234 sourceTable="(%s) AS T" % nonDefValues, columns=selectStr) 2235 2236 Logger.addMessage("done. %s rows inserted." % numRows) 2237 2238 # find tileSetIDs without default rows 2239 missingDefaults = self.archive.query("tileSetID", self._mergeLog, 2240 "tileSetID>0 AND tileSetID NOT IN (%s)" % SelectSQL("tileSetID", 2241 table=self._sourceTable, where="tileSetID>0 AND tileSetSeqNum<0")) 2242 2243 # @TODO: Reduce code duplication with SynopticMerger 2244 if missingDefaults: 2245 nonDefColumns = ["tileSetID", "cuEventID"] 2246 2247 nonDefValues = SelectSQL( 2248 select=", ".join(nonDefColumns), 2249 table=self._sourceTable, 2250 where="tileSetID IN (%s)" % ','.join(map(str, missingDefaults)), 2251 groupBy=", ".join(nonDefColumns)) 2252 2253 selectStr = ", ".join( 2254 (str(column.getDefaultValue()) if column not in nonDefColumns else 2255 "T.%s" % column) 2256 for column in self._progSchema[self._sourceTable].columns) 2257 2258 numRows = self.archive.copyIntoTable(self._sourceTable, 2259 sourceTable="(%s) AS T" % nonDefValues, columns=selectStr) 2260 2261 Logger.addMessage("done. %s rows inserted." % numRows)
2262 2263 #------------------------------------------------------------------------------ 2264