Package helpers :: Module UpdateDetectionTables
[hide private]

Source Code for Module helpers.UpdateDetectionTables

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  # $Id: UpdateDetectionTables.py 9572 2012-12-12 21:04:07Z NicholasCross $ 
   4  """ 
   5     A general script for updating all detection tables. 
   6   
   7     @author: R.S. Collins 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9   
  10     @todo: The calcNParts() code uses common methods and member variables that 
  11            should be encapsulated in their own class, that both the 
  12            DeblendParamFixer and AProf1Fixer classes should derive from 
  13            separately from the generic DetectionUpdater class? 
  14  """ 
  15  #------------------------------------------------------------------------------ 
  16  from __future__      import division, print_function 
  17  from future_builtins import zip 
  18   
  19  import math 
  20  import numpy 
  21  import os 
  22   
  23  from   wsatools.CLI                 import CLI 
  24  import wsatools.CSV                     as csv 
  25  import wsatools.DbConnect.DbConstants   as dbc 
  26  from   wsatools.DbConnect.CuSession import CuSession 
  27  from   wsatools.DbConnect.DbSession import DbSession, Join, SelectSQL, bulkCopy 
  28  import wsatools.DbConnect.Schema        as schema 
  29  import wsatools.FitsUtils                 as fits 
  30  from   wsatools.Logger              import Logger, ForLoopMonitor 
  31  from   wsatools.SystemConstants     import DepCodes 
  32  import wsatools.Utilities               as utils 
  33  #------------------------------------------------------------------------------ 
  34   
