1   
   2   
   3  """ 
   4     Framework for parallel-running curation sessions. Performs the set of 
   5     actions that CUs 1 to 4 are required to do. These curation tasks should 
   6     inherit from this class. 
   7   
   8     @author: E. Sutorius 
   9     @org:    WFAU, IfA, University of Edinburgh 
  10   
  11     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  12     @contributors: R.S. Collins 
  13  """ 
  14   
  15  from future_builtins import zip 
  16   
  17  import csv 
  18  from   collections import defaultdict 
  19  import inspect 
  20  import mx.DateTime   as mxTime 
  21  import os 
  22  import re 
  23  import shutil 
  24  import signal 
  25  import socket 
  26  from   subprocess  import Popen, PIPE 
  27  import time 
  28   
  29  from   wsatools.CLI                 import CLI 
  30  import wsatools.DbConnect.DbConstants   as dbc 
  31  from   wsatools.DbConnect.DbSession import DbSession, odbc 
  32  from   wsatools.ExternalProcess     import Error as ExtError 
  33  from   wsatools.File                import File 
  34  import wsatools.FitsUtils               as fits 
  35  from   wsatools.Logger              import Logger 
  36  import wsatools.DbConnect.Schema        as schema 
  37  from   wsatools.SystemConstants     import SystemConstants 
  38  import wsatools.Utilities               as utils 
  42      """ A parallel curation session for transfer/ingest CUs 1 to 4. 
  43      """ 
  45          """ A curation error instead of a programming error. """ 
  46          pass 
   47   
  48      beginDateDef = 10000101 
  49      endDateDef = 99991231 
  50       
  51       
  52      reqWorkDir = None 
  53      keepWorkDir = False 
  54   
  55       
  56      _defComment = 'A useful comment'   
  57      _fileList = None          
  58      _isDbLocked = False       
  59      _numSessions = 0          
  60   
  61       
  62      sysc = SystemConstants() 
  63   
  64       
  65      CLI.progArgs += [CLI.Argument('comment', _defComment, isOptional=True)] 
  66      CLI.progOpts += [CLI.Option('c', 'curator', 'name of curator', 'NAME', 
  67                                  os.getenv('USER'))] 
  68   
  69       
  70   
  80          """ 
  81          Initialises data members for all database state flags and details for 
  82          this curation session. Writes a database lock and opens a connection. 
  83          Also initiates a Programme table object. 
  84   
  85          @param cuNum:       Curation task number. 
  86          @type  cuNum:       int 
  87          @param curator:     Name of curator. 
  88          @type  curator:     str 
  89          @param comment:     Descriptive comment as to why curation task is 
  90                              being performed. 
  91          @type  comment:     str 
  92          @param reqWorkDir:  If True, a working directory will be created. 
  93          @type  reqWorkDir:  bool 
  94          @param keepWorkDir: Don't remove working directory. 
  95          @type  keepWorkDir: bool 
  96          @param database:    Name of the database to connect to. 
  97          @type  database:    str 
  98          @param userName:    Optionally override default database username. 
  99          @type  userName:    str 
 100          @param autoCommit:  If True, do not auto-commit transactions. This 
 101                              speeds up transactions through minimal logging. 
 102          @type  autoCommit:  bool 
 103          @param isTrialRun:  If True, do not perform database modifications, 
 104                              just print the SQL statement to the terminal. 
 105          @type  isTrialRun:  bool 
 106   
 107          """ 
 108          self.attributesFromArguments( 
 109              inspect.getargspec(IngCuSession.__init__)[0], locals()) 
 110   
 111           
 112           
 113          IngCuSession.sysc = SystemConstants(database.rpartition('.')[2]) 
 114   
 115           
 116          self._cuEventID = None 
 117          self._cuTable = None 
 118          self.comment = ("Running CU" + str(cuNum) 
 119                          if comment is IngCuSession._defComment else comment) 
 120   
 121          self._lockPathName = \ 
 122              os.path.join(os.getenv('HOME'), "dblock-" + self.database) 
 123          self._logFile = None 
 124          self._rolledBack = None 
 125          self._shareFileID = '' 
 126   
 127           
 128          if self.reqWorkDir: 
 129              self._workPath = self.createWorkingDir() 
 130          else: 
 131              self._workPath = os.curdir 
 132          self._allFileName = os.path.join(self._workPath, 'filelist.dat') 
 133   
 134          self.dbMntPath = IngCuSession.sysc.dbMntPath() 
 135          self._hostName = socket.gethostbyaddr(socket.gethostname())[1][0] 
 136          self._success = dbc.yes() 
  137   
 138       
 139   
 149   
 150       
 151   
 166   
 167       
 168   
 170          """ 
 171          Create the translation dictionary of programme identification strings 
 172          to programme ID numbers for all programmes. It contains associations 
 173          between data flow system (DFS) programme identification strings (ie. as 
 174          supplied in ingest FITS files) and the WSA programme unique 
 175          identification numbers. 
 176   
 177          @todo: This method wouldn't be needed if a ProgrammeTable object always 
 178                 existed and had an equivalent method to peform this translation. 
 179                 Hence, the ProgrammeTable object would replace the dictionary 
 180                 here and there wouldn't be a need to query the database again. 
 181          """ 
 182           
 183          queryResult = \ 
 184            self.archive.query("dfsIDString, programmeID", "Programme") 
 185   
 186          self._progIDofName = \ 
 187            dict((name.upper(), progID) for name, progID in queryResult) 
 188   
 189          if self.sysc.isWSA(): 
 190              self._progIDofName["PTS"] = self._progIDofName["U/06A/52"] 
 191              self._progIDofName["COMM"] = self._progIDofName["U/EC/"] 
 192              self._progIDofName["UHS"] = self._progIDofName["U/UHS/"] 
 193          if self.sysc.isVSA(): 
 194              self._progIDofName["TECH"] = self._progIDofName["TECHNICAL"] 
 195              self._progIDofName["MAINT"] = self._progIDofName["MAINTENANCE"] 
 196          if self.sysc.isOSA(): 
 197              self._progIDofName["TECH"] = self._progIDofName["TECHNICAL"] 
 198   
 199          self._progNameOfID = dict(zip(self._progIDofName.itervalues(), 
 200                                        self._progIDofName.iterkeys())) 
 201          if self.sysc.isWSA(): 
 202              self._progNameOfID[self._progIDofName["U/06A/52"]] = "PTS" 
 203              self._progNameOfID[self._progIDofName["U/EC/"]] = "COMM" 
 204              self._progNameOfID[self._progIDofName["U/UHS/"]] = "UHS" 
 205          if self.sysc.isVSA(): 
 206              self._progNameOfID[self._progIDofName["TECHNICAL"]] = "TECH" 
 207              self._progNameOfID[self._progIDofName["MAINTENANCE"]] = "MAINT" 
 208          if self.sysc.isOSA(): 
 209              self._progNameOfID[self._progIDofName["TECHNICAL"]] = "TECH" 
 210   
 211          Logger.addMessage("Created the translation dictionary") 
  212   
 213       
 214   
 239   
 240       
 241   
 243          """ 
 244          Obtain the next available cuEventID. Whilst putting a placeholder row 
 245          into the ArchiveCurationHistory table to reserve this cuEventID, 
 246          preventing it from being hijacked by a parallel process. 
 247   
 248          """ 
 249          cuEventID = self._getNextID(dbc.curationEventUIDAttributeName(), 
 250                                      dbc.archiveCurationHistoryTableName()) 
 251   
 252          if self.sysc.isVSA(): 
 253              dbID = 2 if self.archive.database == "VSAVVV" else 1 
 254              cuEventID = 10 * (cuEventID // 10 + 1) + dbID 
 255   
 256           
 257          try: 
 258              self.archive.insertData(dbc.archiveCurationHistoryTableName(), 
 259                [cuEventID, self.cuNum, dbc.charDefault(), dbc.charDefault(), 
 260                 utils.makeMssqlTimeStamp(), self.curator, 
 261                 "in progress: " + self.comment, dbc.yes()], 
 262                enforcePKC=True) 
 263              self.archive.commitTransaction() 
 264          except odbc.ProgrammingError as error: 
 265              if "permission was denied" in str(error): 
 266                  raise SystemExit("<ERROR> You do not have write " 
 267                                   "permission on database " + self.database) 
 268              else: 
 269                  raise 
 270   
 271          return cuEventID 
  272   
 273       
 274   
 275 -    def _getNextID(self, attrName, tableName, where=''): 
  276          """ 
 277          Obtain next available ID number in the db for a given ID attribute. 
 278   
 279          @param attrName:  Name of ID attribute. 
 280          @type  attrName:  str 
 281          @param tableName: Name of the table where attribute resides. 
 282          @type  tableName: str 
 283          @param where:     Optional SQL WHERE clause. 
 284          @type  where:     str 
 285   
 286          @return: Next available ID number. 
 287          @rtype:  int 
 288   
 289          """ 
 290          maxID = self.archive.queryAttrMax(attrName, tableName, where) 
 291   
 292          if maxID and maxID > 0: 
 293              return maxID + 1 
 294          else:   
 295              return 1 
  296   
 297       
 298   
 300          """ 
 301          Prefix for the comment in the ArchiveCurationHistory table. 
 302   
 303          @param success:    The success status of the CU. 
 304          @type  success:    int 
 305          @param rolledBack: Flag to indicate ingest roll back. 
 306          @type  rolledBack: int 
 307   
 308          @return: Prefix for the comment in the ArchiveCurationHistory table. 
 309          @rtype:  str 
 310   
 311          """ 
 312          if not success and rolledBack: 
 313              return "failed: " 
 314          elif success and rolledBack: 
 315              return "processed: " 
 316          elif success and not rolledBack: 
 317              return "ingested: " 
  318   
 319       
 320   
 322          """ Tidy up run tasks specific to this CU following an exception. 
 323          """ 
 324           
 325          signal.signal(signal.SIGINT, signal.SIG_IGN) 
 326          self._success = dbc.no() 
 327          try: 
 328              self.archive.rollbackTransaction() 
 329          except AttributeError: 
 330               
 331              Logger.addMessage( 
 332                "No database connection, so nothing to rollback.") 
 333          else: 
 334              self._rolledBack = dbc.yes() 
 335           
 336          signal.signal(signal.SIGINT, signal.SIG_DFL) 
  337   
 338       
 339   
 341          """ Performs all the tasks that define this curation session. """ 
 342          pass 
  343   
 344       
 345   
 346      @staticmethod 
 348          """ 
 349          Parse the given list of CU strings into a list of cuNum, cuEventID 
 350          and serverName. 
 351   
 352          @return: List of tuples characterising the CU events to be ingested. 
 353          @rtype:  list 
 354   
 355          """ 
 356          cnList = [] 
 357          for entry in cus: 
 358              try: 
 359                  cuNum, idservdate = entry.partition('-')[::2] 
 360              except AttributeError: 
 361                  cuNum = entry 
 362                  idservdate = '--' 
 363              searchTerms = (idservdate.split('-')+ ['', '', ''])[:3] 
 364              evtId = '' 
 365              serv = '' 
 366              date = '' 
 367              for x in searchTerms: 
 368                  if x.isalpha(): 
 369                      serv = x 
 370                  elif x.isdigit() and x.startswith("20") and len(x) == 6 \ 
 371                           and int(x[4:]) in range(1,13): 
 372                      date = x 
 373                  elif x.isdigit(): 
 374                      evtId = x 
 375              cnList.append(("cu%02d" % int(cuNum), "id" + evtId, serv, date)) 
 376          return cnList 
  377   
 378       
 379   
 381          """ 
 382          Writes database lock. If lock already present then pause and wait for 
 383          database to unlock. 
 384   
 385          """ 
 386          numTries = 0 
 387          midTries = 100 
 388          delay = 2 
 389          taskIdStr = ("[%d] " % self._cuEventID if self._cuEventID else '') 
 390          while os.path.exists(self._lockPathName): 
 391              time.sleep(delay) 
 392              numTries += 1 
 393              if numTries == 1: 
 394                  print("%sA lock file exists: %s" % (taskIdStr, 
 395                                                       self._lockPathName)) 
 396                  print("End the task that owns the lock and/or manually " 
 397                        "delete the lock file and this curation session will " 
 398                        "automatically start.") 
 399              elif maxTries < 1000 and numTries > maxTries: 
 400                  raise Exception("%sTimed out in accessing the" 
 401                                  " database" % taskIdStr) 
 402              elif maxTries >= 1000 and numTries >= midTries: 
 403                  delay = min(120, 10*int(1 + (numTries - midTries)/10)) 
 404                  Logger.addMessage("%sWaiting %d s..." % (taskIdStr, delay)) 
 405              elif numTries > 15000: 
 406                  raise Exception("%sTimed out after %d s in accessing the" 
 407                                  " database" % (taskIdStr, numTries)) 
 408   
 409          file(self._lockPathName, 'w').write("DB locked by CU%s. PID = %s.%s" % 
 410            (self.cuNum, os.getenv('HOST'), os.getpid())) 
 411   
 412          IngCuSession._isDbLocked = True 
  413   
 414       
 415   
 417          """ 
 418          Adds an entry for this curation session to the ArchiveCurationHistory 
 419          database table. 
 420   
 421          @param row: List of entries for the ArchiveCurationHistory table. 
 422          @type  row: list 
 423   
 424          """ 
 425          Logger.addMessage("Updating archive curation history with this row:") 
 426          Logger.addMessage(repr(row)) 
 427   
 428           
 429           
 430          self.archive.delete(dbc.archiveCurationHistoryTableName(), 
 431                              whereStr="cuEventID=%s" % row[0]) 
 432          self.archive.insertData(dbc.archiveCurationHistoryTableName(), row) 
  433   
 434       
 435   
 459   
 460       
 461   
 463          """ 
 464          Adds an entry for this curation session to the ProgrammeCurationHistory 
 465          database table. 
 466   
 467          @param row: List of entries for the ProgrammeCurationHistory table. 
 468          @type  row: list 
 469   
 470          """ 
 471          Logger.addMessage("Updating programme curation history with this row:") 
 472          Logger.addMessage(repr(row)) 
 473   
 474           
 475           
 476          self.archive.delete(dbc.programmeCurationHistoryTableName(), 
 477            whereStr="programmeID=%s AND cuEventID=%s" % (row[0], row[1])) 
 478          self.archive.insertData(dbc.programmeCurationHistoryTableName(), row) 
  479   
 480       
 481   
 484          """Initialize automatically instance variables from __init__ arguments. 
 485          If types dictionary is given, translate commandline str arguments into 
 486          correct types. 
 487   
 488          @param argumentNames: List of arguments of the class to be initialized. 
 489          @type  argumentNames: list[str] 
 490          @param argDict:       Dictionary of arguments and their values. 
 491          @type  argDict:       dict{arg:value} 
 492          @param prefix:        Prefix to be given to the argument names, eg. '_' 
 493          @type  prefix:        str 
 494          @param types:         Dictionary of arguments and their types. 
 495          @type  types:         dict{arg:type} 
 496          """ 
 497          if types: 
 498              argDict.pop("typeTranslation") 
 499              for arg in argDict: 
 500                  if arg in types: 
 501                      if type(argDict[arg]) is type(None): 
 502                          argDict[arg] = '' 
 503                      if types[arg] is not type(argDict[arg]): 
 504                          if types[arg] is int: 
 505                              argDict[arg] = int(argDict[arg]) 
 506                          if types[arg] in (list, set, tuple): 
 507                              for row in csv.reader([argDict[arg]]): 
 508                                  try: 
 509                                      argDict[arg] = types[arg](int(x) 
 510                                        for x in row if isinstance(x, str)) 
 511                                  except ValueError: 
 512                                      argDict[arg] = types[arg](row) 
 513                          if types[arg] is dict: 
 514                              tmpDict = defaultdict() 
 515                              for row in csv.reader([argDict[arg]]): 
 516                                  for x in row: 
 517                                      tmpDict.update(types[arg]( 
 518                                          [re.split('[:=]', x)])) 
 519                              argDict[arg] = tmpDict.copy() 
 520   
 521          self = argDict.pop('self') 
 522          for arg in argumentNames[1:]: 
 523              setattr(self, prefix + arg, argDict[arg]) 
  524   
 525       
 526   
 528          """ 
 529          Create a list of FITS files to be ingest into the database from the 
 530          list supplied in the file at the given path. The list is stored in the 
 531          L{_fileList} member variable as a pair of image and catalogue file 
 532          paths and also written to a file by L{writeIngestList()} for the 
 533          C++ code. 
 534   
 535          @param fileListPath: Full path to the file containing the list of FITS 
 536                               files to ingest, as an optional alternative to the 
 537                               command-line supplied option, xferLog. 
 538          @type  fileListPath: str 
 539   
 540          """ 
 541          img = {} 
 542          cat = {} 
 543          fixcat = {} 
 544          for filePath in utils.ParsedFile(fileListPath or self.xferlog): 
 545              fileDir, fileName = filePath.split(os.sep)[-2:] 
 546              fileSubPath = os.path.join(fileDir, fileName) 
 547               
 548              if fileName.startswith((self.sysc.casuPrefix, 
 549                                      self.sysc.wfauPrefix)): 
 550                   
 551                  catType = self.sysc.catSuffix + self.sysc.catType 
 552                  fixcatType = self.sysc.fixcatSuffix + self.sysc.catType 
 553                  if fileName.endswith(fixcatType): 
 554                      run = fileSubPath.replace(fixcatType, '') 
 555                      fixcat[run] = filePath 
 556                  elif fileName.endswith(catType): 
 557                      run = fileSubPath.replace(catType, '') 
 558                      cat[run] = filePath 
 559                   
 560                  elif fileName.endswith(self.sysc.mefType) \ 
 561                    and not fileName.endswith("_squish.fit"): 
 562                      run = fileSubPath.replace(self.sysc.mefType, '') 
 563                      img[run] = filePath 
 564              else: 
 565                  run = os.path.splitext(fileSubPath)[0] 
 566                  img[run] = filePath 
 567   
 568          orphanedCats = set(cat) - set(img) 
 569          if self.sysc.isOSA(): 
 570              orphanedFixCats = set(fixcat) - set(img) 
 571              if orphanedFixCats: 
 572                  Logger.addMessage( 
 573                      "<Warning> There are orphaned fixed catalogues in this list" 
 574                      " without corresponding image files for that run: " 
 575                      + ', '.join(orphanedFixCats)) 
 576   
 577          else: 
 578              if orphanedCats: 
 579                  Logger.addMessage( 
 580                      "<Warning> There are orphaned catalogues in this list " 
 581                      "without corresponding image files for that run: " 
 582                      + ', '.join(orphanedCats)) 
 583   
 584           
 585          self._dateList = set(os.path.dirname(img[run]) for run in img) 
 586          catDef = self.sysc.emptyFitsCataloguePathName() 
 587          if self.sysc.isOSA(): 
 588              self._fileList = [(img[run], fixcat.get(run, cat.get(run, catDef))) 
 589                                for run in img] 
 590          else: 
 591              self._fileList = [(img[run], cat.get(run, catDef)) for run in img] 
 592          self.writeIngestList() 
  593   
 594       
 595   
 596      @staticmethod 
 598          """Create a file prefix used for the ingest log and the data files. 
 599   
 600          @param cuNum:     The number of the CU. 
 601          @type  cuNum:     int 
 602          @param cuEventID: The cuEventID of this CU run. 
 603          @type  cuEventID: int 
 604          @param hostName:  The host name where the CU is running. 
 605          @type  hostName:  str 
 606          @param database:  The database where the data is ingested to. 
 607          @type  database:  str 
 608          """ 
 609          if not cuEventID: 
 610              cuEventID = -9999 
 611          return "cu%02did%d_%s_%s" % (cuNum, cuEventID, hostName, database) 
  612   
 613       
 614   
 616          """ 
 617          Create the standard log file name for the current cuEventID or the 
 618          given list of CU event IDs. 
 619   
 620          @param cuEventIDs: Optional list of CU eventIDs to record. 
 621          @type  cuEventIDs: list(int) 
 622   
 623          @return: A filename for the log. 
 624          @rtype:  str 
 625   
 626          """ 
 627          if cuEventIDs and not None in cuEventIDs: 
 628              eventIDStr = utils.numberRange(cuEventIDs, sep="_", 
 629                                             useTens=self.sysc.isVSA()) 
 630          elif cuEventIDs is not None: 
 631              eventIDStr = 'NONE' 
 632   
 633          return '%s_cu%sid%s.log' % (self.database.rpartition('.')[2], 
 634                                      self.cuNum, self._cuEventID 
 635                                      if cuEventIDs is None else eventIDStr) 
  636   
 637       
 638   
 665   
 666       
 667   
 669          words = line.split() 
 670          for word in words: 
 671              for detTable in detTableList: 
 672                  if detTable in word: 
 673                      return detTable 
 674          return '' 
  675   
 676       
 677   
 679          """ 
 680          Create a temporary working directory. Directories will have a 4 digit 
 681          extension number for multiple tmpdirs created by one top level script 
 682          calling different CUs. 
 683   
 684          @return: Path to the working directory. 
 685          @rtype:  str 
 686   
 687          """ 
 688          workSpacePath = self.sysc.tempWorkPath() 
 689          workSpaceDir, _workSpaceBase = os.path.split(workSpacePath) 
 690          if os.path.exists(workSpacePath): 
 691              os.rename(workSpacePath, workSpacePath + '.old') 
 692              Logger.addMessage( 
 693                "<Warning> curationClientWorkPath %s exists, moved to %s" % 
 694                (workSpacePath, workSpacePath + '.old')) 
 695   
 696          tmpList = sorted(d for d in os.listdir(workSpaceDir) 
 697            if os.path.isdir(os.path.join(workSpaceDir, d)) 
 698              and str(os.getpid()) in d and '.' in d and not d.endswith(".old")) 
 699   
 700          dirNum = 0 
 701          for tmpDirName in tmpList: 
 702              tmpDirNumMax = int(os.path.splitext(tmpList[-1])[1][1:]) 
 703              if tmpDirNumMax != len(tmpList) - 1: 
 704                  dirNum = 0 
 705                  for tmpDirName in tmpList: 
 706                      newTmpDirName = tmpDirName + ".old" 
 707                      tmpPath = os.path.join(workSpaceDir, tmpDirName) 
 708                      if os.path.exists(tmpPath): 
 709                          newPath = os.path.join(workSpaceDir, newTmpDirName) 
 710                          os.rename(tmpPath, newPath) 
 711              else: 
 712                  dirNum = tmpDirNumMax + 1 
 713   
 714          workSpace = workSpacePath + ".%04u" % dirNum 
 715   
 716          os.mkdir(workSpace) 
 717          return workSpace 
  718   
 719       
 720   
 722          """ 
 723          Determines the full set of programmes for a given list of FITS files. 
 724   
 725          @param fileListPath:    Full path to the file listing all FITS files 
 726                                  for which their programmes should be 
 727                                  determined. 
 728          @type  fileListPath:    str 
 729   
 730          @return: A dictionary of programme IDs referenced by FITS file name, 
 731                   and a dictionary of new project DFS ID strings referenced by 
 732                   FITS file name. 
 733          @rtype:  dict(str:int), dict(str:str) 
 734   
 735          """ 
 736          progOfFile = {} 
 737          newProgOfFile = {} 
 738   
 739           
 740          for fileName in utils.extractColumn(fileListPath, colNum=0): 
 741               
 742              if fileName.endswith(self.sysc.mefType): 
 743                   
 744                  try: 
 745                      fitsFile = fits.open(fileName) 
 746                  except: 
 747                      Logger.addMessage("Broken file:" + fileName) 
 748                      raise 
 749                  fitsFile.close() 
 750   
 751                   
 752                  try: 
 753                      hduNum = (1 if self.sysc.tileSuffix in fileName else 0) 
 754                      project = [fitsFile[hduNum].header.get(key, 
 755                                                             "Unknown").strip() 
 756                                 for key in self.sysc.projectKeys] 
 757   
 758                      if project[0] == "Example WFCAM survey": 
 759                          project[0] = "TEST" 
 760   
 761                      progName, self.knownSurveyTranslations = \ 
 762                        self.translateProgID(project, self._progIDofName, 
 763                                             self.sysc.surveyTranslations) 
 764   
 765                  except TypeError, details: 
 766                      print details 
 767                      Logger.addMessage(fileName + ". Probably not FITS") 
 768                  except KeyError: 
 769                      Logger.addMessage(fileName + ". Can't get program ID") 
 770                  except IndexError: 
 771                      Logger.addMessage(fileName + ". Probably not compressed") 
 772                  else: 
 773                      progID = self._progIDofName.get(progName.upper(), 
 774                                                      dbc.intDefault()) 
 775                       
 776                      if progID != dbc.intDefault(): 
 777                          progOfFile[fileName] = (progID, progName) 
 778                      else: 
 779                          if progName[0] == 'n': 
 780                              newProgOfFile[fileName] = \ 
 781                                          project[0].partition('(')[0].upper() 
 782                          else: 
 783                              newProgOfFile[fileName] = project[0].upper() 
 784   
 785          if newProgOfFile: 
 786              nplPath = os.path.join(self.sysc.corruptFilesPath(), 
 787                "nonPIDFiles_%s.dat" 
 788                % time.strftime("%Y%m%d_%H%M%S", time.gmtime())) 
 789   
 790              file(nplPath, 'w').writelines( 
 791                utils.joinDict(newProgOfFile, ',', '\n')) 
 792   
 793              Logger.addMessage("Files that don't have a valid programme ID are " 
 794                                "listed in " + nplPath) 
 795   
 796          return progOfFile, newProgOfFile 
  797   
 798       
 799   
 801          """ 
 802          Creates a error file name from the given name, including the details of 
 803          this CU run, e.g. database, cuEventID. 
 804   
 805          @param name: Unique name for this type of error file. 
 806          @type  name: str 
 807   
 808          @return: Full file name for error file. 
 809          @rtype:  str 
 810   
 811          """ 
 812          return "%s_%s_cu%sid%s_%s.dat" % (self.database.rpartition('.')[2], 
 813            name, self.cuNum, self._cuEventID, 
 814            mxTime.utc().strftime("%Y%m%d_%H%M%S")) 
  815   
 816       
 817   
 819          """ Get the multiframeIDs from FlatFileLookUp for a given dateVersStr. 
 820          """ 
 821          dataDict = {} 
 822           
 823          for mfID, fileName in self.archive.query("multiframeID, fileName", 
 824            "FlatFileLookUp", whereStr="DateVersStr=%r" % dateVersStr): 
 825              if "PixelFileNoLongerAvailable" not in fileName: 
 826                  dataDict[fileName.rpartition(':')[2]] = mfID 
 827          return dataDict 
  828   
 829       
 830   
 832          """ 
 833          Process programmes in given order, append/include all others from the 
 834          list of all programme IDs. 
 835   
 836          @param progIDs:   List of all (used) programme IDs. 
 837          @type  progIDs:   list(int) 
 838          @param include:   List of project names to be processed. 
 839          @type  include:   list(str) 
 840          @param progOrder: Ordered list of programme names, eg. [las,gps] or 
 841                            [las,others,gps]. 
 842          @type  progOrder: list(str) 
 843   
 844          @return: Rating enhanced dictionary. 
 845          @rtype:  Utilities.Ratings 
 846   
 847          @todo: Will become part of DataFactory.ProgrammeTable when programme 
 848                 translation object moved there. 
 849          """ 
 850          procProgs = [] 
 851          exclProgs = [] 
 852          for entry in include: 
 853               
 854              if entry.startswith('x-'): 
 855                  exclProgs.append(entry.lower().replace('x-', '', 1)) 
 856              else: 
 857                  procProgs.append(entry.lower()) 
 858          if not procProgs: 
 859              procProgs = ['all'] 
 860   
 861          progOrder = [x.lower() for x in progOrder] 
 862          try: 
 863              otherName = \ 
 864                list(set(["other", "others"]).intersection(progOrder))[0] 
 865          except IndexError: 
 866              otherName = '' 
 867   
 868          progIDofName = dict( 
 869            (name.lower().replace("u/ukidss/", ''), progID) 
 870            for name, progID in self._progIDofName.items()) 
 871   
 872           
 873          if 'all' in procProgs: 
 874              procProgs = progIDs 
 875          elif 'ns' in  procProgs: 
 876              procProgs = [pid for pid in progIDs if pid > 10000] 
 877          elif self.sysc.isWSA(): 
 878              if 'ukidss' in procProgs: 
 879                  procProgs = [pid for pid in progIDs if 10 < pid <= 105] 
 880              else: 
 881                  procProgs = [progIDofName[entry] for entry in procProgs 
 882                               if entry in progIDofName \ 
 883                               and progIDofName[entry] in progIDs] 
 884          elif self.sysc.isVSA() or self.sysc.isOSA(): 
 885              procProgs = [progIDofName[entry] for entry in procProgs 
 886                           if entry in progIDofName \ 
 887                           and progIDofName[entry] in progIDs] 
 888   
 889          if exclProgs: 
 890              exclProgs = [progIDofName[entry] for entry in exclProgs 
 891                                                if entry in progIDofName] 
 892              procProgs = list(set(procProgs).difference(exclProgs)) 
 893   
 894          pidOrder = [] 
 895          for progName in progOrder: 
 896              if progName != otherName: 
 897                  for trprName in progIDofName: 
 898                      if progName in trprName.lower() \ 
 899                        and progIDofName[trprName] in procProgs: 
 900                          pidOrder.append(progIDofName[trprName]) 
 901   
 902          progOrderRatedDict = utils.Ratings(zip(pidOrder, range(len(pidOrder)))) 
 903          othersIndex = (progOrder.index(otherName) if otherName else 
 904                         len(progIDofName) + 1) 
 905   
 906          nonOrderedProgs = \ 
 907            set(progIDofName.values()).intersection(procProgs) - set(pidOrder) 
 908   
 909          nonOrderDict = {} 
 910          for progID in nonOrderedProgs: 
 911              nonOrderDict[progID] = othersIndex - 0.5 
 912   
 913          progOrderRatedDict.update(nonOrderDict) 
 914          Logger.addMessage("Programme IDs to be processed: " 
 915                            + ','.join(repr(n) for n in progOrderRatedDict)) 
 916   
 917          return progOrderRatedDict 
  918   
 919       
 920   
 922          if serverName != os.getenv('HOST'): 
 923              return self.sysc.privNetCmd(serverName, cmnd) 
 924          else: 
 925              return cmnd 
  926   
 927       
 928   
 930          """ 
 931          Remove the temporary working directory. 
 932          """ 
 933          try: 
 934              shutil.rmtree(self._workPath) 
 935          except OSError as error: 
 936              Logger.addMessage( 
 937                  "<Warning> Unable to remove temporary working directory " 
 938                  "%s: %s" % (self._workPath, error)) 
  939   
 940       
 941   
 979   
 980       
 981   
 982 -    def runSysCmd(self, cmd, showErrors=True, isVerbose=False): 
  983          """ 
 984          Run a system command. 
 985   
 986          @param cmd:       The command to run. 
 987          @type  cmd:       str 
 988          @param isVerbose: If True, print error messages. 
 989          @type  isVerbose: bool 
 990   
 991          @return: List of lines returned. 
 992          @rtype:  list(str) 
 993   
 994          """ 
 995          proc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) 
 996          stdOut = [x.rstrip() for x in proc.stdout] 
 997          if isVerbose: 
 998              for x in stdOut: 
 999                  print("[%s]> %s" % (str(mxTime.now()), x)) 
