Package wsatools :: Package DbConnect :: Module IngCuSession
[hide private]

Source Code for Module wsatools.DbConnect.IngCuSession

   1  #------------------------------------------------------------------------------ 
   2  #$Id: IngCuSession.py 10208 2014-02-07 11:24:01Z EckhardSutorius $ 
   3  """ 
   4     Framework for parallel-running curation sessions. Performs the set of 
   5     actions that CUs 1 to 4 are required to do. These curation tasks should 
   6     inherit from this class. 
   7   
   8     @author: E. Sutorius 
   9     @org:    WFAU, IfA, University of Edinburgh 
  10   
  11     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  12     @contributors: R.S. Collins 
  13  """ 
  14  #------------------------------------------------------------------------------ 
  15  from future_builtins import zip 
  16   
  17  import csv 
  18  from   collections import defaultdict 
  19  import inspect 
  20  import mx.DateTime   as mxTime 
  21  import os 
  22  import re 
  23  import shutil 
  24  import signal 
  25  import socket 
  26  from   subprocess  import Popen, PIPE 
  27  import time 
  28   
  29  from   wsatools.CLI                 import CLI 
  30  import wsatools.DbConnect.DbConstants   as dbc 
  31  from   wsatools.DbConnect.DbSession import DbSession, odbc 
  32  from   wsatools.ExternalProcess     import Error as ExtError 
  33  from   wsatools.File                import File 
  34  import wsatools.FitsUtils               as fits 
  35  from   wsatools.Logger              import Logger 
  36  import wsatools.DbConnect.Schema        as schema 
  37  from   wsatools.SystemConstants     import SystemConstants 
  38  import wsatools.Utilities               as utils 
