1   
   2   
   3   
   4  """ 
   5     Executes CUs in parallel-mode, preparing ingest files. 
   6   
   7     @author: E. Sutorius 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9   
  10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  11     @contributors: R.S. Collins 
  12  """ 
  13   
  14  from   collections import defaultdict 
  15  import inspect 
  16  import math 
  17  import multiprocessing 
  18  import mx.DateTime     as mxTime 
  19  import os 
  20  import re 
  21  import shutil 
  22  import time 
  23   
  24  from invocations.cu1.cu1         import Cu1 
  25  from invocations.cu1.cu1Transfer import CASUQueries 
  26  from invocations.cu2.cu2         import Cu2 
  27  from invocations.cu3.cu3         import Cu3 
  28  from invocations.cu4.cu4         import Cu4 
  29   
  30  from   wsatools.CLI                    import CLI 
  31  import wsatools.CSV                        as CSV 
  32  import wsatools.DbConnect.DbConstants      as dbc 
  33  from   wsatools.DbConnect.DbSession    import DbSession 
  34  from   wsatools.File                   import File 
  35  import wsatools.FitsUtils                  as fits 
  36  from   wsatools.DbConnect.DbSession    import Ingester 
  37  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  38  from   wsatools.DbConnect.IngIngester  import IngestLogger 
  39  import wsatools.DbConnect.Schema           as schema 
  40  from   wsatools.SystemConstants        import SystemConstants 
  41  import wsatools.Utilities                  as utils 
  44       
  45       
  46      numprocessors = 2 
  47       
  48   
  50          """ 
  51          Run the CU threads. 
  52   
  53          @param procDates:   The dates to run in parallel. 
  54          @type  procDates:   list 
  55          @param resultQueue: The results queue. 
  56          @type  resultQueue: Queue object 
  57   
  58          """ 
  59          outList = [] 
  60          for _cu in _cuRuns: 
  61              _cu.run() 
  62              outList.extend(_cu.outPrefixList) 
  63          resultQueue.put(outList) 
   64   
  65       
  66   
  67 -    def run(self, cuList): 
   68          """ 
  69          Thread the CUs. 
  70   
  71          @param cuList:   The CUs to run in parallel. 
  72          @type  cuList:   list 
  73           
  74          @return: Dictionary containing results of the CU runs. 
  75          @rtype:  dict 
  76          """ 
  77          if self.numprocessors > len(cuList): 
  78              self.numprocessors = len(cuList) 
  79          IngestLogger.addMessage("Running %d CUs on %d processors." % ( 
  80              len(cuList), self.numprocessors)) 
  81          if self.numprocessors == 0: 
  82              raise SystemExit("<Error> No CUs available!") 
  83           
  84          outQueue = multiprocessing.Queue() 
  85          chunksize = int(math.ceil(len(cuList) / float(self.numprocessors))) 
  86          procs = [] 
  87          for i in xrange(self.numprocessors): 
  88              if i > 0: 
  89                  time.sleep(20) 
  90              p = multiprocessing.Process( 
  91                  target=self.runThreads, 
  92                  args=(cuList[chunksize * i:chunksize * (i + 1)], 
  93                        outQueue)) 
  94              procs.append(p) 
  95              p.start() 
  96   
  97           
  98           
  99          resultList = [] 
 100          for i in xrange(self.numprocessors): 
 101              resultList.extend(outQueue.get()) 
 102   
 103           
 104          for p in procs: 
 105              p.join() 
 106   
 107           
 108          return resultList 
   109   
 113      """ Executes CUs in parallel-mode, preparing ingest files. 
 114      """ 
 115      diskList = None 
 116      casuDisk = None 
 117   
 118       
 119   
 120 -    def __init__(self, 
 121                   cuNums=CLI.getOptDef("cunums"), 
 122                   curator=CLI.getOptDef("curator"), 
 123                   database=DbSession.database, 
 124                   beginDate=CLI.getOptDef("begin"), 
 125                   endDate=CLI.getOptDef("end"), 
 126                   versionStr=CLI.getOptDef("version"), 
 127                   outPath=CLI.getOptDef("outpath"), 
 128                   noMfID=CLI.getOptDef("nomfid"), 
 129                   ReDo=CLI.getOptDef("redo"), 
 130                   programmeList=CLI.getOptDef("programmes"), 
 131                   progOrder=CLI.getOptDef("progorder"), 
 132                   isTrialRun=DbSession.isTrialRun, 
 133                   xferlog=CLI.getOptDef("xferlog"), 
 134                   timeCheck=CLI.getOptDef("timecheck"), 
 135                   numThreads=CLI.getOptDef("threads"), 
 136                   writeMfID=CLI.getOptDef("writemfid"), 
 137                   subDir=CLI.getOptDef("subdir"), 
 138                   janet=CLI.getOptDef("janet"), 
 139                   forceXfer=CLI.getOptDef("force"), 
 140                   excludeData=CLI.getOptDef("exclude"), 
 141                   deprecation=CLI.getOptDef("deprecation"), 
 142                   reproMode=CLI.getOptDef("repromode"), 
 143                   forceMosaic=CLI.getOptDef("mosaic"), 
 144                   detectionSubset=CLI.getOptDef("subset"), 
 145                   keepWorkDir=CLI.getOptDef("keepwork"), 
 146                   ffluOnly=CLI.getOptDef("ffluonly"), 
 147                   getHeader=CLI.getOptDef("getheader"), 
 148                   gesOptions=CLI.getOptDef("gesoptions"), 
 149                   comment=CLI.getArgDef("comment")): 
  150          """ 
 151          @param beginDate:  First date to process, eg. 20050101. 
 152          @type  beginDate:  str 
 153          @param comment:    Descriptive comment as to why curation task is 
 154                             being performed. 
 155          @type  comment:    str 
 156          @param cuNums:     Curation task numbers. 
 157          @type  cuNums:     list[int] 
 158          @param curator:    Name of curator. 
 159          @type  curator:    str 
 160          @param database:   Name of the database to connect to. 
 161          @type  database:   str 
 162          @param detectionSubset: Process subset of [Astrometry table, 
 163                                  Photometry table, Raw table]. 
 164          @type  detectionSubset: list(str) 
 165          @param endDate:    Last date to process, eg. 20050131. 
 166          @type  endDate:    str 
 167          @param excludeData: Exclude given files from processing (CU4). 
 168          @type  excludeData: list(str) 
 169          @param ffluOnly:   Don't transfer but update FlatfileLookup table. 
 170          @type  ffluOnly:   bool 
 171          @param forceXfer:  Force the data transfer. 
 172          @type  forceXfer:  bool 
 173          @param gesOptions: Options for GES transfers. 
 174          @type  gesOptions: str 
 175          @param getHeader:  Only transfer FITS headers. 
 176          @type  getHeader:  bool 
 177          @param isTrialRun: If True, do not perform database modifications. 
 178          @type  isTrialRun: bool 
 179          @param janet:      Use JANET instead of UKLight. 
 180          @type  janet:      bool 
 181          @param forceMosaic: Create jpg for mosaic. 
 182          @type  forceMosaic: bool 
 183          @param keepWorkDir: Don't remove working directory. 
 184          @type  keepWorkDir: bool 
 185          @param noMfID:     If True, don't write new MfID into FITS file. 
 186          @type  noMfID:     bool 
 187          @param numThreads: Number of scp transfer or processing threads. 
 188          @type  numThreads: int 
 189          @param outPath:    Directory where data is written to. 
 190          @type  outPath:    str 
 191          @param programmeList: Only process data for given programmes (accepts 
 192                             keywords 'all', 'ns' (non-survey), 
 193                             'ukidss' (all 5 main surveys)). 
 194          @type  programmeList: list(str) 
 195          @param progOrder:  The order of processing of the programmes, all 
 196                             programmes not explicitely named can be put 
 197                             anywhere in the list as 'others'. 
 198          @type  progOrder:  list(str) 
 199          @param ReDo:       If True, overwrite existing MfID (CU1/3) or jpg 
 200                             (CU2). 
 201          @type  ReDo:       bool 
 202          @param reproMode:  Mode for reprocessed data: otm (one-to-many dirs) 
 203                             or datedir suffix. 
 204          @type  reproMode:  str 
 205          @param subDir:     The subdirectory containing FITS files. 
 206          @type  subDir:     str 
 207          @param timeCheck:  Determines if existing files should be checked for 
 208                             their timestamp against the same file at CASU; 
 209          @type  timeCheck:  bool 
 210          @param versionStr: Version number of the data. 
 211          @type  versionStr: str 
 212          @param writeMfID:  If True, only write MfIDs into FITS files. 
 213          @type  writeMfID:  bool 
 214          @param xferlog:    Logfile containing files to be ingested. 
 215          @type  xferlog:    str 
 216   
 217          """ 
 218           
 219          super(DataBuilder, self).__init__(cuNum=0, 
 220                                            curator=curator, 
 221                                            comment=comment, 
 222                                            reqWorkDir=True, 
 223                                            keepWorkDir=keepWorkDir, 
 224                                            database=database, 
 225                                            autoCommit=False, 
 226                                            isTrialRun=isTrialRun) 
 227   
 228          beginDate, endDate = \ 
 229            self.sysc.obsCal.getDatesFromInput(beginDate, endDate) 
 230   
 231          typeTranslation = {"curator":str, 
 232                             "database":str, 
 233                             "cuNums":list, 
 234                             "beginDate":str, 
 235                             "endDate":str, 
 236                             "versionStr":str, 
 237                             "outPath":str, 
 238                             "noMfID":bool, 
 239                             "ReDo":bool, 
 240                             "programmeList":list, 
 241                             "progOrder":list, 
 242                             "isTrialRun":bool, 
 243                             "xferlog":str, 
 244                             "timeCheck":bool, 
 245                             "numThreads":type(self.sysc.scpThreads), 
 246                             "writeMfID":bool, 
 247                             "subDir":type(self.sysc.fitsDir), 
 248                             "janet":bool, 
 249                             "forceXfer":bool, 
 250                             "excludeData":list, 
 251                             "deprecation":str, 
 252                             "reproMode":str, 
 253                             "forceMosaic":bool, 
 254                             "detectionSubset":list, 
 255                             "keepWorkDir":bool, 
 256                             "ffluOnly":bool, 
 257                             "getHeader":bool, 
 258                             "gesOptions":str, 
 259                             "comment":str} 
 260   
 261          super(DataBuilder, self).attributesFromArguments( 
 262              inspect.getargspec(DataBuilder.__init__)[0], locals(), prefix='_', 
 263              types=typeTranslation) 
 264   
 265          self.curator = self._curator 
 266          self.comment = self._comment 
 267          self._UKLight = not self._janet 
 268           
 269          if not self._versionStr: 
 270              self._versionStr = str(max( 
 271                  self.sysc.obsCal.maxVersOfDate(self._beginDate), 
 272                  self.sysc.obsCal.maxVersOfDate(self._endDate))) 
 273   
 274          if not self._outPath: 
 275              self._outPath = self.sysc.dbSharePath() 
 276   
 277          implementedCUs = [1, 2, 3, 4] 
 278          if not set(self._cuNums).intersection(implementedCUs): 
 279              raise SystemExit("No CU possible for %r.\nImplemented CUs are %r" 
 280                               % (self._cuNums, implementedCUs)) 
 281   
 282          if set(self._cuNums) - set(implementedCUs): 
 283              IngestLogger.addMessage( 
 284                "<Warning> The following CUs are not implemented: " + 
 285                ', '.join(map(str, set(self._cuNums) - set(implementedCUs)))) 
 286   
 287          if set(self._cuNums).intersection([1, 2, 3, 4]) and not self._xferlog \ 
 288            and (int(self._beginDate) == int(CLI.getOptDef("begin")) or 
 289                 int(self._endDate) == int(CLI.getOptDef("end"))) \ 
 290            and 'otm' not in self._reproMode: 
 291              raise SystemExit("The CU needs either a xferlog or a date range " 
 292                               "specified!") 
 293   
 294          if 'otm' in self._reproMode and not self._xferlog and not self.casuDisk: 
 295              raise SystemExit("Either a transfer log or a directory at CASU " 
 296                               "need to be specified for reprocessed data " 
 297                               "transfers.") 
 298   
 299          if 1 in self._cuNums: 
 300              if self._numThreads < 10: 
 301                  self._numThreads += 10 
 302              if self._ffluOnly or self._xferlog: 
 303                  newThreads = 11 
 304              elif self._numThreads % 10 * self._numThreads // 10 > 4: 
 305                  newThreads = (22 if self._endDate != self._beginDate else 12) 
 306              else: 
 307                  newThreads = self._numThreads 
 308              if self._numThreads != newThreads: 
 309                  IngestLogger.addMessage( 
 310                      "<Warning> Too many threads chosen: %d => Resetting to %d" \ 
 311                      % (self._numThreads, newThreads)) 
 312                  self._numThreads = newThreads 
 313   
 314          if self._xferlog: 
 315              self._xferlog = os.path.abspath(self._xferlog) 
  316   
 317       
 318   
 320          """ Run each CU requested. 
 321          """ 
 322          self._cuRuns = [] 
 323          self._cuedFiles = defaultdict(bool) 
 324          cuEventIDs = defaultdict(list) 
 325          if 1 in self._cuNums: 
 326              if self._writeMfID: 
 327                   
 328                  IngestLogger.addMessage("Starting updating multiframeIDs.") 
 329                  fitsDirs = self.createDailyLists(self._xferlog) 
 330                  self.updateMfIDs(fitsDirs.dailyFileListDict) 
 331                  IngestLogger.addMessage("Finished updating multiframeIDs.") 
 332                   
 333                  cuEventIDs[1].append(self._cuEventID) 
 334                  cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[1])) 
 335                  IngestLogger.append(cu0Log.pathName) 
 336              else: 
 337                  fileDict = defaultdict(list) 
 338                  if self._numThreads == 0: 
 339                      self._numThreads = SystemConstants.scpThreads 
 340                   
 341                  if self._xferlog and 'otm' not in self._reproMode: 
 342                      fileDict, notAvailableFiles = self.parseXferList() 
 343                      if notAvailableFiles: 
 344                          IngestLogger.addMessage( 
 345                            "<Warning> The following files are not available at " 
 346                            "CASU:") 
 347                          for fileName in notAvailableFiles: 
 348                              IngestLogger.addMessage(fileName) 
 349   
 350                      mjdList = [int(mxTime.strptime(str(date), "%Y%m%d").mjd) 
 351                                  for date in fileDict if fileDict[date]] 
 352                      outMessageSuf = ''.join([" from ", self._xferlog]) 
 353                  else: 
 354                      startMJD = int(mxTime.strptime(self._beginDate, 
 355                                                     "%Y%m%d").mjd) 
 356                      endMJD = int(mxTime.strptime(self._endDate, 
 357                                                   "%Y%m%d").mjd) 
 358                       
 359                       
 360                      if self._reproMode.startswith(('p:', 's:')): 
 361                          CASUQueries.sysc = self.sysc 
 362                          presufCasu = (self._reproMode 
 363                                        if self._reproMode.startswith(('p', 's')) 
 364                                        else '') 
 365                          casuDateDict = CASUQueries.getCasuDateDirs( 
 366                            dirList=[self.casuDisk] if self.casuDisk else [], 
 367                            UKLight=self._UKLight, 
 368                            mode="STR", presuf=presufCasu) 
 369                          reproFix = self._reproMode.partition("::")[2] 
 370                          mjdList = [] 
 371                          for dateDir in casuDateDict: 
 372                              if reproFix in dateDir: 
 373                                  date = int(mxTime.strptime(dateDir.replace( 
 374                                      reproFix, ''), "%Y%m%d").mjd) 
 375                                  if date in range(startMJD, endMJD + 1): 
 376                                      mjdList.append(date) 
 377                      elif self.sysc.isGES(): 
 378                          mjdList = self.getGesMJDs(startMJD, endMJD) 
 379                      elif 'otm' not in self._reproMode and not self.sysc.isGES(): 
 380                           
 381                          CASUQueries.sysc = self.sysc 
 382                          casuDateDict = CASUQueries.getCasuDateDirs( 
 383                            dirList=[self.casuDisk] if self.casuDisk else [], 
 384                            UKLight=self._UKLight, mode="MJD") 
 385                          mjdList = [x for x in range(startMJD, endMJD + 1) 
 386                                     if x in casuDateDict] 
 387                      else: 
 388                          mjdList = range(startMJD, endMJD + 1) 
 389                      outMessageSuf = '' 
 390   
 391                  if not mjdList: 
 392                      IngestLogger.addMessage("No data to be transferred!") 
 393   
 394                   
 395                  if 'otm' in self._reproMode: 
 396                      IngestLogger.addMessage(''.join([ 
 397                          "Preparing CU run for Cu1 for re-processed data in ", 
 398                          self.casuDisk])) 
 399                      self._cuRuns.append( 
 400                          Cu1(curator=self.curator, 
 401                              database=self.database, 
 402                              versionStr=self._versionStr, 
 403                              casuDisk=self.casuDisk, 
 404                              janet=self._janet, 
 405                              deprecation=self._deprecation, 
 406                              xferList=self._xferlog, 
 407                              isTrialRun=self._isTrialRun, 
 408                              forceXfer=self._forceXfer, 
 409                              reproMode=self._reproMode, 
 410                              comment=self.comment)) 
 411                      IngestLogger.addMessage("CUEventID: %s" % 
 412                                              self._cuRuns[-1]._cuEventID) 
 413                  else: 
 414                      dateList = [] 
 415                      for mjd in sorted(mjdList): 
 416                          date = mxTime.DateTimeFromMJD(mjd) 
 417                          dateStr = "%04d%02d%02d" % (date.year, date.month, 
 418                                                     date.day) 
 419                          dateList.append(dateStr) 
 420   
 421                       
 422                      dateSubLists = self.splitDateList(dateList) 
 423   
 424                      for dateGroup in dateSubLists: 
 425                          xferList = [] 
 426                          for date in dateGroup: 
 427                              xferList.extend(fileDict[date]) 
 428                          IngestLogger.addMessage( 
 429                              "Preparing CU run for Cu1 for %r %s" % ( 
 430                                  dateGroup, outMessageSuf)) 
 431                          self._cuRuns.append( 
 432                              Cu1(curator=self.curator, 
 433                                  database=self.database, 
 434                                  runDate=dateGroup, 
 435                                  versionStr=self._versionStr, 
 436                                  casuDisk=self.casuDisk, 
 437                                  timeCheck=self._timeCheck, 
 438                                  numThreads=self._numThreads, 
 439                                  janet=self._janet, 
 440                                  deprecation=self._deprecation, 
 441                                  xferList=xferList, 
 442                                  isTrialRun=self._isTrialRun, 
 443                                  forceXfer=self._forceXfer, 
 444                                  reproMode=self._reproMode, 
 445                                  ffluOnly=self._ffluOnly, 
 446                                  onlyHeader=self._getHeader, 
 447                                  gesOptions=self._gesOptions, 
 448                                  comment=self.comment)) 
 449   
 450                          dateVersStr = ','.join([ 
 451                              ("%s_v%s") % (dateStr, self._versionStr) 
 452                              for dateStr in dateGroup]) 
 453                          self.updateComment(dateVersStr, cuNum=1) 
 454                          IngestLogger.addMessage( 
 455                              "Transferring from %s" % 
 456                                  self.sysc.scpServer(light=not self._janet)) 
 457                          IngestLogger.addMessage("CUEventID: %s" % 
 458                                                  self._cuRuns[-1]._cuEventID) 
 459                   
 460                  for _cu in self._cuRuns: 
 461                      cuEventIDs[1].append(_cu._cuEventID) 
 462                  cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[1])) 
 463                  IngestLogger.append(cu0Log.pathName) 
 464                   
 465                  for _cu in self._cuRuns: 
 466                      _cu.run() 
 467                      if 'mv' in self._deprecation and not self._isTrialRun: 
 468                          self.deprecateJpgs(_cu.deprFileList) 
 469                      if _cu.allXferLogs and not self._isTrialRun \ 
 470                                         and 'otm' not in self._reproMode \ 
 471                                         and not self.sysc.isGES(): 
 472                          self.updateMfIDs(_cu.allXferLogs) 
 473                      del _cu 
 474                  IngestLogger.addMessage("Finished CU1.") 
 475   
 476          if 2 in self._cuNums: 
 477              cu2OutPath = ('' if self._outPath == self.sysc.dbSharePath() 
 478                            else self._outPath) 
 479              fitsDirs = self.createDailyLists(self._xferlog) 
 480   
 481              if self._xferlog: 
 482                  _dateList = sorted(fitsDirs.dailyFileListDict.keys()) 
 483                  self._beginDate = _dateList[0].partition("_v")[0] 
 484                  self._endDate = _dateList[-1].partition("_v")[0] 
 485                  self._versionStr = _dateList[0].partition("_v")[2] 
 486              startMJD = int(mxTime.strptime(self._beginDate, "%Y%m%d").mjd) 
 487              endMJD = int(mxTime.strptime(self._endDate, "%Y%m%d").mjd) 
 488   
 489              for mjd in xrange(startMJD, endMJD + 1): 
 490                  date = mxTime.DateTimeFromMJD(mjd) 
 491                  dateStr = "%04d%02d%02d" % (date.year, date.month, date.day) 
 492                  dateVersStr = "%s_v%s" % (dateStr, self._versionStr) 
 493                  if dateVersStr in fitsDirs.invFitsDateDict: 
 494                      cu02edFlagList = [] 
 495                      try: 
 496                          cu02edFlagList = [os.path.join( 
 497                              fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 
 498                              "CU02ED_" + self._database.rpartition('.')[2])] 
 499                      except AttributeError: 
 500                          for path in fitsDirs.invFitsDateDict[dateVersStr]: 
 501                              cu02edFlagList.append(os.path.join( 
 502                                  path, dateVersStr, 
 503                                  "CU02ED_" + self._database.rpartition('.')[2])) 
 504                      for cu02edFlag in cu02edFlagList: 
 505                          if os.path.exists(cu02edFlag): 
 506                              os.remove(cu02edFlag) 
 507                      IngestLogger.addMessage( 
 508                        "Preparing CU run for Cu2 for " + dateStr) 
 509   
 510                      xferlog = (fitsDirs.dailyFileListDict[dateVersStr][0] 
 511                                 if self._xferlog else None) 
 512   
 513                      self._cuRuns.append( 
 514                          Cu2(curator=self.curator, 
 515                              database=self.database, 
 516                              beginDate=int(dateStr), 
 517                              endDate=int(dateStr), 
 518                              versionStr=self._versionStr, 
 519                              xferLog=xferlog, 
 520                              subDir=self._subDir, 
 521                              jpgPath=cu2OutPath, 
 522                              forceMosaic=self._forceMosaic, 
 523                              reDo=self._ReDo, 
 524                              numThreads=self._numThreads, 
 525                              deprecation=self._deprecation, 
 526                              isTrialRun=self._isTrialRun, 
 527                              comment=self.comment)) 
 528   
 529                      self.updateComment(dateVersStr, cuNum=2) 
 530   
 531               
 532              cuEventIDs[2] = [_cu._cuEventID for _cu in self._cuRuns] 
 533              cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[2])) 
 534              IngestLogger.append(cu0Log.pathName) 
 535               
 536              for _cu in self._cuRuns: 
 537                  _cu.run() 
 538              IngestLogger.addMessage("Finished CU2.") 
 539   
 540          if 3 in self._cuNums: 
 541              fitsDirs = self.createDailyLists(self._xferlog) 
 542              for dateKey in sorted(fitsDirs.dailyFileListDict): 
 543                  for filePath in fitsDirs.dailyFileListDict[dateKey]: 
 544                      listFile = File(filePath) 
 545                      dateVersStr = listFile.root[listFile.root.find("_v") - 8:] 
 546                      IngestLogger.addMessage( 
 547                        "Preparing CU run for Cu3 for " + dateVersStr) 
 548   
 549                      self._cuRuns.append( 
 550                          Cu3(curator=self.curator, 
 551                              database=self.database, 
 552                              csvSharePath=self._outPath, 
 553                              programmeList=self._programmeList, 
 554                              noMfID=self._noMfID, 
 555                              ReDo=self._ReDo, 
 556                              onlyMfID=self._writeMfID, 
 557                              excludeFiles=self._excludeData, 
 558                              isTrialRun=self._isTrialRun, 
 559                              keepWorkDir=self._keepWorkDir, 
 560                              forceComp=self._forceXfer, 
 561                              isWfauProduct=("products" in self._subDir), 
 562                              xferlog=filePath, 
 563                              comment=self.comment)) 
 564                      self.updateComment(dateVersStr, cuNum=3) 
 565               
 566              cuEventIDs[3] = [_cu._cuEventID for _cu in self._cuRuns] 
 567              cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[3])) 
 568              IngestLogger.append(cu0Log.pathName) 
 569               
 570              for _cu in self._cuRuns: 
 571                  if not self._writeMfID: 
 572                      _cu.run() 
 573                      self._cuedFiles[_cu._cuEventID] = _cu.cuedFileExists 
 574              IngestLogger.addMessage("Finished CU3.") 
 575   
 576          self._cuRuns = [] 
 577          self._ingLogList = [] 
 578          if 4 in self._cuNums: 
 579              fitsDirs = self.createDailyLists(self._xferlog) 
 580              for dateKey in sorted(fitsDirs.dailyFileListDict): 
 581                  for filePath in fitsDirs.dailyFileListDict[dateKey]: 
 582                      listFile = File(filePath) 
 583                      dateVersStr = listFile.root[listFile.root.find("_v") - 8:] 
 584                      cu03edFlagList = [] 
 585                      flatFileIngFlagList = [] 
 586                      parentArchive = (self.database.rpartition('.')[2] 
 587                                       if "VSA_v1_3" in self.database else 
 588                                       "VSA" if sysc.isVSA() else 
 589                                       ("OSA" if sysc.isOSA() else "WSA")) 
 590                      try: 
 591                          flatFileIngFlagList = [os.path.join( 
 592                              fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 
 593                              "FFFIED_" + parentArchive)] 
 594                          cu03edFlagList = [os.path.join( 
 595                              fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 
 596                              "CU03ED_" + parentArchive)] 
 597                      except AttributeError: 
 598                          for path in fitsDirs.invFitsDateDict[dateVersStr]: 
 599                              flatFileIngFlagList.append(os.path.join( 
 600                                  path, dateVersStr, 
 601                                  "FFFIED_" + parentArchive)) 
 602                              cu03edFlagList.append(os.path.join( 
 603                                  path, dateVersStr, 
 604                                  "CU03ED_" + parentArchive)) 
 605   
 606                      if not self._forceXfer \ 
 607                             and not all(os.path.exists(fffiedFlag) 
 608                                         for fffiedFlag in flatFileIngFlagList): 
 609   
 610                          IngestLogger.addMessage("<IMPORTANT> " 
 611                            "FinaliseFlatFileIngest hasn't been run for %s. " 
 612                            "For WFAU products just re-run this command with the" 
 613                            " -f/--force option. For new CASU products you must " 
 614                            "first run FinaliseFlatFileIngest." % dateVersStr) 
 615   
 616                      elif self._forceXfer or all(os.path.exists(cu03edFlag) 
 617                                              for cu03edFlag in cu03edFlagList): 
 618                          IngestLogger.addMessage( 
 619                            "Preparing CU run for Cu4 for " + dateVersStr) 
 620   
 621                          self._cuRuns.append( 
 622                              Cu4(curator=self.curator, 
 623                                  database=self.database, 
 624                                  csvSharePath=self._outPath, 
 625                                  programmeList=self._programmeList, 
 626                                  progOrder=self._progOrder, 
 627                                  detectionSubset=self._detectionSubset, 
 628                                  excludeFiles=self._excludeData, 
 629                                  isTrialRun=self._isTrialRun, 
 630                                  ReDo=self._ReDo, 
 631                                  keepWorkDir=self._keepWorkDir, 
 632                                  xferlog=filePath, 
 633                                  comment=self.comment)) 
 634                          self.updateComment(dateVersStr, cuNum=4) 
 635                      else: 
 636                          IngestLogger.addMessage( 
 637                            "No data available for Cu4 for " + dateVersStr) 
 638   
 639              if self._numThreads == 0: 
 640                  self._numThreads = 1 
 641               
 642              cuEventIDs[4] = [_cu._cuEventID for _cu in self._cuRuns] 
 643              cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[4])) 
 644              IngestLogger.append(cu0Log.pathName) 
 645               
 646              ThreadCU.numprocessors = self._numThreads 
 647              threadCUs = ThreadCU() 
 648              resList = threadCUs.run(self._cuRuns) 
 649              del threadCUs 
 650   
 651              IngestLogger.addMessage("Finished CU4.") 
 652   
 653          IngestLogger.addMessage("Overview of all the ingests to run:") 
 654          for cuNum in cuEventIDs: 
 655              if cuNum == 4: 
 656                  for outPrefix, cuEvID in resList: 
 657                      self._ingLogList.append(( 
 658                          os.path.join(self.dbMntPath, outPrefix), 
 659                          self._cuedFiles[cuEvID])) 
 660              else: 
 661                  for cuEventID in cuEventIDs[cuNum]: 
 662                      outPrefix = self.createFilePrefix( 
 663                          cuNum, cuEventID, self._hostName, 
 664                          self.database.rpartition('.')[2]) 
 665                      self._ingLogList.append(( 
 666                          os.path.join(self.dbMntPath, outPrefix), 
 667                          self._cuedFiles[cuEventID])) 
 668              for entry in sorted(set(self._ingLogList)): 
 669                  forceFlag = (" -F" if entry[1] else '') 
 670                  IngestLogger.addMessage("./IngestCUFiles.py -i -r %s%s %s" % ( 
 671                      os.path.join(self.dbMntPath, entry[0] + ".log"), forceFlag, 
 672                      self.database)) 
 673   
 674           
 675          IngestLogger.append(cu0Log.pathName) 
  676   
 677       
 678   
 679      @staticmethod 
 681          """ Yield successive n-sized chunks from l. 
 682          """ 
 683          if n == 0: 
 684              n = 1 
 685          for i in xrange(0, len(l), n): 
 686              yield l[i:i+n] 
  687   
 688       
 689   
 690      @staticmethod 
 704   
 705      
 706   
 708          """ Get the list of MJDs for the GES transfer. 
 709          """ 
 710          if self._gesOptions.endswith(('a','s')): 
 711              mjdList = range(startMJD, startMJD + 1) 
 712          else: 
 713              CASUQueries.sysc = self.sysc 
 714              mjdList = [] 
 715              casuSubDir = ("uves/proc_%s" \ 
 716                            if self._gesOptions.startswith('u') 
 717                            else "proccasu_%s") 
 718              casuDirs = CASUQueries.getCasuStagingDirs( 
 719                  UKLight=self._UKLight, 
 720                  casuSubDirPath=casuSubDir % self._versionStr) 
 721   
 722              for cd in casuDirs: 
 723                  for line in CASUQueries.getDirListing(cd, self._UKLight, "-p"): 
 724                      if line.strip().startswith('d'): 
 725                          subDir = line.strip().rpartition(' ')[2].replace('/', '') 
 726                          if subDir.isdigit(): 
 727                              date = int(mxTime.strptime(subDir, "%Y%m%d").mjd) 
 728                          if date in range(startMJD, endMJD + 1): 
 729                              mjdList.append(date) 
 730          return mjdList 
  731   
 732      
 733   
 735          """ Split the dateList into multiple similar sized threads 
 736          """ 
 737          CASUQueries.sysc = self.sysc 
 738          try: 
 739              if len(dateList) < self._numThreads // 10: 
 740                  newThreads  = len(dateList) * 10 + self._numThreads % 10 
 741                  IngestLogger.addMessage( 
 742                      "<Warning> Short date list. Too many threads chosen:" 
 743                      " %d => Resetting to %d" % ( 
 744                          self._numThreads, newThreads)) 
 745                  self._numThreads = newThreads 
 746              if (self._numThreads // 10) == 1: 
 747                  newDateList = sorted(dateList) 
 748              else: 
 749                  casuDict = self.getCasuSizeDict(dateList, self._UKLight) 
 750                  newDateList = casuDict.keys() 
 751   
 752          except Exception as error: 
 753               
 754              IngestLogger.addExceptionDetails(error) 
 755              newDateList = dateList 
 756   
 757          return self.chunks(newDateList, self._numThreads // 10) 
  758   
 759       
 760   
 762          """ 
 763          Create the daily filelists. 
 764   
 765          @param xferLog: Logfile containing files to be ingested. 
 766          @type  xferLog: str 
 767   
 768          @return: A FitsList object holding daily file information. 
 769          @rtype:  FitsList 
 770   
 771          """ 
 772          fitsDirs = fits.FitsList(self.sysc, prefix="cu0_") 
 773          fitsDirs.createFitsDateDict(ingestDirectory=self._subDir, 
 774                                      trafoLog=xferLog) 
 775          IngestLogger.addMessage("Creating daily filelists.") 
 776          fitsDirs.createDailyFileLists(self._workPath, int(self._beginDate), 
 777                                        int(self._endDate), self._versionStr, 
 778                                        xferLog) 
 779          return fitsDirs 
  780   
 781       
 782   
 784          """ 
 785          Deprecate jpgs of deprecated FITS files. 
 786   
 787          @param deprFitsList: List of deprecated FITS files. 
 788          @type  deprFitsList: list 
 789   
 790          @return: A FitsList object holding daily file information. 
 791          @rtype:  FitsList 
 792   
 793          """ 
 794           
 795          deprTime = utils.makeTimeStamp().split()[0].replace('-', '') 
 796          spacePerDisk = utils.getDiskSpace(self.sysc.deprecatedDataStorage) 
 797          deprJpgDir = os.path.join(utils.getNextDisk(self.sysc, spacePerDisk), 
 798            '%s_%s' % (self.sysc.deprecatedComprImDir, deprTime)) 
 799   
 800          IngestLogger.addMessage("Deprecating JPGs to %s" % deprJpgDir) 
 801          counter = 0 
 802          self._connectToDb() 
 803          dateList = \ 
 804              set(File(fitsFileName).subdir for fitsFileName in deprFitsList) 
 805   
 806          mfIDDict = {} 
 807          for dateStr in dateList: 
 808              results = self.archive.query( 
 809                  "fileName, multiframeID", 
 810                  "Multiframe", 
 811                  "fileName LIKE'%" + dateStr + "%'") 
 812              for fileName, multiframeID in results: 
 813                  if fileName.rpartition(':')[2] in deprFitsList: 
 814                      mfIDDict[fileName] = str(multiframeID) 
 815   
 816          if mfIDDict: 
 817              oldJpgFiles = self.archive.query( 
 818                selectStr="multiframeID, extNum, compFile", 
 819                  fromStr="MultiframeDetector", 
 820                 whereStr="multiframeID IN (%s)" % ','.join(mfIDDict.values())) 
 821   
 822              for dateStr in dateList: 
 823                  deprJpgPath = os.path.join(deprJpgDir, dateStr) 
 824                  utils.ensureDirExist(deprJpgPath) 
 825                  for mfID, extNum, jpgFileName in oldJpgFiles: 
 826                      jpgFile = File(jpgFileName.replace( 
 827                          self.sysc.pixelServerHostName, '')) 
 828                      if dateStr in jpgFile.name \ 
 829                        and os.path.exists(jpgFile.name): 
 830                          deprJpgName = os.path.join(deprJpgPath, jpgFile.base) 
 831                          if jpgFile.name != deprJpgName: 
 832                              shutil.copy2(jpgFile.name, deprJpgName) 
 833                              os.remove(jpgFile.name) 
 834                          else: 
 835                              IngestLogger.addMessage( 
 836                                "Deprecated file %s exists already!" % 
 837                                deprJpgName) 
 838                          compEntry = \ 
 839                            "'%s%s'" % (self.sysc.pixelServerHostName, 
 840                                         deprJpgName) 
 841                          counter += self.archive.update("MultiframeDetector", 
 842                            [('compFile', compEntry)], 
 843                            "multiframeID=%s AND extNum=%s" % (mfID, extNum)) 
 844   
 845          self._disconnectFromDb() 
 846          IngestLogger.addMessage( 
 847            "%s JPGs deprecated to %s." % (counter, deprJpgDir)) 
  848   
 849       
 850   
 852          """ Parse the transfer list and check for files. 
 853          """ 
 854          CASUQueries.sysc = self.sysc 
 855          casuDateDict = {} 
 856          casuFileSystem = CASUQueries.getCasuStagingDirs(self._janet) 
 857          if self.casuDisk and not "reprocessed" in self.casuDisk: 
 858              if self.casuDisk: 
 859                  fileSystem = [self.casuDisk] 
 860              else: 
 861                  fileSystem = casuFileSystem 
 862              casuDateDict = CASUQueries.getCasuDateDirs(fileSystem, self._janet) 
 863          elif not self.casuDisk: 
 864              casuDateDict = CASUQueries.getCasuDateDirs( 
 865                  casuFileSystem, self._janet) 
 866   
 867          notAvailFiles = [] 
 868          xferDict = defaultdict(list) 
 869          for line in file(self._xferlog): 
 870              entry = line.rstrip() 
 871               
 872              try: 
 873                  if '/' in entry: 
 874                      dateStr, fileName = entry.split('/') 
 875                  else: 
 876                      pattern = CASUQueries.re_digits 
 877                      dateStr = pattern.search(entry).group() 
 878                      fileName = entry 
 879   
 880                  if self._beginDate <= dateStr <= self._endDate: 
 881                      if dateStr in casuDateDict: 
 882                          basePath = casuDateDict[dateStr] 
 883                          if not isinstance(basePath, basestring) \ 
 884                                 and len(basePath) > 1: 
 885                              raise AttributeError 
 886                          else: 
 887                              basePath = casuDateDict[dateStr][0] 
 888                      else: 
 889                          basePath = self.casuDisk 
 890                      xferDict[dateStr].append( 
 891                          os.path.join(basePath, dateStr, fileName)) 
 892              except KeyError: 
 893                  notAvailFiles.append(os.path.join(dateStr, fileName)) 
 894              except AttributeError: 
 895                  if isinstance(basePath, (list, tuple)) \ 
 896                         and not isinstance(basePath, basestring): 
 897                      IngestLogger.addMessage( 
 898                          "Date %s found in multiple locations: %r" % ( 
 899                              dateStr, basePath)) 
 900                  elif not basePath: 
 901                      IngestLogger.addMessage( 
 902                          "Date not found on CASU disks: %r" % casuFileSystem) 
 903                  notAvailFiles.append(os.path.join(dateStr, fileName)) 
 904   
 905          return xferDict, notAvailFiles 
  906   
 907       
 908   
 947   
 948       
 949   
 951          """ 
 952          Update the FlatFileLookUp table. 
 953   
 954          @param csvFileName: CSV file where data is written to. 
 955          @type  csvFileName: str 
 956   
 957          """ 
 958          _ingestSchema = "WSA_CurationLogsSchema.sql" 
 959          _tableName = 'FlatFileLookUp' 
 960          fileList = [x[0] for x in self._fileList] 
 961          dateList = sorted(set(os.path.basename(os.path.dirname(entry)) 
 962                                for entry in fileList)) 
 963          for dateVersStr in dateList: 
 964               
 965              dataDict = ({} if self._ReDo and self._forceXfer \ 
 966                          and self._writeMfID else \ 
 967                          self.getMultiframeIDs(dateVersStr)) 
 968               
 969              mfidName = dbc.multiframeUIDAttributeName() 
 970              _nextMfID = self._getNextID(mfidName, "FlatFileLookUp", 
 971                                          where="%s<1000000000000" % mfidName) 
 972              _nextWfauMfID = max(self._getNextID(mfidName, "FlatFileLookUp", 
 973                                          where="%s>1000000000000" % mfidName), 
 974                                  1000000000001) 
 975               
 976              nrCasu = 0 
 977              nrWfau = 0 
 978              csvList = [] 
 979              directUpdate = False 
 980              for entry in sorted(fileList): 
 981                  if entry.endswith(self.sysc.mefType): 
 982                      fName = File(entry) 
 983                      partName = os.path.join(fName.subdir, fName.base) 
 984                      if partName in dataDict: 
 985                          self.archive.update("FlatFileLookUp", 
 986                              [('cuEventID', self._cuEventID), ('fileName', 
 987                              "'%s%s'" % (self.sysc.pixelServerHostName, entry))], 
 988                              "multiframeID=%s" % dataDict[partName]) 
 989                          self.archive.commitTransaction() 
 990                          nrWfau += 1 
 991                          directUpdate = True 
 992                      elif entry not in dataDict: 
 993                          if os.path.basename(entry).startswith("e20"): 
 994                              newMfID = _nextWfauMfID + nrWfau 
 995                              nrWfau += 1 
 996                          else: 
 997                              newMfID = _nextMfID + nrCasu 
 998                              nrCasu += 1 
 999   
1000                          csvList.append( 
1001                              (newMfID, self._cuEventID, 
1002                               os.path.basename(os.path.dirname(entry)), 
1003                               '%s%s' % (self.sysc.pixelServerHostName, entry))) 
1004   
1005              if csvList:   
1006                  CSV.File(csvFileName, 'w').writelines(csvList) 
1007                  time.sleep(20) 
1008                  try: 
1009                      ingester = Ingester(self.archive, 
1010                        schema.parseTables(_ingestSchema, [_tableName])) 
1011                  except schema.MismatchError as error: 
1012                      raise IngCuSession.IngCuError(error) 
1013   
1014                  IngestLogger.addMessage("Ingesting from %s" % csvFileName) 
1015                  ingester.ingestTable(_tableName, csvFileName, 
1016                                       isCsv=True, deleteFile=False) 
1017                  ingester.commitTransaction() 
1018                  newPath = self.sysc.ingestedFilesPath(self.archive.sharePath()) 
1019                  newLocation = os.path.join( 
1020                      newPath, os.path.basename(csvFileName)) 
1021                  os.rename(ingester.sharePath(csvFileName), 
1022                            newLocation) 
1023                  del ingester 
1024                  time.sleep(20) 
1025   
1026                  IngestLogger.addMessage( 
1027                    "FlatFileLookUp table updated with %s listed files." % len(csvList)) 
1028              elif directUpdate: 
1029                  self.archive.commitTransaction() 
1030                  IngestLogger.addMessage( 
1031                    "FlatFileLookUp table updated with %s files." % nrWfau) 
1032              else: 
1033                  IngestLogger.addMessage("No new data to update.") 
 1034   
1035       
1036   
1038          """ 
1039          Write multiframe IDs into FlatFileLookUp. 
1040   
1041          @param dailyFileListDict: Dictionary containing daily file lists. 
1042          @type  dailyFileListDict: dict 
1043   
1044          """ 
1045          IngestLogger.addMessage("Updating multiframeIDs in FlatFileLookUp.") 
1046          try: 
1047              self._cuNum = 1 
1048              self._connectToDb() 
1049              self._cuEventID = \ 
1050                self._getNextCuEventID() if not self._isTrialRun else -9999 
1051   
1052               
1053              self._outPrefix = self.createFilePrefix( 
1054                  self._cuNum, self._cuEventID, self._hostName, 
1055                  self.archive.database) 
1056               
1057              csvFileName = self.archive.sharePath( 
1058                  "%s_%s.csv" % (self._outPrefix, 'FlatFileLookUp')) 
1059   
1060              for dateKey in sorted(dailyFileListDict): 
1061                  for filePath in dailyFileListDict[dateKey]: 
1062                      self.createFileList(filePath) 
1063                      if not self._isTrialRun: 
1064                          self.updateFlatFileLookUp(csvFileName) 
1065                      else: 
1066                          IngestLogger.addMessage( 
1067                            "<TEST> Update for " + filePath) 
1068          finally: 
1069              self._disconnectFromDb() 
  1070   
1071   
1072   
1073   
1074  if __name__ == "__main__": 
1075       
1076      CLI.progArgs["comment"] = "Running CU0" 
1077      CLI.progOpts += [ 
1078        CLI.Option('b', "begin", 
1079                   "first date to process, eg. 20050101", 
1080                   "DATE", str(IngCuSession.beginDateDef)), 
1081        CLI.Option('e', "end", 
1082                  "last date to process, eg. 20050131", 
1083                  "DATE", str(IngCuSession.endDateDef)), 
1084        CLI.Option('f', "force", 
1085                   "CU1: transfer regardless any checks; " 
1086                   "CU3: compress uncompressed FITS files; " 
1087                   "CU4: process regardless any FlatFileIngest/CU3 checks."), 
1088        CLI.Option('j', "janet", 
1089                   "CU1: use JANET instead of UKLight"), 
1090        CLI.Option('k', "disklist", 
1091                   "CU2: list of RAID disk paths including the archive name, " 
1092                   "eg. '/disk01/wsa,/disk02/wsa' (depending on given archive).", 
1093                   "LIST", ''), 
1094        CLI.Option('l', "xferlog", 
1095                   "xferlog of files to be processed", 
1096                   "LOGFILE", ''), 
1097        CLI.Option('m', "writemfid", 
1098                   "CU1: only update MfIDs in FlatFileLookUp table"), 
1099        CLI.Option('n', "nomfid", 
1100                   "CU1/3: don't write new MfID into FITS file"), 
1101        CLI.Option('o', "outpath", 
1102                   "directory where the output data is written to, " 
1103                   "defaults to /mnt of given database", "PATH", ''), 
1104        CLI.Option('p', "progorder", 
1105                   "CU4: the order of processing the programmes (accepts " 
1106                   "keywords 'other','others' for , all programmes not " 
1107                   "explicitely named)", "LIST", ''), 
1108        CLI.Option('r', "redo", 
1109                   "CU3: enable overwriting of existing MfIDs, use only on a " 
1110                   "single day basis! CU2: redo existing jpegs. CU4: Redo " 
1111                   "monthly detection schema."), 
1112        CLI.Option('s', "subdir", 
1113                   "CU2/3: subdirectory containing FITS date directories", 
1114                   "DIR", SystemConstants.fitsDir), 
1115        CLI.Option('w', "casudisk", 
1116                   "CU1: data disk at CASU (depending on given archive)", 
1117                   "PATH"), 
1118        CLI.Option('x', "cunums", 
1119                   "csv list of CU numbers to run", 
1120                   "LIST", ""), 
1121        CLI.Option('y', "threads", 
1122                   "CU1: number of threads: [a]b (a: daywise threads, " 
1123                   "b: scp threads); CU2: number of processors to use; " 
1124                   "CU4: number of processors to use (daywise split)", 
1125                   "INT", '0'), 
1126        CLI.Option('v', "version", 
1127                   "version number of the data", 
1128                   "STR", ''), 
1129        CLI.Option('F', "ffluonly", 
1130                   "CU1: don't transfer but update FlatfileLookup."), 
1131        CLI.Option('G', "gesoptions", 
1132                   "CU1: GES instrument and type, ie " 
1133                   "g[iraffe]/u[ves],n[ightly],s[tacked],a[rchived]", 
1134                   "STR", '', isValOK=lambda x: ',' in x \ 
1135                   and x.startswith(('g','u')) and x.endswith(('a', 'n', 's'))), 
1136        CLI.Option('H', "getheader", 
1137                   "Only transfer FITS headers instead of full file."), 
1138        CLI.Option('K', "keepwork", 
1139                   "Don't remove working dir."), 
1140        CLI.Option('M', "mosaic", 
1141                   "CU2: force making mosaics. Don't use it for large files!"), 
1142        CLI.Option('P', "programmes", "CU3/CU4: only process data for given " 
1143                   "programmes (accepts keywords 'all', 'ns' (non-survey), " 
1144                   "'ukidss' (all 5 main surveys)); one programme prefixed " 
1145                   "with 'x-' excludes this programme.", "LIST", 'all'), 
1146        CLI.Option('R', "repromode", 
1147                   "mode for reprocessed data: 'otm' (one-to-many dirs) " 
1148                   "or 's::suffix' (CASU datedir suffix)." 
1149                   "or 'v::version' (supply version if not given at CASU)", 
1150                   "STR", ''), 
1151        CLI.Option('S', "subset", "CU4: process subset of [Raw table," 
1152                   " Astrometry table, Photometry table]", "LIST", 
1153                   "Raw,Astrometry,Photometry"), 
1154        CLI.Option('T', "timecheck", 
1155                   "CU1: check files against CASU file timestamps"), 
1156        CLI.Option('X', "exclude", 
1157                   "CU3/4: exclude given files/file patterns from processing", 
1158                   "LIST", ""), 
1159        CLI.Option('Z', "deprecation", 
1160                   "CU1: modus for deprecating files: 'mv', 'rm', 'ip' " 
1161                   "(leave in place); CU2: 'modus,deprecated FITS file list'", 
1162                   "MODE", '')] 
1163   
1164      cli = CLI(DataBuilder, "$Revision: 10233 $") 
1165      IngestLogger.addMessage(cli.getProgDetails()) 
1166      sysc = SystemConstants(cli.getArg("database").split('.')[-1]) 
1167      DataBuilder.casuDisk = cli.getOpt("casudisk") 
1168      DataBuilder.diskList = (cli.getOpt("disklist") if cli.getOpt("disklist") 
1169                              else sysc.availableRaidFileSystem()) 
1170   
1171      beginDate, endDate = sysc.obsCal.getDatesFromInput(cli.getOpt("begin"), 
1172                                                         cli.getOpt("end")) 
1173      if not sysc.isGES() and int(endDate) < int(beginDate): 
1174          raise SystemExit("<ERROR> The given end date %s lies before the begin " 
1175                           "date %s." % (cli.getOpt("end"), cli.getOpt("begin"))) 
1176   
1177      builder = DataBuilder(cli.getOpt("cunums"), 
1178                            cli.getOpt("curator"), 
1179                            cli.getArg("database"), 
1180                            cli.getOpt("begin"), 
1181                            cli.getOpt("end"), 
1182                            cli.getOpt("version"), 
1183                            cli.getOpt("outpath"), 
1184                            cli.getOpt("nomfid"), 
1185                            cli.getOpt("redo"), 
1186                            cli.getOpt("programmes"), 
1187                            cli.getOpt("progorder"), 
1188                            cli.getOpt("test"), 
1189                            cli.getOpt("xferlog"), 
1190                            cli.getOpt("timecheck"), 
1191                            cli.getOpt("threads"), 
1192                            cli.getOpt("writemfid"), 
1193                            cli.getOpt("subdir"), 
1194                            cli.getOpt("janet"), 
1195                            cli.getOpt("force"), 
1196                            cli.getOpt("exclude"), 
1197                            cli.getOpt("deprecation"), 
1198                            cli.getOpt("repromode"), 
1199                            cli.getOpt("mosaic"), 
1200                            cli.getOpt("subset"), 
1201                            cli.getOpt("keepwork"), 
1202                            cli.getOpt("ffluonly"), 
1203                            cli.getOpt("getheader"), 
1204                            cli.getOpt("gesoptions"), 
1205                            cli.getArg("comment")) 
1206      builder.run() 
1207   
1208   
1209   
1210   
1211   
1212   
1213   
1214   
1215   
1216   
1217   
1218   
1219   
1220   
1221   
1222   
1223   
1224   
1225   
1226   
1227   
1228   
1229   
1230   
1231   
1232   
1233   
1234   
1235   
1236   
1237   
1238   
1239   
1240   
1241   
1242   
1243   
1244   
1245   
1246   
1247   
1248   
1249   
1250   
1251   
1252   
1253   
1254   
1255