1000   
1001          errors = proc.stderr.readlines() 
1002          if showErrors and errors: 
1003              print(''.join(errors).rstrip()) 
1004          return stdOut, errors 
 1005   
1006       
1007   
1009          """ 
1010          """ 
1011          schemaFileName = "%s_%sSchema.sql" % (self.sysc.loadDatabase, 
1012                                                surveyName.lower()) 
1013          detTableList = ["%sDetection%s" % (surveyName.lower(), splitTable) 
1014                          for splitTable in ['Raw', 'Astrometry', 'Photometry']] 
1015           
1016          for month in sorted(monthSet): 
1017              monthlySchemaFileName = self.createMonthlyDetSchema( 
1018                  schemaFileName, detTableList, month, ReDo) 
1019   
1020           
1021          try: 
1022              self._connectToDb(maxTries=4000) 
1023              for detTableName in detTableList: 
1024                  for month in sorted(monthSet): 
1025                      theDetectionMonth = schema.parseTables( 
1026                          monthlySchemaFileName, [detTableName + month])[0] 
1027                      if not self.archive.existsTable(theDetectionMonth.name): 
1028                           
1029                          Logger.addMessage( 
1030                              "[%d] Creating monthly detection table " 
1031                              "on %s for %s" % ( 
1032                                  self._cuEventID, self.database, month)) 
1033                          self.archive.copyTable( 
1034                              detTableName + "Default", theDetectionMonth, 
1035                              where="multiframeid=0", 
1036                              fileGroup="%sDetection_FG" % surveyName.lower(), 
1037                              attachForeignKeys=False) 
1038                          self.archive.commitTransaction() 
1039          except odbc.ProgrammingError as error: 
1040              if "permission was denied" in str(error): 
1041                  raise SystemExit( 
1042                      "<ERROR> You do not have write " 
1043                      "permission on database " + self.database) 
1044          finally: 
1045              self._disconnectFromDb() 
 1046   