39 #------------------------------------------------------------------------------ 40 41 -class IngCuSession(object):
42 """ A parallel curation session for transfer/ingest CUs 1 to 4. 43 """
44 - class IngCuError(Exception):
45 """ A curation error instead of a programming error. """ 46 pass
47 48 beginDateDef = 10000101 49 endDateDef = 99991231 50 # @@GOTCHA: This needs to be defined to prevent exceptions on __del__ if 51 # __init__ fails. 52 reqWorkDir = None 53 keepWorkDir = False 54 55 # Class-scope member data 56 _defComment = 'A useful comment' #: Default comment placeholder. 57 _fileList = None #: File list. 58 _isDbLocked = False #: Does a database lock file exist? 59 _numSessions = 0 #: Counter to track number of concurrent sessions. 60 61 #: System constants for this CuSession. 62 sysc = SystemConstants() 63 64 # Define generic command-line interface settings for a CuSession. 65 CLI.progArgs += [CLI.Argument('comment', _defComment, isOptional=True)] 66 CLI.progOpts += [CLI.Option('c', 'curator', 'name of curator', 'NAME', 67 os.getenv('USER'))] 68 69 #-------------------------------------------------------------------------- 70
71 - def __init__(self, cuNum=dbc.smallIntDefault(), 72 curator=CLI.getOptDef('curator'), 73 comment=CLI.getArgDef('comment'), 74 reqWorkDir=False, 75 keepWorkDir=False, 76 database=DbSession.database, 77 userName=DbSession.userName, 78 autoCommit=False, 79 isTrialRun=DbSession.isTrialRun):
80 """ 81 Initialises data members for all database state flags and details for 82 this curation session. Writes a database lock and opens a connection. 83 Also initiates a Programme table object. 84 85 @param cuNum: Curation task number. 86 @type cuNum: int 87 @param curator: Name of curator. 88 @type curator: str 89 @param comment: Descriptive comment as to why curation task is 90 being performed. 91 @type comment: str 92 @param reqWorkDir: If True, a working directory will be created. 93 @type reqWorkDir: bool 94 @param keepWorkDir: Don't remove working directory. 95 @type keepWorkDir: bool 96 @param database: Name of the database to connect to. 97 @type database: str 98 @param userName: Optionally override default database username. 99 @type userName: str 100 @param autoCommit: If True, do not auto-commit transactions. This 101 speeds up transactions through minimal logging. 102 @type autoCommit: bool 103 @param isTrialRun: If True, do not perform database modifications, 104 just print the SQL statement to the terminal. 105 @type isTrialRun: bool 106 107 """ 108 self.attributesFromArguments( 109 inspect.getargspec(IngCuSession.__init__)[0], locals()) 110 111 # @@TODO: Make this initialise from database command-line argument, 112 # which probably requires merger with CuSession. 113 IngCuSession.sysc = SystemConstants(database.rpartition('.')[2]) 114 115 # Initialise all object-scope member data 116 self._cuEventID = None 117 self._cuTable = None 118 self.comment = ("Running CU" + str(cuNum) 119 if comment is IngCuSession._defComment else comment) 120 121 self._lockPathName = \ 122 os.path.join(os.getenv('HOME'), "dblock-" + self.database) 123 self._logFile = None 124 self._rolledBack = None 125 self._shareFileID = '' 126 127 # Create a working directory if required 128 if self.reqWorkDir: 129 self._workPath = self.createWorkingDir() 130 else: 131 self._workPath = os.curdir 132 self._allFileName = os.path.join(self._workPath, 'filelist.dat') 133 134 self.dbMntPath = IngCuSession.sysc.dbMntPath() 135 self._hostName = socket.gethostbyaddr(socket.gethostname())[1][0] 136 self._success = dbc.yes()
137 138 #-------------------------------------------------------------------------- 139
140 - def __del__(self):
141 """ Tidy-up IngCuSession. 142 """ 143 if IngCuSession._numSessions > 0 or IngCuSession._isDbLocked: 144 self._disconnectFromDb() 145 146 # Clean up working folder: 147 if self.reqWorkDir and not self.keepWorkDir and self._workPath: 148 self.removeWorkingDir()
149 150 #-------------------------------------------------------------------------- 151
152 - def _connectToDb(self, maxTries=100):
153 """ Connect to the database. 154 """ 155 try: 156 # Wait until database access is unlocked before proceeding 157 if not IngCuSession._isDbLocked and not self.isTrialRun: 158 self._prepareDbLock(maxTries) 159 160 IngCuSession._numSessions += 1 161 self.archive = DbSession(self.database, self.autoCommit, 162 self.isTrialRun, self.userName) 163 except Exception, details: 164 Logger.addExceptionMessage(details) 165 raise
166 167 #-------------------------------------------------------------------------- 168
170 """ 171 Create the translation dictionary of programme identification strings 172 to programme ID numbers for all programmes. It contains associations 173 between data flow system (DFS) programme identification strings (ie. as 174 supplied in ingest FITS files) and the WSA programme unique 175 identification numbers. 176 177 @todo: This method wouldn't be needed if a ProgrammeTable object always 178 existed and had an equivalent method to peform this translation. 179 Hence, the ProgrammeTable object would replace the dictionary 180 here and there wouldn't be a need to query the database again. 181 """ 182 # Query the database for the required data: 183 queryResult = \ 184 self.archive.query("dfsIDString, programmeID", "Programme") 185 186 self._progIDofName = \ 187 dict((name.upper(), progID) for name, progID in queryResult) 188 189 if self.sysc.isWSA(): 190 self._progIDofName["PTS"] = self._progIDofName["U/06A/52"] 191 self._progIDofName["COMM"] = self._progIDofName["U/EC/"] 192 self._progIDofName["UHS"] = self._progIDofName["U/UHS/"] 193 if self.sysc.isVSA(): 194 self._progIDofName["TECH"] = self._progIDofName["TECHNICAL"] 195 self._progIDofName["MAINT"] = self._progIDofName["MAINTENANCE"] 196 if self.sysc.isOSA(): 197 self._progIDofName["TECH"] = self._progIDofName["TECHNICAL"] 198 199 self._progNameOfID = dict(zip(self._progIDofName.itervalues(), 200 self._progIDofName.iterkeys())) 201 if self.sysc.isWSA(): 202 self._progNameOfID[self._progIDofName["U/06A/52"]] = "PTS" 203 self._progNameOfID[self._progIDofName["U/EC/"]] = "COMM" 204 self._progNameOfID[self._progIDofName["U/UHS/"]] = "UHS" 205 if self.sysc.isVSA(): 206 self._progNameOfID[self._progIDofName["TECHNICAL"]] = "TECH" 207 self._progNameOfID[self._progIDofName["MAINTENANCE"]] = "MAINT" 208 if self.sysc.isOSA(): 209 self._progNameOfID[self._progIDofName["TECHNICAL"]] = "TECH" 210 211 Logger.addMessage("Created the translation dictionary")
212 213 #-------------------------------------------------------------------------- 214
215 - def _disconnectFromDb(self):
216 """Disconnect from the database. 217 """ 218 try: 219 del self.archive 220 except AttributeError: 221 pass # archive connection never created 222 finally: 223 IngCuSession._numSessions -= 1 224 225 # Remove database lock if last session for this database 226 if IngCuSession._isDbLocked and IngCuSession._numSessions <= 0: 227 if os.path.exists(self._lockPathName): 228 thisTask = '%s.%s' % (os.getenv('HOST'), os.getpid()) 229 lockTask = file(self._lockPathName).read().split()[-1] 230 if thisTask == lockTask: 231 os.remove(self._lockPathName) 232 print("unlocked " + 233 os.path.basename(self._lockPathName)) 234 elif not self.isTrialRun: 235 Logger.addMessage("<Warning> Tried to remove DbLock, " 236 "but the file doesn't exist.") 237 238 IngCuSession._isDbLocked = False
239 240 #-------------------------------------------------------------------------- 241
242 - def _getNextCuEventID(self):
243 """ 244 Obtain the next available cuEventID. Whilst putting a placeholder row 245 into the ArchiveCurationHistory table to reserve this cuEventID, 246 preventing it from being hijacked by a parallel process. 247 248 """ 249 cuEventID = self._getNextID(dbc.curationEventUIDAttributeName(), 250 dbc.archiveCurationHistoryTableName()) 251 252 if self.sysc.isVSA(): 253 dbID = 2 if self.archive.database == "VSAVVV" else 1 254 cuEventID = 10 * (cuEventID // 10 + 1) + dbID 255 256 # Write it back in immediately to reserve this UID. 257 try: 258 self.archive.insertData(dbc.archiveCurationHistoryTableName(), 259 [cuEventID, self.cuNum, dbc.charDefault(), dbc.charDefault(), 260 utils.makeMssqlTimeStamp(), self.curator, 261 "in progress: " + self.comment, dbc.yes()], 262 enforcePKC=True) 263 self.archive.commitTransaction() 264 except odbc.ProgrammingError as error: 265 if "permission was denied" in str(error): 266 raise SystemExit("<ERROR> You do not have write " 267 "permission on database " + self.database) 268 else: 269 raise 270 271 return cuEventID
272 273 #-------------------------------------------------------------------------- 274
275 - def _getNextID(self, attrName, tableName, where=''):
276 """ 277 Obtain next available ID number in the db for a given ID attribute. 278 279 @param attrName: Name of ID attribute. 280 @type attrName: str 281 @param tableName: Name of the table where attribute resides. 282 @type tableName: str 283 @param where: Optional SQL WHERE clause. 284 @type where: str 285 286 @return: Next available ID number. 287 @rtype: int 288 289 """ 290 maxID = self.archive.queryAttrMax(attrName, tableName, where) 291 292 if maxID and maxID > 0: 293 return maxID + 1 294 else: # Either first ID does not exist or is default. 295 return 1
296 297 #-------------------------------------------------------------------------- 298
299 - def _historyCommentPrefix(self, success, rolledBack):
300 """ 301 Prefix for the comment in the ArchiveCurationHistory table. 302 303 @param success: The success status of the CU. 304 @type success: int 305 @param rolledBack: Flag to indicate ingest roll back. 306 @type rolledBack: int 307 308 @return: Prefix for the comment in the ArchiveCurationHistory table. 309 @rtype: str 310 311 """ 312 if not success and rolledBack: 313 return "failed: " 314 elif success and rolledBack: 315 return "processed: " 316 elif success and not rolledBack: 317 return "ingested: "
318 319 #-------------------------------------------------------------------------- 320
321 - def _onException(self):
322 """ Tidy up run tasks specific to this CU following an exception. 323 """ 324 # Block future keyboard interrupts until rollback is complete 325 signal.signal(signal.SIGINT, signal.SIG_IGN) 326 self._success = dbc.no() 327 try: 328 self.archive.rollbackTransaction() 329 except AttributeError: 330 # archive connection never created 331 Logger.addMessage( 332 "No database connection, so nothing to rollback.") 333 else: 334 self._rolledBack = dbc.yes() 335 # Allow keyboard interrupts again 336 signal.signal(signal.SIGINT, signal.SIG_DFL)
337 338 #-------------------------------------------------------------------------- 339
340 - def _onRun(self):
341 """ Performs all the tasks that define this curation session. """ 342 pass
343 344 #-------------------------------------------------------------------------- 345 346 @staticmethod
347 - def _parseCuNums(cus):
348 """ 349 Parse the given list of CU strings into a list of cuNum, cuEventID 350 and serverName. 351 352 @return: List of tuples characterising the CU events to be ingested. 353 @rtype: list 354 355 """ 356 cnList = [] 357 for entry in cus: 358 try: 359 cuNum, idservdate = entry.partition('-')[::2] 360 except AttributeError: 361 cuNum = entry 362 idservdate = '--' 363 searchTerms = (idservdate.split('-')+ ['', '', ''])[:3] 364 evtId = '' 365 serv = '' 366 date = '' 367 for x in searchTerms: 368 if x.isalpha(): 369 serv = x 370 elif x.isdigit() and x.startswith("20") and len(x) == 6 \ 371 and int(x[4:]) in range(1,13): 372 date = x 373 elif x.isdigit(): 374 evtId = x 375 cnList.append(("cu%02d" % int(cuNum), "id" + evtId, serv, date)) 376 return cnList
377 378 #-------------------------------------------------------------------------- 379
380 - def _prepareDbLock(self, maxTries=100):
381 """ 382 Writes database lock. If lock already present then pause and wait for 383 database to unlock. 384 385 """ 386 numTries = 0 387 midTries = 100 388 delay = 2 389 taskIdStr = ("[%d] " % self._cuEventID if self._cuEventID else '') 390 while os.path.exists(self._lockPathName): 391 time.sleep(delay) 392 numTries += 1 393 if numTries == 1: 394 print("%sA lock file exists: %s" % (taskIdStr, 395 self._lockPathName)) 396 print("End the task that owns the lock and/or manually " 397 "delete the lock file and this curation session will " 398 "automatically start.") 399 elif maxTries < 1000 and numTries > maxTries: 400 raise Exception("%sTimed out in accessing the" 401 " database" % taskIdStr) 402 elif maxTries >= 1000 and numTries >= midTries: 403 delay = min(120, 10*int(1 + (numTries - midTries)/10)) 404 Logger.addMessage("%sWaiting %d s..." % (taskIdStr, delay)) 405 elif numTries > 15000: 406 raise Exception("%sTimed out after %d s in accessing the" 407 " database" % (taskIdStr, numTries)) 408 409 file(self._lockPathName, 'w').write("DB locked by CU%s. PID = %s.%s" % 410 (self.cuNum, os.getenv('HOST'), os.getpid())) 411 412 IngCuSession._isDbLocked = True
413 414 #-------------------------------------------------------------------------- 415
416 - def _updateArchiveCurationHistory(self, row):
417 """ 418 Adds an entry for this curation session to the ArchiveCurationHistory 419 database table. 420 421 @param row: List of entries for the ArchiveCurationHistory table. 422 @type row: list 423 424 """ 425 Logger.addMessage("Updating archive curation history with this row:") 426 Logger.addMessage(repr(row)) 427 428 # First scrub the placeholder default row in the ArchiveCurationHistory 429 # table, generated by the data file builder. 430 self.archive.delete(dbc.archiveCurationHistoryTableName(), 431 whereStr="cuEventID=%s" % row[0]) 432 self.archive.insertData(dbc.archiveCurationHistoryTableName(), row)
433 434 #-------------------------------------------------------------------------- 435
436 - def _updateACHComment(self, newComment, cuEventID):
437 """ 438 Updates the comment of an existing entry in ArchiveCurationHistory. 439 440 @param newComment: The new comment. 441 @type newComment: str 442 @param cuEventID: The cuEventID of the event which is to be updated. 443 @type cuEventID: str 444 445 """ 446 try: 447 self.archive.update(dbc.archiveCurationHistoryTableName(), 448 "comment='%s'" % newComment, where="cuEventID=%s" % cuEventID) 449 self.archive.commitTransaction() 450 except odbc.ProgrammingError as error: 451 # @@TODO: Will this ever be called prior to the ACH entry being 452 # created by getNextCuEventID()? If not, then no need for 453 # permission check. 454 if "permission was denied" in str(error): 455 raise SystemExit("<ERROR> You do not have write permission on " 456 "database " + self.database) 457 else: 458 raise
459 460 #-------------------------------------------------------------------------- 461
462 - def _updateProgrammeCurationHistory(self, row):
463 """ 464 Adds an entry for this curation session to the ProgrammeCurationHistory 465 database table. 466 467 @param row: List of entries for the ProgrammeCurationHistory table. 468 @type row: list 469 470 """ 471 Logger.addMessage("Updating programme curation history with this row:") 472 Logger.addMessage(repr(row)) 473 474 # First scrub the placeholder default row in the 475 # ProgrammeCurationHistory table, generated by the data file builder. 476 self.archive.delete(dbc.programmeCurationHistoryTableName(), 477 whereStr="programmeID=%s AND cuEventID=%s" % (row[0], row[1])) 478 self.archive.insertData(dbc.programmeCurationHistoryTableName(), row)
479 480 #-------------------------------------------------------------------------- 481
482 - def attributesFromArguments(self, argumentNames, argDict, prefix='', 483 types=None):
484 """Initialize automatically instance variables from __init__ arguments. 485 If types dictionary is given, translate commandline str arguments into 486 correct types. 487 488 @param argumentNames: List of arguments of the class to be initialized. 489 @type argumentNames: list[str] 490 @param argDict: Dictionary of arguments and their values. 491 @type argDict: dict{arg:value} 492 @param prefix: Prefix to be given to the argument names, eg. '_' 493 @type prefix: str 494 @param types: Dictionary of arguments and their types. 495 @type types: dict{arg:type} 496 """ 497 if types: 498 argDict.pop("typeTranslation") 499 for arg in argDict: 500 if arg in types: 501 if type(argDict[arg]) is type(None): 502 argDict[arg] = '' 503 if types[arg] is not type(argDict[arg]): 504 if types[arg] is int: 505 argDict[arg] = int(argDict[arg]) 506 if types[arg] in (list, set, tuple): 507 for row in csv.reader([argDict[arg]]): 508 try: 509 argDict[arg] = types[arg](int(x) 510 for x in row if isinstance(x, str)) 511 except ValueError: 512 argDict[arg] = types[arg](row) 513 if types[arg] is dict: 514 tmpDict = defaultdict() 515 for row in csv.reader([argDict[arg]]): 516 for x in row: 517 tmpDict.update(types[arg]( 518 [re.split('[:=]', x)])) 519 argDict[arg] = tmpDict.copy() 520 521 self = argDict.pop('self') 522 for arg in argumentNames[1:]: 523 setattr(self, prefix + arg, argDict[arg])
524 525 #-------------------------------------------------------------------------- 526
527 - def createFileList(self, fileListPath=None):
528 """ 529 Create a list of FITS files to be ingest into the database from the 530 list supplied in the file at the given path. The list is stored in the 531 L{_fileList} member variable as a pair of image and catalogue file 532 paths and also written to a file by L{writeIngestList()} for the 533 C++ code. 534 535 @param fileListPath: Full path to the file containing the list of FITS 536 files to ingest, as an optional alternative to the 537 command-line supplied option, xferLog. 538 @type fileListPath: str 539 540 """ 541 img = {} 542 cat = {} 543 fixcat = {} 544 for filePath in utils.ParsedFile(fileListPath or self.xferlog): 545 fileDir, fileName = filePath.split(os.sep)[-2:] 546 fileSubPath = os.path.join(fileDir, fileName) 547 # Expected files 548 if fileName.startswith((self.sysc.casuPrefix, 549 self.sysc.wfauPrefix)): 550 # Catalogues 551 catType = self.sysc.catSuffix + self.sysc.catType 552 fixcatType = self.sysc.fixcatSuffix + self.sysc.catType 553 if fileName.endswith(fixcatType): 554 run = fileSubPath.replace(fixcatType, '') 555 fixcat[run] = filePath 556 elif fileName.endswith(catType): 557 run = fileSubPath.replace(catType, '') 558 cat[run] = filePath 559 # Images 560 elif fileName.endswith(self.sysc.mefType) \ 561 and not fileName.endswith("_squish.fit"): 562 run = fileSubPath.replace(self.sysc.mefType, '') 563 img[run] = filePath 564 else: 565 run = os.path.splitext(fileSubPath)[0] 566 img[run] = filePath 567 568 orphanedCats = set(cat) - set(img) 569 if self.sysc.isOSA(): 570 orphanedFixCats = set(fixcat) - set(img) 571 if orphanedFixCats: 572 Logger.addMessage( 573 "<Warning> There are orphaned fixed catalogues in this list" 574 " without corresponding image files for that run: " 575 + ', '.join(orphanedFixCats)) 576 577 else: 578 if orphanedCats: 579 Logger.addMessage( 580 "<Warning> There are orphaned catalogues in this list " 581 "without corresponding image files for that run: " 582 + ', '.join(orphanedCats)) 583 584 #: Set of full paths to date directories of files being ingested. 585 self._dateList = set(os.path.dirname(img[run]) for run in img) 586 catDef = self.sysc.emptyFitsCataloguePathName() 587 if self.sysc.isOSA(): 588 self._fileList = [(img[run], fixcat.get(run, cat.get(run, catDef))) 589 for run in img] 590 else: 591 self._fileList = [(img[run], cat.get(run, catDef)) for run in img] 592 self.writeIngestList()
593 594 #-------------------------------------------------------------------------- 595 596 @staticmethod
597 - def createFilePrefix(cuNum, cuEventID, hostName, database):
598 """Create a file prefix used for the ingest log and the data files. 599 600 @param cuNum: The number of the CU. 601 @type cuNum: int 602 @param cuEventID: The cuEventID of this CU run. 603 @type cuEventID: int 604 @param hostName: The host name where the CU is running. 605 @type hostName: str 606 @param database: The database where the data is ingested to. 607 @type database: str 608 """ 609 if not cuEventID: 610 cuEventID = -9999 611 return "cu%02did%d_%s_%s" % (cuNum, cuEventID, hostName, database)
612 613 #-------------------------------------------------------------------------- 614
615 - def createLogFileName(self, cuEventIDs=None):
616 """ 617 Create the standard log file name for the current cuEventID or the 618 given list of CU event IDs. 619 620 @param cuEventIDs: Optional list of CU eventIDs to record. 621 @type cuEventIDs: list(int) 622 623 @return: A filename for the log. 624 @rtype: str 625 626 """ 627 if cuEventIDs and not None in cuEventIDs: 628 eventIDStr = utils.numberRange(cuEventIDs, sep="_", 629 useTens=self.sysc.isVSA()) 630 elif cuEventIDs is not None: 631 eventIDStr = 'NONE' 632 633 return '%s_cu%sid%s.log' % (self.database.rpartition('.')[2], 634 self.cuNum, self._cuEventID 635 if cuEventIDs is None else eventIDStr)
636 637 #-------------------------------------------------------------------------- 638
639 - def createMonthlyDetSchema(self, schemaName, detTableList, month, 640 ReDo=False):
641 """ Copy original schema to monthly schema. 642 """ 643 schemaMonthFileName = schemaName.replace(".sql", "%s.sql" % month) 644 inFile = File(os.path.join(self.sysc.sqlScriptPath, schemaName)) 645 outFile = File(self.sysc.sqlMonthlyDetPath(schemaMonthFileName, 646 full=True)) 647 monthlyDetFileName = self.sysc.sqlMonthlyDetPath(schemaMonthFileName, 648 full=False) 649 if not outFile.exists() or ReDo: 650 Logger.addMessage("Creating monthly detection schema %s" % 651 monthlyDetFileName) 652 utils.ensureDirExist(self.sysc.sqlMonthlyDetPath()) 653 inFile.ropen() 654 outFile.wopen() 655 for line in inFile.readlines(): 656 for detTableName in detTableList: 657 line = line.replace(detTableName, 658 detTableName + month) 659 outFile.writetheline(line) 660 inFile.close() 661 outFile.close() 662 shutil.copy2(outFile.name, os.path.join(self.dbMntPath, 663 monthlyDetFileName)) 664 return monthlyDetFileName
665 666 #-------------------------------------------------------------------------- 667
668 - def getDetTable(self, detTableList, line):
669 words = line.split() 670 for word in words: 671 for detTable in detTableList: 672 if detTable in word: 673 return detTable 674 return ''
675 676 #-------------------------------------------------------------------------- 677
678 - def createWorkingDir(self):
679 """ 680 Create a temporary working directory. Directories will have a 4 digit 681 extension number for multiple tmpdirs created by one top level script 682 calling different CUs. 683 684 @return: Path to the working directory. 685 @rtype: str 686 687 """ 688 workSpacePath = self.sysc.tempWorkPath() 689 workSpaceDir, _workSpaceBase = os.path.split(workSpacePath) 690 if os.path.exists(workSpacePath): 691 os.rename(workSpacePath, workSpacePath + '.old') 692 Logger.addMessage( 693 "<Warning> curationClientWorkPath %s exists, moved to %s" % 694 (workSpacePath, workSpacePath + '.old')) 695 696 tmpList = sorted(d for d in os.listdir(workSpaceDir) 697 if os.path.isdir(os.path.join(workSpaceDir, d)) 698 and str(os.getpid()) in d and '.' in d and not d.endswith(".old")) 699 700 dirNum = 0 701 for tmpDirName in tmpList: 702 tmpDirNumMax = int(os.path.splitext(tmpList[-1])[1][1:]) 703 if tmpDirNumMax != len(tmpList) - 1: 704 dirNum = 0 705 for tmpDirName in tmpList: 706 newTmpDirName = tmpDirName + ".old" 707 tmpPath = os.path.join(workSpaceDir, tmpDirName) 708 if os.path.exists(tmpPath): 709 newPath = os.path.join(workSpaceDir, newTmpDirName) 710 os.rename(tmpPath, newPath) 711 else: 712 dirNum = tmpDirNumMax + 1 713 714 workSpace = workSpacePath + ".%04u" % dirNum 715 716 os.mkdir(workSpace) 717 return workSpace
718 719 #-------------------------------------------------------------------------- 720
721 - def getProgramIDs(self, fileListPath):
722 """ 723 Determines the full set of programmes for a given list of FITS files. 724 725 @param fileListPath: Full path to the file listing all FITS files 726 for which their programmes should be 727 determined. 728 @type fileListPath: str 729 730 @return: A dictionary of programme IDs referenced by FITS file name, 731 and a dictionary of new project DFS ID strings referenced by 732 FITS file name. 733 @rtype: dict(str:int), dict(str:str) 734 735 """ 736 progOfFile = {} 737 newProgOfFile = {} 738 739 # Go through the list of science files to ingest from the transfer log 740 for fileName in utils.extractColumn(fileListPath, colNum=0): 741 # Only interested in the programme assigned to image files 742 if fileName.endswith(self.sysc.mefType): 743 # Extract list of key-value dictionaries, one for each HDU 744 try: 745 fitsFile = fits.open(fileName) 746 except: 747 Logger.addMessage("Broken file:" + fileName) 748 raise 749 fitsFile.close() 750 751 # Get program ID string and convert into program ID object. 752 try: 753 hduNum = (1 if self.sysc.tileSuffix in fileName else 0) 754 project = [fitsFile[hduNum].header.get(key, 755 "Unknown").strip() 756 for key in self.sysc.projectKeys] 757 758 if project[0] == "Example WFCAM survey": 759 project[0] = "TEST" 760 761 progName, self.knownSurveyTranslations = \ 762 self.translateProgID(project, self._progIDofName, 763 self.sysc.surveyTranslations) 764 765 except TypeError, details: 766 print details 767 Logger.addMessage(fileName + ". Probably not FITS") 768 except KeyError: 769 Logger.addMessage(fileName + ". Can't get program ID") 770 except IndexError: 771 Logger.addMessage(fileName + ". Probably not compressed") 772 else: 773 progID = self._progIDofName.get(progName.upper(), 774 dbc.intDefault()) 775 # Append this to the list of progIDs 776 if progID != dbc.intDefault(): 777 progOfFile[fileName] = (progID, progName) 778 else: 779 if progName[0] == 'n': 780 newProgOfFile[fileName] = \ 781 project[0].partition('(')[0].upper() 782 else: 783 newProgOfFile[fileName] = project[0].upper() 784 785 if newProgOfFile: 786 nplPath = os.path.join(self.sysc.corruptFilesPath(), 787 "nonPIDFiles_%s.dat" 788 % time.strftime("%Y%m%d_%H%M%S", time.gmtime())) 789 790 file(nplPath, 'w').writelines( 791 utils.joinDict(newProgOfFile, ',', '\n')) 792 793 Logger.addMessage("Files that don't have a valid programme ID are " 794 "listed in " + nplPath) 795 796 return progOfFile, newProgOfFile
797 798 #-------------------------------------------------------------------------- 799
800 - def getErrorFileName(self, name):
801 """ 802 Creates a error file name from the given name, including the details of 803 this CU run, e.g. database, cuEventID. 804 805 @param name: Unique name for this type of error file. 806 @type name: str 807 808 @return: Full file name for error file. 809 @rtype: str 810 811 """ 812 return "%s_%s_cu%sid%s_%s.dat" % (self.database.rpartition('.')[2], 813 name, self.cuNum, self._cuEventID, 814 mxTime.utc().strftime("%Y%m%d_%H%M%S"))
815 816 #-------------------------------------------------------------------------- 817
818 - def getMultiframeIDs(self, dateVersStr):
819 """ Get the multiframeIDs from FlatFileLookUp for a given dateVersStr. 820 """ 821 dataDict = {} 822 # check for existing data in FlatFileLookUp 823 for mfID, fileName in self.archive.query("multiframeID, fileName", 824 "FlatFileLookUp", whereStr="DateVersStr=%r" % dateVersStr): 825 if "PixelFileNoLongerAvailable" not in fileName: 826 dataDict[fileName.rpartition(':')[2]] = mfID 827 return dataDict
828 829 #-------------------------------------------------------------------------- 830
831 - def getProcessOrder(self, progIDs, include=['all'], progOrder=[]):
832 """ 833 Process programmes in given order, append/include all others from the 834 list of all programme IDs. 835 836 @param progIDs: List of all (used) programme IDs. 837 @type progIDs: list(int) 838 @param include: List of project names to be processed. 839 @type include: list(str) 840 @param progOrder: Ordered list of programme names, eg. [las,gps] or 841 [las,others,gps]. 842 @type progOrder: list(str) 843 844 @return: Rating enhanced dictionary. 845 @rtype: Utilities.Ratings 846 847 @todo: Will become part of DataFactory.ProgrammeTable when programme 848 translation object moved there. 849 """ 850 procProgs = [] 851 exclProgs = [] 852 for entry in include: 853 # exclude surveys 854 if entry.startswith('x-'): 855 exclProgs.append(entry.lower().replace('x-', '', 1)) 856 else: 857 procProgs.append(entry.lower()) 858 if not procProgs: 859 procProgs = ['all'] 860 861 progOrder = [x.lower() for x in progOrder] 862 try: 863 otherName = \ 864 list(set(["other", "others"]).intersection(progOrder))[0] 865 except IndexError: 866 otherName = '' 867 868 progIDofName = dict( 869 (name.lower().replace("u/ukidss/", ''), progID) 870 for name, progID in self._progIDofName.items()) 871 872 # create progID list of processed programmes 873 if 'all' in procProgs: 874 procProgs = progIDs 875 elif 'ns' in procProgs: 876 procProgs = [pid for pid in progIDs if pid > 10000] 877 elif self.sysc.isWSA(): 878 if 'ukidss' in procProgs: 879 procProgs = [pid for pid in progIDs if 10 < pid <= 105] 880 else: 881 procProgs = [progIDofName[entry] for entry in procProgs 882 if entry in progIDofName \ 883 and progIDofName[entry] in progIDs] 884 elif self.sysc.isVSA() or self.sysc.isOSA(): 885 procProgs = [progIDofName[entry] for entry in procProgs 886 if entry in progIDofName \ 887 and progIDofName[entry] in progIDs] 888 889 if exclProgs: 890 exclProgs = [progIDofName[entry] for entry in exclProgs 891 if entry in progIDofName] 892 procProgs = list(set(procProgs).difference(exclProgs)) 893 894 pidOrder = [] 895 for progName in progOrder: 896 if progName != otherName: 897 for trprName in progIDofName: 898 if progName in trprName.lower() \ 899 and progIDofName[trprName] in procProgs: 900 pidOrder.append(progIDofName[trprName]) 901 902 progOrderRatedDict = utils.Ratings(zip(pidOrder, range(len(pidOrder)))) 903 othersIndex = (progOrder.index(otherName) if otherName else 904 len(progIDofName) + 1) 905 906 nonOrderedProgs = \ 907 set(progIDofName.values()).intersection(procProgs) - set(pidOrder) 908 909 nonOrderDict = {} 910 for progID in nonOrderedProgs: 911 nonOrderDict[progID] = othersIndex - 0.5 912 913 progOrderRatedDict.update(nonOrderDict) 914 Logger.addMessage("Programme IDs to be processed: " 915 + ','.join(repr(n) for n in progOrderRatedDict)) 916 917 return progOrderRatedDict
918 919 #-------------------------------------------------------------------------- 920
921 - def makeSysCmd(self, serverName, cmnd):
922 if serverName != os.getenv('HOST'): 923 return self.sysc.privNetCmd(serverName, cmnd) 924 else: 925 return cmnd
926 927 #-------------------------------------------------------------------------- 928
929 - def removeWorkingDir(self):
930 """ 931 Remove the temporary working directory. 932 """ 933 try: 934 shutil.rmtree(self._workPath) 935 except OSError as error: 936 Logger.addMessage( 937 "<Warning> Unable to remove temporary working directory " 938 "%s: %s" % (self._workPath, error))
939 940 #-------------------------------------------------------------------------- 941
942 - def run(self):
943 """ Run the curation session. """ 944 try: 945 self._onRun() 946 except KeyboardInterrupt as details: 947 # Block future keyboard interrupts until CU has finished cleanly 948 signal.signal(signal.SIGINT, signal.SIG_IGN) 949 self._onException() 950 # ..re-apply just to be safe (onException reverts to default) 951 signal.signal(signal.SIGINT, signal.SIG_IGN) 952 if os.getenv("USER") != "scos": 953 answer = None # Don't accept another Ctrl-C as an answer! 954 while answer is None: 955 try: 956 answer = \ 957 raw_input("Would you like to see trace (yes/no)? >") 958 except KeyboardInterrupt: 959 print('') # Just output a carriage return 960 else: # No user-input for scos runs in case it's a background job 961 answer = 'y' 962 if not answer or 'y' in answer.lower(): 963 Logger.addExceptionDetails(details) 964 else: 965 Logger.addExceptionMessage(details) 966 signal.signal(signal.SIGINT, signal.SIG_DFL) 967 except odbc.DatabaseError as details: 968 Logger.addExceptionMessage(details, eType="SQL or Database Error") 969 self._onException() 970 except IngCuSession.IngCuError as details: 971 Logger.addExceptionMessage(details, eType="Curation Failure") 972 self._onException() 973 except ExtError as details: 974 Logger.addExceptionMessage(details, eType="External-process Error") 975 self._onException() 976 except Exception as details: 977 Logger.addExceptionDetails(details, eType="Programming Error") 978 self._onException()
979 980 #-------------------------------------------------------------------------- 981
982 - def runSysCmd(self, cmd, showErrors=True, isVerbose=False):
983 """ 984 Run a system command. 985 986 @param cmd: The command to run. 987 @type cmd: str 988 @param isVerbose: If True, print error messages. 989 @type isVerbose: bool 990 991 @return: List of lines returned. 992 @rtype: list(str) 993 994 """ 995 proc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) 996 stdOut = [x.rstrip() for x in proc.stdout] 997 if isVerbose: 998 for x in stdOut: 999 print("[%s]> %s" % (str(mxTime.now()), x)) 1000 1001 errors = proc.stderr.readlines() 1002 if showErrors and errors: 1003 print(''.join(errors).rstrip()) 1004 return stdOut, errors
1005 1006 #-------------------------------------------------------------------------- 1007
1008 - def setupDetectionTable(self, surveyName, monthSet, ReDo):
1009 """ 1010 """ 1011 schemaFileName = "%s_%sSchema.sql" % (self.sysc.loadDatabase, 1012 surveyName.lower()) 1013 detTableList = ["%sDetection%s" % (surveyName.lower(), splitTable) 1014 for splitTable in ['Raw', 'Astrometry', 'Photometry']] 1015 # create the monthly schema file 1016 for month in sorted(monthSet): 1017 monthlySchemaFileName = self.createMonthlyDetSchema( 1018 schemaFileName, detTableList, month, ReDo) 1019 1020 # create the monthly table in the DB 1021 try: 1022 self._connectToDb(maxTries=4000) 1023 for detTableName in detTableList: 1024 for month in sorted(monthSet): 1025 theDetectionMonth = schema.parseTables( 1026 monthlySchemaFileName, [detTableName + month])[0] 1027 if not self.archive.existsTable(theDetectionMonth.name): 1028 # create monthly detection table 1029 Logger.addMessage( 1030 "[%d] Creating monthly detection table " 1031 "on %s for %s" % ( 1032 self._cuEventID, self.database, month)) 1033 self.archive.copyTable( 1034 detTableName + "Default", theDetectionMonth, 1035 where="multiframeid=0", 1036 fileGroup="%sDetection_FG" % surveyName.lower(), 1037 attachForeignKeys=False) 1038 self.archive.commitTransaction() 1039 except odbc.ProgrammingError as error: 1040 if "permission was denied" in str(error): 1041 raise SystemExit( 1042 "<ERROR> You do not have write " 1043 "permission on database " + self.database) 1044 finally: 1045 self._disconnectFromDb()
1046 1047 #-------------------------------------------------------------------------- 1048
1049 - def testForProgs(self, obsName):
1050 """ Test if programme is one of the main VISTA surveys. 1051 """ 1052 for prog in self.sysc.scienceProgs: 1053 if prog.upper() in obsName.upper(): 1054 return prog 1055 1056 return None
1057 1058 #-------------------------------------------------------------------------- 1059
1060 - def translateProgID(self, pIDkeys, progTranslation, knownTranslations={}):
1061 """ 1062 Translates a programme ID string into a programme ID number. 1063 1064 @param pIDkeys: List of Programme ID strings. For VISTA the 1065 order is: OBS PROG ID, OBS NAME, DPR CATG 1066 @type pIDkeys: list 1067 @param progTranslation: A dictionary of upper-case programme 1068 identification string keys and programme IDs 1069 values. 1070 @type progTranslation: dict(str:int) 1071 @param knownTranslations: A dictionary of programme identification keys 1072 and dfsIDStrings. 1073 @type knownTranslations: dict(str:int) 1074 1075 @return: Programme ID number. Default int value if unknown programme. 1076 @rtype: int 1077 1078 @todo: Tie this up with L{DataFactory.ProgrammeTable} somehow, so that 1079 progTranslation isn't required. 1080 1081 """ 1082 if self.sysc.isWSA(): 1083 reqProgName = pIDkeys[0].upper() 1084 # Trim off any nonsense from the end of the supplied programme 1085 # name. 1086 if reqProgName.startswith("U/UKIDSS"): 1087 # _SV are special case as they are a superset of other 1088 # programmes. 1089 if "_SV" in reqProgName: 1090 reqProgName = reqProgName[:reqProgName.rfind("_SV") + 3] 1091 else: 1092 for progName in progTranslation: 1093 if (progName.startswith("U/UKIDSS") 1094 and progName in reqProgName): 1095 reqProgName = progName 1096 elif reqProgName.startswith("U/EC/"): 1097 for progName in progTranslation: 1098 if (progName.startswith("U/EC/") 1099 and progName in reqProgName): 1100 reqProgName = progName 1101 # UKIRT Hemisphere Survey 1102 elif reqProgName.startswith("U/UHS/"): 1103 reqProgName = "U/UHS/" 1104 # UKIRTCAL is really just CAL 1105 elif reqProgName == "UKIRTCAL": 1106 reqProgName = "CAL" 1107 else: 1108 reqProgName = '' 1109 # get the programme from the OBS NAME key 1110 progInObsName = self.testForProgs(pIDkeys[1]) 1111 1112 if pIDkeys[0].upper().startswith("60.A-9"): 1113 # special period (Period 60) programmes 1114 if pIDkeys[0].upper() in knownTranslations: 1115 # is programme known? 1116 reqProgName = knownTranslations[pIDkeys[0].upper()] 1117 elif progInObsName: 1118 # is programme name in ESO OBS NAME? 1119 reqProgName = progInObsName 1120 #knownTranslations[pIDkeys[0].upper()] = reqProgName 1121 else: 1122 # define programme by ESO DPR CATG 1123 if pIDkeys[2].upper() in ["CALIB"]: 1124 reqProgName = "CAL" 1125 #knownTranslations[pIDkeys[0].upper()] = reqProgName 1126 else: 1127 reqProgName = "TECHNICAL" 1128 knownTranslations[pIDkeys[0].upper()] = reqProgName 1129 else: 1130 # is Large Programme 1131 if pIDkeys[0].upper().startswith('1'): 1132 if pIDkeys[0].partition('(')[0].upper() \ 1133 in knownTranslations: 1134 # is programme known? 1135 reqProgName = knownTranslations[ 1136 pIDkeys[0].partition('(')[0].upper()] 1137 elif ''.join(pIDkeys[0].partition('.')[:2]).upper() \ 1138 in knownTranslations: 1139 # is programme known? 1140 reqProgName = knownTranslations[ 1141 ''.join(pIDkeys[0].partition('.')[:2]).upper()] 1142 knownTranslations[pIDkeys[0].upper()] = reqProgName 1143 elif progInObsName: 1144 # is programme name in ESO OBS NAME? 1145 reqProgName = progInObsName 1146 #knownTranslations[pIDkeys[0].upper()] = reqProgName 1147 else: 1148 reqProgName = pIDkeys[0].upper() 1149 elif pIDkeys[0].partition('(')[0].upper() in progTranslation: 1150 reqProgName = pIDkeys[0].partition('(')[0].upper() 1151 knownTranslations[pIDkeys[0].partition('(')[0].upper()] = \ 1152 pIDkeys[0].partition('(')[0].upper() 1153 elif pIDkeys[0].upper() in progTranslation: 1154 reqProgName = pIDkeys[0].upper() 1155 knownTranslations[pIDkeys[0].upper()] = pIDkeys[0].upper() 1156 elif not self.sysc.isOSA() and progInObsName: 1157 reqProgName = progInObsName 1158 #knownTranslations[pIDkeys[0].upper()] = reqProgName 1159 elif pIDkeys[0].upper()[:3].isdigit(): 1160 reqProgName = 'n' + pIDkeys[0].partition('(')[0].upper() 1161 else: 1162 if pIDkeys[2].upper() in ["CALIB"]: 1163 reqProgName = "CAL" 1164 else: 1165 reqProgName = pIDkeys[0].upper() 1166 1167 return reqProgName, knownTranslations
1168 1169 #-------------------------------------------------------------------------- 1170
1171 - def writeErrorFiles(self, notIngList):
1172 """ 1173 Writes the list of not ingestable files into a list in the corrupt 1174 directory for further investigation. 1175 1176 @param notIngList: List of files that cannot be ingested. 1177 @type notIngList: list(str) 1178 1179 """ 1180 errorFileName = self.getErrorFileName("notIngFiles") 1181 errorFilePath = os.path.join(self.sysc.corruptFilesPath(), 1182 errorFileName) 1183 file(errorFilePath, 'w').writelines(f + '\n' for f in notIngList) 1184 1185 Logger.addMessage("%s files that are not ingested are listed in %s" % 1186 (len(notIngList), errorFilePath)) 1187 1188 pctNotIng = 100 * len(notIngList) / len(self._fileList) 1189 message = \ 1190 "There are %s%% files not ingested! Check the log files!" % pctNotIng 1191 if pctNotIng > 90: 1192 Logger.addMessage("<ERROR> " + message) 1193 elif pctNotIng > 50: 1194 Logger.addMessage("<Warning> " + message)
1195 1196 #-------------------------------------------------------------------------- 1197
1198 - def writeIngestList(self):
1199 """ 1200 Writes the list of FITS files to ingest to file "filelist.dat" in the 1201 work directory in a 'imagefile catfile num' format for the C++ code. 1202 1203 """ 1204 lines = ('%s %s %s\n' % (imgFile, catFile, i) 1205 for i, (imgFile, catFile) in enumerate(self._fileList)) 1206 file(self._allFileName, 'w').writelines(lines) 1207 Logger.addMessage("Ingest list written to %s" % self._allFileName)
1208 1209 #------------------------------------------------------------------------------ 1210 # Change log: 1211 # 1212 # 30-Nov-2006, ETWS: First version, based on CuSession.py. 1213 # 9-Jan-2007, ETWS: Included type translation of command line arguments. 1214 # 30-Jan-2007, ETWS: Bug fixes. 1215 # 2-Feb-2007, ETWS: Included getProcessOrder, fixed createWorkingDir. 1216 # 5-Feb-2007, ETWS: Moved class Ratings into wsatools.EnhancedDictionary; 1217 # fixed bug in updateProgrammeHistory 1218 # 12-Mar-2007, ETWS: Included an optional transferlog name in createFileList. 1219 # 27-Apr-2007, ETWS: Bug fix. 1220 # 03-May-2007, ETWS: Included date list. 1221 # 15-May-2007, ETWS: Included ArchiveCurationHistory comment update. 1222 # 15-Aug-2007, ETWS: Replaced NoneType with empty string in type translation. 1223 # 27-Aug-2007, ETWS: Enhanced getProcessOrder. 1224 # 6-Sep-2007, ETWS: Included updating of existing ProgrammeCurationHistory 1225 # entries 1226 # 2-Nov-2007, ETWS: Enhancement to take care of any case versions of 'other'. 1227 # 24-Jan-2008, ETWS: Taking care of empty list for createCuEventIDStr. 1228 # 5-Feb-2008, ETWS: Included usage of FlatFileLookUp for better parallel 1229 # performance. 1230 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 1231 # file name setting. 1232 # 7-Apr-2008, ETWS: Upgraded to use new detection table layout. 1233 # 27-May-2008, ETWS: Updated getProcessOrder to only process a given 1234 # programme subset. 1235 # 6-Jun-2008, ETWS: Included PTS in programme translation dictionary. 1236 # 3-Oct-2008, ETWS: Included function to update illunination file names in DB. 1237 # 3-Nov-2008, ETWS: Included possibility to insert data into tables other 1238 # than the actual observed survey. 1239 # 10-Nov-2008, ETWS: Fixed illumFileName. 1240