Package invocations :: Package cu0 :: Module FinaliseCatalogueIngest
[hide private]

Source Code for Module invocations.cu0.FinaliseCatalogueIngest

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  # $Id: FinaliseCatalogueIngest.py 10069 2013-09-16 13:18:37Z EckhardSutorius $ 
  4  """ 
  5     Finalise the catalogue data ingests by creating the constraints for the 
  6     monthly VVV tables and deprecating old detections. 
  7   
  8     @author: Eckhard Sutorius 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10  """ 
 11  #------------------------------------------------------------------------------ 
 12  from   collections import defaultdict 
 13  import os 
 14  import string 
 15   
 16  from   wsatools.CLI                 import CLI 
 17  import wsatools.DbConnect.DbConstants   as dbc 
 18  from   wsatools.DbConnect.DbSession import DbSession, odbc, SelectSQL 
 19  import wsatools.DbConnect.CommonQueries as queries 
 20  import wsatools.DbConnect.Schema        as schema 
 21  import wsatools.DataFactory             as df 
 22  from   wsatools.File                import File, PickleFile 
 23  from   wsatools.Logger              import Logger 
 24  from   wsatools.SystemConstants     import SystemConstants 
 25  import wsatools.Utilities               as utils 
 26  #------------------------------------------------------------------------------ 
 27  pickledDataFile = "DetData_%s.pcl" 
 28  sqlOutDir = "/mnt/ramses12/" 
 29  # constraint templates 
 30  constraintName = {"mf": "multiframeID%s%s", 
 31                    "obj": "objID%s"} 
 32  constraintStr = "ALTER TABLE %s%s WITH CHECK "\ 
 33                  "ADD CONSTRAINT %s CHECK " 
 34  constraintRange = {"mf": "(multiframeID >= %s AND multiframeID <= %s)", 
 35                     "obj": "(objID >= %d AND objID <= %d)"} 
 36  constraintAdd = {"mf": "(multiframeID = %s)", 
 37                   "obj": "(objID = %d)"} 
 38  #------------------------------------------------------------------------------ 
 39   
