Package invocations :: Package cu13 :: Module cu13
[hide private]

Source Code for Module invocations.cu13.cu13

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: cu13.py 10077 2013-09-20 14:51:03Z RossCollins $ 
   4  """ 
   5     Invoke CU13 to create required products. 
   6   
   7     @author: N.J.G. Cross 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9   
  10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  11     @contributors: R.S. Collins, E. Sutorius 
  12  """ 
  13  #------------------------------------------------------------------------------ 
  14  from   __future__      import division, print_function 
  15  from   future_builtins import map, zip 
  16   
  17  from   collections import defaultdict, namedtuple 
  18  import math 
  19  import multiprocessing 
  20  import numpy 
  21  from   operator    import attrgetter, itemgetter 
  22  import os 
  23  import pyfits 
  24  from   Scientific.Functions import LeastSquares as LSQ 
  25  import shutil 
  26  import time 
  27   
  28  import wsatools.Astrometry              as astro 
  29  import wsatools.Calibration             as calib 
  30  from   wsatools.CLI                 import CLI 
  31  import wsatools.CSV                     as csv 
  32  import wsatools.DataFactory             as df 
  33  import wsatools.DbConnect.CommonQueries as queries 
  34  from   wsatools.DbConnect.CuSession import CuSession 
  35  import wsatools.DbConnect.DbConstants   as dbc 
  36  from   wsatools.DbConnect.DbSession import Join, SelectSQL 
  37  import wsatools.DbConnect.Schema        as schema 
  38  import wsatools.ExternalProcess         as extp 
  39  import wsatools.FitsUtils               as fits 
  40  from   wsatools.Logger              import Logger, ForLoopMonitor 
  41  import wsatools.ProvenanceUtils         as pu 
  42  from   wsatools.SExtractorInterface import SExtractorInterface 
  43  import wsatools.Statistics              as stats 
  44  from   wsatools.SystemConstants     import DepCodes, SystemConstants 
  45  import wsatools.Utilities               as utils 
