Package helpers :: Module ProductAssigner
[hide private]

Source Code for Module helpers.ProductAssigner

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  # $Id: ProductAssigner.py 9515 2012-11-24 13:24:57Z NicholasCross $ 
  4  """ 
  5     Utility class for assigning a single multiframe to a product to 
  6     override deep stacking. 
  7   
  8     @author: N.J.G.. Cross 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10  """ 
 11  #------------------------------------------------------------------------------ 
 12  from __future__      import division, print_function 
 13  from future_builtins import map 
 14  import time 
 15  from   operator    import itemgetter 
 16  import numpy 
 17  from   collections import defaultdict 
 18   
 19  import wsatools.DbConnect.DbConstants   as dbc 
 20  from   wsatools.CLI                 import CLI 
 21  from   wsatools.DbConnect.CuSession import CuSession 
 22  from   wsatools.Logger              import Logger 
 23  import wsatools.Utilities               as utils 
 24  import wsatools.CSV                     as csv 
 25  import wsatools.DataFactory             as df 
 26  import wsatools.DbConnect.CommonQueries as queries 
 27  import wsatools.DbConnect.Schema        as schema 
 28  from   wsatools.SystemConstants     import DepCodes 
 29  #------------------------------------------------------------------------------ 
 30   
31 -class ProductAssigner(CuSession):
32 """ Finds best multiframeID for a product. 33 """ 34 #-------------------------------------------------------------------------- 35 # Class constants (access as ProductAssigner.varName) 36 cuNum = 27 37 38 #-------------------------------------------------------------------------- 39 # OB matching 40 useOBs = True 41 42 dateRange = CuSession.sysc.obsCal.dateRange() #: Nights to stack. 43 dirDate = time.strftime("%Y%m%d", time.gmtime()) #: Output dir date. 44 maxProdID = None #: Maximum product ID of stack to be produced. 45 minProdID = None #: Minimum product ID of stack to be produced. 46 releaseNum = None #: Release number of stacks to create. 47 reqProdIDs = None #: List of product IDs for stacks to be created. 48 selType = 'OB' #: Default selection type 49 assignSingOnly = False 50 # Necessary for ProductDefinition 51 notDepOnly = False 52 numStack = dbc.intDefault() 53 reprocess = False 54 useDeep = False 55 _tmpWorkDir = None 56 # Working variables 57 deepProductTable = None 58 """ A DataFactory.View of the contents of the database table 59 Required{Product} for current programme and product type. 60 """ 61 existingProducts = None 62 """ List of details of products already ingested in the database. 63 """ 64 65 prodInfo = None 66 """ The contents of the database table Required{Product} for current 67 programme and product type. 68 """ 69 pType = None 70 """ Product type currently being curated. 71 """ 72 73 reqProdsOfType = None 74 """ Dictionary of lists of product IDs referenced by product type, giving 75 the products required to be created. This may be preset by the calling 76 task or else is automatically determined from the database. 77 """ 78 79 # Outputs 80 newProductsOfType = None 81 """ Dictionary of ProductDetails for each type of product successfully 82 created. 83 """ 84 #-------------------------------------------------------------------------- 85
86 - def _onRun(self):
87 """ Assign products. 88 """ 89 Logger.addMessage("Checking for existing %s products..." 90 % self.programme.getAcronym().upper()) 91 92 self.newProductsOfType = defaultdict(dict) 93 if not self.reqProdsOfType: 94 self.reqProdsOfType = \ 95 queries.findRequiredDeepProducts(self.programme, self.minProdID, 96 self.maxProdID, self.reqProdIDs) 97 98 # Do tiles first... and then use ProductLinks/Provenance to sort out pawprints. 99 100 productTypes = self.programme.getProductTypes() if self.assignSingOnly else [self.programme.getProductTypes()[-1]] 101 for prodType in productTypes: 102 self.pType = prodType 103 output = queries.getProductInfo(self.archive, self.programmeID, 104 self.pType, self.reqProdsOfType[self.pType]) 105 106 tableName = "Required" + self.pType.title() 107 curSchema = self.sysc.curationSchema() 108 columns = schema.parseTables(curSchema, [tableName])[0].columns 109 self.deepProductTable = df.View(map(str, columns), output) 110 111 # Find any existing ones If restart. 112 113 # @@TODO: Reset instead?? 114 self.existingProducts = self.archive.query( 115 selectStr="productID, M.frameType, releaseNum, " 116 "M.multiframeID, M.fileName AS imgName, " 117 "M2.fileName AS confName, M.catName", 118 fromStr="ProgrammeFrame P, Multiframe M, Multiframe M2", 119 whereStr="M.multiframeID=P.multiframeID AND programmeID=%s" 120 % self.programmeID + " AND M.deprecated=0" 121 " AND productID>0 AND M.frameType LIKE '%stack' " 122 " AND M2.multiframeID=M.confID") 123 124 if self.pType not in self.reqProdsOfType \ 125 or not self.reqProdsOfType[self.pType]: 126 Logger.addMessage("No %s products to process" % self.pType) 127 else: 128 Logger.addMessage("Processing %s products" % self.pType.upper()) 129 130 # Check to see if all need reprocessing 131 completeProducts = [] 132 newProducts = [] 133 componentsOfProd = {} 134 productOfID = {} 135 Logger.addMessage("Checking %s %ss..." 136 % (len(self.reqProdsOfType[self.pType]), self.pType)) 137 138 for productID in sorted(self.reqProdsOfType[self.pType]): 139 self.index = [ii for ii, pID in enumerate(self.deepProductTable["productID"]) 140 if pID == productID][0] 141 filterID = self.deepProductTable["filterID"][self.index] 142 ra = self.deepProductTable["ra"][self.index] 143 dec = self.deepProductTable["dec"][self.index] 144 gSize = self.deepProductTable["stackRadius"][self.index] 145 nustep = self.deepProductTable["nustep"][self.index] 146 offsetPos = None 147 if self.sysc.isVSA() and self.pType == 'stack': 148 offsetPos = self.deepProductTable["offsetPos"][self.index] 149 components = queries.getAllFieldObs(self.archive, ra, dec, 60 * gSize, 150 filterID, self.programmeID, frameType=self.pType, 151 dateRange=self.dateRange, nustep=nustep, offsetPos=offsetPos, 152 deprecated=DepCodes.selectNonDeprecated, noDeeps=True, 153 onlyTileComps=self.programme.isShallowTiled()) 154 if len(components) == 1: 155 completeProducts.append((productID, 156 list(components)[0])) 157 elif len(components) == 0: 158 Logger.addMessage("<Info> No data found for this %s: " 159 "productID=%s" % (self.pType, productID)) 160 161 else: 162 # Check to see if already completed 163 frameTypeSel = queries.getFrameSelection(self.pType) 164 if not self.assignSingOnly or not self.archive.queryEntriesExist( 165 "ProgrammeFrame as p,Multiframe as m", 166 "p.programmeID=%s and p.productID=%s and p.releaseNum=%s " 167 "and p.multiframeID=m.multiframeID and %s" 168 " and deprecated=0" % (self.programmeID, 169 productID, self.releaseNum, frameTypeSel)): 170 newProducts.append(productID) 171 componentsOfProd[productID] = components 172 productOfID[productID] = (self.deepProductTable, self.index) 173 if not completeProducts: 174 Logger.addMessage("ProgrammeFrame.productID is up-to-date for" 175 " completed %s products." % self.pType) 176 else: 177 Logger.addMessage("Updating ProgrammeFrame.productID for " 178 "complete %ss..." % self.pType) 179 total = 0 180 for productID, multiframeID in completeProducts: 181 total += self.archive.update("ProgrammeFrame", 182 entryList=[("productID", productID), 183 ("releaseNum", self.releaseNum)], 184 where="programmeID=%s AND multiframeID=%s" 185 % (self.programmeID, multiframeID)) 186 187 Logger.addMessage("...assigned %s completed products." % total) 188 189 if not newProducts: 190 Logger.addMessage("<Info> No new data available to create new" 191 " required %s products." % self.pType) 192 else: 193 194 assignedProducts = self.assignProducts(productOfID, componentsOfProd) 195 total = 0 196 for productID, multiframeID in assignedProducts: 197 total += self.archive.update("ProgrammeFrame", 198 entryList=[("productID", productID), 199 ("releaseNum", self.releaseNum)], 200 where="programmeID=%s AND multiframeID=%s" 201 % (self.programmeID, multiframeID)) 202 203 Logger.addMessage("...assigned %s best frames." % total) 204 205 # If ProductLinks, use Provenance and ProductLinks to propagate... 206 if not self.assignSingOnly: 207 lowerProducts = self.propagateDown() 208 total = 0 209 for productID, multiframeID in lowerProducts: 210 total += self.archive.update("ProgrammeFrame", 211 entryList=[("productID", productID), 212 ("releaseNum", self.releaseNum)], 213 where="programmeID=%s AND multiframeID=%s" 214 % (self.programmeID, multiframeID)) 215 216 Logger.addMessage("...assigned %s lower level products." % total)
217 218 #-------------------------------------------------------------------------- 219
220 - def assignProducts(self, productOfID, componentsOfProd):
221 """ 222 """ 223 frameTypeSel = queries.getFrameSelection(self.pType, noDeeps=True) 224 productMfIDList = [] 225 for productID in productOfID: 226 pointingMfIDs = self.archive.query( 227 selectStr="m.multiframeID,p.productID,m.filterID", 228 fromStr="Required%s as r1,Required%s as r2,ProgrammeFrame as p," 229 "Multiframe as m" % (self.pType, self.pType), 230 whereStr="r1.programmeID=%s and r1.productID=%s and " 231 "r1.fieldID=r2.fieldID and r1.programmeID=r2.programmeID and " 232 "r2.programmeID=p.programmeID and r2.productID=p.productID and" 233 " p.multiframeID=m.multiframeID and p.releaseNum=%s and " 234 "%s and m.%s order by m.multiframeID" % 235 (self.programmeID, productID, 236 self.releaseNum, frameTypeSel, DepCodes.selectNonDeprecated)) 237 if 'OB' in self.selType: 238 # Use obsName and then seeing. 239 # For pointing find obsName 240 if pointingMfIDs: 241 selectInfo = self.archive.query( 242 selectStr="m.multiframeID,filterID,obsID,obsName,mjdObs", 243 fromStr="Multiframe as m,MultiframeEsoKeys as e", 244 whereStr="m.multiframeID=e.multiframeID and " 245 "m.multiframeID in (%s) order by m.multiframeID " 246 % ','.join(map(str, [pmf.multiframeID 247 for pmf in pointingMfIDs]))) 248 else: 249 # Use non v-?? 250 selectInfo = [] 251 252 # For components find obsName and seeing. 253 compInfo = self.archive.query( 254 selectStr="m.multiframeID,obsID,obsName,mjdObs," 255 "avg(seeing*xPixSize) as seeing", 256 fromStr="Multiframe as m,MultiframeEsoKeys as e," 257 "MultiframeDetector as md,CurrentAstrometry as c", 258 whereStr="m.multiframeID=e.multiframeID and " 259 "m.multiframeID=md.multiframeID and md.multiframeID=" 260 "c.multiframeID and md.extNum=c.extNum and " 261 "m.multiframeID in (%s) group by m.multiframeID,obsID," 262 "obsName,mjdObs order by m.multiframeID" % 263 ','.join(map(str, [comp 264 for comp in componentsOfProd[productID]]))) 265 266 # use sysc.obFiltDict to find filters to compare OB to. 267 mfID = self.findOBmultiframe(compInfo, selectInfo, productOfID[productID]) 268 if mfID: 269 productMfIDList.append((productID, mfID)) 270 elif 'see' in self.selType.lower(): 271 mfID = sorted(compInfo, key=itemgetter(3)).pop().multiframeID 272 productMfIDList.append((productID, mfID)) 273 274 return productMfIDList
275 276 #-------------------------------------------------------------------------- 277
278 - def findOBmultiframe(self, compInfo, selectInfo, prod):
279 """ 280 """ 281 deepProductTable, index = prod 282 filterTable = df.Table("Filter", self.archive) 283 filterTable.setCurRow(filterID=deepProductTable["filterID"][index]) 284 shortName = filterTable.getAttr("shortName").title() 285 obFiltersList = [obFilts for obFilts in self.sysc.obFiltDict[self.programme.getAcronym().upper()] 286 if shortName in obFilts] 287 obFilterIDs = [] 288 if len(obFiltersList) > 1: raise ProductAssigner.CuError("Code is not set up for multiple OB filter assignment") 289 for filtNameTuple in obFiltersList: 290 for filtName in filtNameTuple: 291 filterTable.setCurRow(shortName=filtName.lower()) 292 obFilterIDs.append(filterTable.getAttr("filterID")) 293 294 obsIDs = set(sInfo.obsID for sInfo in selectInfo 295 if sInfo.filterID in obFilterIDs) 296 if len(obsIDs) == 0 and len(selectInfo) > 0: 297 # No filters in the same OB. 298 # select the nearest in mjdObs or 299 # Check obsName. 300 posMfIDs = [cInfo.multiframeID for cInfo in compInfo 301 if 'v-' not in cInfo.obsName] 302 if posMfIDs: 303 return posMfIDs.pop() 304 medianMjdObs = numpy.median([sInfo.mjdObs for sInfo in selectInfo]) 305 posMfIDs = sorted([(cInfo.multiframeID, abs(cInfo.mjdObs - medianMjdObs)) 306 for cInfo in compInfo], key=itemgetter(1)) 307 if posMfIDs: 308 return posMfIDs.pop()[0] 309 else: 310 return 311 312 elif len(obsIDs) == 0: 313 posMfIDs = [cInfo.multiframeID for cInfo in compInfo 314 if 'v-' not in cInfo.obsName] 315 if posMfIDs: 316 return posMfIDs.pop() 317 else: 318 return 319 320 elif len(obsIDs) == 1: 321 goodObsID = list(obsIDs)[0] 322 posMfIDs = [cInfo.multiframeID for cInfo in compInfo 323 if cInfo.obsID == goodObsID] 324 if posMfIDs: 325 return posMfIDs.pop() 326 else: 327 return 328 else: 329 # Other filters in same pointing in multiple OBs - how to select 330 331 332 posMfIDs = [] 333 goodObsIDs = [] 334 for obsId in obsIDs: 335 good = [cInfo.multiframeID for cInfo in compInfo 336 if cInfo.obsID == obsId] 337 if good: 338 goodObsIDs.append(obsId) 339 posMfIDs += good 340 if len(posMfIDs) > 1: 341 Logger.addMessage("More than one possible obsID for prodID=%s, obsIDs: %s" % 342 (self.deepProductTable["productID"][index], ','.join(map(str, goodObsIDs)))) 343 Logger.addMessage("Selecting product from obsID %s" % goodObsIDs[0]) 344 if posMfIDs: 345 return posMfIDs.pop() 346 else: 347 return
348 #-------------------------------------------------------------------------- 349
350 - def propagateDown(self):
351 """ 352 """ 353 assignedProducts = [] 354 productLinks = self.archive.query( 355 selectStr="intProductID,intProdType,combiProductID", 356 fromStr="ProductLinks", 357 whereStr="programmeID=%s and combiProdType='%s'" % 358 (self.programmeID, self.pType)) 359 if productLinks: 360 uframeTypeSel = queries.getFrameSelection(self.pType, noDeeps=True, alias='m') 361 lpTypes = set(pl.intProdType for pl in productLinks) 362 for lpType in lpTypes: 363 output = queries.getProductInfo(self.archive, self.programmeID, 364 lpType, self.reqProdsOfType[lpType]) 365 tableName = "Required" + lpType.title() 366 curSchema = self.sysc.curationSchema() 367 columns = schema.parseTables(curSchema, [tableName])[0].columns 368 lowerProductTable = df.View(map(str, columns), output) 369 lframeTypeSel = queries.getFrameSelection(lpType, noDeeps=True, alias='ml') 370 upperProdFrames = self.archive.query( 371 selectStr="productID,pv.multiframeID", 372 fromStr="ProgrammeFrame as p,Multiframe as m," 373 "Provenance as pv,Multiframe as ml", 374 whereStr="p.programmeID=%d and p.multiframeID=m.multiframeID " 375 "and m.multiframeID=pv.combiframeID and %s and " 376 "pv.multiframeID=ml.multiframeID and %s" 377 % (self.programmeID, uframeTypeSel, lframeTypeSel)) 378 upperProdIDs = set(upProdF.productID for upProdF in upperProdFrames) 379 for productID in upperProdIDs: 380 # Select links for each product 381 prodLinks = [pl for pl in productLinks 382 if pl.combiProductID == productID] 383 prodFrames = [pf.multiframeID for pf in upperProdFrames 384 if pf.productID == productID] 385 # For each link find possible input frames 386 for pl in prodLinks: 387 index = [ii for ii, pID in enumerate(lowerProductTable["productID"]) 388 if pID == pl.intProductID][0] 389 filterID = lowerProductTable["filterID"][index] 390 ra = lowerProductTable["ra"][index] 391 dec = lowerProductTable["dec"][index] 392 gSize = lowerProductTable["stackRadius"][index] 393 nustep = lowerProductTable["nustep"][index] 394 offsetPos = None 395 if self.sysc.isVSA() and lpType == 'stack': 396 offsetPos = lowerProductTable["offsetPos"][index] 397 components = queries.getAllFieldObs( 398 self.archive, ra, dec, 60 * gSize, 399 filterID, self.programmeID, frameType=lpType, 400 dateRange=self.dateRange, nustep=nustep, offsetPos=offsetPos, 401 deprecated=DepCodes.selectNonDeprecated, noDeeps=True, 402 onlyTileComps=self.programme.isShallowTiled()) 403 bestComps = [comp for comp in components 404 if comp in prodFrames] 405 if len(bestComps) != 1: 406 Logger.addMessage("Expecting one match for " 407 "intProdID=%d, got %s, %s" % (pl.intProductID, 408 len(bestComps), ','.join(map(str, bestComps)))) 409 else: 410 assignedProducts.append((pl.intProductID, 411 bestComps[0])) 412 413 return assignedProducts
414 415 #------------------------------------------------------------------------------ 416 # Entry point for ProductAssigner 417 418 # Allow module to be imported as well as executed from the command line 419 if __name__ == "__main__": 420 421 # Define specific command-line interface settings required by ProductAssigner 422 CLI.progArgs += [ 423 CLI.Argument("programmeID", "DXS"), 424 CLI.Argument("begin_date", "05A", isValOK=CLI.isDateOK), 425 CLI.Argument("end_date", "05A", isValOK=CLI.isDateOK), 426 CLI.Argument("release_number", '3', isValOK=lambda val: val.isdigit())] 427 428 CLI.progOpts += [ 429 CLI.Option('l', "products", 430 "comma separated list of highest layer of products e.g. mosaic " 431 "products if stacks and mosaics, tile products if stacks and tiles or " 432 "stacks if just stacks", 433 "LIST"), 434 435 CLI.Option('n', "minProdID", 436 "start at this productID in highest layer of products", 437 "NUMBER", isValOK=lambda val: val.isdigit()), 438 439 CLI.Option('s', "selectionType", 440 "type of selection for assigned frame e.g. OBsee", 441 "NAME"), 442 443 CLI.Option('v', "verbose", 444 "more verbose logging"), 445 446 CLI.Option('x', "maxProdID", 447 "finish at this productID in highest layer of products", 448 "NUMBER", isValOK=lambda val: val.isdigit()), 449 CLI.Option('z', 'assignSingOnly', 'Only update where missing single products') 450 ] 451 452 cli = CLI(ProductAssigner, "$Revision: 9515 $") 453 Logger.isVerbose = cli.getOpt("verbose") 454 Logger.addMessage(cli.getProgDetails()) 455 CLI.check64bitServer() 456 cu = ProductAssigner(cli.getArg("programmeID"), cli=cli) 457 try: 458 cu.dateRange = cu.sysc.obsCal.dateRange(cli.getArg("begin_date"), 459 cli.getArg("end_date")) 460 except Exception as error: 461 eType = "Invalid Option" 462 Logger.addExceptionMessage(error, eType) 463 raise SystemExit(eType + ": see log " + cu._log.pathName) 464 465 cu.dirDate = \ 466 utils.makeDateTime(cli.getOpt("dir_date")).date.replace('-', '') 467 468 if cli.getOpt("maxProdID"): 469 cu.maxProdID = int(cli.getOpt("maxProdID")) 470 471 if cli.getOpt("minProdID"): 472 cu.minProdID = int(cli.getOpt("minProdID")) 473 474 if cli.getOpt("products"): 475 cu.reqProdIDs = csv.values(cli.getOpt("products")) 476 477 if cli.getOpt("selectionType"): 478 cu.selType = cli.getOpt("selectionType") 479 cu.assignSingOnly = cli.getOpt("assignSingOnly") 480 481 cu.releaseNum = int(cli.getArg("release_number")) 482 483 cu.run() 484 485 #------------------------------------------------------------------------------ 486