40 -def createConstraints(archive, readFromFile=False, month=None, writeAll=False):
41 """ 42 Create script to attach constraints to monthly VVV detection tables. 43 44 @param archive: The DB connection. 45 @type archive: DBSession 46 @param readFromFile: If True read from existing data file else query DB. 47 @type readFromFile: bool 48 49 """ 50 detSuffix = month or "20" 51 52 # get existing constraints 53 identity = string.maketrans('', '') 54 deleteTrans = '. []()' 55 dbConstraints = defaultdict() 56 for name, text in archive.query( 57 "name, text", "sysobjects o, syscomments c", " AND ".join([ 58 "o.id=c.id", "xtype = 'C'", 59 "(name like 'multiframeID%20%' or name like 'objID%20%')"])): 60 dbConstraints[name] = text.translate(identity, deleteTrans) 61 62 # open constraint sql file 63 timestamp = utils.makeMssqlTimeStamp() 64 constraintFile = File(os.path.join( 65 sqlOutDir, "MakeConstraints_%s.sql" % timestamp[:timestamp.find(':')])) 66 constraintFile.wopen() 67 constraintFile.writetheline("use VSAVVV") 68 for splitTable in ["Raw", "Astrometry", "Photometry"]: 69 if splitTable in ["Astrometry", "Photometry"]: 70 readFromFile = True 71 72 detTableName = "%sDetection%s" % ("vvv", splitTable) 73 constraintFile.writetheline('--- %s mfID constraints ---' % splitTable) 74 75 # get all monthly detection tables 76 Logger.addMessage("Getting monthly %sDetection tables..." % splitTable) 77 detTabs = archive.query( 78 "name", "sysobjects", "name like '%s%s%%'" % (detTableName, 79 detSuffix)) 80 mfidList = [] 81 constraintDict = defaultdict(list) 82 detCounts = defaultdict() 83 84 # read the mfids either from a file or the DB 85 Logger.addMessage("Reading multiframeIDs") 86 fileName = "Mfid%s" % (pickledDataFile % archive.database) 87 88 if readFromFile: 89 Logger.addMessage(" from %s..." % fileName) 90 dataFile = PickleFile(fileName) 91 mfidList, detCounts = list(dataFile.pickleRead()) 92 else: 93 for table in sorted(detTabs): 94 Logger.addMessage(" from %s..." % table) 95 monthStr = table.replace(detTableName, '') 96 if monthStr: 97 results = archive.query("multiframeID, count(*)", table, 98 "objID>=0", groupBy="MultiframeID", 99 orderBy="MultiframeID") 100 for mfid, counts in results: 101 mfidList.append((mfid, monthStr)) 102 detCounts[mfid] = counts 103 104 # write data into pickle file 105 dataFile = PickleFile(fileName) 106 dataFile.pickleWrite(mfidList, detCounts) 107 108 Logger.addMessage("%d mfIDs found" % len(mfidList)) 109 110 # get the mfid constraint ranges 111 monthOld = "000000" 112 theMonth = monthOld 113 constraintOn = -1 114 constraintOff = -1 115 Logger.addMessage("Getting the multiframeID constraint ranges...") 116 for mfid, monthStr in sorted(mfidList): 117 if monthStr != monthOld: 118 monthOld = monthStr 119 if constraintOn < 0: 120 constraintOn = mfid 121 theMonth = monthStr 122 else: 123 if constraintOn == constraintOff: 124 constraintDict[theMonth].append( 125 constraintAdd["mf"] % constraintOn) 126 else: 127 constraintDict[theMonth].append( 128 constraintRange["mf"] % (constraintOn, 129 constraintOff)) 130 constraintOn = mfid 131 constraintOff = mfid 132 theMonth = monthOld 133 else: 134 constraintOff = mfid 135 136 constraintDict[theMonth].append( 137 constraintRange["mf"] % (constraintOn, mfid)) 138 del mfidList 139 140 # write the mulitframeID constraints into the sql file 141 for month in sorted(constraintDict): 142 theConstraintName = constraintName["mf"] % (splitTable, month) 143 theConstraintText = constraintStr % (detTableName, month, 144 theConstraintName) 145 theConstraintText += '(' + " OR ".join(constraintDict[month]) + ')' 146 theConsCheck = theConstraintText.rpartition("CHECK")[2].translate( 147 identity, deleteTrans) 148 if writeAll or theConstraintName not in dbConstraints \ 149 or theConsCheck != dbConstraints[theConstraintName]: 150 constraintFile.writetheline(theConstraintText) 151 152 objidDict = defaultdict(list) 153 constraintDict = defaultdict(list) 154 # read the objids either from a file or the DB 155 if splitTable == "Raw": 156 constraintFile.writetheline('--- objID constraints ---') 157 fileName = "Objid%s" % (pickledDataFile % archive.database) 158 Logger.addMessage("Reading objIDs") 159 if readFromFile: 160 Logger.addMessage(" from %s..." % fileName) 161 dataFile = PickleFile(fileName) 162 objidDict = list(dataFile.pickleRead())[0] 163 else: 164 for table in sorted([x for x in detTabs if "Raw" in x]): 165 Logger.addMessage(" from %s..." % table) 166 monthStr = table.replace(detTableName, '') 167 if monthStr: 168 results = archive.query( 169 "multiframeID, MIN(objID), MAX(objID)", table, 170 "objID>=0", groupBy="multiframeID", 171 orderBy="MIN(objID)") 172 for mfid, minobj, maxobj in results: 173 objidDict[monthStr].append((mfid, minobj, maxobj)) 174 175 # write data into pickle file 176 dataFile = PickleFile(fileName) 177 dataFile.pickleWrite(objidDict) 178 179 Logger.addMessage("%d objIDs found" % sum( 180 len(val) for val in objidDict.itervalues())) 181 182 for monthStr in objidDict: 183 monthRange = [] 184 queryMfids = [] 185 if detTableName + monthStr in detTabs: 186 for mfid, minobj, maxobj in objidDict[monthStr]: 187 if maxobj - minobj + 1 == detCounts[mfid]: 188 monthRange.append((minobj, maxobj)) 189 else: 190 queryMfids.append(mfid) 191 if queryMfids: 192 for mfID in queryMfids: 193 table = detTableName + monthStr 194 ranges = getObjIDRanges(archive, mfID, table) 195 for begObjid, endObjid in ranges: 196 monthRange.append((begObjid, endObjid)) 197 198 newMonthRange = [] 199 minObjid, maxObjid = sorted(monthRange)[0] 200 for objMin, objMax in sorted(monthRange)[1:]: 201 if objMin == maxObjid + 1: 202 maxObjid = objMax 203 else: 204 newMonthRange.append((minObjid, maxObjid)) 205 minObjid = objMin 206 maxObjid = objMax 207 newMonthRange.append((minObjid, maxObjid)) 208 209 for objidMin, objidMax in newMonthRange: 210 if objidMin != objidMax: 211 constraintDict[monthStr].append(constraintRange[ 212 "obj"] % (objidMin, objidMax)) 213 else: 214 constraintDict[monthStr].append( 215 constraintAdd["obj"] % objidMin) 216 217 # write the data into the sql file 218 for month in sorted(constraintDict): 219 appendConstrain("obj", constraintDict, month, 220 dbc.intDefault(), dbc.intDefault()) 221 222 theConstraintName = constraintName["obj"] % month 223 theConstraintText = constraintStr % (detTableName, month, 224 theConstraintName) 225 theConstraintText += '(' + " OR ".join(sorted( 226 constraintDict[month])) + ')' 227 theConsCheck = theConstraintText.rpartition( 228 "CHECK")[2].translate(identity, deleteTrans) 229 if writeAll or theConstraintName not in dbConstraints \ 230 or theConsCheck != dbConstraints[theConstraintName]: 231 constraintFile.writetheline(theConstraintText) 232 constraintFile.writetheline("GO") 233 234 constraintFile.close() 235 Logger.addMessage("Updates written to %s" % constraintFile.name)
236 237 #------------------------------------------------------------------------------ 238
239 -def createViews(archive, surveyName):
240 """ Create the views for the monthly split tables. 241 """ 242 detTableList = ["%sDetection%s" % (surveyName.lower(), splitTable) 243 for splitTable in ['Raw', 'Astrometry', 'Photometry']] 244 245 try: 246 for detTableName in detTableList: 247 Logger.addMessage("Creating VIEW %s" % detTableName) 248 viewID = archive.query( 249 "id", "sysobjects", 250 "xtype = 'v' and name like '%s%%'" % detTableName, 251 firstOnly=True, default=None) 252 viewText = (archive.query( 253 "text", "syscomments", "id=%d" % viewID, firstOnly=True) 254 if viewID else '') 255 dettabs = archive.query( 256 "name", "sysobjects", "name like '%s%%'" % detTableName) 257 detMonths = [] 258 viewExists = True 259 for table in dettabs: 260 if table.replace(detTableName, ''): 261 detMonths.append(table.replace(detTableName, '')) 262 if table not in viewText: 263 viewExists = False 264 if not viewExists: 265 view = schema.View() 266 view.name = detTableName 267 view.definition = " UNION ALL ".join( 268 ([str(SelectSQL('*', detTableName + detMon)) 269 for detMon in sorted(detMonths) if detMon])) 270 archive.createObjects([view], overWrite=True) 271 archive.commitTransaction() 272 else: 273 Logger.addMessage("View already exists.") 274 275 except odbc.ProgrammingError as error: 276 if "permission was denied" in str(error): 277 raise SystemExit("<ERROR> You do not have write permission on " 278 "database %s" % archive)
279 280 #------------------------------------------------------------------------------ 281
282 -def getObjIDRanges(archive, mfID, tableName):
283 """ 284 Get all objIDs for the given month. 285 286 @param archive: The DB connection. 287 @type archive: DBSession object 288 @param mfidList: List of MultiframeIDs. 289 @type mfidList: list 290 @param tableName: Name of the processed table. 291 @type tableName: str 292 293 """ 294 results = archive._executeScript( 295 "WITH objidCTE AS (" 296 "SELECT objID, DENSE_RANK() OVER (ORDER BY objID) - objID AS grp " 297 "FROM %s WHERE multiframeID=%d and objid>0) " 298 "SELECT MIN(objID), MAX(objID) FROM objidCTE " 299 "GROUP BY grp order by MIN(objID)" % (tableName, mfID), 300 wantResult=True) 301 return results
302 303 #------------------------------------------------------------------------------ 304
305 -def appendConstrain(attr, conDict, month, conOn, conOff):
306 """ 307 Append entry to constraint dictionary. 308 309 @param attr: Attribute short for which the constraint is build. 310 @type attr: str 311 @param conDict: The constraint dictionary. 312 @type conDict: dict 313 @param month: The processed month. 314 @type month: str 315 @param conOn: Start value of the constraint. 316 @type conOn: int 317 @param conOff: End value of the constraint. 318 @type conOff: int 319 320 """ 321 if conOn == conOff: 322 conDict[month].append( 323 constraintAdd[attr] % conOn) 324 else: 325 conDict[month].append( 326 constraintRange[attr] % (conOn, conOff))
327 328 #------------------------------------------------------------------------------ 329
330 -def deprecateOldDetections(archive):
331 """ 332 Deprecate old detections. 333 334 @param archive: The DB connection. 335 @type archive: DBSession object 336 337 """ 338 allProgIDs = archive.query("programmeID, dfsIDString", "Programme", 339 "programmeID>0") 340 progTable = df.ProgrammeTable(archive) 341 Logger.addMessage("Deprecating old detections... ") 342 for progId, idStr in allProgIDs: 343 detTable = progTable.getDetectionTable(programmeID=progId) 344 detTableList = [] 345 if progId in archive.sysc.monthlyDetSurveys: 346 detTable = detTable + "20" 347 detTableList = archive.query( 348 "name", "sysobjects", "xtype = 'U' and name like '%s%%'" % detTable) 349 350 if detTableList: 351 Logger.addMessage("...for %s[%d]" % (idStr, progId)) 352 353 for tableName in sorted(detTableList): 354 Logger.addMessage( 355 "No. of detections deprecated in %s: %s" % ( 356 tableName, 357 queries.deprecateOldDetections(archive, tableName)))
358 359 #------------------------------------------------------------------------------ 360
361 -def updateVVVObjIDs(archive):
362 """ 363 Since the objectIDs couldn't be calculated beforehand they have to be 364 updated separately based on their value and the maximal value found in 365 the database. 366 367 @param archive: The DB connection. 368 @type archive: DBSession object 369 370 """ 371 Logger.addMessage("Updating objIDs...") 372 maxObjId = 0 373 detTabs = defaultdict() 374 for detTable in sorted(archive.query("name", "sys.tables", 375 "name like '%%%s%%'" % "Raw20")): 376 minObj, maxObj = archive.query("MIN(objID), MAX(objID)", detTable, 377 "seqNum>-99999990", 378 firstOnly=True, default=(0, 0)) 379 detTabs[detTable] = minObj 380 maxObjId = max(maxObjId, maxObj) 381 382 for detTable in sorted(detTabs): 383 if detTabs[detTable] < 0: 384 archive._executeScript( 385 "DECLARE @myvar bigint; set @myvar = %d; " 386 "WITH updCTE as (SELECT TOP 9223372036854775807 " 387 "multiframeID, extNum, seqNum, objID " 388 "FROM %s WHERE objID<0 AND seqNum>-99999990 " 389 "ORDER BY multiframeID, extNum, seqNum)" 390 "UPDATE updCTE SET @myvar = objID = @myvar + 1" % ( 391 maxObjId, detTable), wantRowCount=True) 392 archive.commitTransaction() 393 Logger.addMessage("...%s updated." % detTable) 394 maxObjId = max(maxObjId, archive.queryAttrMax("objID", detTable))
395 396 #------------------------------------------------------------------------------ 397 # Entry point for script. 398 399 # Allow module to be imported as well as executed from the command line 400 if __name__ == "__main__": 401 CLI.progOpts += [ 402 CLI.Option('C', "constraint", 403 "create constraint script"), 404 CLI.Option('D', "deprecate", 405 "deprecate old detections"), 406 CLI.Option('F', "force", 407 "write constraints even when they are already in the DB"), 408 CLI.Option('O', "objid", 409 "update objIDs for VVV"), 410 CLI.Option('V', "doviews", 411 "create views for VVV"), 412 CLI.Option('m', "month", 413 "create objID constraints only for this month " 414 "(format: YYYYMM)", "NUMBER", 415 isValOK=lambda x: x.isdigit() and len(x) == 6), 416 CLI.Option('r', "readfile", 417 "read data from file instead DB"), 418 CLI.Option('v', "version", "version number of the data", "STR")] 419 420 cli = CLI("FinaliseCatalogueIngest", "$Revision: 10069 $", __doc__) 421 database = cli.getArg("database").split('.')[-1] 422 Logger.isVerbose = False 423 #log = Logger("%s_FCI_%s.log" % (database, 424 # utils.makeTimeStamp().replace(' ', '_'))) 425 426 Logger.addMessage(cli.getProgDetails()) 427 sysc = SystemConstants(database) 428 obsCal = sysc.obsCal 429 430 if not cli.getOpt("semester"): 431 semesterList = obsCal.getSemList()[-3:] 432 elif cli.getOpt("semester").lower() == "all": 433 semesterList = obsCal.getSemList() 434 else: 435 semesterList = cli.getOpt("semester").split(',') 436 437 version = (cli.getOpt("version") or 438 str(obsCal.maxVersOfDate(obsCal.getDates(semesterList[0], 439 "%Y%m%d")[0]))) 440 441 doAll = not any([cli.getOpt("deprecate"), cli.getOpt("constraint"), 442 cli.getOpt("objid"), cli.getOpt("doviews")]) 443 444 if doAll or cli.getOpt("deprecate"): 445 for server in sorted(sysc.ingestDbForServer): 446 loadDB = '.'.join([server, sysc.ingestDbForServer[server]]) 447 try: 448 # create a DB connection 449 archive = DbSession(loadDB, False, cli.getOpt("test"), 450 cli.getOpt("user")) 451 deprecateOldDetections(archive) 452 del archive 453 except Exception, details: 454 Logger.addExceptionMessage(details) 455 raise 456 457 if sysc.isVSA() and (doAll or cli.getOpt("doviews")): 458 try: 459 # create a DB connection 460 if "test" in database.lower(): 461 loadDb = cli.getArg("database") 462 else: 463 loadDb = "ramses12.VSAVVV" 464 archive = DbSession(loadDb, False, cli.getOpt("test"), 465 cli.getOpt("user")) 466 createViews(archive, "VVV") 467 del archive 468 except Exception, details: 469 Logger.addExceptionMessage(details) 470 raise 471 472 473 474 if sysc.isVSA() and (doAll or cli.getOpt("constraint")): 475 try: 476 # create a DB connection 477 if "test" in database.lower(): 478 loadDb = cli.getArg("database") 479 else: 480 loadDb = "ramses12.VSAVVV" 481 archive = DbSession(loadDb, False, cli.getOpt("test"), 482 cli.getOpt("user")) 483 createConstraints(archive, readFromFile=cli.getOpt("readfile"), 484 month=cli.getOpt("month"), 485 writeAll=cli.getOpt("force")) 486 del archive 487 except Exception, details: 488 Logger.addExceptionMessage(details) 489 raise 490 491 if sysc.isVSA() and (doAll or cli.getOpt("objid")): 492 try: 493 # create a DB connection 494 if "test" in database.lower(): 495 loadDb = cli.getArg("database") 496 else: 497 loadDb = "ramses12.VSAVVV" 498 archive = DbSession(loadDb, False, cli.getOpt("test"), 499 cli.getOpt("user")) 500 updateVVVObjIDs(archive) 501 del archive 502 except Exception, details: 503 Logger.addExceptionMessage(details) 504 raise 505