Package invocations :: Package cu4 :: Module cu4
[hide private]

Source Code for Module invocations.cu4.cu4

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: cu4.py 10181 2014-01-10 11:53:07Z EckhardSutorius $ 
  4  """ 
  5     Invoke CU4. Ingest single frame source detections. 
  6   
  7     @author: E. Sutorius 
  8     @org:    WFAU, IfA, University of Edinburgh 
  9   
 10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
 11     @contributors: I. Bond, J. Bryant, R.S. Collins, N.C. Hambly 
 12  """ 
 13  #------------------------------------------------------------------------------ 
 14  from   collections import defaultdict 
 15  import dircache 
 16  import inspect 
 17  import itertools 
 18  import math 
 19  import mx.DateTime     as mxdt 
 20  from   operator    import itemgetter 
 21  import os 
 22  import string 
 23   
 24  from   wsatools.CLI                    import CLI 
 25  import wsatools.DataFactory                as df 
 26  import wsatools.DbConnect.DbConstants      as dbc 
 27  from   wsatools.DbConnect.DbSession    import DbSession 
 28  from   wsatools.DbConnect.IngCuSession import IngCuSession 
 29  from   wsatools.DbConnect.IngIngester  import IngestLogger as Logger, \ 
 30                                                IngestLogFile 
 31  from   wsatools.File                   import File 
 32  import wsatools.Utilities                  as utils 
 33  import wsatools.WfcamsrcInterface          as wfcamsrc 
 34  #------------------------------------------------------------------------------ 
 35   