1047       
1048   
1050          """ Test if programme is one of the main VISTA surveys. 
1051          """ 
1052          for prog in self.sysc.scienceProgs: 
1053              if prog.upper() in obsName.upper(): 
1054                  return prog 
1055   
1056          return None 
 1057   
1058       
1059   
1060 -    def translateProgID(self, pIDkeys, progTranslation, knownTranslations={}): 
 1061          """ 
1062          Translates a programme ID string into a programme ID number. 
1063   
1064          @param pIDkeys:           List of Programme ID strings. For VISTA the 
1065                                    order is: OBS PROG ID, OBS NAME, DPR CATG 
1066          @type  pIDkeys:           list 
1067          @param progTranslation:   A dictionary of upper-case programme 
1068                                    identification string keys and programme IDs 
1069                                    values. 
1070          @type  progTranslation:   dict(str:int) 
1071          @param knownTranslations: A dictionary of programme identification keys 
1072                                    and dfsIDStrings. 
1073          @type  knownTranslations: dict(str:int) 
1074   
1075          @return: Programme ID number. Default int value if unknown programme. 
1076          @rtype:  int 
1077   
1078          @todo: Tie this up with L{DataFactory.ProgrammeTable} somehow, so that 
1079                 progTranslation isn't required. 
1080   
1081          """ 
1082          if self.sysc.isWSA(): 
1083              reqProgName = pIDkeys[0].upper() 
1084               
1085               
1086              if reqProgName.startswith("U/UKIDSS"): 
1087                   
1088                   
1089                  if "_SV" in reqProgName: 
1090                      reqProgName = reqProgName[:reqProgName.rfind("_SV") + 3] 
1091                  else: 
1092                      for progName in progTranslation: 
1093                          if (progName.startswith("U/UKIDSS") 
1094                              and progName in reqProgName): 
1095                              reqProgName = progName 
1096              elif reqProgName.startswith("U/EC/"): 
1097                  for progName in progTranslation: 
1098                      if (progName.startswith("U/EC/") 
1099                          and progName in reqProgName): 
1100                          reqProgName = progName 
1101               
1102              elif reqProgName.startswith("U/UHS/"): 
1103                  reqProgName = "U/UHS/" 
1104               
1105              elif reqProgName == "UKIRTCAL": 
1106                  reqProgName = "CAL" 
1107          else: 
1108              reqProgName = '' 
1109               
1110              progInObsName = self.testForProgs(pIDkeys[1]) 
1111   
1112              if pIDkeys[0].upper().startswith("60.A-9"): 
1113                   
1114                  if pIDkeys[0].upper() in knownTranslations: 
1115                       
1116                      reqProgName = knownTranslations[pIDkeys[0].upper()] 
1117                  elif progInObsName: 
1118                       
1119                      reqProgName = progInObsName 
1120                       
1121                  else: 
1122                       
1123                      if pIDkeys[2].upper() in ["CALIB"]: 
1124                          reqProgName = "CAL" 
1125                           
1126                      else: 
1127                          reqProgName = "TECHNICAL" 
1128                      knownTranslations[pIDkeys[0].upper()] = reqProgName 
1129              else: 
1130                   
1131                  if pIDkeys[0].upper().startswith('1'): 
1132                      if pIDkeys[0].partition('(')[0].upper() \ 
1133                             in knownTranslations: 
1134                           
1135                          reqProgName = knownTranslations[ 
1136                              pIDkeys[0].partition('(')[0].upper()] 
1137                      elif ''.join(pIDkeys[0].partition('.')[:2]).upper() \ 
1138                             in knownTranslations: 
1139                           
1140                          reqProgName = knownTranslations[ 
1141                              ''.join(pIDkeys[0].partition('.')[:2]).upper()] 
1142                          knownTranslations[pIDkeys[0].upper()] = reqProgName 
1143                      elif progInObsName: 
1144                           
1145                          reqProgName = progInObsName 
1146                           
1147                      else: 
1148                          reqProgName = pIDkeys[0].upper() 
1149                  elif pIDkeys[0].partition('(')[0].upper() in progTranslation: 
1150                      reqProgName = pIDkeys[0].partition('(')[0].upper() 
1151                      knownTranslations[pIDkeys[0].partition('(')[0].upper()] = \ 
1152                                          pIDkeys[0].partition('(')[0].upper() 
1153                  elif pIDkeys[0].upper() in progTranslation: 
1154                      reqProgName = pIDkeys[0].upper() 
1155                      knownTranslations[pIDkeys[0].upper()] = pIDkeys[0].upper() 
1156                  elif not self.sysc.isOSA() and progInObsName: 
1157                      reqProgName = progInObsName 
1158                       
1159                  elif pIDkeys[0].upper()[:3].isdigit(): 
1160                      reqProgName = 'n' + pIDkeys[0].partition('(')[0].upper() 
1161                  else: 
1162                      if pIDkeys[2].upper() in ["CALIB"]: 
1163                          reqProgName = "CAL" 
1164                      else: 
1165                          reqProgName = pIDkeys[0].upper() 
1166   
1167          return reqProgName, knownTranslations 
 1168   