35 -class DetectionUpdater(CuSession):
36 """ Updates the detection tables. 37 """ 38 # Private class parameters - should not be altered 39 _autoCommit = True 40 _isPersistent = True 41 42 #-------------------------------------------------------------------------- 43 # Define class constants (access as DetectionUpdater.varName) 44 45 comment = "Updating detection tables." 46 """ Default ArchiveCurationHistory comment. 47 """ 48 #-------------------------------------------------------------------------- 49 # Define public member variable default values (access as obj.varName) 50 # these need to be set from command-line options 51 52 #: Tables to include. 53 tableSet = set(["Raw", "Astrometry", "Photometry"]) 54 #: First programme (for resuming interruptions). 55 firstProg = None 56 #: For re-doing earlier programmes following an interruption. 57 lastProg = None 58 59 #-------------------------------------------------------------------------- 60
61 - def _onRun(self):
62 """ Updates the database. 63 """ 64 progList = ([self.programmeID] if self.programmeID else 65 self.programme.getProgIDList(onlyNonSurvey=self.onlyNonSurveys, 66 excludeSV=True)) 67 self.programmeID = set() 68 69 # We want to do GPS and CAL last, as they take the longest. 70 progList = utils.arbSort(progList, 71 [dbc.calibrationProgrammeID(), self.sysc.scienceProgs.get("GPS")], 72 isFullKeySet=False) 73 74 progList.reverse() 75 76 # Select programme range for continuing from interruptions 77 if self.firstProg: 78 firstProgID = self.programme.setProgID(self.firstProg) 79 progList = progList[progList.index(firstProgID):] 80 81 if self.lastProg: 82 lastProgID = self.programme.setProgID(self.lastProg) 83 progList = progList[:progList.index(lastProgID) + 1] 84 85 tableSchema = self._parseSchemas(progList) 86 87 # Copy data from each detection table 88 for progID in progList: 89 self.programme.setCurRow(programmeID=progID) 90 nParts = self.calcNParts() 91 for part in range(nParts): 92 self.setUpParameters(part, nParts) 93 94 for table in [self.programme.getDetectionTable(), 95 self.programme.getAstrometryTable(), 96 self.programme.getPhotometryTable()]: 97 98 if table.endswith(tuple(self.tableSet)): 99 self.updateTable(tableSchema[table]) 100 101 self.programmeID.add(self.programme.getAttr("programmeID")) 102 103 Logger.addMessage("Finished.")
104 105 #-------------------------------------------------------------------------- 106
107 - def _onException(self):
108 """ Only successfully completed tables are included in the 109 ProgrammeCurationHistory, so success should always be True. 110 @note: This will lead to a misleading log note if there are errors. 111 """ 112 super(DetectionUpdater, self)._onException() 113 if self.programmeID: 114 self._success = True
115 116 #-------------------------------------------------------------------------- 117
118 - def _parseSchemas(self, progList):
119 """ Returns, for the given programme list, the table schemas. 120 """ 121 schemaList = [] 122 parsedScripts = set([dbc.charDefault()]) 123 for progID in progList: 124 progScript = self.programme.getSchemaScript(programmeID=progID) 125 if progScript not in parsedScripts: 126 schemaList += schema.parseTables(progScript) 127 parsedScripts.add(progScript) 128 129 return dict((table.name, table) for table in schemaList)
130 131 #-------------------------------------------------------------------------- 132
133 - def calcNParts(self):
134 """ @return: Number of parts to split the table update into. 135 @rtype: int 136 """ 137 # Override this method in a sub-class if required. 138 return 1
139 140 #-------------------------------------------------------------------------- 141
142 - def setUpParameters(self, part, nParts):
143 """ Prepares data for table update. 144 """ 145 # Override this method in a sub-class if required. 146 pass
147 148 #-------------------------------------------------------------------------- 149
150 - def updateTable(self, table):
151 """ 152 Perform specific update for this class to the given detection table. 153 154 @param table: Schema for detection table to update. 155 @type table: schema.Table 156 157 """ 158 # Override this method in a sub-class to define a specific task 159 raise DetectionUpdater.CuError("No task specified")
160 161 #------------------------------------------------------------------------------ 162
163 -class DetectionCopier(DetectionUpdater):
164 """ Copies detection table data from a source database. 165 """ 166 #-------------------------------------------------------------------------- 167 # Define public member variable default values (access as obj.varName) 168 # these need to be set from command-line options 169 170 copyAll = True 171 """ If False, then only copies detections where multiframes exist in the 172 destination database. Otherwise all detections are copied regardless. 173 """ 174 srcDb = None 175 """ A DbSession to the source database. 176 """ 177 #-------------------------------------------------------------------------- 178
179 - def updateTable(self, table):
180 """ 181 Copies detections from given table in source database. 182 183 @param table: Schema for detection table to update. 184 @type table: schema.Table 185 186 """ 187 Logger.addMessage("Outgesting %s..." % table) 188 bulkCopy(SelectSQL('*', self.srcDb.tablePath(table.name), 189 where='' if self.copyAll else "multiframeID IN (%s)" % 190 SelectSQL("multiframeID", self.archive.tablePath("Multiframe")), 191 orderBy=table.primaryKey()), table, fromDb=self.srcDb, 192 toDb=self.archive, fileTag=self.shareFileID)
193 194 #------------------------------------------------------------------------------ 195
196 -class DefaultRowInserter(DetectionUpdater):
197 """ Inserts default rows into every detection table selected. 198 """
199 - def updateTable(self, table):
200 """ 201 Insert default row into the given detecion table. 202 203 @param table: Schema for detection table to update. 204 @type table: schema.Table 205 206 """ 207 Logger.addMessage("Inserting default row into table %s..." % table) 208 self.archive.insertData(table.name, 209 [column.getDefaultValue() for column in table.columns])
210 211 #------------------------------------------------------------------------------ 212
213 -class DeprecationPropagator(DetectionUpdater):
214 """ Propagates the deprecation flag from MultiframeDetector into every 215 detection table selected. 216 """
217 - def updateTable(self, table):
218 """ 219 Propagate deprecation flag from MultiframeDetector into given detection 220 table. 221 222 @param table: Schema for detection table to update. 223 @type table: schema.Table 224 225 """ 226 if table.name.endswith("Raw"): 227 Logger.addMessage( 228 "Updating deprecated flag for table %s..." % table) 229 230 tableJoin = Join([table.name, "MultiframeDetector"], 231 ["multiframeID", "extNum"]) 232 233 numRows = self.archive.update(table.name, 234 entryList="%s.deprecated=%s" % (table, DepCodes.propDepDet), 235 where="MultiframeDetector.deprecated BETWEEN %s AND %s AND " 236 "%s.deprecated!=%s" % (DepCodes.nonDep + 1, 237 DepCodes.reprocCASU - 1, table, DepCodes.propDepDet), 238 fromTables=tableJoin) 239 240 numRows += self.archive.update(table.name, 241 entryList="%s.deprecated=MultiframeDetector.deprecated" % table, 242 where="MultiframeDetector.deprecated>=%s AND %s.deprecated" 243 "!=MultiframeDetector.deprecated" 244 % (DepCodes.reprocCASU, table), 245 fromTables=tableJoin) 246 247 Logger.addMessage("%s rows affected." % numRows)
248 249 #------------------------------------------------------------------------------ 250
251 -class ResetQualityFlags(DetectionUpdater):
252 """ Resets the ppErrBits flag to zero for all programmes. 253 """
254 - def updateTable(self, table):
255 """ 256 Resets the ppErrBits flag to zero in the given table. 257 258 @param table: Schema for detection table to update. 259 @type table: schema.Table 260 261 """ 262 if table.name.endswith("Raw"): 263 Logger.addMessage("Reset %s ppErrBits values in table %s" % 264 (self.archive.update(table.name, "ppErrBits=0", "ppErrBits!=0"), 265 table))
266 267 #------------------------------------------------------------------------------ 268
269 -class RowCounter(DetectionUpdater):
270 """ Checks that the *DetectionRaw and *DetectionPhotometry tables all have 271 the same row counts. 272 """
273 - def updateTable(self, table):
274 """ 275 Compares the given DetectionRaw table row count to that in 276 DetectionPhotometry reports any discrepancies. 277 278 @param table: Schema for detection table to update. 279 @type table: schema.Table 280 281 """ 282 if table.name.endswith("Raw"): 283 numRaw = self.archive.queryNumRows(table.name) 284 numPhotometry = self.archive.queryNumRows( 285 table.name.replace("Raw", "Photometry")) 286 if numRaw == numPhotometry: 287 Logger.addMessage("Programme %s is OK" % 288 table.name.replace("DetectionRaw", "").upper()) 289 elif numRaw > numPhotometry: 290 Logger.addMessage(table.name + " has MORE rows than the " 291 "photometry table: %s vs %s" % (numRaw, numPhotometry)) 292 elif numRaw < numPhotometry: 293 Logger.addMessage(table.name + " has LESS rows than the " 294 "photometry table: %s vs %s" % (numRaw, numPhotometry))
295 296 #------------------------------------------------------------------------------ 297
298 -class ObjIDUpdater(DetectionUpdater):
299 """ Makes sure that positive values of objID are unique. 300 """ 301 inputProgs = None #: ? 302 numberObjID = 0 #: ? 303 objIDoffset = 0 #: ? 304 _whereStr = None #: ? 305 306 #-------------------------------------------------------------------------- 307
308 - def calcNParts(self):
309 """ @return: Number of parts to split the table update into. 310 @rtype: int 311 """ 312 self.tableSet = set(["Raw"]) 313 Logger.addMessage("Processing programme " 314 + self.programme.getAcronym()) 315 316 Logger.addMessage("Finding input programmes...") 317 progIDStr = "P2.programmeID" 318 self.inputProgs = self.archive.query(progIDStr, 319 fromStr=Join([("ProgrammeFrame", "P1"), ("ProgrammeFrame", "P2")], 320 "multiframeID"), 321 whereStr="P1.programmeID!=P2.programmeID AND P1.programmeID=%s" 322 % self.programme.getAttr("programmeID"), 323 orderBy=progIDStr, groupBy=progIDStr) 324 325 self.inputProgs.append(self.programme.getAttr("programmeID")) 326 327 return len(self.inputProgs)
328 329 #-------------------------------------------------------------------------- 330
331 - def setUpParameters(self, part, nParts):
332 """ Prepares data for table update. 333 """ 334 Logger.addMessage("Finding maximum objID for component programme ID=" 335 "%s (%s of %s)" % (self.inputProgs[part], part + 1, nParts)) 336 337 # Select mfs not wanted first.... 338 pIDList = [str(pID) for ii, pID in enumerate(self.inputProgs) 339 if pID != self.programme.getAttr("programmeID") and ii < part] 340 341 if pIDList: 342 badMfIDs = SelectSQL( 343 select="Multiframe.multiframeID", 344 table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 345 where="programmeID IN (%s)" % ','.join(pIDList) 346 + " AND frameType LIKE '%stack'") 347 348 self._whereStr = ("programmeID=%d AND objID>0" 349 " AND P.multiframeID NOT IN (%s)" 350 % (self.inputProgs[part], badMfIDs)) 351 else: 352 self._whereStr = ("programmeID=%d AND objID>0" % 353 self.inputProgs[part]) 354 355 self.objIDoffset += self.numberObjID 356 self.numberObjID = self.archive.queryAttrMax("objID", 357 table=Join([("ProgrammeFrame", "P"), 358 (self.programme.getDetectionTable(), "D")], 359 "multiframeID"), 360 where=self._whereStr) or 0
361 362 #-------------------------------------------------------------------------- 363
364 - def updateTable(self, table):
365 """ 366 Updates database with the values of aProf1 from the files. 367 368 @param table: Schema for detection table to update. 369 @type table: schema.Table 370 371 """ 372 Logger.addMessage("Updating %s with correct values of objID..." % table) 373 Logger.addMessage("...%s rows affected" % 374 self.archive.update(table.name, "objID=objID+%s" % self.objIDoffset, 375 fromTables=Join([(table.name, "D"), ("ProgrammeFrame", "P")], 376 "multiframeID"), 377 where=self._whereStr))
378 379 #------------------------------------------------------------------------------ 380
381 -class AProf1Fixer(DetectionUpdater):
382 """ Updates the AProf1 column in the detection table. 383 """
384 - def calcNParts(self):
385 """ @return: Number of parts to split the table update into. 386 @rtype: int 387 """ 388 self.tableSet = set(["Raw"]) 389 Logger.addMessage("Processing programme " 390 + self.programme.getAcronym()) 391 392 Logger.addMessage("Altering tables for correct column type...") 393 394 self.archive.alterColumn(self.programme.getDetectionTable(), 395 "aProf1", "real") 396 self.archive.alterColumn(self.programme.getAttr('detectionTable'), 397 "aProf1", "real") 398 399 Logger.addMessage("Calculating number of parts...") 400 401 self.firstMFID, self.lastMFID = self.archive.query( 402 selectStr="min(multiframeID),max(multiframeID)", 403 fromStr=self.programme.getDetectionTable(), 404 whereStr="multiframeID>0", firstOnly=True) 405 406 self._selectStr = "r.multiframeID, r.extNum, r.seqNum, aProf1" 407 self._fromStr = (self.programme.getDetectionTable() + " AS r, " 408 "MultiframeDetector AS m, Multiframe AS f") 409 self._whereStr = ("m.multiframeID=r.multiframeID AND " 410 "m.extNum=r.extNum AND f.multiframeID=m.multiframeID" 411 " AND aProf2=-1") 412 413 availMemory = self.sysc.one_kilobyte * utils.getSystemMemoryKb() 414 memFracUse = 0.5 # Fraction on memory to use 415 factor = 5 # How many times initial array is needed 416 size = 8 * len(self._selectStr.split(',')) 417 size *= self.archive.queryNumRows(self._fromStr, self._whereStr) 418 nParts = int(math.ceil(factor * size / (memFracUse * availMemory))) 419 Logger.addMessage("...breaking into %s parts" % nParts) 420 421 return nParts
422 423 #-------------------------------------------------------------------------- 424
425 - def setUpParameters(self, part, nParts):
426 """ Prepares data for table update. 427 """ 428 Logger.addMessage("Reading correct aProf1 values in from FITS files") 429 430 partSize = (self.lastMFID - self.firstMFID) / nParts 431 minMFID = int(self.firstMFID + part * partSize) 432 maxMFID = int(self.firstMFID + (part + 1) * partSize) + 1 433 434 self._whereStr += (" AND r.multiframeID>=%s AND r.multiframeID<%s" 435 % (minMFID, maxMFID)) 436 437 self.posArray = self.archive.query(self._selectStr, self._fromStr, 438 whereStr=self._whereStr + " ORDER BY " 439 + self._selectStr[:self._selectStr.rindex(',')]) 440 441 # Read correct (REAL) aProf1 from FITS file and put in an array of the 442 # correct shape and with correct MFID structure. 443 self.aProf1Arr = numpy.zeros(len(self.posArray), numpy.float32) 444 dbData = numpy.array(self.posArray) 445 fileNameInfo = self.archive.query("DISTINCT f.multiframeID,f.catName", 446 self._fromStr, self._whereStr) 447 448 progress = ForLoopMonitor(fileNameInfo) 449 for mfID, fileName in fileNameInfo: 450 fileName = fileName.split(':')[-1] 451 if self.sysc.emptyFitsCatalogueFileName not in fileName \ 452 and os.path.exists(fileName): 453 454 for extNum, hdu in enumerate(fits.open(fileName)): 455 if extNum > 0 and hdu.header["NAXIS2"] > 0: 456 indices = numpy.nonzero(numpy.logical_and( 457 numpy.equal(dbData[:, 0], mfID), 458 numpy.equal(dbData[:, 1], extNum + 1))) 459 460 seqNos = numpy.nonzero( 461 numpy.equal(hdu.data.field("Areal_2_profile"), -1)) 462 463 self.aProf1Arr[indices] = \ 464 hdu.data.field("Areal_1_profile")[seqNos] 465 466 progress.testForOutput()
467 468 #-------------------------------------------------------------------------- 469
470 - def updateTable(self, table):
471 """ 472 Updates database with the values of aProf1 from the files. 473 474 @param table: Schema for detection table to update. 475 @type table: schema.Table 476 477 """ 478 Logger.addMessage("Updating %s with correct values of aProf1" % table) 479 480 # For Loop Monitor 481 progress = ForLoopMonitor(self.posArray) 482 for (pos, aProf1) in zip(self.posArray, self.aProf1Arr): 483 self.archive.update(table.name, "aProf1=%s" % aProf1, 484 where="multiframeID=%s AND extNum=%s AND seqNum=%s" % pos[:-1]) 485 progress.testForOutput()
486 487 #------------------------------------------------------------------------------ 488
489 -class DeblendParamFixer(DetectionUpdater):
490 """ Fixes the parameters of deblended objects in the detection tables. 491 """ 492 #: ? 493 apertures = [0.5, 1 / math.sqrt(2), 1, math.sqrt(2), 2, 2 * math.sqrt(2), 494 4, 5, 6, 7, 8, 10, 12] 495 #: ? 496 firstAperture = 7 497 #: ? 498 firstAperIndex = apertures.index(firstAperture) 499 #: Only process the last multiframeID for the programme. 500 doLastMfOnly = False 501 502 #-------------------------------------------------------------------------- 503
504 - def calcNParts(self):
505 """ @return: Number of parts to split the table update into. 506 @rtype: int 507 """ 508 self.tableSet = set(["Raw", "Photometry"]) 509 Logger.addMessage("Calculating number of parts for %s..." 510 % self.programme.getAcronym()) 511 512 self.firstMFID, self.lastMFID = self.archive.query( 513 selectStr="min(d.multiframeID),max(d.multiframeID)", 514 fromStr="%s as d,Multiframe as m" % 515 self.programme.getDetectionTable(), 516 whereStr="d.multiframeID=m.multiframeID AND d.multiframeID>0 " 517 "AND utDate< '%s'" % 518 self.sysc.obsCal.dateRange('08B').begin, firstOnly=True) 519 520 rawColumnNames = \ 521 self.archive.queryColumnNames(self.programme.getDetectionTable()) 522 523 aperNames = [colName for colName in rawColumnNames 524 if 'aperFlux' in colName and 'Err' not in colName 525 and 'err' not in colName 526 and int(colName.split('aperFlux')[1]) <= len(self.apertures)] 527 528 self._selectStr = ("r.multiframeID,r.extNum,r.seqNum,nustep,aProf1," 529 "aProf2,aProf8,isoFlux,pHeight,%s,isoMag,thresholdIsoph," 530 "coreRadius,gain,skyNoise,skyVar" % ','.join(aperNames)) 531 532 self._fromStr = ("%s AS r, %s AS p, MultiframeDetector AS m," 533 "Multiframe AS f" % (self.programme.getDetectionTable(), 534 self.programme.getPhotometryTable())) 535 536 self._whereStr = ("m.multiframeID=r.multiframeID AND m.extNum=r.extNum" 537 " AND r.multiframeID=p.multiframeID AND r.extNum=p.extNum AND " 538 "r.seqNum=p.seqNum AND f.multiframeID=m.multiframeID AND aProf2=-1 " 539 "AND utDate < '%s'" % self.sysc.obsCal.dateRange('08B').begin) 540 541 if self.doLastMfOnly: 542 self.dictParts = {0: (self.lastMFID, self.lastMFID + 1)} 543 return 1 544 545 availMemory = self.sysc.one_kilobyte * utils.getSystemMemoryKb() 546 memFracUse = 0.5 # Fraction on memory to use 547 factor = 5 # How many times initial array is needed 548 size = 8 * len(self._selectStr.split(',')) 549 totalRows = self.archive.queryNumRows(self._fromStr, self._whereStr) 550 551 if self.archive.isTrialRun: 552 sample = 20 553 size *= sample 554 self._selectStr = "TOP %s " % sample + self._selectStr 555 else: 556 size *= totalRows 557 558 nParts = int(math.ceil(factor * size / (memFracUse * availMemory))) 559 Logger.addMessage("...breaking into %s parts" % nParts) 560 561 # Different multiframes have different numbers of rows 562 if totalRows > 0: 563 partSize = totalRows / nParts 564 mfSizes = self.archive.query( 565 selectStr="m.multiframeID,COUNT(*)", 566 fromStr=self._fromStr, 567 whereStr=self._whereStr + " group by m.multiframeID " 568 "order by m.multiframeID") 569 570 beginMfID, endMfID = self.firstMFID, self.firstMFID 571 part = 0 572 total = 0 573 self.dictParts = dict() 574 for ii, (mfID, totalRows) in enumerate(mfSizes): 575 total += totalRows 576 if total > partSize: 577 endMfID = mfID 578 self.dictParts[part] = (beginMfID, endMfID) 579 beginMfID = mfSizes[ii + 1][0] 580 total = 0 581 part += 1 582 endMfID = self.lastMFID + 1 583 self.dictParts[part] = (beginMfID, endMfID) 584 585 return nParts
586 587 #-------------------------------------------------------------------------- 588
589 - def setUpParameters(self, part, _nParts):
590 """ Prepares data for table update. 591 """ 592 Logger.addMessage("Retrieving database parameters for part %s of %s..." 593 % (part + 1, _nParts)) 594 595 # Different multiframes have different numbers of rows 596 deblendData = numpy.array( 597 self.archive.query(self._selectStr, self._fromStr, self._whereStr 598 + " AND r.multiframeID>=%s AND r.multiframeID<%s" 599 % self.dictParts[part] 600 + " ORDER BY r.multiframeID, r.extNum, r.seqNum OPTION (MAXDOP 1)")) 601 602 Logger.addMessage("Calculating weighted fluxes and errors...") 603 604 # Set any parameters to defaults which are incorrect or default 605 # set gain defaults 606 deblendData[:, self.firstAperIndex + len(self.apertures) + 3] = \ 607 numpy.where(numpy.greater( 608 deblendData[:, self.firstAperIndex + len(self.apertures) + 3], 0.), 609 deblendData[:, self.firstAperIndex + len(self.apertures) + 3], 5.0) 610 611 # set threshold defaults 612 deblendData[:, self.firstAperIndex + len(self.apertures) + 1] = \ 613 numpy.where(numpy.greater( 614 deblendData[:, self.firstAperIndex + len(self.apertures) + 1], 0.), 615 deblendData[:, self.firstAperIndex + len(self.apertures) + 1], 1.0) 616 617 # set skyNoise defaults 618 deblendData[:, self.firstAperIndex + len(self.apertures) + 4] = \ 619 numpy.where(numpy.greater( 620 deblendData[:, self.firstAperIndex + len(self.apertures) + 4], 0.), 621 deblendData[:, self.firstAperIndex + len(self.apertures) + 4], 1.0) 622 623 # set rCore defaults 624 deblendData[:, self.firstAperIndex + len(self.apertures) + 2] = \ 625 numpy.where(numpy.greater( 626 deblendData[:, self.firstAperIndex + len(self.apertures) + 2], 0.), 627 deblendData[:, self.firstAperIndex + len(self.apertures) + 2], 2.5) 628 629 self.apertureArray = numpy.transpose(numpy.multiply( 630 deblendData[:, self.firstAperIndex + len(self.apertures) + 2], 631 numpy.transpose(numpy.multiply( 632 numpy.ones([deblendData.shape[0], len(self.apertures)], 633 numpy.float32), 634 numpy.array(self.apertures, numpy.float32))))) 635 636 self.midPointRadii = numpy.multiply(numpy.add( 637 self.apertureArray[:, 1:len(self.apertures)], 638 self.apertureArray[:, :len(self.apertures) - 1]), 0.5) 639 640 self.posArray = deblendData[:, :3] 641 # mfID,eNum,sNum,aprof1,aProf2,aProf8,isoFlux,pHeight,apertures,isoMag 642 # Calculate initial KPH parameters 643 # Areal_1_profile different between FITS and DB 644 # - real and int (CASU F***UP) 645 self.radt = numpy.sqrt(numpy.divide(deblendData[:, 4], math.pi)) 646 self.kronRadArr = self.calcKronRad(deblendData) 647 self.petrRadArr = self.calcPetrRad(deblendData) 648 self.hallRadArr = self.calcHallRad(deblendData) 649 Logger.addMessage("Calculated radii", alwaysLog=False) 650 kronFluxArr = self.calcFlux(deblendData, self.kronRadArr) 651 petrFluxArr = self.calcFlux(deblendData, self.petrRadArr) 652 hallFluxArr = self.calcFlux(deblendData, self.hallRadArr) 653 Logger.addMessage("Calculated fluxes", alwaysLog=False) 654 655 # Group 656 groups = findGroups(deblendData) 657 Logger.addMessage("Found groups", alwaysLog=False) 658 659 # Weight parameters according to isophotal flux 660 self.kronFluxArrFin = self.weightFlux(deblendData, kronFluxArr, groups) 661 self.petrFluxArrFin = self.weightFlux(deblendData, petrFluxArr, groups) 662 self.hallFluxArrFin = \ 663 self.weightFlux(deblendData, hallFluxArr, groups, isHall=True) 664 665 # Calc errors 666 self.kronFluxErrArr = \ 667 self.calcErr(deblendData, self.kronFluxArrFin, self.kronRadArr) 668 self.petrFluxErrArr = \ 669 self.calcErr(deblendData, self.petrFluxArrFin, self.petrRadArr) 670 self.hallFluxErrArr = \ 671 self.calcErr(deblendData, self.hallFluxArrFin, self.hallRadArr) 672 673 Logger.addMessage("Calculating corrected magnitudes and errors...") 674 675 # Calculate mags 676 ZPCorr = self.calcZeropoint(deblendData) 677 678 # WHERE FOR WHEN FLUX<0. 679 self.kronMagArr = calcMag(self.kronFluxArrFin, ZPCorr) 680 self.petrMagArr = calcMag(self.petrFluxArrFin, ZPCorr) 681 self.hallMagArr = calcMag(self.hallFluxArrFin, ZPCorr) 682 683 # Calc mag errors 684 # If -ve flux - default error 685 self.kronMagErrArr = \ 686 calcMagErr(self.kronFluxArrFin, self.kronFluxErrArr) 687 688 self.petrMagErrArr = \ 689 calcMagErr(self.petrFluxArrFin, self.petrFluxErrArr) 690 691 self.hallMagErrArr = \ 692 calcMagErr(self.hallFluxArrFin, self.hallFluxErrArr)
693 694 #-------------------------------------------------------------------------- 695
696 - def updateTable(self, table):
697 """ 698 Updates detection table deblended parameters with corrected values. 699 700 @param table: Schema for detection table to update. 701 @type table: schema.Table 702 703 """ 704 Logger.addMessage("Updating table %s..." % table) 705 progress = ForLoopMonitor(range(self.posArray.shape[0])) 706 for dI in range(self.posArray.shape[0]): 707 if table.name.endswith("Raw"): 708 updates = dict(kronRad=0.5 * self.kronRadArr[dI], 709 petroRad=0.5 * self.petrRadArr[dI], 710 hallRad=self.hallRadArr[dI], 711 kronFlux=self.kronFluxArrFin[dI], 712 petroFlux=self.petrFluxArrFin[dI], 713 hallFlux=self.hallFluxArrFin[dI], 714 kronFluxErr=self.kronFluxErrArr[dI], 715 petroFluxErr=self.petrFluxErrArr[dI], 716 hallFluxErr=self.hallFluxErrArr[dI]) 717 718 elif table.name.endswith("Photometry"): 719 updates = dict(kronMag=self.kronMagArr[dI], 720 petroMag=self.petrMagArr[dI], 721 hallMag=self.hallMagArr[dI], 722 kronMagErr=self.kronMagErrArr[dI], 723 petroMagErr=self.petrMagErrArr[dI], 724 hallMagErr=self.hallMagErrArr[dI]) 725 726 self.archive.update(table.name, updates.items(), 727 where="multiframeID=%s AND extNum=%s AND seqNum=%s" 728 % tuple(self.posArray[dI])) 729 730 progress.testForOutput()
731 732 #-------------------------------------------------------------------------- 733
734 - def calcKronRad(self, deblendData):
735 """ Calculate Kron radius to Mike's prescription. 736 """ 737 maxAperture = self.firstAperture 738 fAI = self.firstAperIndex 739 fluxInAnnuli = numpy.maximum(numpy.subtract( 740 deblendData[:, fAI + 1:fAI + maxAperture], 741 deblendData[:, fAI:fAI + maxAperture - 1]), 0.) 742 743 modulus = numpy.sum(numpy.multiply( 744 self.midPointRadii[:, 0:maxAperture - 1], fluxInAnnuli), 1) 745 746 modulus = numpy.add(numpy.multiply(numpy.multiply( 747 self.apertureArray[:, 0], deblendData[:, fAI]), 0.5), modulus) 748 749 norm = numpy.add(deblendData[:, fAI], numpy.sum(fluxInAnnuli, 1)) 750 rad1 = numpy.where(numpy.not_equal(norm, 0.), 751 numpy.divide(modulus, norm), 0.) 752 753 return numpy.maximum(self.radt, numpy.minimum(numpy.minimum( 754 numpy.multiply(5.0, self.radt), numpy.multiply(2., rad1)), 755 self.apertureArray[:, len(self.apertures) - 1]))
756 757 #-------------------------------------------------------------------------- 758
759 - def calcPetrRad(self, deblendData):
760 """ Calculate Petrosian radius to Mike's prescription. 761 """ 762 etaPetro = 0.2 763 fAI = self.firstAperIndex 764 fluxInAnnuli = numpy.subtract( 765 deblendData[:, fAI + 1:fAI + len(self.apertures)], 766 deblendData[:, fAI:fAI + len(self.apertures) - 1]) 767 768 annArea = numpy.subtract( 769 numpy.power(self.apertureArray[:, 1:len(self.apertures)], 2), 770 numpy.power(self.apertureArray[:, :len(self.apertures) - 1], 2)) 771 772 sbAnn = numpy.where(numpy.greater(annArea, 0.), 773 numpy.divide(fluxInAnnuli, annArea), 0.) 774 775 sbCirc = numpy.where( 776 numpy.greater(numpy.power( 777 self.apertureArray[:, :len(self.apertures) - 1], 2), 0.), 778 numpy.divide(deblendData[:, fAI:fAI + len(self.apertures) - 1], 779 numpy.power(self.apertureArray[:, :len(self.apertures) - 1], 2)), 780 0.) 781 782 etaInit = numpy.where(numpy.greater(sbCirc, 0.), 783 numpy.divide(sbAnn, sbCirc), 0.) 784 785 # Find first index when eta<0.2 786 etaIndex = findFirstIndex(etaInit, etaPetro, 'le') 787 788 # Be very careful with index. Mike makes j=j+1 after calc eta and 789 # etaold and etaArray is 12 across, whereas aperArray is 13 across 790 eta = etaInit[etaIndex] 791 etaOld = numpy.where(numpy.greater(etaIndex[1], 0), 792 etaInit[addIndex(etaIndex, -1)], 1.0) 793 794 apIndex = addIndex(etaIndex, 2) 795 apIndexU = (apIndex[0], 796 numpy.minimum(apIndex[1], len(self.apertures) - 1)) 797 798 midPointUpper = numpy.sqrt(numpy.multiply(0.5, numpy.add( 799 numpy.power(self.apertureArray[apIndexU], 2.), 800 numpy.power(self.apertureArray[addIndex(apIndexU, -1)], 2.)))) 801 802 midPointLower = numpy.sqrt(numpy.multiply(0.5, numpy.add( 803 numpy.power(self.apertureArray[addIndex(apIndexU, -1)], 2.), 804 numpy.power(self.apertureArray[addIndex(apIndexU, -2)], 2.)))) 805 806 etaRatUpper = numpy.where(numpy.not_equal(etaOld, eta), numpy.divide( 807 numpy.subtract(etaOld, etaPetro), numpy.subtract(etaOld, eta)), 0.) 808 809 etaRatLower = numpy.where(numpy.not_equal(etaOld, eta), numpy.divide( 810 numpy.subtract(etaPetro, eta), numpy.subtract(etaOld, eta)), 0.) 811 812 radeta = numpy.add(numpy.multiply(etaRatUpper, midPointUpper), 813 numpy.multiply(etaRatLower, midPointLower)) 814 815 radeta = numpy.where( 816 numpy.greater_equal(apIndex[1], (len(self.apertures) - 1)), 817 self.apertureArray[apIndexU], radeta) 818 819 return numpy.maximum(self.radt, numpy.minimum(numpy.minimum( 820 numpy.multiply(5.0, self.radt), numpy.multiply(2., radeta)), 821 self.apertureArray[:, len(self.apertures) - 1]))
822 823 #-------------------------------------------------------------------------- 824
825 - def calcHallRad(self, deblendData):
826 """ Calculate Hall radius to Mike's prescription. 827 """ 828 fAI = self.firstAperIndex 829 peak = numpy.maximum( 830 numpy.multiply(1.5, deblendData[:, fAI + len(self.apertures) + 1]), 831 deblendData[:, fAI - 1]) 832 833 pkThrRat = numpy.log( 834 numpy.divide(peak, deblendData[:, fAI + len(self.apertures) + 1])) 835 836 radH = numpy.divide(numpy.multiply(5.0, self.radt), pkThrRat) 837 838 return numpy.maximum(self.radt, numpy.minimum(numpy.minimum( 839 numpy.multiply(5.0, self.radt), radH), 840 self.apertureArray[:, len(self.apertures) - 1]))
841 842 #-------------------------------------------------------------------------- 843
844 - def calcFlux(self, deblendData, radiusArr):
845 """ Calculates best flux given radius. 846 """ 847 bestIndex = \ 848 findFirstIndex(self.apertureArray, radiusArr, 'ge', isArray=True) 849 850 bestIndex = (bestIndex[0], numpy.maximum(bestIndex[1], 1)) 851 deltaRad = numpy.divide( 852 numpy.subtract(self.apertureArray[bestIndex], radiusArr), 853 numpy.subtract(self.apertureArray[bestIndex], 854 self.apertureArray[addIndex(bestIndex, -1)])) 855 856 return numpy.add( 857 numpy.multiply( 858 deblendData[addIndex(bestIndex, self.firstAperIndex)], 859 numpy.subtract(1., deltaRad)), 860 numpy.multiply( 861 deblendData[addIndex(bestIndex, self.firstAperIndex - 1)], 862 deltaRad))
863 864 #-------------------------------------------------------------------------- 865
866 - def weightFlux(self, deblendData, fluxArr, groups, isHall=False):
867 """ Sums up fluxes and apportions by comparing to ratio of 868 isophotal to sumIso. 869 """ 870 isoInd = self.firstAperIndex - 2 871 outFluxArr = numpy.zeros([fluxArr.size], numpy.float32) 872 for fInd, _mfID, _eNum, _fSNum, nMembers, sumIso, _isBad in groups: 873 sumIso = max(sumIso, 1) 874 sumFlux = sum(fluxArr[fInd + blend] for blend in range(nMembers)) 875 weighted = \ 876 (deblendData[fInd:fInd + nMembers, isoInd] * sumFlux) / sumIso 877 878 if not isHall: 879 outFluxArr[fInd:fInd + nMembers] = numpy.where( 880 numpy.greater(weighted, 0.), weighted, 881 deblendData[fInd:fInd + nMembers, isoInd]) 882 else: 883 outFluxArr[fInd:fInd + nMembers] = numpy.maximum( 884 weighted, deblendData[fInd:fInd + nMembers, isoInd]) 885 886 return outFluxArr
887 888 #-------------------------------------------------------------------------- 889
890 - def calcErr(self, deblendData, fluxArr, radArr):
891 """ Calculates the error in the flux. 892 """ 893 gainIndex = self.firstAperIndex + len(self.apertures) + 3 894 sNIndex = self.firstAperIndex + len(self.apertures) + 4 895 sVIndex = self.firstAperIndex + len(self.apertures) + 5 896 897 return numpy.sqrt(numpy.add( 898 numpy.divide(numpy.maximum(fluxArr, 0.), deblendData[:, gainIndex]), 899 numpy.multiply(numpy.multiply(math.pi, 900 numpy.multiply(radArr, radArr)), 901 numpy.add(numpy.power(deblendData[:, sNIndex], 2.), 902 numpy.power(deblendData[:, sVIndex], 2.)))))
903 904 #-------------------------------------------------------------------------- 905
906 - def calcZeropoint(self, deblendData):
907 """ Uses isophotal mag and isophotal flux to calculate ZP. 908 """ 909 isoFInd = self.firstAperIndex - 2 910 isoMInd = self.firstAperIndex + len(self.apertures) 911 912 return numpy.add(deblendData[:, isoMInd], 913 numpy.multiply(2.5, numpy.log10((deblendData[:, isoFInd]))))
914 915 #------------------------------------------------------------------------------ 916
917 -class MjdStackUpdater(DetectionUpdater):
918 """ Updates values of mjd in OB stacks 919 """ 920 #: ? 921 #-------------------------------------------------------------------------- 922
923 - def calcNParts(self):
924 """ @return: Number of parts to split the table update into. 925 @rtype: int 926 """ 927 self.tableSet = set(["Raw"]) 928 return 1
929 #-------------------------------------------------------------------------- 930
931 - def updateTable(self, table):
932 """ 933 Updates database with the values of aProf1 from the files. 934 935 @param table: Schema for detection table to update. 936 @type table: schema.Table 937 938 """ 939 Logger.addMessage("Updating %s with correct values of mjd..." % table) 940 Logger.addMessage("...%s rows affected" % 941 self.archive.update(table.name, "mjd=M.mjdMean", 942 fromTables="%s D, MultiframeDetector M, Multiframe F" % table.name, 943 where="D.multiframeID=M.multiframeID AND " 944 "D.extNum=M.extNum AND M.multiframeID=F.multiframeID AND " 945 "f.frameType like '%stack' AND f.frameType not like '%tile%' " 946 "AND f.frameType not like '%deep%' AND M.mjdMean>0. AND D.mjd<0."))
947 #------------------------------------------------------------------------------ 948
949 -def calcMag(fluxArr, zpArr):
950 """ 951 """ 952 return numpy.where(numpy.greater(fluxArr, 0.), 953 numpy.subtract(zpArr, numpy.multiply(2.5, numpy.log10(fluxArr))), 954 dbc.realDefault())
955 956 #----------------------------------------------------------------------------- 957
958 -def calcMagErr(fluxArr, fluxErrArr):
959 """ 960 """ 961 return numpy.where(numpy.greater(fluxArr, 0.), 962 numpy.multiply(1.0857, numpy.divide(fluxErrArr, fluxArr)), 963 dbc.realDefault())
964 965 #------------------------------------------------------------------------------ 966
967 -def findGroups(deblendData):
968 """ 969 Groups of deblended parameters have consecutive seqNum and start with 970 aProf8=0, except in 2005-2006, when it is impossible to separate groups 971 that appear on consecutive seqNum. Also after 2006, there are a small 972 number of single deblends - some other issue that has aProfN=-1 where N>=2. 973 974 """ 975 deblendList = list(deblendData[:, :8]) # and isophotal 976 977 # First deblend in table 978 mfID, eNum, fSNum, _xPSze, _ap1, ap2, ap8, sumIso = deblendList[0] 979 fIndex = 0 980 nMembers = 1 981 982 # Just need groups, not bad groups. 983 isBadGroup = not (ap2 == -1 and ap8 == 0) 984 985 groups = [] 986 for ii, (mfID, eNum, sNum, _xPSze, _ap1, ap2, ap8, isoFlux) \ 987 in enumerate(deblendList[1:]): 988 989 isAprof8set = ap2 == -1 and ap8 == 0 990 991 # newGroup by spacing 992 isNewGroup = (sNum != deblendData[ii][2] + 1 993 or mfID != deblendData[ii][0] or eNum != deblendData[ii][1]) 994 995 if isNewGroup or isAprof8set: 996 # New Group -- seqNum changes by more than 1 or aProf8=0 997 if nMembers > 1: 998 groups.append((fIndex, deblendData[ii][0], deblendData[ii][1], 999 fSNum, nMembers, sumIso, isBadGroup)) 1000 1001 # Initialise new group 1002 fSNum = sNum 1003 fIndex = ii + 1 1004 nMembers = 1 1005 sumIso = isoFlux 1006 isBadGroup = not isAprof8set 1007 else: 1008 # Add in new members 1009 sumIso += isoFlux 1010 nMembers += 1 1011 1012 # Add in last group 1013 if nMembers > 1: 1014 groups.append((fIndex, deblendData[ii][0], deblendData[ii][1], fSNum, 1015 nMembers, sumIso, isBadGroup)) 1016 1017 return groups
1018 1019 #------------------------------------------------------------------------------ 1020
1021 -def findFirstIndex(mainArray, value, logic, isArray=False):
1022 """ Returns 2-D tuple of arrays. First entry in the pair is arange. 1023 """ 1024 # value can be scalar or 2-D (N,1) array 1025 if isArray: 1026 if value.shape[0] != mainArray.shape[0]: 1027 raise Exception("Arrays are inconsistent shapes: %d, %d" 1028 % (value.shape[0], mainArray.shape[0])) 1029 1030 value = numpy.reshape(value, [value.shape[0], 1]) 1031 1032 valArray = numpy.multiply(numpy.ones(mainArray.shape), value) 1033 1034 if logic == 'le': 1035 testArray = numpy.less_equal(mainArray, valArray).astype(numpy.int16) 1036 1037 elif logic == 'lt': 1038 testArray = numpy.less(mainArray, valArray).astype(numpy.int16) 1039 1040 elif logic == 'ge': 1041 testArray = \ 1042 numpy.greater_equal(mainArray, valArray).astype(numpy.int16) 1043 1044 elif logic == 'gt': 1045 testArray = numpy.greater(mainArray, valArray).astype(numpy.int16) 1046 1047 else: 1048 raise Exception("given incorrect logic value") 1049 1050 # If no value meets criteria - index = max. 1051 return (numpy.arange(mainArray.shape[0]), 1052 numpy.where(numpy.equal(numpy.max(testArray, 1), 1), 1053 numpy.argmax(testArray, 1), (mainArray.shape[1] - 1)))
1054 1055 #------------------------------------------------------------------------------ 1056
1057 -def addIndex(arrayIndex, scalarValue):
1058 """ 1059 """ 1060 iArray, jArray = arrayIndex 1061 jArray = numpy.add(jArray, scalarValue) 1062 1063 return (iArray, jArray)
1064 1065 #------------------------------------------------------------------------------ 1066 1067 if __name__ == "__main__": 1068 # Define additional command-line options for DetectionUpdater 1069 CLI.progOpts += [ 1070 CLI.Option('a', "add_defaults", 1071 "insert true default rows into detection tables"), 1072 CLI.Option('b', "fix_deblends", 1073 "fix deblended extended source parameters according to CASU " 1074 "prescription"), 1075 CLI.Option('B', "fix_deblends_only", 1076 "fix deblended extended source parameters according to CASU " 1077 "prescription, but not aProf1"), 1078 CLI.Option('e', "exclude", 1079 "list of table endings to exclude, e.g. 'Raw, Astrometry'", 1080 "LIST"), 1081 CLI.Option('f', "flag_deprecated", 1082 "propagate deprecation flags from MultiframeDetector into detection " 1083 "tables"), 1084 CLI.Option('i', "include", 1085 "list of table endings to include, e.g. 'Raw, Photometry'", 1086 "LIST", ', '.join(DetectionUpdater.tableSet)), 1087 CLI.Option('k', "first_programme", 1088 "first programme to include in the list of programmes, for resuming " 1089 "interrupted runs", 1090 "ID"), 1091 CLI.Option('l', "last_programme", 1092 "last programme to include in the list of programmes, for redoing " 1093 "earlier programmes following an interrupted run", 1094 "ID"), 1095 CLI.Option('m', "mjd_stack", 1096 "Update mjd values for OB stacks using mjdMean in MultiframeDetector"), 1097 CLI.Option('o', "uniqueObjID", 1098 "Update objIDs in programmes that have been merged together to make " 1099 "them unique"), 1100 CLI.Option('p', "progID", 1101 "include just this programme " 1102 "(specify 'NONSURVEYS' for just non-surveys)", 1103 "ID", isValOK=lambda x: ',' not in x), 1104 CLI.Option('q', "quality_flag_reset", 1105 "reset quality bit flags in DetectionRaw"), 1106 CLI.Option('r', "row_count", 1107 "compare row counts between the Raw and Photometry tables"), 1108 CLI.Option('s', "sync_from", 1109 "copy detection data from this source database", 1110 "NAME"), 1111 CLI.Option('z', "do_last_mf_only", 1112 "Only process the last multiframe in each programme") 1113 ] 1114 1115 # Change default comment: @@TODO: This should reflect the task's comment 1116 CLI.progArgs['comment'] = DetectionUpdater.comment 1117 1118 cli = CLI(DetectionUpdater.__name__, "$Revision: 9572 $", __doc__) 1119 Logger.addMessage(cli.getProgDetails()) 1120 Logger.isVerbose = False 1121 1122 tasks = list() 1123 if cli.getOpt("sync_from"): 1124 DetectionCopier.srcDb = DbSession(cli.getOpt("sync_from"), 1125 autoCommit=True, userName=cli.getOpt("user"), isPersistent=True) 1126 tasks.append(DetectionCopier) 1127 1128 if cli.getOpt("add_defaults"): 1129 tasks.append(DefaultRowInserter) 1130 1131 if cli.getOpt("flag_deprecated"): 1132 tasks.append(DeprecationPropagator) 1133 1134 if cli.getOpt("quality_flag_reset"): 1135 if not CLI.isConfirmed("The -q/--quality_flat_reset option will wipe " 1136 "existing ppErrBits values"): 1137 exit() 1138 1139 tasks.append(ResetQualityFlags) 1140 1141 if cli.getOpt("row_count"): 1142 tasks.append(RowCounter) 1143 1144 if cli.getOpt("fix_deblends"): 1145 tasks.append(AProf1Fixer) 1146 tasks.append(DeblendParamFixer) 1147 1148 if cli.getOpt("fix_deblends_only"): 1149 tasks.append(DeblendParamFixer) 1150 1151 if cli.getOpt("uniqueObjID"): 1152 tasks.append(ObjIDUpdater) 1153 1154 if cli.getOpt("mjd_stack"): 1155 tasks.append(MjdStackUpdater) 1156 1157 if not tasks: 1158 tasks.append(DetectionUpdater) 1159 1160 1161 for Task in tasks: 1162 task = Task( 1163 programmeID=cli.getOpt("progID") if cli.getOpt("progID") else '', 1164 cli=cli) 1165 1166 task.tableSet = set(csv.values(cli.getOpt("include"))) 1167 if cli.getOpt("exclude"): 1168 task.tableSet -= set(csv.values(cli.getOpt("exclude"))) 1169 1170 task.firstProg = cli.getOpt("first_programme") 1171 task.lastProg = cli.getOpt("last_programme") 1172 1173 if isinstance(task, DeblendParamFixer): 1174 task.doLastMfOnly = cli.getOpt("do_last_mf_only") 1175 1176 task.run() 1177 del task 1178 1179 #------------------------------------------------------------------------------ 1180