36 -class Cu4(IngCuSession):
37 """ 38 Invoke single frame source detections ingestion given a log of files 39 transferred from the data processing centre. 40 41 """
42 - class NoFilesError(Exception):
43 """ Exception in case the given programme has no catalogues to process. 44 """ 45 pass
46 47 maxSplit = 2 48 cuedFileExists = False # Does CU4ED file exist? 49 #-------------------------------------------------------------------------- 50
51 - def __init__(self, 52 curator=CLI.getOptDef("curator"), 53 database=DbSession.database, 54 csvSharePath=CLI.getOptDef("outpath"), 55 programmeList=CLI.getOptDef("programmes"), 56 progOrder=CLI.getOptDef("progorder"), 57 detectionSubset=CLI.getOptDef("subset"), 58 excludeFiles=CLI.getOptDef("exclude"), 59 isTrialRun=DbSession.isTrialRun, 60 ReDo=CLI.getOptDef("redo"), 61 keepWorkDir=CLI.getOptDef("keepwork"), 62 xferlog=CLI.getArgDef("xferlog"), 63 comment=CLI.getArgDef("comment")):
64 """ 65 @param curator: Name of curator. 66 @type curator: str 67 @param comment: Descriptive comment as to why curation task is 68 being performed. 69 @type comment: str 70 @param database: Name of the database to connect to. 71 @type database: str 72 @param detectionSubset: Process subset of [Astrometry table, 73 Photometry table, Raw table]. 74 @type detectionSubset: list(str) 75 @param excludeFiles: Excluded files from processing. 76 @type excludeFiles: list(str) 77 @param keepWorkDir: Don't remove working directory. 78 @type keepWorkDir: bool 79 @param programmeList: Only process data for given programmes (accepts 80 keywords 'all', 'ns' (non-survey), 81 'ukidss' (all 5 main surveys)). 82 @type programmeList: list(str) 83 @param progOrder: The order of processing of the programmes, all 84 programmes not explicitely named can be put 85 anywhere in the list as 'others'. 86 @type progOrder: list(str) 87 @param ReDo: If True, overwrite existing monthly detection schema. 88 @type ReDo: bool 89 @param xferlog: Logfile containing files to be ingested. 90 @type xferlog: str 91 @param isTrialRun: If True, do not perform database modifications. 92 @type isTrialRun: bool 93 94 """ 95 typeTranslation = {"curator":str, 96 "database":str, 97 "csvSharePath":str, 98 "programmeList":list, 99 "progOrder":list, 100 "detectionSubset":list, 101 "excludeFiles":list, 102 "isTrialRun":bool, 103 "xferlog":str, 104 "ReDo":bool, 105 "keepWorkDir":bool, 106 "comment":str} 107 108 super(Cu4, self).attributesFromArguments( 109 inspect.getargspec(Cu4.__init__)[0], locals(), 110 types=typeTranslation) 111 112 self.comment += '[' + ','.join(n for n in programmeList) + ']' 113 114 # Initialize parent class 115 super(Cu4, self).__init__(cuNum=4, 116 curator=self.curator, 117 comment=self.comment, 118 reqWorkDir=True, 119 keepWorkDir=self.keepWorkDir, 120 database=self.database, 121 autoCommit=False, 122 isTrialRun=self.isTrialRun) 123 124 # make sure detectionSubset conforms with table naming scheme 125 self.detectionSubset = tuple(x.title() for x in self.detectionSubset) 126 127 # Other constants used within this script: 128 self.createFileList() 129 self.outPrefixList = [] 130 131 self._initFromDB() 132 133 utils.ensureDirExist(self.csvSharePath) 134 self.obsCal = self.sysc.obsCal
135 136 #-------------------------------------------------------------------------- 137
138 - def _initFromDB(self):
139 """ Initialize constants from the DB 140 """ 141 # Connect to database 142 self._connectToDb() 143 144 try: 145 if not self.isTrialRun: 146 self._cuEventID = self._getNextCuEventID() 147 148 if self.cuNum != dbc.smallIntDefault(): 149 try: 150 self._cuTable = df.CurationTaskTable(self.archive, 151 self.cuNum) 152 Logger.addMessage("Initialising Curation Use case:") 153 Logger.addMessage(self._cuTable.getDescription()) 154 Logger.addMessage("on list: " + self.xferlog) 155 except ValueError, details: 156 Logger.addMessage("<Warning> " + str(details)) 157 self._cuTable = None 158 159 # Create the translation dictionary of programme identification 160 # strings to programme ID numbers for all programmes: 161 self._createProgrammeTranslation() 162 163 # remove all items with empty fits catalogue files and create 164 # a catalogue filelist from this and also check if cu3 has run 165 # on these files before 166 self._shortListFile = \ 167 File(os.path.join(self._workPath, "shortlist.dat")) 168 self._shortListFile.wopen() 169 170 notInDbLogFile = \ 171 File("notcu3ed_%s.log" % mxdt.utc().strftime("%Y%m%d_%H%M%S")) 172 notInDbLogFile.wopen() 173 174 # exclude all items containing the names given via exclude option 175 tmpList = [] 176 if self.excludeFiles: 177 Logger.addMessage("Excluding %s" % ','.join(self.excludeFiles)) 178 for excl in self.excludeFiles: 179 for entry in self._fileList: 180 if not (excl in entry[0] or excl in entry[1]): 181 tmpList.append(entry) 182 self._fileList = tmpList[:] 183 184 catFiles = [] 185 self.procDateSet = set() 186 self.isWfauProduct = False 187 notcu3ed = 0 188 189 for entry in self._fileList: 190 if entry[1] != self.sysc.emptyFitsCataloguePathName(): 191 # Check that CU3 has been run on this file 192 fileEntry = self.sysc.pixelServerHostName + entry[0] 193 #if self.archive.queryEntriesExist("Multiframe", 194 mfID = self.archive.query( 195 "multiframeID", "Multiframe", 196 whereStr="deprecated<128 AND filename LIKE %r" % 197 fileEntry) 198 if mfID: 199 catFiles.append(list(entry) + mfID) 200 self._shortListFile.writetheline(entry[0] + ' ' + entry[1]) 201 202 if "e20" in entry[1]: 203 self.isWfauProduct = True 204 205 if self.sysc.isVSA() and "e20" in entry[1] \ 206 and self.database.rpartition('.')[2] == "VSAVVV" : 207 month = os.path.basename(os.path.dirname( 208 entry[0])).partition('_v')[0][:4] + "0000" 209 else: 210 month = os.path.basename(os.path.dirname( 211 entry[0])).partition('_v')[0] 212 213 self.procDateSet.add(month) 214 else: 215 notcu3ed += 1 216 notInDbLogFile.writetheline(entry[0]) 217 218 self._fileList = catFiles 219 220 self._shortListFile.close() 221 notInDbLogFile.close() 222 223 if notcu3ed > 0: 224 Logger.addMessage("%s files not ingested by CU3 are in %s" % 225 (notcu3ed, notInDbLogFile.name)) 226 else: 227 try: 228 os.remove(notInDbLogFile.name) 229 except OSError: 230 pass 231 232 # Get the programme table schema 233 self._progTable = df.ProgrammeTable(self.archive) 234 235 finally: 236 self._disconnectFromDb()
237 238 #-------------------------------------------------------------------------- 239
240 - def _onRun(self):
241 """ Do CU4. 242 """ 243 Logger.addMessage("[%d] Started CU%s on list: %s in order: %r" % 244 (self._cuEventID, self.cuNum, self.xferlog, self.progOrder)) 245 246 # Create a log file to hold the verbose record for this invocation 247 log = Logger(self.createLogFileName()) 248 249 try: 250 # Only invoke ingestion if needed: 251 if not self._fileList: 252 # @@TODO: Raise a CuError? 253 Logger.addMessage( 254 "[%d] No new source data are required to be loaded." % self._cuEventID) 255 return 256 257 # create the illumination table list 258 self.createIlluminationTableList() 259 260 # get month of processed data 261 self.monthlyMonths = [] 262 for dateStr in sorted(self.procDateSet): 263 self.monthlyMonths.append(dateStr[:6]) 264 265 # Get programme IDs for every catalogue 266 catsForProg = self.getProgIDs() 267 268 Logger.addMessage( 269 "[%d] Number of files containing source detections to be " 270 "ingested: %d" % (self._cuEventID, len(list( 271 utils.unpackList(catsForProg.values()))))) 272 273 # create schema dictionary for every table 274 self.createSchemaDict() 275 276 # process programmes in given order, otherwise from extractDict 277 self._progOrderRatedDict = self.getProcessOrder(self._progIDs, 278 include=self.programmeList, progOrder=self.progOrder) 279 280 if not set(self._progIDs).intersection(self._progOrderRatedDict): 281 raise Cu4.NoFilesError() 282 283 # split the file list for easier processing 284 extractDict = self.createSplitListFiles(catsForProg, 285 splitNum=self.maxSplit) 286 287 if not self._progOrderRatedDict: 288 Logger.addMessage("[%d] No data for %s." % ( 289 self._cuEventID, ','.join(self.programmeList))) 290 else: 291 Logger.addMessage("[%d] Running exnumeric..." % self._cuEventID) 292 print("Full log is no longer echoed to terminal, but " 293 "continues to be written to " + log.pathName) 294 295 # write ingest info into the ingest log file 296 timestamp = utils.makeMssqlTimeStamp() 297 histories = {"DCH": [], "ACH": [], "PCH": []} 298 # include processed dates 299 histories["DCH"].extend(self._dateList) 300 301 # for the update of the curation event history 302 histories["ACH"].append( 303 [self._cuEventID, self.cuNum, log.pathName, dbc.charDefault(), 304 timestamp, self.curator, self.comment, dbc.yes()]) 305 306 # check if CU4ED files exist 307 for dateDirPath in histories["DCH"]: 308 cuedFileName = "CU%02dED_%s" % ( 309 self.cuNum, self.database.rpartition('.')[2]) 310 if os.path.exists(os.path.join(dateDirPath, cuedFileName)): 311 self.cuedFileExists = True 312 313 self._subNum = 0 314 totalProcNum = 0 315 for progID in self._progOrderRatedDict: 316 # Setup the monthly detection tables 317 if progID in self.sysc.monthlyDetSurveys: 318 self.setupDetectionTable( 319 self._progNameOfID[progID].lower().replace( 320 "u/ukidss/",'').replace('/',''), self.monthlyMonths, 321 self.ReDo) 322 323 # for VVV split filelist into 10 sublists: 324 if progID == 120: 325 if len(extractDict[progID]) >= 50: 326 splitNum = int(len(extractDict[progID]) / 10.) 327 elif len(extractDict[progID]) >= 10: 328 splitNum = int(len(extractDict[progID]) / 5.) 329 else: 330 splitNum = len(extractDict[progID]) 331 else: 332 splitNum = 1 333 334 self._progTable.setCurRow(programmeID=int(progID)) 335 Logger.addMessage("[%d] Extracting data for %s" % ( 336 self._cuEventID, self._progTable.getName())) 337 338 subLists = self.splitList(extractDict[progID], splitNum) 339 340 Logger.addMessage( 341 "[%d] Number of listfiles (cont. 2 FITS files) to be " 342 "processed: %d (split into %d sublist(s))" % ( 343 self._cuEventID, len(extractDict[progID]), 344 len(subLists))) 345 346 for subFileList in subLists: 347 self._subNum += 1 348 Logger.addMessage("Processing sublist #%d.%d..." % ( 349 self._cuEventID, self._subNum)) 350 # output file prefix 351 hostID = "%s-%d" % (self._hostName, self._subNum) 352 if progID == 120: 353 hostID += "-%s" % self.monthlyMonths[0] 354 355 self._outPrefix = self.createFilePrefix( 356 self.cuNum, self._cuEventID, hostID, 357 self.database.rpartition('.')[2]) 358 self.outPrefixList.append((self._outPrefix, 359 self._cuEventID)) 360 361 # set table variables that will be passed on to the ingester 362 self.setTableVariables() 363 364 # make sure there are no old .dat files in the dbSharePath 365 self.cleanUpFileShare() 366 367 # Create ingest log file 368 ingestLogFile = IngestLogFile(os.path.join( 369 self.dbMntPath, self._outPrefix + ".log")) 370 371 self.runExtractions(progID, subFileList) 372 373 # check if all files are created 374 tmpTableDict, tmpIngestOrder = self.checkCreated(progID) 375 376 # update histories 377 histories["PCH"] =[[progID, self._cuEventID, timestamp, -1]] 378 ingestLogFile.pickleWrite(tmpIngestOrder, tmpTableDict, 379 histories) 380 Logger.addMessage( 381 "[%d] Curation History info written to %s" % ( 382 self._cuEventID, ingestLogFile.name)) 383 384 Logger.addMessage( 385 "Processing of %s listfiles of sublist #%d.%d finished." % ( 386 len(subFileList), self._cuEventID, self._subNum)) 387 totalProcNum += len(subFileList) 388 forceFlag = (" -F" if self.cuedFileExists else '') 389 Logger.addMessage( 390 "[%d] Please, run ./IngestCUFiles.py -i -r %s%s %s" % ( 391 self._cuEventID, ingestLogFile.name, forceFlag, 392 self.database)) 393 394 preComment = self._historyCommentPrefix(self._success, dbc.yes()) 395 row = (self._cuEventID, self.cuNum, log.pathName, 396 dbc.charDefault(), timestamp, self.curator, 397 preComment + self.comment, dbc.yes()) 398 399 self._connectToDb() 400 self._updateArchiveCurationHistory(row) 401 self.archive.commitTransaction() 402 self._disconnectFromDb() 403 Logger.addMessage("[%d] Processing of %s files finished." % ( 404 self._cuEventID, totalProcNum * self.maxSplit)) 405 406 except Cu4.NoFilesError: 407 Logger.addMessage( 408 "[%d] No catalogue data available for programmes: %s" % ( 409 self._cuEventID, ', '.join(self.programmeList))) 410 except Exception as error: 411 # @@TODO: If all IngCuSessions have this structure, then 412 # IngCuSession needs to have a new run() method. 413 Logger.addExceptionDetails("[%d] %r" % (self._cuEventID, error)) 414 finally: 415 Logger.addMessage("[%d] Log written to %s" %( 416 self._cuEventID, log.pathName)) 417 Logger.dump(file(log.pathName, 'w')) 418 Logger.reset()
419 420 #-------------------------------------------------------------------------- 421
422 - def checkCreated(self, progID):
423 """ Check that all files are created. 424 """ 425 tmpTableDict = {} 426 tmpIngestOrder = [] 427 identity = string.maketrans('', '') 428 for table in self._ingestOrder: 429 progToCheck = self._progNameOfID[progID].lower() 430 progToCheck = progToCheck.replace("u/ukidss/", '') 431 progToCheck = progToCheck.translate(identity, '/.-()') 432 if progToCheck in table.partition("Detection")[0].lower(): 433 if not os.path.exists(os.path.join( 434 self.csvSharePath, self._tableDict[table]["ingfile"])): 435 self._ingestOrder.remove(table) 436 Logger.addMessage( 437 "[%d] <ERROR> Data for %s not available!" % ( 438 self._cuEventID, table)) 439 else: 440 tmpTableDict[table] = self._tableDict[table] 441 tmpIngestOrder.append(table) 442 443 return tmpTableDict, tmpIngestOrder
444 445 #-------------------------------------------------------------------------- 446
447 - def cleanUpFileShare(self):
448 """Make sure there are no old .dat files in the dbSharePath. 449 """ 450 for table in self._tableDict: 451 csvFileName = os.path.join( 452 self.csvSharePath, self._tableDict[table]["ingfile"]) 453 if os.path.exists(csvFileName): 454 os.remove(csvFileName)
455 456 #-------------------------------------------------------------------------- 457
459 """Create the list of used illumination tables and write it for use 460 with exnumeric to the working directory. 461 """ 462 illumDict = defaultdict(list) 463 illumFiles = dircache.listdir(self.sysc.illuminationDir()) 464 465 for dateStr in self.procDateSet: 466 # find existing illumination tables 467 for illumFileName in illumFiles: 468 if illumFileName.endswith(".table"): 469 _prefix, semester, month, _band = \ 470 illumFileName.partition(".table")[0].split('_') 471 472 semester = ("20%s" % semester.upper() if self.sysc.isWSA() 473 else semester) 474 475 if not self.isWfauProduct and \ 476 (semester == self.obsCal.checkDate(dateStr) 477 or dateStr.startswith(semester)) \ 478 and month.lower() == self.obsCal.getMonth(dateStr): 479 illumDict[dateStr].append(os.path.join( 480 self.sysc.illuminationDir(), illumFileName)) 481 482 # fill missing entries with default table 483 if dateStr not in illumDict: 484 illumDict[dateStr] = [ 485 os.path.join(self.sysc.illuminationDir(), fn) 486 for fn in illumFiles 487 if any(k in fn for k in ['_00x_def_', '_0000_def_']) 488 and fn.endswith(".table")] 489 Logger.addMessage("<Warning> No illumination correction table" 490 " available, using default table!") 491 492 # create table list file and csv files for exnumeric 493 self.illumFile = File(os.path.join(self._workPath, "illumTables.dat")) 494 self.illumFile.wopen() 495 for dateStr in self.procDateSet: 496 for illumTableName in illumDict[dateStr]: 497 if not os.path.exists(illumTableName + ".csv"): 498 self.parseIllumTable(illumTableName) 499 self.illumFile.writetheline(illumTableName + ".csv") 500 self.illumFile.close()
501 502 #-------------------------------------------------------------------------- 503
504 - def createSchemaDict(self):
505 """ Create schema dictionary for every table. 506 """ 507 self._schemaFileDict = {} 508 for progID in self._progIDs: 509 self._progTable.setCurRow(programmeID=progID) 510 511 rawTable = self._progTable.getDetectionTable() 512 astroTable = self._progTable.getAstrometryTable() 513 photoTable = self._progTable.getPhotometryTable() 514 515 if progID in self.sysc.monthlyDetSurveys: 516 rawTable += self.monthlyMonths[0] 517 astroTable += self.monthlyMonths[0] 518 photoTable += self.monthlyMonths[0] 519 520 tables = [rawTable, astroTable, photoTable] 521 522 # determine the tables wanted 523 processTables = [table for table in tables 524 if any(x in table for x in self.detectionSubset)] 525 526 self._schemaFileDict[progID] = [ 527 os.path.join(self._workPath, "schema_%s.dat" % progID), 528 ','.join(processTables)] 529 530 schema = self._progTable.getAttr('catalogueSchema') 531 if progID in self.sysc.monthlyDetSurveys: 532 schema = self.sysc.sqlMonthlyDetPath( 533 schema.replace(".sql", "%s.sql" % self.monthlyMonths[0]), 534 full=False) 535 536 # include raw table 537 writeTables = (processTables if rawTable in processTables else 538 [rawTable] + processTables) 539 file(self._schemaFileDict[progID][0], 'w').writelines( 540 "%s %s\n" % (schema, table) for table in writeTables)
541 542 543 #-------------------------------------------------------------------------- 544
545 - def splitList(self, p, n):
546 """Split list p into sublists of length n. 547 """ 548 result = [ [ ] for _x in itertools.repeat(0, n) ] 549 resiter = itertools.cycle(result) 550 for item, sublist in itertools.izip(p, resiter): 551 sublist.append(item) 552 return result
553 554 #-------------------------------------------------------------------------- 555
556 - def createSplitListFiles(self, catalogueDict, splitNum=2):
557 """ Split the catalogues of each programme into splitNum sublists, 558 where larger catalogues are paired with smaller catalogues. 559 """ 560 extractDict = defaultdict(list) 561 for progID in self._progOrderRatedDict: 562 splitList = utils.npop(catalogueDict[progID], splitNum, mode="asc") 563 for i, entry in enumerate(splitList): 564 pidFileName = "filelist_%s_%s.dat" % (progID, i) 565 pidFilePath = os.path.join(self._workPath, pidFileName) 566 extractDict[progID].append(pidFilePath) 567 pidFile = File(pidFilePath) 568 pidFile.wopen() 569 for fileName in entry: 570 if fileName[1] != self.sysc.emptyFitsCataloguePathName: 571 pidFile.writetheline(fileName[0] + ' ' + fileName[1]) 572 pidFile.close() 573 574 return extractDict
575 576 #-------------------------------------------------------------------------- 577
578 - def getProgIDs(self):
579 """ 580 Reads FITS files to determine the set of programme IDs for the 581 catalogues that will be ingested in this curation event. 582 583 @return: A programme ID -> catalogues dictionary. 584 @rtype: defaultdict(int, list(str)) 585 586 """ 587 # get all program IDs that are affected by this use case: 588 Logger.addMessage("[%d] Getting all programme IDs." % self._cuEventID) 589 progOfFile, newProgOfFile = \ 590 self.getProgramIDs(self._shortListFile.name) 591 592 newProgFiles = newProgOfFile.keys() 593 progIDs = set(progOfFile[fileName][0] for fileName in progOfFile) 594 595 # Create a programme ID -> catalogues dictionary 596 catsForProg = defaultdict(list) 597 for imgName, catName, mfID in self._fileList: 598 if not imgName in newProgFiles: 599 #fileNames.append(os.path.getsize(fileNames[1])) 600 catsForProg[progOfFile[imgName][0]].append( 601 [imgName, catName, mfID]) 602 603 # Merge together any programme IDs that are the same programme. 604 tmpProgs = set() 605 for entry in self.programmeList: 606 if any(eql in entry for eql in ('+=', '==')): 607 inProg, _eql, outProg = entry.rpartition('=') 608 inProg, _met = inProg[:-1], inProg[-1:] 609 inPID = self._progIDofName[inProg.upper()] 610 if self.sysc.isWSA(): 611 outPID = self._progIDofName[ 612 outProg.upper() if '/' in outProg 613 else "U/UKIDSS/%s" % outProg.upper()] 614 else: 615 outPID = self._progIDofName[outProg.upper()] 616 617 progIDs.add(outPID) 618 if _met == '+': 619 progIDs.add(inPID) 620 if outProg in self.programmeList: 621 catsForProg[outPID].extend(catsForProg[inPID]) 622 else: 623 catsForProg[outPID] = catsForProg[inPID][:] 624 tmpProgs.add(inProg) 625 tmpProgs.add(outProg) 626 elif _met == '=': 627 catsForProg[outPID] = catsForProg[inPID][:] 628 del catsForProg[inPID] 629 progIDs.discard(inPID) 630 tmpProgs.add(outProg) 631 else: 632 tmpProgs.add(entry) 633 else: 634 tmpProgs.add(entry) 635 self.programmeList = list(tmpProgs)[:] 636 637 # Sort lists in programme ID -> catalogues dictionary by filesize then 638 # order by largest first 639 for progID in catsForProg: 640 catsForProg[progID][:] = \ 641 sorted(catsForProg[progID], key=itemgetter(2)) 642 643 self._progIDs = sorted(progIDs) 644 Logger.addMessage( 645 "[%d] Got all programme IDs: %s" % ( 646 self._cuEventID, ','.join(map(str, self._progIDs)))) 647 648 return catsForProg
649 650 #-------------------------------------------------------------------------- 651
652 - def parseIllumTable(self, illumTableName):
653 """ Parse the CASU illumination table file into a csv file. 654 """ 655 casuIllumFile = File(illumTableName) 656 casuIllumFile.ropen() 657 658 illumDict = {} 659 for line in casuIllumFile.readlines(): 660 if "Xi" not in line: 661 xi, eta, delMag, _binNo = line.split() 662 key = (math.radians(float(xi)), math.radians(float(eta))) 663 illumDict[key] = float(delMag) 664 665 filterName = casuIllumFile.root.rpartition('_')[2] 666 casuIllumFile.close() 667 668 xiList = sorted(set(k[0] for k in illumDict)) 669 etaList = sorted(set(k[1] for k in illumDict)) 670 671 illumDataFile = File(casuIllumFile.name + ".csv") 672 illumDataFile.wopen() 673 illumDataFile.writetheline('%s,%s,%s' % 674 (len(xiList), len(etaList), filterName)) 675 for xi in xiList: 676 for eta in etaList: 677 illumDataFile.writetheline('%.16f,%.16f,%.3f' % 678 (xi, eta, illumDict[(xi, eta)])) 679 illumDataFile.close()
680 681 682 #-------------------------------------------------------------------------- 683
684 - def runExtractions(self, progID, subFileList):
685 """ Run the C++ code to extract the detections from the catalogues. 686 """ 687 # negative object IDs to enable parallelisation 688 nextObjID = -1 689 690 for filePidPathName in subFileList: 691 # Extract detections only if file list contains files 692 if not self.isTrialRun and os.path.getsize(filePidPathName) > 0: 693 notIngList, lastObjID = wfcamsrc.extractDetections( 694 self._schemaFileDict[progID][0], filePidPathName, 695 self._cuEventID, self.csvSharePath, self._outPrefix, 696 self.illumFile.name, self._schemaFileDict[progID][1], 697 nextObjID, self.sysc) 698 if notIngList: 699 self.writeErrorFiles(notIngList) 700 701 # Object IDs are negative and lastObjID defaults to 0. 702 nextObjID = min(nextObjID, lastObjID)
703 704 #-------------------------------------------------------------------------- 705
706 - def setTableVariables(self):
707 """ Set the variables dependend of the tables used in this cu. 708 """ 709 tableList = [] 710 processedTables = [] 711 for progID in self._progOrderRatedDict: 712 tableList += zip( 713 utils.extractColumn(self._schemaFileDict[progID][0], 1), 714 utils.extractColumn(self._schemaFileDict[progID][0], 0)) 715 processedTables += self._schemaFileDict[progID][1].split(',') 716 717 self._ingestOrder = [] 718 self._tableDict = {} 719 for table, schema in tableList: 720 if table in processedTables: 721 self._ingestOrder.append(table) 722 self._tableDict[table] = dict(schema=schema, 723 ingfile="%s_%s.dat" % (self._outPrefix, table)) 724 725 self._ingestOrder = utils.orderedSet(self._ingestOrder)
726 727 #------------------------------------------------------------------------------ 728 # Entry point for CU4 729 730 if __name__ == '__main__': 731 # Transfer log with 1 filename per line 732 CLI.progArgs.append(CLI.Argument("xferlog", "xferlog.log")) 733 CLI.progOpts += [ 734 CLI.Option('o', "outpath", 735 "directory where the csv file is written to", 736 "PATH"), 737 CLI.Option('p', "progorder", 738 "the order of processing of the programmes, all programmes not " 739 "explicitly named can be put anywhere in the list as 'others'", 740 "LIST", ''), 741 CLI.Option('r', "redo", 742 "enable overwriting of existing MfIDs"), 743 CLI.Option('K', "keepwork", "Don't remove working dir."), 744 CLI.Option('P', "programmes", 745 "only process data for given programmes (accepts keywords 'all', 'ns' " 746 "(non-survey), 'ukidss' (all 5 main surveys); one programme suffixed " 747 "with 'x-' excludes this programme.)", 748 "LIST", "all"), 749 CLI.Option('S', "subset", 750 "process subset of Photometry table, Raw table and Astrometry table ", 751 "LIST", "Photometry,Raw,Astrometry"), 752 CLI.Option('X', "exclude", 753 "exclude given files/file patterns from processing", 754 "LIST", '')] 755 756 cli = CLI(Cu4.__name__, "$Revision: 10181 $", Cu4.__doc__) 757 Logger.addMessage(cli.getProgDetails()) 758 759 cu4 = Cu4(cli.getOpt("curator"), 760 cli.getArg("database"), 761 cli.getOpt("outpath"), 762 cli.getOpt("programmes"), 763 cli.getOpt("progorder"), 764 cli.getOpt("subset"), 765 cli.getOpt("exclude"), 766 cli.getOpt("test"), 767 cli.getOpt("redo"), 768 cli.getOpt("keepwork"), 769 cli.getArg("xferlog"), 770 cli.getArg("comment")) 771 cu4.run() 772 773 #------------------------------------------------------------------------------ 774 # Change log: 775 # 776 # 20-Apr-2003, NCH: Added in statements to implement deprecation of detections 777 # from reingested frames (not tested!); added in this 778 # revision history stuff. 779 # TO DO: This needs tidying up along the lines of cu3.py; 780 # ie. should have the most recent DB connectivity etc. coded 781 # in. Note that the final paragraph of code at the end 782 # should be kept and put in the correct place (ie. at 783 # the end of all ingest operations, but before the final 784 # curation history updates & DB commit etc). 785 # 11-Aug-2004, ETWS: Refactoring to implement latest changes to the 786 # data model along the lines of CU3 787 # 18-Jan-2005, NCH: Reinstated deprecation of detections from reingested 788 # frames (with small refactoring; NB: this is still 789 # untested); tidied up some documentation inconsistencies. 790 # 10-Mar-2005, ETWS: fixed some minor bugs 791 # 11-Apr-2005, ETWS: fixed the NextCuEventId DB update 792 # 27-May-2005, ETWS: included cuNum in getNextCurationEventID 793 # 6-Jun-2005, ETWS: included list sorting and splitting to avoid memory leak 794 # 11-Jul-2005, ETWS: included excluding of files without valid programmeID 795 # 16-Aug-2005, ETWS: set the amount of files in a split list to 10 796 # 28-Nov-2005, NCH: Added dropping of table indexes prior to bulk load for 797 # speedier operation. 798 # 2-Dec-2005, ETWS: include non-survey sql schema path 799 # 5-Dec-2005, JB: Commented out dropping indexes prior to bulk load. 800 # 20-Jan-2006, JB: Added code so that the name of the database being used 801 # is prepended to the logfile filename. 802 # 22-Feb-2006, RSC: Upgraded to support binary file ingest 803 # 3-Mar-2006, JB: Uncommented tmp file removal code 804 # 10-Mar-2006, NCH: Reinstated drop table index to see if it helps speed 805 # things up a bit 806 # 24-Mar-2006, JB: Added code to output the list of Programme ID's before 807 # they are processed. 808 # 28-Mar-2006, JB: Fixed the above code! :) 809 # 30-Mar-2006, JB: Cosmetic change to a log messge (added a space! :) 810 # 30-Mar-2006, JB: Changed the above so it now looks the same as CU3... 811 # 26-Apr-2006, ETWS: Included check if files have been ingested by CU3 before 812 # 28-Apr-2006, ETWS: Fixed indentation bug and update of ProgCurationHistory 813 # only if there is data ingested 814 # 2-May-2006, ETWS: Included mainDb and cuEventId in partly and not ingested 815 # file names 816 # 3-May-2006, ETWS: Added constraint check 817 # 8-May-2006, ETWS: Added programme ID output to logger 818 # 11-May-2006, ETWS: Added more error messages 819 # 11-May-2006, JB: Quick str/int changes and now check table integrity 820 # immediately after ingestion 821 # 16-May-2006, RSC: Replaced Metadata.makeTableList() with 822 # utils.extractColumn() 823 # 30-Nov-2006, ETWS: Refactored to use OO framework and parallelisation, this 824 # script now produces data files and an ingest log file, 825 # ingest has to be done seperately with IngestCUFiles.py 826 # 31-Jan-2007, ETWS: Bug fixes and improvements. 827 # 2-Feb-2007, ETWS: Included processing of programmes in a given order. 828 # 5-Feb-2007, ETWS: Moved class Ratings into wsatools.EnhancedDictionary 829 # 7-Feb-2007, ETWS: ...and the dictionary functions are on the move again... 830 # 21-Mar-2007, ETWS: Added some more screen output. 831 # 08-Jun-2007, ETWS: Fixed bugs introduced by changes made to DataFactory. 832 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 833 # file name setting. 834 # 17-Apr-2008, ETWS: Included illumination correction. 835 # 5-May-2008, ETWS: Finalised processing of detection table subset. 836 # 28-May-2008, ETWS: Included switch to process specified programmes; 837 # excluded files deprecated due to reprocessing. 838 # 12-Jun-2008, ETWS: Included default illumination table. 839 # 1-Oct-2008, ETWS: Integrated new CASU illumination table design. 840 # 3-Oct-2008, ETWS: Fixed remaining illum table issues. 841 # 3-Nov-2008, ETWS: Fixed new non-survey sql path. 842 # 21-Nov-2008, ETWS: Fixed illumination table updates for dates outside of 843 # semester ranges. 844