46 #------------------------------------------------------------------------------ 47 48 -class Cu13(CuSession):
49 """ 50 Creates deep stacks and confidence images from intermediate stacks 51 derived from MSB observations. The deep stacks are described in the 52 RequiredStack table and have a UID comprising the programmeID and 53 productID. The deep stacks are produced using the CASU fitsio_stack 54 code. Various options allow the operator to choose which stacks to 55 produce and how many intermediate stacks to use. The database release 56 number (DR#) defines the directory where the mosaic will be found. 57 58 """ 59 60 # @TODO: Stacking etc for OSA... 61 # @TODO: Testing - seeing, ZP, class ..... 62 # 63 64 #-------------------------------------------------------------------------- 65 # Class data types 66 Pointing = namedtuple("Pointing", "index ra dec filterID dualID") 67 ProductDetails = namedtuple("ProductDetails", 68 "productID multiframeID imageName confName catName") 69 70 #-------------------------------------------------------------------------- 71 # Class constants (access as Cu13.varName) 72 cuNum = 13 73 74 #: Airmass value for FITS products (must be a float). 75 airmass = 1.0 76 77 #: External zeropoint accuracy in mags 78 externalZPAccuracy = 0.01 79 80 #: Default aperture; 3 by default, 81 # 7 for UDS. 82 aperture = defaultdict(lambda : 3, UDS=7) 83 84 #: Extinction value for FITS products (must be a float). 85 extinct = 0.0 86 87 #: Maximum amount of memory that CU13 should consume in kilobytes. 88 memoryLimit = 0.5 * utils.getSystemMemoryKb() 89 90 #: ? 91 numObjsBin = 100 92 93 #: DetectionTableLargeSizeLimit 94 largeDetTableLimit = 1.0e9 95 96 # Private class parameters - should not be altered 97 _autoCommit = True 98 _isPersistent = True 99 _reqDbLock = False 100 _useWorkDir = True 101 102 #-------------------------------------------------------------------------- 103 # Public member variable default values (access as obj.varName) 104 105 # Configurations: override the defaults to change script behaviour 106 casuSoftDir = None #: CASU software directory 107 filterer = "nebuliser" #: Image filtering program, e.g. nebuliser. 108 stacker = "imstack" #: Image stacker program, e.g. fitsio_stack. 109 tiler = "mosaic" #: Image tiler program, e.g. mosaic 110 111 bitWiseProcessingDict = { 112 # stacking 113 'imstack':0, 114 # tiling 115 'nebuliser':3, 116 'mosaic':4, 117 'filtmos':5, 118 # external mosaic 119 'imageproc':6, 120 # headers 121 'prepHeaders':8, 122 # extraction 123 'sex':12, 124 'imcore':14, 125 'wcsfit':15, 126 'intUpdate':18, 127 'grout':19, 128 'classify':20, 129 # finish off 130 'calcZP':24, 131 'testProduct':25, 132 'complete':30, 133 134 } 135 136 # Inputs: these need to be set from command-line options 137 breakIntoParts = False #: Stack with intermediate stages? 138 dateRange = CuSession.sysc.obsCal.dateRange() #: Nights to stack. 139 dirDate = time.strftime("%Y%m%d", time.gmtime()) #: Output dir date. 140 doQFlagCheck = True #: Check that detection quality flags are set? 141 extractor = None #: Override database choice of catalogue extractor. 142 isCompressed = True #: Compress deep stack files? 143 maxProdID = None #: Maximum product ID of stack to be produced. 144 minProdID = None #: Minimum product ID of stack to be produced. 145 notDepOnly = False #: Use non-deprecated stacks only 146 numCPUs = multiprocessing.cpu_count() #: Number of CPUs to utilise. 147 numParts = None #: Force stacking to uses this number of stages. 148 numStack = None #: Number of intermediate stacks to be used. 149 redoHeader = False #: Don't create stacks just update headers? 150 releaseNum = None #: Release number of stacks to create. 151 reprocessTypes = None #: List of product types that need reprocessing. 152 restartDir = None #: Rerun in this directory 153 reqProdIDs = None #: List of product IDs for stacks to be created. 154 useDeep = False #: Use previous stack as part of this stack? 155 stackOnly = False #: Only create stacks 156 157 # Working variables 158 deepProductTable = None 159 """ A DataFactory.View of the contents of the database table 160 Required{Product} for current programme and product type. 161 """ 162 existingProducts = None 163 """ List of details of products already ingested in the database. 164 """ 165 166 fileListForType = None 167 """ Dictionary of paths to the ingest file lists referenced by product 168 type. This may be preset by the calling task or else defaults will be 169 used. 170 """ 171 prodInfo = None 172 """ The contents of the database table Required{Product} for current 173 programme and product type. 174 """ 175 pType = None 176 """ Product type currently being curated. 177 """ 178 reprocess = False 179 """ Reprocess existing products of the current type? 180 """ 181 reqProdsOfType = None 182 """ Dictionary of lists of product IDs referenced by product type, giving 183 the products required to be created. This may be preset by the calling 184 task or else is automatically determined from the database. 185 """ 186 _tmpWorkDir = None 187 """ Common temporary working directory for all threads. 188 """ 189 isDetTableTooBig = False 190 """ Test whether detection table is too big to get OB cat data from archive. 191 """ 192 193 # Outputs 194 newProductsOfType = None 195 """ Dictionary of ProductDetails for each type of product successfully 196 created. 197 """ 198 199 startedProductsOfType = None 200 """ Dictionary of ProductDetails for each type of product successfully 201 created. 202 """ 203 204 prodDirectory = None 205 """ Dictionary of directory for each type of product successfully 206 created. 207 """ 208 extractIgnoreMsgs = ["WCSFIT: Matching routine failed -- " 209 "MATCHSTDS: No rows in object catalogue", 210 "CLASSIFY: missing header info", 211 "CLASSIFY: can't update header", 212 ""] 213 """ List of messages to ignore when extracting 214 """ 215 216 missExtensionErrorMsgs = ["Error running imcore_conf", 217 "imcore_backstats: all values appear to be zero", 218 "Error running classify"] 219 """ List of messages to ignore when extracting if one bad extension 220 """ 221 222 tileOBDict = None 223 """ Dictionary of OB tiles for each tile product... 224 """ 225 226 pptTileDict = None 227 """ Look up for tile mfID based on pawprint mfID 228 """ 229 230 obNonTiles = None 231 """ Look up for OB components without a tile 232 """ 233 234 prodMfIDDict = None 235 """ Lookup for mfID of product if single epoch 236 """ 237 238 deepProductTableDict = None 239 """ Dictionary of product tables - one for each productType 240 """ 241 #-------------------------------------------------------------------------- 242
243 - def _onRun(self):
244 """ Create products. 245 """ 246 self.isDetTableTooBig = self.programmeID in [self.sysc.scienceProgs.get("VVV")] 247 # self.isDetTableTooBig = True 248 249 # Initial CU checks - will raise CuError if setup is incorrect 250 self.numCPUs = Cu13.getUsefulNumCPUs(self.numCPUs, self.programme, 251 self.stackOnly) 252 253 254 if self.numCPUs == multiprocessing.cpu_count(): 255 Logger.addMessage("<IMPORTANT> At certain times, this task will " 256 "require use of all available CPUs. Please ensure no other " 257 "tasks are running.") 258 259 if self.doQFlagCheck and not self.isDetTableTooBig: 260 self.checkQBFlags() 261 262 # Initialise member variables 263 self.extractor = (self.extractor 264 or self.programme.getAttr("extractTool")) 265 softVersNum = self.getSoftVersion() 266 if self.extractor == 'CASU': 267 268 Logger.addMessage("Processing with CASU tools: %s" % softVersNum) 269 else: 270 Logger.addMessage("Processing with Astromatic: %s" % softVersNum) 271 272 self.fileListForType = (self.fileListForType 273 or fits.createFileListDict(self.programme)) 274 275 if not self.newProductsOfType: 276 self.newProductsOfType = defaultdict(dict) 277 if not self.startedProductsOfType: 278 self.startedProductsOfType = defaultdict(dict) 279 if not self.prodDirectory: 280 self.prodDirectory = defaultdict(dict) 281 282 self.reprocessTypes = \ 283 (list(self.reprocessTypes) if self.reprocessTypes else []) 284 self._tmpWorkDir = self.sysc.tempWorkPath() 285 286 Logger.addMessage("Checking for existing %s products..." 287 % self.programme.getAcronym().upper()) 288 289 filterTable = df.Table("Filter", self.archive) 290 self.findContents() 291 292 if not self.reqProdsOfType: 293 self.reqProdsOfType = \ 294 queries.findRequiredDeepProducts(self.programme, self.minProdID, 295 self.maxProdID, self.reqProdIDs) 296 # Get reprocess status. 297 reprocStatus = queries.getRedoStatus(self.archive, self.programmeID, 298 self.reprocessTypes) 299 productTypes = (['stack'] if self.stackOnly else 300 self.programme.getProductTypes()) 301 if 'tile' in productTypes: 302 # Make sure that stack req are there 303 self.reqProdsOfType['stack'] = set( 304 self.archive.query( 305 "intProductID", "ProductLinks", 306 "programmeID=%s AND combiProdType='tile' AND " 307 "combiProductID IN (%s) AND intProdType='stack'" 308 % (self.programmeID, ','.join(map(str, self.reqProdsOfType['tile']))))) 309 310 self.deepProductTableDict = dict() 311 for self.pType in productTypes: 312 tableName = "Required" + self.pType.title() 313 curSchema = self.sysc.curationSchema() 314 columns = schema.parseTables(curSchema, [tableName])[0].columns 315 output = queries.getProductInfo(self.archive, self.programmeID, 316 self.pType, self.reqProdsOfType[self.pType]) 317 318 self.deepProductTableDict[self.pType] = df.View(map(str, columns), output) 319 320 self.reprocess = reprocStatus[self.pType] 321 # Needs to be updated after each product cycle to take into 322 # account any new products in the previous layer 323 self.existingProducts = self.archive.query( 324 selectStr="productID, M.frameType, releaseNum, " 325 "M.multiframeID, M.fileName AS imgName, " 326 "M2.fileName AS confName, M.catName", 327 fromStr="ProgrammeFrame P, Multiframe M, Multiframe M2", 328 whereStr="M.multiframeID=P.multiframeID AND programmeID=%s" 329 " AND M.%s AND M2.multiframeID=M.confID" 330 % (self.programmeID, DepCodes.selectNonDeprecated) 331 + " AND productID>0 AND M.frameType LIKE '%stack'") 332 333 if self.pType not in self.reqProdsOfType \ 334 or not self.reqProdsOfType[self.pType]: 335 Logger.addMessage("No %s products to process" % self.pType) 336 continue 337 338 Logger.addMessage("Preparing %s products..." % self.pType.upper()) 339 # @TODO: Why does the next few parts take so long for VHS. 340 self.setUpProductsInfo() 341 Logger.addMessage("Setup products") 342 # If stacks are components of tiles... 343 if self.pType == 'stack': 344 self.imageProductDict = queries.getAllFieldObsProd( 345 self.archive, self.reqProdsOfType[self.pType], 346 self.programmeID, self.pType, self.dateRange, 347 deprecated="deprecated=0", 348 noDeeps=((not self.useDeep) or self.reprocess), 349 onlyTileComps=self.programme.isShallowTiled()) 350 self.idImageProductDict = queries.getAllFieldObsProd( 351 self.archive, self.reqProdsOfType[self.pType], 352 self.programmeID, self.pType, self.dateRange, 353 deprecated=DepCodes.selectNonDeprecated, 354 noDeeps=((not self.useDeep) or self.reprocess), 355 onlyTileComps=self.programme.isShallowTiled()) 356 self.getProdMfID() 357 else: 358 self.imageProductDict = None 359 self.idImageProductDict = None 360 self.deepProdMfIDDict = None 361 if self.pType == 'stack' and 'tile' in productTypes: 362 # Simplify 363 # Check if current deeps are from before Aug 2012 364 # If so, and not reprocess stacks or ProductProcessing contains data 365 potDodgyPids = self.arePotDodgyDeeps() 366 if (len(potDodgyPids) > 0 and not self.reprocess and 367 not self.archive.queryEntriesExist("ProductProcessing", 368 "programmeID=%s and productType='stack'" % self.programmeID)): 369 if not CLI.isConfirmed("There are existing deep pawprints which " 370 "have been stacked in the incorrect order, so tiles " 371 "will have poor photometry. These are productIDs %s. " 372 "Confirm if you want to continue" % (','.join(map(str, potDodgyPids)))): 373 raise SystemExit() 374 self.tileOBDict = defaultdict(list) 375 self.obNonTiles = defaultdict(list) 376 self.pptTileDict = {} 377 self.compDict = {} 378 self.prodMfIDDict = {} 379 # Get expected order 380 self.getExpectedOrder(filterTable) # 381 # Only reset initially 382 # Reset any malformed products. 383 self.fileNameMfIDLookUp = dict([(os.path.join(os.path.basename( 384 os.path.dirname(fName)), os.path.basename(fName)), mfID) 385 for fName, mfID in self.archive.query( 386 selectStr="fileName,m.multiframeID", 387 fromStr="Multiframe as m,ProgrammeFrame as p", 388 whereStr="p.programmeID=%s and m.multiframeID=p.multiframeID " 389 "and frameType='stack'" % self.programmeID)]) 390 # If reusing deep products.... 391 #if (self.areMalformedProducts() and not reprocStatus[self.pType] 392 # and not self.archive.queryEntriesExist("ProductProcessing", 393 # "programmeID=%s and productType='stack'" % self.programmeID)): 394 # raise Cu13.CuError("There are existing deep pawprints which " 395 # "have been stacked in the incorrect order, so tiles " 396 # "will have poor photometry. Please rerun with the " 397 # "stack reprocessing option") 398 completeProducts = [] 399 probList = [] 400 newProducts = [] 401 #prodDep = set() 402 componentsOfProd = {} 403 productOfID = {} 404 Logger.addMessage("Checking %s %ss..." 405 % (len(self.reqProdsOfType[self.pType]), self.pType)) 406 407 for productID in sorted(self.reqProdsOfType[self.pType]): 408 # set up class 409 bestProduct = ProductDefinition(self, productID, filterTable) 410 createdProduct = \ 411 self.newProductsOfType[self.pType].get(productID) 412 # Now determine which products need additional processing 413 if self.pType == 'stack' and 'tile' in productTypes: 414 components = self.compDict[productID] 415 bestProduct.componentOrder = self.setComponentOrder(productID, components) 416 bestProduct.compProductType = bestProduct.productType 417 if components: 418 bestProduct.newProduct = True 419 else: 420 bestProduct.prodMfID = self.prodMfIDDict[productID] 421 else: 422 components = bestProduct.findRequiredInputs() 423 424 if components and not createdProduct: 425 newProducts.append(productID) 426 componentsOfProd[productID] = components 427 productOfID[productID] = bestProduct 428 429 elif bestProduct.prodMfID: 430 completeProducts.append((productID, bestProduct.prodMfID)) 431 432 elif createdProduct: 433 Logger.addMessage("ProductID=%s already created: %s" 434 % (productID, createdProduct.imageName), alwaysLog=False) 435 436 else: 437 raise Cu13.CuError("No data found for this %s:" 438 " productID=%s" % (self.pType, productID)) 439 440 #if (self.pType == "mosaic" and bestProduct.getDualProductID()): 441 # prodDep.add(bestProduct.getDualProductID()) 442 443 if bestProduct.problemComponents: 444 probList += bestProduct.problemComponents 445 446 if not completeProducts: 447 Logger.addMessage("ProgrammeFrame.productID is up-to-date for" 448 " completed %s products." % self.pType) 449 else: 450 Logger.addMessage("Updating ProgrammeFrame.productID for " 451 "complete %ss..." % self.pType) 452 total = 0 453 for productID, multiframeID in completeProducts: 454 total += self.archive.update("ProgrammeFrame", 455 entryList=[("productID", productID), 456 ("releaseNum", self.releaseNum)], 457 where="programmeID=%s AND multiframeID=%s" 458 % (self.programmeID, multiframeID)) 459 # update conf 460 self.archive.update("ProgrammeFrame", 461 entryList=[("productID", productID), 462 ("releaseNum", self.releaseNum)], 463 where="programmeID=%s AND Multiframe.multiframeID=%s" 464 " AND Multiframe.confID=ProgrammeFrame.multiframeID" 465 % (self.programmeID, multiframeID), 466 fromTables="Multiframe") 467 468 469 470 Logger.addMessage("...assigned %s completed products." % total) 471 472 if not newProducts: 473 Logger.addMessage("<Info> No new data available to create new" 474 " required %s products." % self.pType) 475 else: 476 newProducts.sort() 477 self.createProducts(newProducts, productOfID, componentsOfProd) 478 479 if probList: 480 Logger.addMessage("%s deep stacks have problematic components" 481 % len(probList)) 482 483 logOutputDir = (self.sysc.missingFilesFilePath() 484 if self.archive.isRealRun else 485 os.path.join(self.sysc.testOutputPath(), 486 self.sysc.productLogDir)) 487 488 utils.ensureDirExist(logOutputDir) 489 probFileName = "cu13_probFilelist%s_v%s.log" 490 probFileName %= (self.dirDate, self.releaseNum) 491 probFileList = os.path.join(logOutputDir, probFileName) 492 Logger.addMessage("Writing problem file list to " 493 + probFileList) 494 495 csv.File(probFileList, 'a').writelines(probList) 496 497 # Finish off 498 for pType in self.fileListForType: 499 if os.path.exists(self.fileListForType[pType]): 500 Logger.addMessage("Ingest release %s %ss from file list %s" 501 % (self.releaseNum, 502 pType, self.fileListForType[pType]))
503 504 #-------------------------------------------------------------------------- 505
506 - def getSoftVersion(self):
507 """ 508 """ 509 if self.extractor == 'CASU': 510 cmdPath = None 511 if self.casuSoftDir: 512 return 'casutools-' + os.path.dirname(self.casuSoftDir).split('casutools-')[1] 513 else: 514 # first find path 515 cmd = 'imstack' 516 for path in os.environ["PATH"].split(":"): 517 if os.path.exists(os.path.join(path, cmd)): 518 cmdPath = path 519 if cmdPath: 520 return 'casutools-' + os.readlink(os.path.dirname(cmdPath)).split('casutools-')[1] 521 else: 522 return "UNDEFINED" 523 else: 524 stdout = extp.run('sex --version') 525 if stdout: 526 return stdout[0] 527 else: 528 return "UNDEFINED"
529 #-------------------------------------------------------------------------- 530
531 - def arePotDodgyDeeps(self):
532 """ Simple test - anything before Aug 2012 is redone. 533 """ 534 # @FIXME If old deeps contain deprecated inputs, do not include 535 # Check if any Required X has multiple OBs - deeps needed and 536 # latest deep is at position... 537 deepStacks = set(pID for pID in self.reqProdsOfType['stack'] 538 if self.imageProductDict[pID]) 539 540 # Get ones that are deepStacks and not goodOldDeepPids. 541 goodOldDeepPids = set(self.archive.query( 542 "distinct s.productID", 543 "Multiframe as m,ProgrammeFrame as p,RequiredStack as s", 544 "p.programmeID=%s and p.productID>0 and p.multiframeID=m.multiframeID " 545 "and frameType like 'deep%%stack' and frameType not like " 546 "'%%tile%%' and deprecated in (0,255) and creationDate " 547 "BETWEEN '20120831' AND '21000101' and s.filterID=m.filterID and " 548 "s.programmeID=p.programmeID and " 549 "dbo.fGreatCircleDist(s.ra,s.dec,(15.*rabase),decbase)<(60.*s.stackRadius)" 550 % self.programmeID)) 551 552 potDodgyDeeps = deepStacks - goodOldDeepPids 553 554 # Are any of these oldDeepsPids 555 # Could just be new pointings 556 # @FIXME: Sometimes components in old deeps deprecated and now just a 557 # single pointing...... 558 # Optional is it safe to continue... instead of error, listing 559 # PIDs which may have problems... 560 oldDeepsPids = self.archive.query( 561 "distinct s.productID", 562 "Multiframe as m,ProgrammeFrame as p,RequiredStack as s", 563 "p.programmeID=%s and p.productID>0 and p.multiframeID=m.multiframeID " 564 "and frameType like 'deep%%stack' and frameType not like " 565 "'%%tile%%' and deprecated in (0,255) and s.filterID=m.filterID and " 566 "s.programmeID=p.programmeID and " 567 "dbo.fGreatCircleDist(s.ra,s.dec,(15.*rabase),decbase)<(60.*s.stackRadius) " 568 "and creationDate BETWEEN '20090930' AND '20120831'" 569 % self.programmeID) 570 571 return potDodgyDeeps.intersection(oldDeepsPids)
572 573 #-------------------------------------------------------------------------- 574
575 - def getProdMfID(self):
576 """ Get dictionary of productMfIDs 577 """ 578 self.deepProdMfIDDict = {} 579 if not self.reprocess and self.useDeep: 580 frameTypeSel = queries.getFrameSelection('stack', deepOnly=True) 581 productMfIDs = self.archive.query( 582 selectStr="s.productID,m.multiframeID", 583 fromStr="ProgrammeFrame as p,Multiframe as m, RequiredStack as s", 584 whereStr="s.programmeID=p.programmeID AND " 585 "m.multiframeID=p.multiframeID AND %s AND " 586 "deprecated IN (%s,%s) AND s.programmeID=%d AND " 587 "m.filterID=s.filterID AND dbo.fGreatCircleDist(s.ra, " 588 "s.dec, 15*rabase, decbase) < 60*s.stackRadius" 589 % (frameTypeSel, DepCodes.nonDep, 590 DepCodes.reprocWFAU, self.programmeID), 591 orderBy="s.productID,m.multiframeID DESC") 592 # Select first mfID for each product... 593 if productMfIDs: 594 prodID = productMfIDs[0].productID 595 firstOne = True 596 for productID, mfID in productMfIDs: 597 if productID == prodID and firstOne: 598 self.deepProdMfIDDict[productID] = mfID 599 firstOne = False 600 else: 601 if productID != prodID: 602 self.deepProdMfIDDict[productID] = mfID 603 prodID = productID 604 firstOne = False
605 606 607 #-------------------------------------------------------------------------- 608
609 - def areMalformedProducts(self):
610 """ Check to see if their are problems with OBs in wrong order in existing products 611 """ 612 badExistProd = False 613 for productID in self.reqProdsOfType['tile']: 614 # NO: This gets previous components if any, but doesn't look at previous deeps... 615 pwpProductIDs = self.pawprintProdDict[productID] 616 # check existing and new products 617 # productID,frameType,releaseNum,mfID,fileName,confName,catName 618 #ppProd = [(exPrd.productID, exPrd.imgName) for exPrd in self.existingProducts 619 # if 'tile' not in exPrd.frameType and 620 # exPrd.frameType.endswith('stack') and 621 # exPrd.productID in pwpProductIDs] 622 # productID,mfID,fileName,confName,catName 623 #ppProd += [(nwPrd.productID, nwPrd.fileName) for nwPrd in self.newProductsOfType['stack'] 624 # if nwPrd.productID in pwpProductIDs] 625 for pwpProdID in pwpProductIDs: 626 if productID in self.inputDeepPaw: 627 ppProd = [(nwPrd.productID, nwPrd.imageName) for nwPrd in self.inputDeepPaw[productID] 628 if nwPrd.productID == pwpProdID] 629 badExistProd = self.testProv(ppProd, productID, badExistProd) 630 631 return badExistProd
632 633 #------------------------------------------------------------------------------ 634
635 - def getExpectedOrder(self, filterTable):
636 """ For each tile work out best OB order 637 """ 638 pLinks = self.archive.query( 639 selectStr="combiProductID,intProductID", 640 fromStr="ProductLinks", 641 whereStr="programmeID=%s and combiProdType='tile' and " 642 "intProdType='stack'" % self.programmeID) 643 tileProds = sorted(list(set([cPID for cPID, _iPID in pLinks]))) 644 self.pawprintProdDict = dict() 645 for tileID in tileProds: 646 self.pawprintProdDict[tileID] = [iPID for cPID, iPID in pLinks 647 if tileID == cPID] 648 self.inputDeepPaw = dict() 649 for productID in self.reqProdsOfType['tile']: 650 obTileDict = dict() 651 obTileSet = set() 652 for pwpProdID in self.pawprintProdDict[productID]: 653 pwpProd = ProductDefinition(self, pwpProdID, filterTable) 654 self.compDict[pwpProdID] = pwpProd.findRequiredInputs() 655 self.prodMfIDDict[pwpProdID] = pwpProd.prodMfID 656 pwpList = (self.archive.query( 657 "%s as productID,m.multiframeID,m.fileName as imageName," 658 "c.fileName as confName,m.catName" % pwpProdID, 659 "Multiframe as m,Multiframe as c", 660 "m.confID=c.multiframeID and m.multiframeID=%s" 661 % pwpProd.prodMfID) 662 if pwpProd.prodMfID else self.compDict[pwpProdID]) 663 664 # Break any deeps into parts... 665 obPwpList = self.getOBList(productID, pwpProdID, pwpList) 666 obTileDict[pwpProdID] = self.getTiles(obPwpList) if obPwpList else [] 667 668 [obTileSet.add((tileInfo.combiframeID, tileInfo.mjdObs)) for tileInfo in 669 obTileDict[pwpProdID] if tileInfo.combiframeID > 0] 670 self.obNonTiles[pwpProdID] = [(tileInfo.mjdObs, tileInfo.multiframeID) 671 for tileInfo in obTileDict[pwpProdID] 672 if tileInfo.combiframeID < 0] 673 self.pptTileDict.update([(tileInfo.multiframeID, tileInfo.combiframeID) 674 for tileInfo in obTileDict[pwpProdID]]) 675 676 # Get ordered list of inputs with no of times they occur 677 obTileOrder = sorted(list(obTileSet), key=itemgetter(1)) 678 679 compTiles = [] 680 inCompTiles = [] 681 for tlmfID, _mjdobs in obTileOrder: 682 isComplete = (len([pwpProdID for pwpProdID in self.pawprintProdDict[productID] 683 if tlmfID in [tlm for tlm, _ppm, _mjd in obTileDict[pwpProdID]]]) 684 == len(self.pawprintProdDict[productID])) 685 if isComplete: 686 compTiles.append((tlmfID, True)) 687 else: 688 inCompTiles.append((tlmfID, False)) 689 # Non-tiles...pawprints not associated 690 691 # @TODO: Do we need to group? 692 # self.groupNonTiles(obNonTiles) 693 694 695 696 if not compTiles: 697 self.tileOBDict[productID] = None 698 else: 699 self.tileOBDict[productID] = compTiles + inCompTiles
700 701 #----------------------------------------------------------------- 702 703
704 - def getOBList(self, productID, pwpProdID, pwpList):
705 """ Splits any deeps into components 706 """ 707 deepMfIDs = [pwp.multiframeID for pwp in pwpList 708 if '_dp' in pwp.imageName] 709 self.inputDeepPaw[productID] = [Cu13.ProductDetails(pwpProdID, pwp.multiframeID, pwp.imageName, pwp.confName, pwp.catName) for pwp in pwpList 710 if '_dp' in pwp.imageName] 711 obList = [pwp for pwp in pwpList if '_dp' not in pwp.imageName] 712 if deepMfIDs: 713 obList += self.archive.query( 714 selectStr="%s as productID,m.multiframeID,m.fileName as imageName," 715 "c.fileName as confName,m.catName" % dbc.intDefault(), 716 fromStr="Multiframe as m,Provenance as v,Multiframe as c", 717 whereStr="v.combiframeID in (%s) and v.multiframeID=" 718 "m.multiframeID and c.multiframeID=m.confID" % 719 ','.join(map(str, deepMfIDs))) 720 return obList
721 722 #----------------------------------------------------------------- 723
724 - def getTiles(self, pawprintComps):
725 """ Find OB tiles made from these components 726 """ 727 tiles = self.archive.query( 728 selectStr="combiframeID,v.multiframeID,mjdObs", 729 fromStr="Provenance as v,Multiframe as m", 730 whereStr="v.multiframeID in (%s) and m.multiframeID=v.combiframeID " 731 "and m.frameType='tilestack' ORDER BY mjdObs" % ','.join(map(str, 732 [comp.multiframeID for comp in pawprintComps]))) 733 missingOBppts = (set([comp.multiframeID for comp in pawprintComps]) - 734 set([gt.multiframeID for gt in tiles])) 735 if missingOBppts: 736 missTiles = self.archive.query( 737 selectStr="(-ROW_NUMBER() OVER (ORDER BY m.multiframeID)) as combiframeID," 738 "m.multiframeID,mjdObs", 739 fromStr="Multiframe as m,MultiframeEsoKeys as e", 740 whereStr="m.multiframeID in (%s) and m.multiframeID=e.multiframeID " 741 "ORDER BY mjdObs" % ','.join(map(str, missingOBppts))) 742 tiles += missTiles 743 return tiles
744 745 #-------------------------------------------------------------------------- 746 747
748 - def testProv(self, pawprintProducts, tileProdID, badOrder):
749 """ Get list of OBs that went into pawprint 750 """ 751 # Check first frame in e 752 for productID, fileName in pawprintProducts: 753 # Open fileName. read in header info 754 hdulist = fits.open(fileName) 755 fMfIDList = [] 756 provNo = 1 757 key = 'PROV%04d' % provNo 758 # All extensions have same list 759 while key in hdulist[1].header: 760 fMfIDList.append(self.fileNameMfIDLookUp[hdulist[1].header[key]]) 761 provNo += 1 762 key = 'PROV%04d' % provNo 763 hdulist.close() 764 for ii, obPaw in enumerate(fMfIDList): 765 if ii >= len(self.tileOBDict[tileProdID]): 766 # More prov than in good tileOBs 767 # May be in self.obNonTiles 768 if not (obPaw in self.obNonTiles[productID]): 769 badOrder = True 770 elif not ((self.pptTileDict[obPaw] == self.tileOBDict[tileProdID][ii][0] and 771 self.tileOBDict[tileProdID][ii][1]) or 772 not self.tileOBDict[tileProdID][ii][1]) : 773 badOrder = True 774 775 776 return badOrder
777 #-------------------------------------------------------------------------- 778
779 - def setComponentOrder(self, productID, components):
780 """ Creates an order for the stacking. 781 """ 782 # DEEPS... 783 # components - productID multiframeID imageName confName catName 784 tileProdID = self.archive.query("combiProductID", "ProductLinks", 785 "programmeID=%s and combiProdType='tile' and intProdType='stack' " 786 "and intProductID=%s" % (self.programmeID, productID), 787 firstOnly=True) 788 compOrder = [comp for comp in components if self.sysc.deepSuffix in comp.imageName] 789 790 compOrder += [comp for tlmfID, _isComp in self.tileOBDict[tileProdID] 791 for comp in components 792 if self.sysc.deepSuffix not in comp.imageName and self.pptTileDict[comp.multiframeID] == tlmfID] 793 compOrder += [comp for _mjd, mfID in self.obNonTiles[productID] 794 for comp in components 795 if self.sysc.deepSuffix not in comp.imageName and comp.multiframeID == mfID] 796 return compOrder
797 798 #-------------------------------------------------------------------------- 799
800 - def createProducts(self, newProducts, productOfID, componentsOfProd):
801 """ 802 Creates the products in the given list. 803 804 """ 805 spaceReq = self.calcDiskSpaceRequired(len(newProducts)) 806 outputDir = self.createProductDir(spaceReq) 807 for productID in newProducts: 808 productOfID[productID].outputDir = outputDir 809 productType = productOfID[productID].productType 810 811 Logger.addMessage("%ss will be produced in %s" 812 % (self.pType.title(), outputDir)) 813 814 Logger.addMessage("Number of %ss to be created or updated: %s" 815 % (self.pType, len(newProducts))) 816 817 Logger.addMessage("...with productIDs: " 818 + utils.numberRange(newProducts)) 819 820 # For each process, work out how many threads can be used 821 if productType == 'stack': 822 # stack - do the following processes 823 self.createStacks(newProducts, productOfID, componentsOfProd) 824 825 elif productType == 'tile': 826 # tile - do the following processes 827 self.createTiles(newProducts, productOfID, componentsOfProd) 828 829 elif productType == 'mosaic': 830 self.createMosaics(newProducts, productOfID, componentsOfProd) 831 832 # Splitting the list of products to work on into chunks of numThreads 833 # @TODO: This will always be as slow as slowest product, instead prefer 834 # to always have X active threads at any time so it starts a new 835 # product in a given thread as soon as one has finished, working 836 # through the original list. To do this we need to split the 837 # list into X=numThreads parts, with each chunk being total/X. 838 # (or even better have a common list to all threads that each 839 # pops off the current worker to minimise joining time at end) 840 # *However*, could only do this if we commit to doing all tiles 841 # followed by all catalogues in series, due to the fact 842 # catalogues can't be parallelised due to database queries. 843 newFiles = [] 844 for productID in newProducts: 845 product = productOfID[productID] 846 if os.path.exists(product.fileName()): 847 newFiles.append(product.fileName()) 848 newFiles.append(product.confName()) 849 if os.path.exists(product.catName()): 850 newFiles.append(product.catName()) 851 self.newProductsOfType[self.pType][productID] = \ 852 Cu13.ProductDetails(productID, dbc.intDefault(), 853 product.fileName(), product.confName(), 854 product.catName()) 855 856 filePath = self.fileListForType[self.pType] 857 Logger.addMessage("Saving the list of %ss created for release %s to %s" 858 % (self.pType, self.releaseNum, filePath)) 859 860 file(filePath, 'a').write('\n'.join(newFiles) + '\n')
861 862 #-------------------------------------------------------------------------- 863
864 - def calcProcessLayers(self, processType, products, productOfID, 865 componentsOfProducts):
866 """ 867 Organise products into processing layers. 868 869 """ 870 # No of threads per product 871 if processType == 'nebuliser': 872 noCPUsPerProduct = 6 873 else: 874 noCPUsPerProduct = 1 875 876 # Order products by memory used 877 # check products to see if process has been completed: 878 products = self.hasProcessBeenRun(products, productOfID, processType) 879 if processType == 'imstack': 880 memoryPerProduct = [(productID, self.getRequiredMemory( 881 self.getImageList(componentsOfProducts[productID]))) 882 for productID in products] 883 884 elif processType == 'mosaic' or processType == 'filtmos': 885 memoryPerProduct = [] 886 for productID in products: 887 bestProd = productOfID[productID] 888 microStepping = (bestProd.deepProductTable['nuStep'][bestProd.index] 889 if self.sysc.hasMicroStep else 1) 890 bitPix = 4 891 memoryPerProduct.append((productID, 892 (3 * bitPix * self.sysc.fieldOfViewTile[0] 893 * self.sysc.fieldOfViewTile[1] 894 * (3600 * math.sqrt(microStepping) / self.sysc.instrumentPixScale) ** 2) 895 / SystemConstants.one_kilobyte)) 896 897 elif processType == 'imcore' or processType == 'sex': 898 # @TODO: Might be lower for SExtractor? 899 memoryPerProduct = [] 900 for productID in products: 901 bestProd = productOfID[productID] 902 memoryPerProduct.append((productID, 903 3 * getFileSizeKiloBytes(bestProd.fileName(), True))) 904 905 elif processType == 'imageproc': 906 memoryPerProduct = [] 907 for productID in products: 908 bestProd = productOfID[productID] 909 pixSize = bestProd.deepProductTable['pixelSize'][bestProd.index] 910 microStep = (bestProd.deepProductTable['nuStep'][bestProd.index] 911 if self.sysc.hasMicroStep else 1) 912 if pixSize < 0.: 913 pixSize = self.sysc.instrumentPixScale / math.sqrt(microStep) 914 raExt = bestProd.deepProductTable['raExtent'][bestProd.index] 915 decExt = bestProd.deepProductTable['decExtent'][bestProd.index] 916 bitPix = 4 917 memoryPerProduct.append((productID, 918 (3 * bitPix * raExt * decExt * ((3600. / pixSize) ** 2) / 919 SystemConstants.one_kilobyte))) 920 921 else: 922 memoryPerProduct = [(productID, self.getRequiredMemory([ 923 self.getImageList(componentsOfProducts[productID])[0]])) 924 for productID in products] 925 926 # How many threads can be used and how many layers are needed 927 return self.organiseProducts(memoryPerProduct, noCPUsPerProduct)
928 929 #-------------------------------------------------------------------------- 930
931 - def hasProcessBeenRun(self, products, productOfID, processType):
932 """ 933 """ 934 bitWiseValue = 2 ** self.bitWiseProcessingDict[processType] 935 completeProducts = self.archive.query("productID, fileName", 936 fromStr="ProductProcessing", 937 whereStr="programmeID=%s AND productType = '%s' AND productID IN (%s)" 938 " AND (bitProcessingFlag&%s>0 OR bitProcessingFlag&%s>0)" 939 % (self.programmeID, productOfID[products[0]].productType, 940 ','.join(map(str, products)), bitWiseValue, 941 2 ** self.bitWiseProcessingDict['complete'])) 942 943 completeProducts = set(productID 944 for productID, fileName in completeProducts 945 if productOfID[productID].fileName() == fileName) 946 947 return sorted(set(products) - completeProducts)
948 949 #-------------------------------------------------------------------------- 950
951 - def organiseProducts(self, memoryPerProduct, noProcessesPerProduct):
952 """ 953 """ 954 # First are any products over the memory limit? 955 maxNThreads = self.numCPUs 956 productInfo = [] 957 processLayers = [] 958 for productID, memory in memoryPerProduct: 959 960 if memory > Cu13.memoryLimit: 961 numParts = self.numParts or \ 962 int(math.ceil(memory / Cu13.memoryLimit)) 963 964 # These must be processed separately 965 # All the way through (at least until stacking finished) 966 processLayers.append(([(productID, numParts)], True)) 967 else: 968 nThreads = min(int(math.floor(Cu13.memoryLimit / memory)), 969 maxNThreads) 970 971 productInfo.append((productID, nThreads)) 972 973 productInfo = sorted(productInfo, key=itemgetter(1)) 974 prodIndex = 0 975 prodProcNo = 0 976 index = noProcessesPerProduct * prodIndex + prodProcNo 977 while index < (len(productInfo) * noProcessesPerProduct): 978 productID, nThreads = productInfo[prodIndex] 979 productsInLayer = [] 980 for _ii in range(nThreads): 981 if (index < len(productInfo) * noProcessesPerProduct): 982 productsInLayer.append((productInfo[prodIndex][0], prodProcNo)) 983 prodProcNo += 1 984 if prodProcNo == noProcessesPerProduct: 985 prodProcNo = 0 986 prodIndex += 1 987 index = noProcessesPerProduct * prodIndex + prodProcNo 988 processLayers.append((productsInLayer, False)) 989 990 # Now organise products.... 991 return processLayers
992 993 #-------------------------------------------------------------------------- 994
995 - def getImageList(self, compOfProducts):
996 """ 997 """ 998 imageList = [] 999 for _prodID, _mfID, imageName, confName, catName in compOfProducts: 1000 # calculate BSCALE 1001 imageName = imageName.split(':')[-1] 1002 confName = confName.split(':')[-1] 1003 catName = catName.split(':')[-1] 1004 imageList.append(fits.FileList(imageName, confName, catName)) 1005 1006 return imageList
1007 1008 #-------------------------------------------------------------------------- 1009
1010 - def compressProd(self, bestProduct):
1011 """ Compressed all necessary files associated with the product. 1012 """ 1013 compressFileList = [bestProduct.fileName(), bestProduct.confName()] 1014 if bestProduct.isTile(): 1015 compressFileList += [bestProduct.filteredName(), 1016 bestProduct.filtConfName()] 1017 Logger.addMessage("Compressing FITS files... %s" 1018 % ','.join(compressFileList), alwaysLog=False) 1019 fits.compressFits(compressFileList)
1020 1021 #-------------------------------------------------------------------------- 1022
1023 - def fixBadExtensions(self, bestProduct, zeroExtNums):
1024 """ 1025 """ 1026 confFile = fits.open(bestProduct.confName(), 'update') 1027 for extNum in zeroExtNums: 1028 # Set confidence to equal 100 1029 confFile[extNum].data.fill(100) 1030 fits.checkForUnicodeError(confFile, bestProduct.confName()) 1031 confFile.close()
1032 1033 #-------------------------------------------------------------------------- 1034
1035 - def createCatalogue(self, bestProduct, components, dualImageProduct=None, 1036 isQuick=False):
1037 """ 1038 Extracts the catalogue for the given product, using the extraction 1039 tool for the programme. 1040 1041 """ 1042 # @TODO: Is this OBSOLETE. Not quite yet. Needs rewriting 1043 Logger.addMessage("Extracting catalogue for %s %s" 1044 % (bestProduct.productType, bestProduct.fileName()), 1045 alwaysLog=(bestProduct.productType == 'mosaic')) 1046 1047 #isNewCat = not self.archive.isTrialRun and not bestProduct.catExists()\ 1048 # and (self.extractor == "CASU" or self.extractor == "SEX") 1049 #@TODO: Revert to above 1050 #isNewCat = not bestProduct.catExists() \ 1051 # and (self.extractor == "CASU" or self.extractor == "SEX") 1052 productID = bestProduct.deepProductTable["productID"][bestProduct.index] 1053 prodOfID = {productID:bestProduct} 1054 componentsOfProd = {productID:components} 1055 # @TODO: use ProductProcessing to determine what is to be run 1056 if self.extractor == "CASU": 1057 processLayers = self.calcProcessLayers('imcore', [productID], prodOfID, None) 1058 for productInfo, _isParts in processLayers: 1059 productIDs = [prodID for prodID, _partNum in productInfo] 1060 self.runInitialCasuExtraction(productIDs, prodOfID, 1061 componentsOfProd, stackInfoDict=None) 1062 elif self.extractor == "SEX": 1063 # Only use DUAL FILTER if mosaic 1064 processLayers = self.calcProcessLayers('sex', [productID], prodOfID, None) 1065 for productInfo, _isParts in processLayers: 1066 productIDs = [prodID for prodID, _partNum in productInfo] 1067 if productIDs: 1068 sx = SExtractorInterface(self.archive, bestProduct, dualImageProduct) 1069 sx.extractCatalogue() 1070 sx.calcAperCor() 1071 self.productProcessing(productIDs, prodOfID, 'sex') 1072 processLayers = self.calcProcessLayers('calcZP', [productID], prodOfID, 1073 componentsOfProd) 1074 for productInfo, _isParts in processLayers: 1075 productIDs = [prodID for prodID, _partNum in productInfo] 1076 if productIDs: 1077 Logger.addMessage("Calculating zero-point and updating headers...", 1078 alwaysLog=(bestProduct.productType == 'mosaic')) 1079 1080 fits.removeDuplicateKeywords(bestProduct.catName()) 1081 1082 # Update FITS headers 1083 if not isQuick: 1084 updateHeaders(bestProduct, self.calcZP(bestProduct, components), 1085 pu.Provenance(self.archive, bestProduct.fileName(), 1086 components=components)) 1087 self.productProcessing([productID], prodOfID, 'calcZP') 1088 self.testProduct(bestProduct, components) 1089 self.productProcessing([productID], prodOfID, 'testProduct') 1090 # Do not compress... 1091 self.productProcessing([productID], prodOfID, isComplete=True)
1092 1093 #-------------------------------------------------------------------------- 1094
1095 - def createStacks(self, newProducts, productOfID, componentsOfProd):
1096 """ 1097 Creates the deep stack of given product ID in the output directory when 1098 supplied a list of component images (from L{queries.getStacks()}). 1099 If the number of images being stacked requires more than the available 1100 amount of physical system memory then the stacking is broken up into 1101 parts, unless overridden by the L{Cu13.breakIntoParts} option. 1102 1103 @return: List of broken extensions. 1104 @rtype: list(int) 1105 1106 """ 1107 processLayers = self.calcProcessLayers('imstack', newProducts, 1108 productOfID, componentsOfProd) 1109 1110 numToBeStacked = len(set(prodID 1111 for productInfo, _isParts in processLayers 1112 for prodID, _nParts in productInfo)) 1113 stackInfoDict = {} 1114 Logger.addMessage("Stacking %s stacks..." % numToBeStacked) 1115 progress = ForLoopMonitor(processLayers) 1116 for productInfo, isParts in processLayers: 1117 productIDs = [prodID for prodID, _partNum in productInfo] 1118 if productIDs: 1119 self.productProcessing(productIDs, productOfID, initialise=True) 1120 1121 if isParts: 1122 # Stacks exceed memory limit 1123 # create stacks in series (breaking each into parts) 1124 for productID, numParts in productInfo: 1125 bestProduct = productOfID[productID] 1126 currentFiles, expTimes = \ 1127 bestProduct.getImageDetails(componentsOfProd[productID]) 1128 1129 stackInfoDict[productID] = (currentFiles, expTimes) 1130 partFiles = [] 1131 # @TODO: Is the concept of iterations really useful? 1132 # It could lead to an infinite loop... 1133 iteration = 0 1134 isComplete = self.redoHeader and bestProduct.imageExists() 1135 part = 1 1136 while not isComplete: 1137 stkSize = (0 if not self.breakIntoParts else 1138 self.getRequiredMemory(currentFiles)) 1139 1140 if iteration == 0: 1141 numParts = self.numParts or \ 1142 int(math.ceil(stkSize / Cu13.memoryLimit)) 1143 else: 1144 numParts = int(math.ceil(stkSize / Cu13.memoryLimit)) 1145 1146 if numParts <= 1: 1147 self.stackImages(currentFiles, bestProduct, expTimes) 1148 break 1149 1150 nImagePerPart = \ 1151 max(int(math.ceil(len(currentFiles) / numParts)), 2) 1152 1153 splitInputFiles = \ 1154 list(utils.splitList(currentFiles, nImagePerPart, 1155 noSingles=True)) 1156 1157 numParts = len(splitInputFiles) 1158 if numParts <= 1: 1159 self.stackImages(currentFiles, bestProduct, expTimes) 1160 break 1161 1162 Logger.addMessage("Splitting the stacking into %s parts" 1163 % numParts, alwaysLog=False) 1164 1165 deepPartFiles = [] 1166 for inputFiles in splitInputFiles: 1167 bestProduct.part = part 1168 self.stackImages(inputFiles, bestProduct, expTimes) 1169 deepPartFiles.append(bestProduct.files()) 1170 part += 1 1171 1172 bestProduct.part = None 1173 iteration += 1 1174 partFiles += deepPartFiles 1175 currentFiles = deepPartFiles 1176 1177 self.productProcessing([productID], productOfID, 'imstack') 1178 1179 # Delete any parts 1180 for filePath in partFiles: 1181 os.remove(filePath.image) 1182 if filePath.conf: 1183 os.remove(filePath.conf) 1184 if filePath.cat: 1185 os.remove(filePath.cat) 1186 1187 else: # run layers in parallel 1188 prodThreads = [] 1189 for productID in productIDs: 1190 bestProduct = productOfID[productID] 1191 allFiles, expTimes = \ 1192 bestProduct.getImageDetails(componentsOfProd[productID]) 1193 stackInfoDict[productID] = (allFiles, expTimes) 1194 prodThreads.append( 1195 multiprocessing.Process(target=self.stackImages, 1196 args=(allFiles, bestProduct, expTimes))) 1197 1198 # Do stacking 1199 if productIDs: 1200 msg = "Stacking images with productIDs " 1201 msg += utils.numberRange(productIDs) 1202 Logger.addMessage(msg, alwaysLog=False) 1203 1204 self.runThreads(prodThreads) 1205 self.productProcessing(productIDs, productOfID, 'imstack') 1206 1207 msg = msg.replace('ing', 'ed') if productIDs else '' 1208 progress.testForOutput(msg) 1209 # @TODO: Parallel process prepHeaders 1210 products = \ 1211 self.hasProcessBeenRun(newProducts, productOfID, 'prepHeaders') 1212 1213 if products: 1214 msg = ("of all new products" if newProducts == products else 1215 "with productIDs " + utils.numberRange(products)) 1216 1217 Logger.addMessage("Updating headers for images " + msg) 1218 1219 for productID in products: 1220 bestProduct = productOfID[productID] 1221 if productID in stackInfoDict: 1222 allFiles, expTimes = stackInfoDict[productID] 1223 else: 1224 allFiles, expTimes = \ 1225 bestProduct.getImageDetails(componentsOfProd[productID]) 1226 allFiles = self.getOBLevel(allFiles, bestProduct) 1227 fits.prepHeaders(bestProduct.fileName(), bestProduct.confName(), 1228 allFiles, expTimes, 1229 dfsIDString=(self.programme.getAttr("dfsIDString") 1230 if self.sysc.isWSA() else None), 1231 bScale=(bestProduct.bScale 1232 if self.sysc.hasEsoRelease else None)) 1233 1234 self.productProcessing([productID], productOfID, 'prepHeaders') 1235 1236 # Delete any intermediate stack confidence images 1237 for filePath in allFiles: 1238 if bestProduct.outputDir in filePath.conf: 1239 os.remove(filePath.conf) 1240 self.runInitialCasuExtraction(newProducts, productOfID, 1241 componentsOfProd, stackInfoDict)
1242 1243 #-------------------------------------------------------------------------- 1244
1245 - def getOBLevel(self, allFiles, bestProduct):
1246 """ 1247 """ 1248 if bestProduct.isTile() or bestProduct.isMosaic(): 1249 return allFiles 1250 deeps = [af for af in allFiles if self.sysc.deepSuffix in af.image] 1251 nonDeeps = [af for af in allFiles if self.sysc.deepSuffix not in af.image] 1252 while deeps: 1253 # Should only be one deep 1254 if len(deeps) > 1: 1255 Cu13.CuError("More than one old deep used to construct %s" % bestProduct.fileName()) 1256 # Get OB files.. 1257 provFileNames = self.archive.query( 1258 "m2.fileName", "Multiframe as m1,Provenance as v,Multiframe as m2", 1259 "v.combiframeID=m1.multiframeID and m1.fileName like '%%%s' and " 1260 "v.multiframeID=m2.multiframeID order by m2.mjdObs" % 1261 os.path.join(os.path.basename(os.path.dirname(deeps[0].image)), 1262 os.path.basename(deeps[0].image))) 1263 newFiles = [fits.FileList(pfn, fits.getConfMap(pfn), fits.getCatalogue(pfn)) for pfn in provFileNames] 1264 nonDeeps = newFiles + nonDeeps 1265 deeps = [af for af in newFiles if self.sysc.deepSuffix in af.image] 1266 return nonDeeps
1267 1268
1269 - def runInitialCasuExtraction(self, newProducts, productOfID, 1270 componentsOfProd, stackInfoDict=None):
1271 """ 1272 """ 1273 # process catalogues in parallel 1274 processLayers = self.calcProcessLayers('imcore', newProducts, 1275 productOfID, componentsOfProd) 1276 Logger.addMessage("Extracting...") 1277 areZeroSizeExtsDict = {} 1278 for productID in newProducts: 1279 bestProduct = productOfID[productID] 1280 areZeroSizeExtsDict[productID] = \ 1281 any(ii > 0 and hdu.header["TEXPTIME"] == 0 1282 for ii, hdu in enumerate(fits.open(bestProduct.fileName()))) 1283 1284 progress = ForLoopMonitor(processLayers) 1285 for productInfo, _isParts in processLayers: 1286 productIDs = [prodID for prodID, _partNum in productInfo] 1287 prodThreads = [] 1288 for productID in productIDs: 1289 bestProduct = productOfID[productID] 1290 prodThreads.append(multiprocessing.Process( 1291 target=self.extractCat, 1292 args=(bestProduct, areZeroSizeExtsDict[productID]))) 1293 1294 # Do extracting 1295 if productIDs: 1296 msg = "Extracting products: " + utils.numberRange(productIDs) 1297 Logger.addMessage(msg, alwaysLog=False) 1298 1299 self.runThreads(prodThreads) 1300 self.productProcessing(productIDs, productOfID, 'imcore') 1301 1302 msg = msg.replace('ing', 'ed') if productIDs else '' 1303 progress.testForOutput(msg) 1304 1305 self.fitWCS(newProducts, productOfID, componentsOfProd, inParallel=True, 1306 areZeroSizeExtsDict=areZeroSizeExtsDict) 1307 1308 # Fix any conf images - use zeroExtNums 1309 proc = 'intUpdate' if bestProduct.isTile() else 'calcZP' 1310 products = self.hasProcessBeenRun(newProducts, productOfID, proc) 1311 if products: 1312 msg = ("all new products" if newProducts == products else 1313 "products " + utils.numberRange(products)) 1314 1315 Logger.addMessage("Calculating ZP and updating headers for " + msg) 1316 1317 progress = ForLoopMonitor(products) 1318 for productID in products: 1319 if bestProduct.isTile(): 1320 Logger.addMessage("ProductID=%s" % productID, alwaysLog=False) 1321 bestProduct = productOfID[productID] 1322 # @TODO: Why not componentsOfProd[productID] 1323 components = bestProduct.findRequiredInputs() 1324 zpOfExt = self.calcZP(bestProduct, components, isGrt=True) 1325 prov = pu.Provenance(self.archive, bestProduct.fileName(), 1326 components=components) 1327 1328 updateHeaders(bestProduct, zpOfExt, prov, isGrt=True) 1329 self.productProcessing([productID], productOfID, 'intUpdate') 1330 else: 1331 Logger.addMessage("ProductID=%s" % productID, alwaysLog=False) 1332 bestProduct = productOfID[productID] 1333 Logger.addMessage("Calculating zero-point and updating headers...", 1334 alwaysLog=(bestProduct.productType == 'mosaic')) 1335 1336 fits.removeDuplicateKeywords(bestProduct.catName()) 1337 1338 # Update FITS headers 1339 updateHeaders(bestProduct, 1340 self.calcZP(bestProduct, componentsOfProd[productID]), 1341 pu.Provenance(self.archive, bestProduct.fileName(), 1342 components=componentsOfProd[productID])) 1343 1344 if productID in stackInfoDict: 1345 _allFiles, expTimes = stackInfoDict[productID] 1346 else: 1347 _allFiles, expTimes = \ 1348 bestProduct.getImageDetails(componentsOfProd[productID]) 1349 1350 # Check expTimes - TODO: Do this within fixBadExtensions? 1351 zeroExtNums = \ 1352 [i + 1 for i in utils.getListIndices(expTimes, value=0)] 1353 1354 self.fixBadExtensions(bestProduct, zeroExtNums) 1355 self.productProcessing([productID], productOfID, 'calcZP') 1356 1357 progress.testForOutput("Completed productID %s" % productID) 1358 1359 if not bestProduct.isTile(): 1360 self.finishOffProducts(newProducts, productOfID, componentsOfProd)
1361 1362 #-------------------------------------------------------------------------- 1363
1364 - def createTiles(self, newProducts, productOfID, componentsOfProd):
1365 """ 1366 """ 1367 # nebuliser 1368 1369 isCompleteDict = {} 1370 for productID in newProducts: 1371 isCompleteDict[productID] = self.redoHeader and \ 1372 productOfID[productID].imageExists() 1373 1374 processLayers = self.calcProcessLayers('nebuliser', newProducts, 1375 productOfID, componentsOfProd) 1376 Logger.addMessage("Filtering...") 1377 progress = ForLoopMonitor(processLayers) 1378 filtCompsOfProd = {} 1379 for productInfo, _isParts in processLayers: 1380 productIDs = [prodID for prodID, _partNum in productInfo] 1381 self.productProcessing(productIDs, productOfID, initialise=True) 1382 prodThreads = [] 1383 filterComps = [] 1384 for productID, prodPart in productInfo: 1385 if not isCompleteDict[productID]: 1386 inputImage = componentsOfProd[productID][prodPart] 1387 filteredImage = fits.getFilteredImageName(inputImage.imageName, 1388 directory=(self._tmpWorkDir if not self.archive.isRealRun 1389 and self.sysc.testOutputPath() not in inputImage.imageName 1390 else None)) 1391 1392 filterComps.append(Cu13.ProductDetails(inputImage.productID, 1393 inputImage.multiframeID, filteredImage, inputImage.confName, 1394 inputImage.catName)) 1395 1396 if prodPart == 5: # TODO: there could be 3 pawprints, not 6. 1397 filtCompsOfProd[productID] = filterComps 1398 filterComps = [] 1399 1400 prodThreads.append( 1401 multiprocessing.Process(target=self.filterImages, 1402 args=(inputImage, filteredImage))) 1403 1404 if productInfo: 1405 uniquePiDs = [productID for productID, pp in productInfo 1406 if pp == 5] 1407 1408 msg = "Filtering products: " + utils.numberRange(uniquePiDs) 1409 Logger.addMessage(msg, alwaysLog=False) 1410 self.runThreads(prodThreads) 1411 1412 # @TODO: Need to work if NOFFSETS=3 1413 # This only updates ProductProcessing if the last stack in the 1414 # tile (offset 5) has been completed 1415 self.productProcessing(uniquePiDs, productOfID, 'nebuliser') 1416 1417 msg = msg.replace('ing', 'ed') if productInfo else '' 1418 progress.testForOutput(msg) 1419 1420 # Now tile 1421 # @TODO: Would be more efficient to merge layers together! 1422 processLayers = [(pInfo, 1) for pInfo, _isParts in 1423 self.calcProcessLayers('mosaic', newProducts, 1424 productOfID, componentsOfProd)] 1425 processLayers += [(pInfo, 2) for pInfo, _isParts in 1426 self.calcProcessLayers('filtmos', newProducts, 1427 productOfID, componentsOfProd)] 1428 # @FIXME: Why does this often fall over after mosaicing, before filtmos? 1429 Logger.addMessage("Mosaicing...") 1430 progress = ForLoopMonitor(processLayers) 1431 for productInfo, mosType in processLayers: 1432 productIDs = [prodID for prodID, _partNum in productInfo] 1433 prodThreads = [] 1434 for productID in productIDs: 1435 bestProduct = productOfID[productID] 1436 isFiltered = False 1437 if mosType == 1: 1438 components = componentsOfProd[productID] 1439 else: 1440 isFiltered = True 1441 if productID in filtCompsOfProd: 1442 components = filtCompsOfProd[productID] 1443 else: 1444 filteredImageDict = dict([(inputImage.imageName, 1445 fits.getFilteredImageName(inputImage.imageName)) 1446 for inputImage in componentsOfProd[productID]]) 1447 1448 for imageName in filteredImageDict: 1449 if not filteredImageDict[imageName] \ 1450 and not self.archive.isRealRun: 1451 filteredImageDict[imageName] = \ 1452 fits.getFilteredImageName(imageName, 1453 directory=(self._tmpWorkDir 1454 if not self.archive.isRealRun 1455 and self.sysc.testOutputPath() 1456 not in imageName 1457 else None)) 1458 1459 components = [Cu13.ProductDetails(inputImage.productID, 1460 inputImage.multiframeID, 1461 filteredImageDict[inputImage.imageName], 1462 inputImage.confName, inputImage.catName) 1463 for inputImage in componentsOfProd[productID]] 1464 1465 prodThreads.append(multiprocessing.Process( 1466 target=self.tileImages, args=(components, bestProduct, 1467 isFiltered))) 1468 1469 if productIDs: 1470 msg = "Mosaicing products: " + utils.numberRange(productIDs) 1471 Logger.addMessage(msg, alwaysLog=False) 1472 self.runThreads(prodThreads) 1473 self.productProcessing(productIDs, productOfID, 1474 process=('mosaic' if mosType == 1 else 'filtmos')) 1475 1476 msg = msg.replace('ing', 'ked') if productIDs else '' 1477 progress.testForOutput(msg) 1478 1479 products = \ 1480 self.hasProcessBeenRun(newProducts, productOfID, 'prepHeaders') 1481 1482 if products: 1483 msg = ("all new products" if newProducts == products else 1484 "products " + utils.numberRange(products)) 1485 1486 Logger.addMessage("Updating headers for " + msg) 1487 1488 for productID in products: 1489 for inputImage in componentsOfProd[productID]: 1490 filteredImage = fits.getFilteredImageName(inputImage.imageName, 1491 directory=(self._tmpWorkDir if not self.archive.isRealRun 1492 and self.sysc.testOutputPath() not in inputImage.imageName 1493 else None)) 1494 1495 if os.path.exists(filteredImage): 1496 os.remove(filteredImage) 1497 1498 bestProduct = productOfID[productID] 1499 allFiles, expTimes = \ 1500 bestProduct.getImageDetails(componentsOfProd[productID]) 1501 allFiles = self.getOBLevel(allFiles, bestProduct) 1502 fits.prepHeaders(bestProduct.fileName(), bestProduct.confName(), 1503 allFiles, expTimes, 1504 dfsIDString=(self.programme.getAttr("dfsIDString") 1505 if self.sysc.isWSA() else None), 1506 bScale=(bestProduct.bScale 1507 if self.sysc.hasEsoRelease else None)) 1508 1509 self.productProcessing([productID], 1510 productOfID, 'prepHeaders') 1511 1512 # extract + wcsfit 1513 self.runInitialCasuExtraction(newProducts, productOfID, 1514 componentsOfProd) 1515 1516 # grout 1517 tmpGroutDir = self.sysc.tempWorkPath() 1518 shutil.rmtree(tmpGroutDir) 1519 utils.ensureDirExist(tmpGroutDir) 1520 processLayers = self.calcProcessLayers('grout', newProducts, 1521 productOfID, componentsOfProd) 1522 Logger.addMessage("Grouting...") 1523 progress = ForLoopMonitor(processLayers) 1524 for productInfo, _isParts in processLayers: 1525 productIDs = [prodID for prodID, _partNum in productInfo] 1526 prodThreads = [] 1527 if productIDs: 1528 Logger.addMessage("Setting up links for products: " 1529 + utils.numberRange(productIDs), alwaysLog=False) 1530 1531 for productID in productIDs: 1532 bestProduct = productOfID[productID] 1533 self.createSoftLinks(bestProduct, tmpGroutDir) 1534 catName = bestProduct.preGroutCatName() 1535 finalCatName = bestProduct.catName() 1536 prodThreads.append(multiprocessing.Process( 1537 target=self.groutCat, args=(catName, finalCatName, 1538 tmpGroutDir))) 1539 1540 if productIDs: 1541 msg = "Grouting catalogues for products: " 1542 msg += utils.numberRange(productIDs) 1543 Logger.addMessage(msg, alwaysLog=False) 1544 1545 self.runThreads(prodThreads) 1546 self.productProcessing(productIDs, productOfID, 'grout') 1547 1548 msg = msg.replace('ing', 'ed') if productIDs else '' 1549 progress.testForOutput(msg) 1550 shutil.rmtree(tmpGroutDir) 1551 # classify 1552 processLayers = self.calcProcessLayers('classify', newProducts, 1553 productOfID, componentsOfProd) 1554 1555 Logger.addMessage("Classifying...") 1556 progress = ForLoopMonitor(processLayers) 1557 for productInfo, _isParts in processLayers: 1558 productIDs = [prodID for prodID, _partNum in productInfo] 1559 prodThreads = [] 1560 for productID in productIDs: 1561 bestProduct = productOfID[productID] 1562 catName = bestProduct.catName() 1563 prodThreads.append(multiprocessing.Process( 1564 target=self.classifyCat, args=(catName, False))) 1565 1566 if productIDs: 1567 msg = "Classifying catalogues for products: " 1568 msg += utils.numberRange(productIDs) 1569 Logger.addMessage(msg, alwaysLog=False) 1570 1571 self.runThreads(prodThreads) 1572 self.productProcessing(productIDs, productOfID, 'classify') 1573 1574 msg = msg.replace('ying', 'ied') if productIDs else '' 1575 progress.testForOutput(msg) 1576 1577 # ZP 1578 products = self.hasProcessBeenRun(newProducts, productOfID, 'calcZP') 1579 if products: 1580 msg = ("all new products" if newProducts == products else 1581 "products " + utils.numberRange(products)) 1582 1583 Logger.addMessage("Calculating ZP and updating headers for " + msg) 1584 1585 progress = ForLoopMonitor(products) 1586 for productID in products: 1587 bestProduct = productOfID[productID] 1588 1589 # Remove temporary pre-grouted catalogues 1590 if bestProduct.isTile() \ 1591 and os.path.exists(bestProduct.preGroutCatName()): 1592 os.remove(bestProduct.preGroutCatName()) 1593 1594 Logger.addMessage("Calculating zero-point and updating headers...", 1595 alwaysLog=bestProduct.productType == 'mosaic') 1596 1597 fits.removeDuplicateKeywords(bestProduct.catName()) 1598 1599 # Update FITS headers 1600 updateHeaders(bestProduct, 1601 self.calcZP(bestProduct, componentsOfProd[productID]), 1602 pu.Provenance(self.archive, bestProduct.fileName(), 1603 componentsOfProd[productID])) 1604 1605 self.productProcessing([productID], productOfID, 'calcZP') 1606 progress.testForOutput("Completed productID %s" % productID) 1607 1608 self.finishOffProducts(newProducts, productOfID, componentsOfProd)
1609 1610 #-------------------------------------------------------------------------- 1611
1612 - def finishOffProducts(self, newProducts, productOfID, componentsOfProd):
1613 """ Last parts. 1614 """ 1615 products = self.hasProcessBeenRun(newProducts, productOfID, 'testProduct') 1616 if products: 1617 msg = ("all new products" if newProducts == products else 1618 "products " + utils.numberRange(products)) 1619 1620 Logger.addMessage("Testing products and updating headers for " + msg) 1621 1622 progress = ForLoopMonitor(products) 1623 for productID in products: 1624 bestProduct = productOfID[productID] 1625 self.testProduct(bestProduct, componentsOfProd[productID]) 1626 self.productProcessing([productID], productOfID, 'testProduct') 1627 progress.testForOutput("Completed productID %s" % productID) 1628 1629 prevProducts = products 1630 products = self.hasProcessBeenRun(newProducts, productOfID, 'complete') 1631 if products: 1632 msg = ("the same products" if prevProducts == products else 1633 "products " + utils.numberRange(products)) 1634 1635 Logger.addMessage("Compressing files and finishing off for " + msg) 1636 1637 progress = ForLoopMonitor(products) 1638 for productID in products: 1639 bestProduct = productOfID[productID] 1640 self.compressProd(bestProduct) 1641 self.productProcessing([productID], productOfID, isComplete=True) 1642 progress.testForOutput("Completed productID %s" % productID)
1643 1644 #-------------------------------------------------------------------------- 1645
1646 - def createMosaics(self, newProducts, productOfID, componentsOfProd):
1647 """ Create mosaics 1648 """ 1649 # Convert mosaics first (can this be parallised?) 1650 for productID in newProducts: 1651 Logger.addMessage("Creating mosaic, product %s" % productID) 1652 bestProduct = productOfID[productID] 1653 self.createMosaic(bestProduct, componentsOfProd[productID]) 1654 # Extract catalogues - (can this be parallised?) 1655 for productID in newProducts: 1656 bestProduct = productOfID[productID] 1657 dualProduct = None 1658 if bestProduct.getDualProductID(): 1659 dualProduct = self.getDualImageData(bestProduct) 1660 Logger.addMessage("Got dual image data", alwaysLog=False) 1661 components = bestProduct.intComponents 1662 Logger.addMessage("Extracting catalogue") 1663 self.createCatalogue(bestProduct, components, dualProduct)
1664 1665 #-------------------------------------------------------------------------- 1666
1667 - def fitWCS(self, newProducts, productOfID, componentsOfProd, 1668 inParallel=False, areZeroSizeExtsDict={}):
1669 """ Use wcsfit to do astrometry 1670 """ 1671 Logger.addMessage("Fitting astrometry...") 1672 processLayers = self.calcProcessLayers('wcsfit', newProducts, 1673 productOfID, componentsOfProd) 1674 progress = ForLoopMonitor(processLayers) 1675 for productInfo, _isParts in processLayers: 1676 productIDs = [prodID for prodID, _partNum in productInfo] 1677 msg = "Fitting astrometry to products: " 1678 msg += utils.numberRange(productIDs) 1679 Logger.addMessage(msg, alwaysLog=False) 1680 1681 prodThreads = [] 1682 threadNo = 1 1683 for productID in productIDs: 1684 bestProduct = productOfID[productID] 1685 if inParallel: 1686 prodThreads.append( 1687 multiprocessing.Process(target=self.wcsfit, 1688 args=(bestProduct, threadNo, areZeroSizeExtsDict[productID]))) 1689 threadNo += 1 1690 else: 1691 self.wcsfit(bestProduct, threadNo, areZeroSizeExtsDict[productID]) 1692 self.productProcessing([productID], productOfID, 'wcsfit') 1693 1694 if inParallel and productIDs: 1695 self.runThreads(prodThreads) 1696 self.productProcessing(productIDs, productOfID, 'wcsfit') 1697 for thdNo in range(threadNo - 1): 1698 # Delete temporary folders 1699 shutil.rmtree(os.path.join(os.getcwd(), "thread%s_%s" % 1700 (thdNo + 1, self.cuEventID))) 1701 1702 progress.testForOutput(msg.replace('ing', 'ed'))
1703 1704 #-------------------------------------------------------------------------- 1705
1706 - def testFits(self, fileName, processName):
1707 """ 1708 """ 1709 failed = False 1710 if os.path.exists(fileName): 1711 stdout = extp.run('fverify %s' % fileName, 1712 ignoreMsgs=['Problem saving parameter file', 1713 'for writing']) 1714 index = 0 1715 for ii, line in enumerate(stdout): 1716 if line.startswith('HDU#'): 1717 index = ii 1718 hduNo = 0 1719 1720 if ii > index and index > 0: 1721 if len(line) > 0 and line[0].isdigit(): 1722 hduNo += 1 1723 if int(line.split()[len(line.split()) - 1]) > 0: 1724 failed = True 1725 Logger.addMessage( 1726 "Corrupt file %s, ext %s in process %s " 1727 % (fileName, hduNo, 1728 processName if processName else 'NONE')) 1729 1730 if self.sysc.tileSuffix not in fileName \ 1731 and self.sysc.mosaicSuffix not in fileName \ 1732 and hduNo < self.sysc.maxHDUs: 1733 failed = True 1734 Logger.addMessage("Fewer extensions %s in process %s" 1735 % (fileName, 1736 processName if processName else 'NONE')) 1737 1738 return failed
1739 1740 #-------------------------------------------------------------------------- 1741
1742 - def productProcessing(self, productIDList, productOfID, process=None, 1743 initialise=False, isComplete=False):
1744 """ 1745 """ 1746 doAdd = False 1747 for productID in productIDList: 1748 bestProduct = productOfID[productID] 1749 fileName = bestProduct.fileName() 1750 # @TODO: Does this slow it down too much 1751 hasFailed = self.testFits(fileName, process) 1752 productType = bestProduct.productType 1753 if initialise: 1754 bitWiseStatus = 0 1755 doAdd = True 1756 elif isComplete: 1757 bitWiseStatus = 2 ** (self.bitWiseProcessingDict['complete']) 1758 else: 1759 bitWiseStatus = 2 ** (self.bitWiseProcessingDict[process]) 1760 doAdd = True 1761 1762 if not hasFailed and self.archive.queryEntriesExist("ProductProcessing", 1763 "programmeID=%s and productType='%s' and productID=%s " 1764 "and fileName='%s'" % (self.programmeID, productType, productID, 1765 fileName)): 1766 self.archive.update("ProductProcessing", 1767 [("bitProcessingFlag", ("bitProcessingFlag+%s" % bitWiseStatus 1768 if doAdd else bitWiseStatus))], 1769 where="programmeID=%s AND productType='%s' AND productID=%s " 1770 "AND fileName='%s'" % (self.programmeID, productType, 1771 productID, fileName)) 1772 elif not hasFailed: 1773 self.archive.insertData("ProductProcessing", 1774 (self.programmeID, productType, productID, fileName, 1775 bitWiseStatus), enforcePKC=True) 1776 else: 1777 raise Cu13.CuError("Corrupt file: " + fileName)
1778 #Logger.addMessage("<Warning> Corrupt file %s, will delete and redo later" % fileName) 1779 #if os.path.exists(fileName): 1780 # os.remove(fileName) 1781 #if os.path.exists(fits.getConfMap(fileName)): 1782 # os.remove(fits.getConfMap(fileName)) 1783 #if os.path.exists(fits.getCatalogue(fileName)): 1784 # os.remove(fits.getCatalogue(fileName)) 1785 #self.archive.delete("ProductProcessing", 1786 # whereStr="programmeID=%s AND productType='%s' AND productID=%s " 1787 # "AND fileName='%s'" % (self.programmeID, productType, 1788 # productID, fileName)) 1789 1790 #-------------------------------------------------------------------------- 1791
1792 - def runThreads(self, threads):
1793 """ Executes given list of thread processes. 1794 """ 1795 try: 1796 for thread in threads: 1797 thread.start() 1798 finally: 1799 threadFail = False 1800 for thread in threads: 1801 if threadFail: 1802 thread.terminate() 1803 thread.join() 1804 else: 1805 thread.join() 1806 threadFail = thread.exitcode 1807 if threadFail: 1808 Logger.addMessage("<ERROR> Thread failed. Terminating all" 1809 " other running threads.") 1810 1811 if threadFail: 1812 self.tidyUp() 1813 raise Cu13.CuError("Forked processes failed. Please check stdout.")
1814 1815 #-------------------------------------------------------------------------- 1816
1817 - def tidyUp(self):
1818 """ 1819 """ 1820 Logger.addMessage("Tidying up...THIS PART OF CODE STILL NEEDS TO BE COMPLETED")
1821 1822 #-------------------------------------------------------------------------- 1823 1894 1895 #-------------------------------------------------------------------------- 1896
1897 - def createMosaic(self, bestProduct, components):
1898 """ 1899 Creates the deep mosaic of given product ID in the output directory when 1900 supplied a list of component images (from L{queries.getStacks()}). 1901 1902 @return: List of new deep product files created. 1903 @rtype: list(str) 1904 1905 """ 1906 1907 # @TODO: low priority make parallel. 1908 if bestProduct.external: 1909 productID = bestProduct.deepProductTable["productID"][bestProduct.index] 1910 prodOfID = {productID:bestProduct} 1911 processLayers = self.calcProcessLayers('imageproc', [productID], prodOfID, None) 1912 Logger.addMessage("Querying database for metadata...", alwaysLog=False) 1913 provInfoObj = Provenance(self, bestProduct) 1914 bestProduct.compProductType = provInfoObj.intProdType 1915 bestProduct.intComponents = provInfoObj.getComponents() 1916 # nToBeConverted = len(set([prodID for productInfo, _isParts in processLayers for prodID, _nParts in productInfo])) 1917 # progress = ForLoopMonitor(processLayers) 1918 # Logger.addMessage("Converting %s stacks..." % nToBeStacked) 1919 Logger.addMessage("Mosaicing image: %s from %s" 1920 % (bestProduct.fileName(), components), alwaysLog=False) 1921 for productInfo, _isParts in processLayers: 1922 productIDs = [prodID for prodID, _partNum in productInfo] 1923 if productIDs: 1924 self.productProcessing(productIDs, prodOfID, initialise=True) 1925 self.modifyExternalMosaic(bestProduct, components, provInfoObj) 1926 self.isCompressed = False 1927 self.productProcessing(productIDs, prodOfID, 'imageproc') 1928 else: 1929 raise Cu13.CuError("No support for CASU tool yet.")
1930 1931 #-------------------------------------------------------------------------- 1932
1933 - def modifyExternalMosaic(self, bestProduct, components, provInfoObj):
1934 """ 1935 Sets up attributes to convert an external mosaic into the 1936 correct format: MEF, with the correct headers for CU3 to ingest 1937 into the science archive. 1938 1939 """ 1940 if not (components and os.path.exists(bestProduct.provFile())): 1941 raise Cu13.CuError("External mosaics have not been imported or " 1942 "set up correctly. If they have been imported " 1943 "add a line into ExternalProduct and run " 1944 "cu13.convertProvInfo.py") 1945 1946 bestProduct.imageName = components[0][1] 1947 bestProduct.weightName = components[0][2] 1948 prodID = bestProduct.deepProductTable["productID"][bestProduct.index] 1949 Logger.addMessage("Converting mosaic productID %s in %s" 1950 % (prodID, 1951 ','.join(map(str, bestProduct.deepProductTable["productID"])))) 1952 1953 1954 1955 # convert into mef if necessary and sort out headers 1956 outputFiles = [(bestProduct.fileName(), True), 1957 (bestProduct.confName(), False)] 1958 1959 # normalise weight image 1960 normalFactor = (1 if self.extractor != 'CASU' else 1961 getNormalisationConst(bestProduct)) 1962 1963 for imageName, isImage in outputFiles: 1964 self.convertMosaic(bestProduct, imageName, isImage, provInfoObj, 1965 normalFactor) 1966 1967 1968 1969 Logger.addMessage("Finished converting mosaic productID %s" 1970 % (prodID))
1971 1972 #-------------------------------------------------------------------------- 1973
1974 - def convertMosaic(self, mosaicObj, outputFileName, isImage, 1975 provObj, normalFactor):
1976 """ 1977 Converts a mosaic to MEF and adds the correct primary and extension 1978 header. 1979 1980 """ 1981 if not (os.path.exists(outputFileName)): 1982 Logger.addMessage("Converting mosaic %s %s..." % 1983 ("image" if isImage else "confidence map", outputFileName)) 1984 # @TODO: Check to see if this part has already been done. 1985 # use ProductProcessing 1986 inputFileName = mosaicObj.imageName if isImage \ 1987 else mosaicObj.weightName 1988 # Copy image and get header info 1989 if not mosaicObj.isMef(inputFileName): 1990 if not self.archive.isTrialRun: 1991 if not self.redoHeader: 1992 origImagHeader = [convertSFtoMEF(inputFileName, 1993 outputFileName, isImage, self.extractor, normalFactor)] 1994 1995 os.chmod(outputFileName, 0644) 1996 else: 1997 origImagHeader = [fits.open(inputFileName)[0].header] 1998 else: 1999 if not self.archive.isTrialRun: 2000 if not self.redoHeader: 2001 shutil.copyfile(inputFileName, outputFileName) 2002 hdulist = fits.open(outputFileName, 'update') 2003 origImagHeader = [hdulist[0].header, hdulist[1].header] 2004 if not isImage and self.extractor == 'CASU': 2005 hdulist[1].data = numpy.multiply( 2006 hdulist[1].data, normalFactor).astype( 2007 numpy.uint8) 2008 fits.checkForUnicodeError(hdulist, outputFileName) 2009 hdulist.close() 2010 else: 2011 hdulist = fits.open(inputFileName) 2012 origImagHeader = [hdulist[0].header, hdulist[1].header] 2013 hdulist.close() 2014 2015 # Now fix up headers 2016 if not self.archive.isTrialRun: 2017 hdulist = fits.open(outputFileName, 'update') 2018 hdulist[0].header = self.getPrimaryHeader(mosaicObj, 2019 hdulist[0].header, origImagHeader[0], isImage, provObj) 2020 2021 fits.checkForUnicodeError(hdulist, outputFileName) 2022 if mosaicObj.isMef(): 2023 hdulist[1].header = self.getInitImageHeader( 2024 hdulist[1].header, origImagHeader[len(origImagHeader) - 1], 2025 provObj) 2026 2027 fits.checkForUnicodeError(hdulist, outputFileName) 2028 hdulist[1].header = self.getImageHeader(mosaicObj, 2029 hdulist[1].header, isImage, provObj) 2030 fits.checkForUnicodeError(hdulist, outputFileName) 2031 hdulist.close()
2032 2033 #-------------------------------------------------------------------------- 2034
2035 - def findContents(self):
2036 """ 2037 Checks ProductProcessing to see if there are partially processed 2038 products: sets directory date 2039 2040 """ 2041 compProds = self.archive.query("productType,productID,fileName", 2042 "ProductProcessing", 2043 "programmeID=%s and bitProcessingFlag&%s!=0" 2044 % (self.programmeID, 2045 2 ** (Cu13.bitWiseProcessingDict['complete']))) 2046 2047 startProds = self.archive.query("productType,productID,fileName", 2048 "ProductProcessing", 2049 "programmeID=%s and bitProcessingFlag&%s!=1 " 2050 "and bitProcessingFlag>0" 2051 % (self.programmeID, 2052 2 ** (Cu13.bitWiseProcessingDict['complete']))) 2053 2054 filtTileProds = self.archive.query("productType,productID,fileName", 2055 "ProductProcessing", 2056 "programmeID=%s and bitProcessingFlag=%s and " 2057 "productType ='tile' " 2058 % (self.programmeID, 2059 2 ** (Cu13.bitWiseProcessingDict['nebuliser']))) 2060 2061 for pType, productID, prodFilePath in compProds: 2062 if (os.path.exists(prodFilePath) and 2063 os.path.exists(fits.getConfMap(prodFilePath)) and 2064 os.path.exists(fits.getCatalogue(prodFilePath))): 2065 self.newProductsOfType[pType][productID] = \ 2066 Cu13.ProductDetails(productID, 2067 dbc.intDefault(), prodFilePath, 2068 fits.getConfMap(prodFilePath), 2069 fits.getCatalogue(prodFilePath)) 2070 2071 self.prodDirectory[pType][productID] = \ 2072 os.path.dirname(prodFilePath) 2073 2074 for pType, productID, prodFilePath in startProds: 2075 if (os.path.exists(prodFilePath)): 2076 self.startedProductsOfType[pType][productID] = \ 2077 Cu13.ProductDetails(productID, 2078 dbc.intDefault(), prodFilePath, 2079 fits.getConfMap(prodFilePath), 2080 fits.getCatalogue(prodFilePath)) 2081 2082 self.prodDirectory[pType][productID] = \ 2083 os.path.dirname(prodFilePath) 2084 2085 for pType, productID, prodFilePath in filtTileProds: 2086 # @TODO: have to repeat filtering this way 2087 # Use productLinks 2088 stackProdIDs = self.archive.query( 2089 "intProductID", 2090 "ProductLinks", 2091 "combiProductID=%s AND programmeID=%s AND combiProdType='tile'" 2092 " AND intProdType='stack'" % (productID, self.programmeID)) 2093 # Find stacks - 2094 frameSel = queries.getFrameSelection('stack', noDeeps=False, 2095 deepOnly=False, selType='%stack') 2096 fileNames = self.archive.query( 2097 "fileName,productID", 2098 "ProgrammeFrame as p,Multiframe as m", 2099 "p.programmeID=%s AND p.multiframeID=m.multiframeID AND %s AND" 2100 " %s AND productID IN (%s)" % (self.programmeID, frameSel, 2101 DepCodes.selectNonDeprecated, ','.join(map(str, stackProdIDs)))) 2102 foundProds = [pID for fName, pID in fileNames if os.path.exists( 2103 os.path.join(os.path.dirname(fName), 2104 self.sysc.filtPrefix + os.path.basename(fName)))] 2105 2106 missingProds = [prodID for prodID in stackProdIDs 2107 if prodID not in foundProds] 2108 2109 fileNames += self.archive.query( 2110 "fileName,productID", 2111 "ProductProcessing", 2112 "programmeID=%s AND productType='stack' AND productID IN (%s)" 2113 % (self.programmeID, ','.join(map(str, missingProds)))) 2114 2115 foundProds = [pID for fName, pID in fileNames if os.path.exists( 2116 os.path.join(os.path.dirname(fName), 2117 self.sysc.filtPrefix + os.path.basename(fName)))] 2118 2119 missingProds = [prodID for prodID in stackProdIDs 2120 if prodID not in foundProds] 2121 2122 # check stacks that make up tile??? 2123 if not missingProds: 2124 self.startedProductsOfType[pType][productID] = \ 2125 Cu13.ProductDetails(productID, 2126 dbc.intDefault(), prodFilePath, 2127 fits.getConfMap(prodFilePath), 2128 fits.getCatalogue(prodFilePath)) 2129 2130 self.prodDirectory[pType][productID] = \ 2131 os.path.dirname(prodFilePath)
2132 2133 #-------------------------------------------------------------------------- 2134
2135 - def getPrimaryHeader(self, bestProd, prihdr, originalHeader, isImage, 2136 provObj, addOrig=True):
2137 """ 2138 This produces all the relevant info for the primary header. 2139 2140 @return: Primary header for mosaic 2141 @rtype: pyfits.header 2142 2143 """ 2144 # Add in original header values. 2145 prihdr = createNewHeader(prihdr, 'primary') 2146 2147 # Add in additional info, if need be. 2148 cards = originalHeader.ascard 2149 if addOrig: 2150 for card in cards: 2151 if checkList(card.key) == 1: 2152 try: 2153 prihdr.update(card.key, card.value, card.comment) 2154 except KeyError: 2155 pass 2156 2157 # Zeropoint keywords should be in image extension 2158 for keyword in ('MAGZPT', 'MAGZRR'): 2159 try: 2160 del prihdr[keyword] 2161 except KeyError: 2162 pass 2163 2164 provInfo = provObj.getProvInfo() 2165 prihdr.update('PROJECT', provObj.getConstDict()['PROJECT'], 2166 'Time-allocation code') 2167 2168 prihdr.update('OBSNUM', -9999, 'Observation number') 2169 prihdr.update('FILTER', provInfo.get('filterName'), 'Filter name') 2170 prihdr.update('UTDATE', provInfo.get('utDate'), 2171 'UT date as integer in yyyymmdd format') 2172 2173 prihdr.update('MJD-OBS', provInfo.get('mjdObs'), 2174 'DATE-OBS as Modified Julian Date') 2175 2176 prihdr.update('WSA_TIME', '', 'WSA time stamp') 2177 prihdr.update('WSA_MFID', 0, 'WSA MultiframeID') 2178 if self.sysc.hasMicroStep: 2179 prihdr.update('NUSTEP', provInfo.get('nustep'), 'Number of microsteps') 2180 2181 # add those used by CU3.py 2182 if 'DATE' in cards.keys(): 2183 prihdr.update('DATE', originalHeader['DATE'], 'Creation Date') 2184 2185 prihdr.update('TELESCOP', provObj.getConstDict()['TELESCOP'], 2186 'Telescope name') 2187 prihdr.update('INSTRUME', provObj.getConstDict()['INSTRUME'], 2188 'Instrument') 2189 prihdr.update('OBSERVER', provObj.getConstDict()['OBSERVER'], 2190 'Observers names') 2191 prihdr.update('RECIPE', provObj.getConstDict()['RECIPE'], 2192 'Data reduction recipe to be used') 2193 prihdr.update('OBSTYPE', 'OBJECT' if isImage else 'CONFIDENCEMAP', 2194 'BIAS|DARK|SKYFLAT|DOMEFLAT|OBJECT|SKY|FOCUS') 2195 2196 if self.sysc.isWSA(): 2197 prihdr.update('DHSVER', provObj.getConstDict()['DHSVER'], 2198 'Data handling version') 2199 prihdr.update('HDTFILE', provObj.getConstDict()['HDTFILE'], 2200 'Name of global hdt file') 2201 prihdr.update('USERID', provObj.getConstDict()['USERID'], 2202 'Userid logged in as') 2203 prihdr.update('OBSREF', provObj.getConstDict()['OBSREF'], 2204 'PATT or other reference') 2205 prihdr.update('DATE-END', provInfo.get('dateEnd'), 2206 'Date time (UTC) of end of observation') 2207 prihdr.update('TRACKSYS', provObj.getConstDict()['TRACKSYS'], 2208 'Telescope tracking coordinate system') 2209 prihdr.update('READMODE', provObj.getConstDict()['READMODE'], 2210 'Name of camera readmode') 2211 prihdr.update(self.sysc.expTimeKeyWord, provInfo.get('expTime'), 2212 '[s] Integration time per exposure') 2213 prihdr.update('NEXP', provObj.getConstDict()['NEXP'], 2214 'Number of exposures in integration') 2215 prihdr.update('NINT', provObj.getConstDict()['NINT'], 2216 'Number of integrations in observation') 2217 prihdr.update('READINT', provObj.getConstDict()['READINT'], 2218 '[s] Interval between reads') 2219 prihdr.update('NREADS', provObj.getConstDict()['NREADS'], 2220 'Number of reads per exposure') 2221 prihdr.update('RABASE', provInfo.get('raBase'), 2222 '[h] Right ascension of base position') 2223 2224 prihdr.update('DECBASE', provInfo.get('decBase'), 2225 '[deg] Declination of base position') 2226 elif self.sysc.hasEsoRelease: 2227 prihdr.update("HIERARCH ESO OBS PROG ID", 2228 provObj.getConstDict()['HIERARCH ESO OBS PROG ID'], 2229 "ESO programme ID") 2230 prihdr.update("HIERARCH ESO INS FILT1 NAME", 2231 provInfo.get('filterName').title(), "ESO FILTER NAME") 2232 2233 prihdr.update('RA', 15. * provInfo.get('raBase'), 2234 '[deg] Right ascension of base position') 2235 prihdr.update('HIERARCH ESO TEL POSANG', 2236 provInfo.get('posAngle'), "ESO POSITION ANGLE") 2237 prihdr.update('DEC', provInfo.get('decBase'), 2238 '[deg] Declination of base position') 2239 # ALSO ESO HIERARCH KEYS ... 2240 # @FIXME: NOT CORRECT -- HHMMSS.TTT DDMMSS.TTT 2241 tileRaHms, tileDecDms = astro.convertCoordEso(15.*provInfo.get('raBase'), 2242 provInfo.get('decBase')) 2243 2244 prihdr.update('HIERARCH ESO OCS SADT TILE RA', tileRaHms, 2245 'SADT tile RA') 2246 prihdr.update('HIERARCH ESO OCS SADT TILE DEC', tileDecDms, 2247 'SADT tile dec') 2248 prihdr.update('HIERARCH ESO OCS SADT TILE OFFANGLE', 2249 provInfo.get('posAngle'), "SADT tile rotator offset angle") 2250 2251 prihdr.update('HIERARCH ESO DPR TECH', 'IMAGE;JITTER', 'Observation technique') 2252 prihdr.update('STANDARD', False, 2253 'Is the target a standard star observation?') 2254 2255 prihdr.update('DATE-OBS', provInfo.get('dateObs'), 2256 'Date time (UTC) of start of observation') 2257 2258 prihdr.update('WCS_AXES', 2, 2259 'Number of axes in world co-ordinate system') 2260 2261 radesysKey = 'RADESYS' if self.sysc.isWSA() else 'RADECSYS' 2262 try: 2263 prihdr.update(radesysKey, originalHeader['RADECSYS'], 2264 'Mean IAU 1984 equatorial co-ordinates') 2265 except KeyError: 2266 prihdr.update(radesysKey, originalHeader['RADESYS'], 2267 'Mean IAU 1984 equatorial co-ordinates') 2268 2269 prihdr.update('EQUINOX', originalHeader['EQUINOX'], 2270 '[yr] Equinox of object position') 2271 2272 prihdr.update('EXTINCT', 0.0, 'Extinction') 2273 prihdr.update(self.sysc.