1169       
1170   
1172          """ 
1173          Writes the list of not ingestable files into a list in the corrupt 
1174          directory for further investigation. 
1175   
1176          @param notIngList: List of files that cannot be ingested. 
1177          @type  notIngList: list(str) 
1178   
1179          """ 
1180          errorFileName = self.getErrorFileName("notIngFiles") 
1181          errorFilePath = os.path.join(self.sysc.corruptFilesPath(), 
1182                                       errorFileName) 
1183          file(errorFilePath, 'w').writelines(f + '\n' for f in notIngList) 
1184   
1185          Logger.addMessage("%s files that are not ingested are listed in %s" % 
1186            (len(notIngList), errorFilePath)) 
1187   
1188          pctNotIng = 100 * len(notIngList) / len(self._fileList) 
1189          message = \ 
1190            "There are %s%% files not ingested! Check the log files!" % pctNotIng 
1191          if pctNotIng > 90: 
1192              Logger.addMessage("<ERROR> " + message) 
1193          elif pctNotIng > 50: 
1194              Logger.addMessage("<Warning> " + message) 
 1195   
1196       
1197   
1199          """ 
1200          Writes the list of FITS files to ingest to file "filelist.dat" in the 
1201          work directory in a 'imagefile catfile num' format for the C++ code. 
1202   
1203          """ 
1204          lines = ('%s %s %s\n' % (imgFile, catFile, i) 
1205                   for i, (imgFile, catFile) in enumerate(self._fileList)) 
1206          file(self._allFileName, 'w').writelines(lines) 
1207          Logger.addMessage("Ingest list written to %s" % self._allFileName) 
  1208   
1209   
1210   
1211   
1212   
1213   
1214   
1215   
1216   
1217   
1218   
1219   
1220   
1221   
1222   
1223   
1224   
1225   
1226   
1227   
1228   
1229   
1230   
1231   
1232   
1233   
1234   
1235   
1236   
1237   
1238   
1239   
1240