Package invocations :: Package cu0 :: Module DataBuilder
[hide private]

Source Code for Module invocations.cu0.DataBuilder

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: DataBuilder.py 10233 2014-03-05 16:35:35Z EckhardSutorius $ 
   4  """ 
   5     Executes CUs in parallel-mode, preparing ingest files. 
   6   
   7     @author: E. Sutorius 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9   
  10     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  11     @contributors: R.S. Collins 
  12  """ 
  13  #------------------------------------------------------------------------------ 
  14  from   collections import defaultdict 
  15  import inspect 
  16  import math 
  17  import multiprocessing 
  18  import mx.DateTime     as mxTime 
  19  import os 
  20  import re 
  21  import shutil 
  22  import time 
  23   
  24  from invocations.cu1.cu1         import Cu1 
  25  from invocations.cu1.cu1Transfer import CASUQueries 
  26  from invocations.cu2.cu2         import Cu2 
  27  from invocations.cu3.cu3         import Cu3 
  28  from invocations.cu4.cu4         import Cu4 
  29   
  30  from   wsatools.CLI                    import CLI 
  31  import wsatools.CSV                        as CSV 
  32  import wsatools.DbConnect.DbConstants      as dbc 
  33  from   wsatools.DbConnect.DbSession    import DbSession 
  34  from   wsatools.File                   import File 
  35  import wsatools.FitsUtils                  as fits 
  36  from   wsatools.DbConnect.DbSession    import Ingester 
  37  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  38  from   wsatools.DbConnect.IngIngester  import IngestLogger 
  39  import wsatools.DbConnect.Schema           as schema 
  40  from   wsatools.SystemConstants        import SystemConstants 
  41  import wsatools.Utilities                  as utils 
42 #------------------------------------------------------------------------------ 43 -class ThreadCU(object):
44 #-------------------------------------------------------------------------- 45 # Define class constants (access as ThreadCU.varName) 46 numprocessors = 2 47 #-------------------------------------------------------------------------- 48
49 - def runThreads(self, _cuRuns, resultQueue):
50 """ 51 Run the CU threads. 52 53 @param procDates: The dates to run in parallel. 54 @type procDates: list 55 @param resultQueue: The results queue. 56 @type resultQueue: Queue object 57 58 """ 59 outList = [] 60 for _cu in _cuRuns: 61 _cu.run() 62 outList.extend(_cu.outPrefixList) 63 resultQueue.put(outList)
64 65 #-------------------------------------------------------------------------- 66
67 - def run(self, cuList):
68 """ 69 Thread the CUs. 70 71 @param cuList: The CUs to run in parallel. 72 @type cuList: list 73 74 @return: Dictionary containing results of the CU runs. 75 @rtype: dict 76 """ 77 if self.numprocessors > len(cuList): 78 self.numprocessors = len(cuList) 79 IngestLogger.addMessage("Running %d CUs on %d processors." % ( 80 len(cuList), self.numprocessors)) 81 if self.numprocessors == 0: 82 raise SystemExit("<Error> No CUs available!") 83 84 outQueue = multiprocessing.Queue() 85 chunksize = int(math.ceil(len(cuList) / float(self.numprocessors))) 86 procs = [] 87 for i in xrange(self.numprocessors): 88 if i > 0: 89 time.sleep(20) 90 p = multiprocessing.Process( 91 target=self.runThreads, 92 args=(cuList[chunksize * i:chunksize * (i + 1)], 93 outQueue)) 94 procs.append(p) 95 p.start() 96 97 # Collect all results into a single result dict. We know how many dicts 98 # with results to expect. 99 resultList = [] 100 for i in xrange(self.numprocessors): 101 resultList.extend(outQueue.get()) 102 103 # Wait for all worker processes to finish 104 for p in procs: 105 p.join() 106 107 # Return the results 108 return resultList
109
110 #------------------------------------------------------------------------------ 111 112 -class DataBuilder(IngCuSession):
113 """ Executes CUs in parallel-mode, preparing ingest files. 114 """ 115 diskList = None 116 casuDisk = None 117 118 #-------------------------------------------------------------------------- 119
120 - def __init__(self, 121 cuNums=CLI.getOptDef("cunums"), 122 curator=CLI.getOptDef("curator"), 123 database=DbSession.database, 124 beginDate=CLI.getOptDef("begin"), 125 endDate=CLI.getOptDef("end"), 126 versionStr=CLI.getOptDef("version"), 127 outPath=CLI.getOptDef("outpath"), 128 noMfID=CLI.getOptDef("nomfid"), 129 ReDo=CLI.getOptDef("redo"), 130 programmeList=CLI.getOptDef("programmes"), 131 progOrder=CLI.getOptDef("progorder"), 132 isTrialRun=DbSession.isTrialRun, 133 xferlog=CLI.getOptDef("xferlog"), 134 timeCheck=CLI.getOptDef("timecheck"), 135 numThreads=CLI.getOptDef("threads"), 136 writeMfID=CLI.getOptDef("writemfid"), 137 subDir=CLI.getOptDef("subdir"), 138 janet=CLI.getOptDef("janet"), 139 forceXfer=CLI.getOptDef("force"), 140 excludeData=CLI.getOptDef("exclude"), 141 deprecation=CLI.getOptDef("deprecation"), 142 reproMode=CLI.getOptDef("repromode"), 143 forceMosaic=CLI.getOptDef("mosaic"), 144 detectionSubset=CLI.getOptDef("subset"), 145 keepWorkDir=CLI.getOptDef("keepwork"), 146 ffluOnly=CLI.getOptDef("ffluonly"), 147 getHeader=CLI.getOptDef("getheader"), 148 gesOptions=CLI.getOptDef("gesoptions"), 149 comment=CLI.getArgDef("comment")):
150 """ 151 @param beginDate: First date to process, eg. 20050101. 152 @type beginDate: str 153 @param comment: Descriptive comment as to why curation task is 154 being performed. 155 @type comment: str 156 @param cuNums: Curation task numbers. 157 @type cuNums: list[int] 158 @param curator: Name of curator. 159 @type curator: str 160 @param database: Name of the database to connect to. 161 @type database: str 162 @param detectionSubset: Process subset of [Astrometry table, 163 Photometry table, Raw table]. 164 @type detectionSubset: list(str) 165 @param endDate: Last date to process, eg. 20050131. 166 @type endDate: str 167 @param excludeData: Exclude given files from processing (CU4). 168 @type excludeData: list(str) 169 @param ffluOnly: Don't transfer but update FlatfileLookup table. 170 @type ffluOnly: bool 171 @param forceXfer: Force the data transfer. 172 @type forceXfer: bool 173 @param gesOptions: Options for GES transfers. 174 @type gesOptions: str 175 @param getHeader: Only transfer FITS headers. 176 @type getHeader: bool 177 @param isTrialRun: If True, do not perform database modifications. 178 @type isTrialRun: bool 179 @param janet: Use JANET instead of UKLight. 180 @type janet: bool 181 @param forceMosaic: Create jpg for mosaic. 182 @type forceMosaic: bool 183 @param keepWorkDir: Don't remove working directory. 184 @type keepWorkDir: bool 185 @param noMfID: If True, don't write new MfID into FITS file. 186 @type noMfID: bool 187 @param numThreads: Number of scp transfer or processing threads. 188 @type numThreads: int 189 @param outPath: Directory where data is written to. 190 @type outPath: str 191 @param programmeList: Only process data for given programmes (accepts 192 keywords 'all', 'ns' (non-survey), 193 'ukidss' (all 5 main surveys)). 194 @type programmeList: list(str) 195 @param progOrder: The order of processing of the programmes, all 196 programmes not explicitely named can be put 197 anywhere in the list as 'others'. 198 @type progOrder: list(str) 199 @param ReDo: If True, overwrite existing MfID (CU1/3) or jpg 200 (CU2). 201 @type ReDo: bool 202 @param reproMode: Mode for reprocessed data: otm (one-to-many dirs) 203 or datedir suffix. 204 @type reproMode: str 205 @param subDir: The subdirectory containing FITS files. 206 @type subDir: str 207 @param timeCheck: Determines if existing files should be checked for 208 their timestamp against the same file at CASU; 209 @type timeCheck: bool 210 @param versionStr: Version number of the data. 211 @type versionStr: str 212 @param writeMfID: If True, only write MfIDs into FITS files. 213 @type writeMfID: bool 214 @param xferlog: Logfile containing files to be ingested. 215 @type xferlog: str 216 217 """ 218 # Initialize parent class 219 super(DataBuilder, self).__init__(cuNum=0, 220 curator=curator, 221 comment=comment, 222 reqWorkDir=True, 223 keepWorkDir=keepWorkDir, 224 database=database, 225 autoCommit=False, 226 isTrialRun=isTrialRun) 227 228 beginDate, endDate = \ 229 self.sysc.obsCal.getDatesFromInput(beginDate, endDate) 230 231 typeTranslation = {"curator":str, 232 "database":str, 233 "cuNums":list, 234 "beginDate":str, 235 "endDate":str, 236 "versionStr":str, 237 "outPath":str, 238 "noMfID":bool, 239 "ReDo":bool, 240 "programmeList":list, 241 "progOrder":list, 242 "isTrialRun":bool, 243 "xferlog":str, 244 "timeCheck":bool, 245 "numThreads":type(self.sysc.scpThreads), 246 "writeMfID":bool, 247 "subDir":type(self.sysc.fitsDir), 248 "janet":bool, 249 "forceXfer":bool, 250 "excludeData":list, 251 "deprecation":str, 252 "reproMode":str, 253 "forceMosaic":bool, 254 "detectionSubset":list, 255 "keepWorkDir":bool, 256 "ffluOnly":bool, 257 "getHeader":bool, 258 "gesOptions":str, 259 "comment":str} 260 261 super(DataBuilder, self).attributesFromArguments( 262 inspect.getargspec(DataBuilder.__init__)[0], locals(), prefix='_', 263 types=typeTranslation) 264 265 self.curator = self._curator 266 self.comment = self._comment 267 self._UKLight = not self._janet 268 269 if not self._versionStr: 270 self._versionStr = str(max( 271 self.sysc.obsCal.maxVersOfDate(self._beginDate), 272 self.sysc.obsCal.maxVersOfDate(self._endDate))) 273 274 if not self._outPath: 275 self._outPath = self.sysc.dbSharePath() 276 277 implementedCUs = [1, 2, 3, 4] 278 if not set(self._cuNums).intersection(implementedCUs): 279 raise SystemExit("No CU possible for %r.\nImplemented CUs are %r" 280 % (self._cuNums, implementedCUs)) 281 282 if set(self._cuNums) - set(implementedCUs): 283 IngestLogger.addMessage( 284 "<Warning> The following CUs are not implemented: " + 285 ', '.join(map(str, set(self._cuNums) - set(implementedCUs)))) 286 287 if set(self._cuNums).intersection([1, 2, 3, 4]) and not self._xferlog \ 288 and (int(self._beginDate) == int(CLI.getOptDef("begin")) or 289 int(self._endDate) == int(CLI.getOptDef("end"))) \ 290 and 'otm' not in self._reproMode: 291 raise SystemExit("The CU needs either a xferlog or a date range " 292 "specified!") 293 294 if 'otm' in self._reproMode and not self._xferlog and not self.casuDisk: 295 raise SystemExit("Either a transfer log or a directory at CASU " 296 "need to be specified for reprocessed data " 297 "transfers.") 298 299 if 1 in self._cuNums: 300 if self._numThreads < 10: 301 self._numThreads += 10 302 if self._ffluOnly or self._xferlog: 303 newThreads = 11 304 elif self._numThreads % 10 * self._numThreads // 10 > 4: 305 newThreads = (22 if self._endDate != self._beginDate else 12) 306 else: 307 newThreads = self._numThreads 308 if self._numThreads != newThreads: 309 IngestLogger.addMessage( 310 "<Warning> Too many threads chosen: %d => Resetting to %d" \ 311 % (self._numThreads, newThreads)) 312 self._numThreads = newThreads 313 314 if self._xferlog: 315 self._xferlog = os.path.abspath(self._xferlog)
316 317 #-------------------------------------------------------------------------- 318
319 - def _onRun(self):
320 """ Run each CU requested. 321 """ 322 self._cuRuns = [] 323 self._cuedFiles = defaultdict(bool) 324 cuEventIDs = defaultdict(list) 325 if 1 in self._cuNums: 326 if self._writeMfID: 327 # only write MultiframeIDs 328 IngestLogger.addMessage("Starting updating multiframeIDs.") 329 fitsDirs = self.createDailyLists(self._xferlog) 330 self.updateMfIDs(fitsDirs.dailyFileListDict) 331 IngestLogger.addMessage("Finished updating multiframeIDs.") 332 # update cu0 log 333 cuEventIDs[1].append(self._cuEventID) 334 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[1])) 335 IngestLogger.append(cu0Log.pathName) 336 else: 337 fileDict = defaultdict(list) 338 if self._numThreads == 0: 339 self._numThreads = SystemConstants.scpThreads 340 # prepare cu1 date list 341 if self._xferlog and 'otm' not in self._reproMode: 342 fileDict, notAvailableFiles = self.parseXferList() 343 if notAvailableFiles: 344 IngestLogger.addMessage( 345 "<Warning> The following files are not available at " 346 "CASU:") 347 for fileName in notAvailableFiles: 348 IngestLogger.addMessage(fileName) 349 350 mjdList = [int(mxTime.strptime(str(date), "%Y%m%d").mjd) 351 for date in fileDict if fileDict[date]] 352 outMessageSuf = ''.join([" from ", self._xferlog]) 353 else: 354 startMJD = int(mxTime.strptime(self._beginDate, 355 "%Y%m%d").mjd) 356 endMJD = int(mxTime.strptime(self._endDate, 357 "%Y%m%d").mjd) 358 359 # create date list 360 if self._reproMode.startswith(('p:', 's:')): 361 CASUQueries.sysc = self.sysc 362 presufCasu = (self._reproMode 363 if self._reproMode.startswith(('p', 's')) 364 else '') 365 casuDateDict = CASUQueries.getCasuDateDirs( 366 dirList=[self.casuDisk] if self.casuDisk else [], 367 UKLight=self._UKLight, 368 mode="STR", presuf=presufCasu) 369 reproFix = self._reproMode.partition("::")[2] 370 mjdList = [] 371 for dateDir in casuDateDict: 372 if reproFix in dateDir: 373 date = int(mxTime.strptime(dateDir.replace( 374 reproFix, ''), "%Y%m%d").mjd) 375 if date in range(startMJD, endMJD + 1): 376 mjdList.append(date) 377 elif self.sysc.isGES(): 378 mjdList = self.getGesMJDs(startMJD, endMJD) 379 elif 'otm' not in self._reproMode and not self.sysc.isGES(): 380 # get available dates at CASU 381 CASUQueries.sysc = self.sysc 382 casuDateDict = CASUQueries.getCasuDateDirs( 383 dirList=[self.casuDisk] if self.casuDisk else [], 384 UKLight=self._UKLight, mode="MJD") 385 mjdList = [x for x in range(startMJD, endMJD + 1) 386 if x in casuDateDict] 387 else: 388 mjdList = range(startMJD, endMJD + 1) 389 outMessageSuf = '' 390 391 if not mjdList: 392 IngestLogger.addMessage("No data to be transferred!") 393 394 # prepare cu1 transfers 395 if 'otm' in self._reproMode: 396 IngestLogger.addMessage(''.join([ 397 "Preparing CU run for Cu1 for re-processed data in ", 398 self.casuDisk])) 399 self._cuRuns.append( 400 Cu1(curator=self.curator, 401 database=self.database, 402 versionStr=self._versionStr, 403 casuDisk=self.casuDisk, 404 janet=self._janet, 405 deprecation=self._deprecation, 406 xferList=self._xferlog, 407 isTrialRun=self._isTrialRun, 408 forceXfer=self._forceXfer, 409 reproMode=self._reproMode, 410 comment=self.comment)) 411 IngestLogger.addMessage("CUEventID: %s" % 412 self._cuRuns[-1]._cuEventID) 413 else: 414 dateList = [] 415 for mjd in sorted(mjdList): 416 date = mxTime.DateTimeFromMJD(mjd) 417 dateStr = "%04d%02d%02d" % (date.year, date.month, 418 date.day) 419 dateList.append(dateStr) 420 421 # Split the date list into daily threads 422 dateSubLists = self.splitDateList(dateList) 423 424 for dateGroup in dateSubLists: 425 xferList = [] 426 for date in dateGroup: 427 xferList.extend(fileDict[date]) 428 IngestLogger.addMessage( 429 "Preparing CU run for Cu1 for %r %s" % ( 430 dateGroup, outMessageSuf)) 431 self._cuRuns.append( 432 Cu1(curator=self.curator, 433 database=self.database, 434 runDate=dateGroup, 435 versionStr=self._versionStr, 436 casuDisk=self.casuDisk, 437 timeCheck=self._timeCheck, 438 numThreads=self._numThreads, 439 janet=self._janet, 440 deprecation=self._deprecation, 441 xferList=xferList, 442 isTrialRun=self._isTrialRun, 443 forceXfer=self._forceXfer, 444 reproMode=self._reproMode, 445 ffluOnly=self._ffluOnly, 446 onlyHeader=self._getHeader, 447 gesOptions=self._gesOptions, 448 comment=self.comment)) 449 450 dateVersStr = ','.join([ 451 ("%s_v%s") % (dateStr, self._versionStr) 452 for dateStr in dateGroup]) 453 self.updateComment(dateVersStr, cuNum=1) 454 IngestLogger.addMessage( 455 "Transferring from %s" % 456 self.sysc.scpServer(light=not self._janet)) 457 IngestLogger.addMessage("CUEventID: %s" % 458 self._cuRuns[-1]._cuEventID) 459 # update cu0 log 460 for _cu in self._cuRuns: 461 cuEventIDs[1].append(_cu._cuEventID) 462 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[1])) 463 IngestLogger.append(cu0Log.pathName) 464 # run cu1s 465 for _cu in self._cuRuns: 466 _cu.run() 467 if 'mv' in self._deprecation and not self._isTrialRun: 468 self.deprecateJpgs(_cu.deprFileList) 469 if _cu.allXferLogs and not self._isTrialRun \ 470 and 'otm' not in self._reproMode \ 471 and not self.sysc.isGES(): 472 self.updateMfIDs(_cu.allXferLogs) 473 del _cu 474 IngestLogger.addMessage("Finished CU1.") 475 476 if 2 in self._cuNums: 477 cu2OutPath = ('' if self._outPath == self.sysc.dbSharePath() 478 else self._outPath) 479 fitsDirs = self.createDailyLists(self._xferlog) 480 481 if self._xferlog: 482 _dateList = sorted(fitsDirs.dailyFileListDict.keys()) 483 self._beginDate = _dateList[0].partition("_v")[0] 484 self._endDate = _dateList[-1].partition("_v")[0] 485 self._versionStr = _dateList[0].partition("_v")[2] 486 startMJD = int(mxTime.strptime(self._beginDate, "%Y%m%d").mjd) 487 endMJD = int(mxTime.strptime(self._endDate, "%Y%m%d").mjd) 488 489 for mjd in xrange(startMJD, endMJD + 1): 490 date = mxTime.DateTimeFromMJD(mjd) 491 dateStr = "%04d%02d%02d" % (date.year, date.month, date.day) 492 dateVersStr = "%s_v%s" % (dateStr, self._versionStr) 493 if dateVersStr in fitsDirs.invFitsDateDict: 494 cu02edFlagList = [] 495 try: 496 cu02edFlagList = [os.path.join( 497 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 498 "CU02ED_" + self._database.rpartition('.')[2])] 499 except AttributeError: 500 for path in fitsDirs.invFitsDateDict[dateVersStr]: 501 cu02edFlagList.append(os.path.join( 502 path, dateVersStr, 503 "CU02ED_" + self._database.rpartition('.')[2])) 504 for cu02edFlag in cu02edFlagList: 505 if os.path.exists(cu02edFlag): 506 os.remove(cu02edFlag) 507 IngestLogger.addMessage( 508 "Preparing CU run for Cu2 for " + dateStr) 509 510 xferlog = (fitsDirs.dailyFileListDict[dateVersStr][0] 511 if self._xferlog else None) 512 513 self._cuRuns.append( 514 Cu2(curator=self.curator, 515 database=self.database, 516 beginDate=int(dateStr), 517 endDate=int(dateStr), 518 versionStr=self._versionStr, 519 xferLog=xferlog, 520 subDir=self._subDir, 521 jpgPath=cu2OutPath, 522 forceMosaic=self._forceMosaic, 523 reDo=self._ReDo, 524 numThreads=self._numThreads, 525 deprecation=self._deprecation, 526 isTrialRun=self._isTrialRun, 527 comment=self.comment)) 528 529 self.updateComment(dateVersStr, cuNum=2) 530 531 # update cu0 log 532 cuEventIDs[2] = [_cu._cuEventID for _cu in self._cuRuns] 533 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[2])) 534 IngestLogger.append(cu0Log.pathName) 535 # run cu2s 536 for _cu in self._cuRuns: 537 _cu.run() 538 IngestLogger.addMessage("Finished CU2.") 539 540 if 3 in self._cuNums: 541 fitsDirs = self.createDailyLists(self._xferlog) 542 for dateKey in sorted(fitsDirs.dailyFileListDict): 543 for filePath in fitsDirs.dailyFileListDict[dateKey]: 544 listFile = File(filePath) 545 dateVersStr = listFile.root[listFile.root.find("_v") - 8:] 546 IngestLogger.addMessage( 547 "Preparing CU run for Cu3 for " + dateVersStr) 548 549 self._cuRuns.append( 550 Cu3(curator=self.curator, 551 database=self.database, 552 csvSharePath=self._outPath, 553 programmeList=self._programmeList, 554 noMfID=self._noMfID, 555 ReDo=self._ReDo, 556 onlyMfID=self._writeMfID, 557 excludeFiles=self._excludeData, 558 isTrialRun=self._isTrialRun, 559 keepWorkDir=self._keepWorkDir, 560 forceComp=self._forceXfer, 561 isWfauProduct=("products" in self._subDir), 562 xferlog=filePath, 563 comment=self.comment)) 564 self.updateComment(dateVersStr, cuNum=3) 565 # update cu0 log 566 cuEventIDs[3] = [_cu._cuEventID for _cu in self._cuRuns] 567 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[3])) 568 IngestLogger.append(cu0Log.pathName) 569 # run cu3s 570 for _cu in self._cuRuns: 571 if not self._writeMfID: 572 _cu.run() 573 self._cuedFiles[_cu._cuEventID] = _cu.cuedFileExists 574 IngestLogger.addMessage("Finished CU3.") 575 576 self._cuRuns = [] 577 self._ingLogList = [] 578 if 4 in self._cuNums: 579 fitsDirs = self.createDailyLists(self._xferlog) 580 for dateKey in sorted(fitsDirs.dailyFileListDict): 581 for filePath in fitsDirs.dailyFileListDict[dateKey]: 582 listFile = File(filePath) 583 dateVersStr = listFile.root[listFile.root.find("_v") - 8:] 584 cu03edFlagList = [] 585 flatFileIngFlagList = [] 586 parentArchive = (self.database.rpartition('.')[2] 587 if "VSA_v1_3" in self.database else 588 "VSA" if sysc.isVSA() else 589 ("OSA" if sysc.isOSA() else "WSA")) 590 try: 591 flatFileIngFlagList = [os.path.join( 592 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 593 "FFFIED_" + parentArchive)] 594 cu03edFlagList = [os.path.join( 595 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 596 "CU03ED_" + parentArchive)] 597 except AttributeError: 598 for path in fitsDirs.invFitsDateDict[dateVersStr]: 599 flatFileIngFlagList.append(os.path.join( 600 path, dateVersStr, 601 "FFFIED_" + parentArchive)) 602 cu03edFlagList.append(os.path.join( 603 path, dateVersStr, 604 "CU03ED_" + parentArchive)) 605 606 if not self._forceXfer \ 607 and not all(os.path.exists(fffiedFlag) 608 for fffiedFlag in flatFileIngFlagList): 609 610 IngestLogger.addMessage("<IMPORTANT> " 611 "FinaliseFlatFileIngest hasn't been run for %s. " 612 "For WFAU products just re-run this command with the" 613 " -f/--force option. For new CASU products you must " 614 "first run FinaliseFlatFileIngest." % dateVersStr) 615 616 elif self._forceXfer or all(os.path.exists(cu03edFlag) 617 for cu03edFlag in cu03edFlagList): 618 IngestLogger.addMessage( 619 "Preparing CU run for Cu4 for " + dateVersStr) 620 621 self._cuRuns.append( 622 Cu4(curator=self.curator, 623 database=self.database, 624 csvSharePath=self._outPath, 625 programmeList=self._programmeList, 626 progOrder=self._progOrder, 627 detectionSubset=self._detectionSubset, 628 excludeFiles=self._excludeData, 629 isTrialRun=self._isTrialRun, 630 ReDo=self._ReDo, 631 keepWorkDir=self._keepWorkDir, 632 xferlog=filePath, 633 comment=self.comment)) 634 self.updateComment(dateVersStr, cuNum=4) 635 else: 636 IngestLogger.addMessage( 637 "No data available for Cu4 for " + dateVersStr) 638 639 if self._numThreads == 0: 640 self._numThreads = 1 641 # update cu0 log 642 cuEventIDs[4] = [_cu._cuEventID for _cu in self._cuRuns] 643 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[4])) 644 IngestLogger.append(cu0Log.pathName) 645 # run cu4s 646 ThreadCU.numprocessors = self._numThreads 647 threadCUs = ThreadCU() 648 resList = threadCUs.run(self._cuRuns) 649 del threadCUs 650 651 IngestLogger.addMessage("Finished CU4.") 652 653 IngestLogger.addMessage("Overview of all the ingests to run:") 654 for cuNum in cuEventIDs: 655 if cuNum == 4: 656 for outPrefix, cuEvID in resList: 657 self._ingLogList.append(( 658 os.path.join(self.dbMntPath, outPrefix), 659 self._cuedFiles[cuEvID])) 660 else: 661 for cuEventID in cuEventIDs[cuNum]: 662 outPrefix = self.createFilePrefix( 663 cuNum, cuEventID, self._hostName, 664 self.database.rpartition('.')[2]) 665 self._ingLogList.append(( 666 os.path.join(self.dbMntPath, outPrefix), 667 self._cuedFiles[cuEventID])) 668 for entry in sorted(set(self._ingLogList)): 669 forceFlag = (" -F" if entry[1] else '') 670 IngestLogger.addMessage("./IngestCUFiles.py -i -r %s%s %s" % ( 671 os.path.join(self.dbMntPath, entry[0] + ".log"), forceFlag, 672 self.database)) 673 674 # update the cu0 log 675 IngestLogger.append(cu0Log.pathName)
676 677 #-------------------------------------------------------------------------- 678 679 @staticmethod
680 - def chunks(l, n):
681 """ Yield successive n-sized chunks from l. 682 """ 683 if n == 0: 684 n = 1 685 for i in xrange(0, len(l), n): 686 yield l[i:i+n]
687 688 #-------------------------------------------------------------------------- 689 690 @staticmethod
691 - def getCasuSizeDict(dateList, UKLight):
692 """ Get a rated dictionary of CASU date dirs ordered by size. 693 """ 694 casuDict = utils.Ratings() 695 for date in dateList: 696 scpCasuDirs = CASUQueries.getCasuStagingDirs( 697 UKLight, "processed/%s" % date) 698 for direc in scpCasuDirs: 699 casuDict[date] = -int(re.findall( 700 r'\d+', CASUQueries.getDirSizes( 701 direc, UKLight, 702 returnList=True, dateRE='')[0][0])[0]) 703 return casuDict
704 705 #-------------------------------------------------------------------------- 706
707 - def getGesMJDs(self, startMJD, endMJD):
708 """ Get the list of MJDs for the GES transfer. 709 """ 710 if self._gesOptions.endswith(('a','s')): 711 mjdList = range(startMJD, startMJD + 1) 712 else: 713 CASUQueries.sysc = self.sysc 714 mjdList = [] 715 casuSubDir = ("uves/proc_%s" \ 716 if self._gesOptions.startswith('u') 717 else "proccasu_%s") 718 casuDirs = CASUQueries.getCasuStagingDirs( 719 UKLight=self._UKLight, 720 casuSubDirPath=casuSubDir % self._versionStr) 721 722 for cd in casuDirs: 723 for line in CASUQueries.getDirListing(cd, self._UKLight, "-p"): 724 if line.strip().startswith('d'): 725 subDir = line.strip().rpartition(' ')[2].replace('/', '') 726 if subDir.isdigit(): 727 date = int(mxTime.strptime(subDir, "%Y%m%d").mjd) 728 if date in range(startMJD, endMJD + 1): 729 mjdList.append(date) 730 return mjdList
731 732 #-------------------------------------------------------------------------- 733
734 - def splitDateList(self, dateList):
735 """ Split the dateList into multiple similar sized threads 736 """ 737 CASUQueries.sysc = self.sysc 738 try: 739 if len(dateList) < self._numThreads // 10: 740 newThreads = len(dateList) * 10 + self._numThreads % 10 741 IngestLogger.addMessage( 742 "<Warning> Short date list. Too many threads chosen:" 743 " %d => Resetting to %d" % ( 744 self._numThreads, newThreads)) 745 self._numThreads = newThreads 746 if (self._numThreads // 10) == 1: 747 newDateList = sorted(dateList) 748 else: 749 casuDict = self.getCasuSizeDict(dateList, self._UKLight) 750 newDateList = casuDict.keys() 751 752 except Exception as error: 753 # Log the details of the exception: 754 IngestLogger.addExceptionDetails(error) 755 newDateList = dateList 756 757 return self.chunks(newDateList, self._numThreads // 10)
758 759 #-------------------------------------------------------------------------- 760
761 - def createDailyLists(self, xferLog=None):
762 """ 763 Create the daily filelists. 764 765 @param xferLog: Logfile containing files to be ingested. 766 @type xferLog: str 767 768 @return: A FitsList object holding daily file information. 769 @rtype: FitsList 770 771 """ 772 fitsDirs = fits.FitsList(self.sysc, prefix="cu0_") 773 fitsDirs.createFitsDateDict(ingestDirectory=self._subDir, 774 trafoLog=xferLog) 775 IngestLogger.addMessage("Creating daily filelists.") 776 fitsDirs.createDailyFileLists(self._workPath, int(self._beginDate), 777 int(self._endDate), self._versionStr, 778 xferLog) 779 return fitsDirs
780 781 #-------------------------------------------------------------------------- 782
783 - def deprecateJpgs(self, deprFitsList):
784 """ 785 Deprecate jpgs of deprecated FITS files. 786 787 @param deprFitsList: List of deprecated FITS files. 788 @type deprFitsList: list 789 790 @return: A FitsList object holding daily file information. 791 @rtype: FitsList 792 793 """ 794 # path to deprecated jpgs 795 deprTime = utils.makeTimeStamp().split()[0].replace('-', '') 796 spacePerDisk = utils.getDiskSpace(self.sysc.deprecatedDataStorage) 797 deprJpgDir = os.path.join(utils.getNextDisk(self.sysc, spacePerDisk), 798 '%s_%s' % (self.sysc.deprecatedComprImDir, deprTime)) 799 800 IngestLogger.addMessage("Deprecating JPGs to %s" % deprJpgDir) 801 counter = 0 802 self._connectToDb() 803 dateList = \ 804 set(File(fitsFileName).subdir for fitsFileName in deprFitsList) 805 806 mfIDDict = {} 807 for dateStr in dateList: 808 results = self.archive.query( 809 "fileName, multiframeID", 810 "Multiframe", 811 "fileName LIKE'%" + dateStr + "%'") 812 for fileName, multiframeID in results: 813 if fileName.rpartition(':')[2] in deprFitsList: 814 mfIDDict[fileName] = str(multiframeID) 815 816 if mfIDDict: 817 oldJpgFiles = self.archive.query( 818 selectStr="multiframeID, extNum, compFile", 819 fromStr="MultiframeDetector", 820 whereStr="multiframeID IN (%s)" % ','.join(mfIDDict.values())) 821 822 for dateStr in dateList: 823 deprJpgPath = os.path.join(deprJpgDir, dateStr) 824 utils.ensureDirExist(deprJpgPath) 825 for mfID, extNum, jpgFileName in oldJpgFiles: 826 jpgFile = File(jpgFileName.replace( 827 self.sysc.pixelServerHostName, '')) 828 if dateStr in jpgFile.name \ 829 and os.path.exists(jpgFile.name): 830 deprJpgName = os.path.join(deprJpgPath, jpgFile.base) 831 if jpgFile.name != deprJpgName: 832 shutil.copy2(jpgFile.name, deprJpgName) 833 os.remove(jpgFile.name) 834 else: 835 IngestLogger.addMessage( 836 "Deprecated file %s exists already!" % 837 deprJpgName) 838 compEntry = \ 839 "'%s%s'" % (self.sysc.pixelServerHostName, 840 deprJpgName) 841 counter += self.archive.update("MultiframeDetector", 842 [('compFile', compEntry)], 843 "multiframeID=%s AND extNum=%s" % (mfID, extNum)) 844 845 self._disconnectFromDb() 846 IngestLogger.addMessage( 847 "%s JPGs deprecated to %s." % (counter, deprJpgDir))
848 849 #-------------------------------------------------------------------------- 850
851 - def parseXferList(self):
852 """ Parse the transfer list and check for files. 853 """ 854 CASUQueries.sysc = self.sysc 855 casuDateDict = {} 856 casuFileSystem = CASUQueries.getCasuStagingDirs(self._janet) 857 if self.casuDisk and not "reprocessed" in self.casuDisk: 858 if self.casuDisk: 859 fileSystem = [self.casuDisk] 860 else: 861 fileSystem = casuFileSystem 862 casuDateDict = CASUQueries.getCasuDateDirs(fileSystem, self._janet) 863 elif not self.casuDisk: 864 casuDateDict = CASUQueries.getCasuDateDirs( 865 casuFileSystem, self._janet) 866 867 notAvailFiles = [] 868 xferDict = defaultdict(list) 869 for line in file(self._xferlog): 870 entry = line.rstrip() 871 # filename is either dateDir/fileName or just fileName 872 try: 873 if '/' in entry: 874 dateStr, fileName = entry.split('/') 875 else: 876 pattern = CASUQueries.re_digits 877 dateStr = pattern.search(entry).group() 878 fileName = entry 879 880 if self._beginDate <= dateStr <= self._endDate: 881 if dateStr in casuDateDict: 882 basePath = casuDateDict[dateStr] 883 if not isinstance(basePath, basestring) \ 884 and len(basePath) > 1: 885 raise AttributeError 886 else: 887 basePath = casuDateDict[dateStr][0] 888 else: 889 basePath = self.casuDisk 890 xferDict[dateStr].append( 891 os.path.join(basePath, dateStr, fileName)) 892 except KeyError: 893 notAvailFiles.append(os.path.join(dateStr, fileName)) 894 except AttributeError: 895 if isinstance(basePath, (list, tuple)) \ 896 and not isinstance(basePath, basestring): 897 IngestLogger.addMessage( 898 "Date %s found in multiple locations: %r" % ( 899 dateStr, basePath)) 900 elif not basePath: 901 IngestLogger.addMessage( 902 "Date not found on CASU disks: %r" % casuFileSystem) 903 notAvailFiles.append(os.path.join(dateStr, fileName)) 904 905 return xferDict, notAvailFiles
906 907 #-------------------------------------------------------------------------- 908
909 - def updateComment(self, dateVersStr, cuNum):
910 """ 911 Update the ArchiveCurationHistory comment. 912 913 @param dateVersStr: The date directory name. 914 @type dateVersStr: str 915 @param cuNum: Curation task number. 916 @type cuNum: int 917 918 """ 919 if not self._isTrialRun: 920 IngestLogger.addMessage( 921 "Updating archive curation history comment") 922 reComment = re.compile(r'Running CU[0-4]\Z') 923 reCommentCu0 = re.compile(r'Running CU0\[') 924 if reComment.search(self.comment) \ 925 or reCommentCu0.search(self.comment): 926 self._connectToDb() 927 oldComment = self.archive.query("comment", 928 "ArchiveCurationHistory", whereStr="cuEventID=%s" % 929 self._cuRuns[-1]._cuEventID, firstOnly=True) 930 931 if reComment.search(oldComment) \ 932 or reCommentCu0.search(oldComment): 933 if reComment.search(oldComment): 934 _prefix, _comment = \ 935 oldComment.partition("Running CU")[:2] 936 newComment = ''.join([_comment, "0/CU", str(cuNum), 937 " ", dateVersStr]) 938 else: 939 _prefix, _comment, _suffix = \ 940 oldComment.partition("Running CU0") 941 newComment = ''.join([_comment, "/CU", str(cuNum), 942 " ", dateVersStr, _suffix]) 943 self._updateACHComment(_prefix + newComment, 944 self._cuRuns[-1]._cuEventID) 945 self._cuRuns[-1].comment = newComment 946 self._disconnectFromDb()
947 948 #-------------------------------------------------------------------------- 949
950 - def updateFlatFileLookUp(self, csvFileName):
951 """ 952 Update the FlatFileLookUp table. 953 954 @param csvFileName: CSV file where data is written to. 955 @type csvFileName: str 956 957 """ 958 _ingestSchema = "WSA_CurationLogsSchema.sql" 959 _tableName = 'FlatFileLookUp' 960 fileList = [x[0] for x in self._fileList] 961 dateList = sorted(set(os.path.basename(os.path.dirname(entry)) 962 for entry in fileList)) 963 for dateVersStr in dateList: 964 # check for existing data in FlatFileLookUp 965 dataDict = ({} if self._ReDo and self._forceXfer \ 966 and self._writeMfID else \ 967 self.getMultiframeIDs(dateVersStr)) 968 # get next Multiframe ID 969 mfidName = dbc.multiframeUIDAttributeName() 970 _nextMfID = self._getNextID(mfidName, "FlatFileLookUp", 971 where="%s<1000000000000" % mfidName) 972 _nextWfauMfID = max(self._getNextID(mfidName, "FlatFileLookUp", 973 where="%s>1000000000000" % mfidName), 974 1000000000001) 975 # write the data into a csv file 976 nrCasu = 0 977 nrWfau = 0 978 csvList = [] 979 directUpdate = False 980 for entry in sorted(fileList): 981 if entry.endswith(self.sysc.mefType): 982 fName = File(entry) 983 partName = os.path.join(fName.subdir, fName.base) 984 if partName in dataDict: 985 self.archive.update("FlatFileLookUp", 986 [('cuEventID', self._cuEventID), ('fileName', 987 "'%s%s'" % (self.sysc.pixelServerHostName, entry))], 988 "multiframeID=%s" % dataDict[partName]) 989 self.archive.commitTransaction() 990 nrWfau += 1 991 directUpdate = True 992 elif entry not in dataDict: 993 if os.path.basename(entry).startswith("e20"): 994 newMfID = _nextWfauMfID + nrWfau 995 nrWfau += 1 996 else: 997 newMfID = _nextMfID + nrCasu 998 nrCasu += 1 999 1000 csvList.append( 1001 (newMfID, self._cuEventID, 1002 os.path.basename(os.path.dirname(entry)), 1003 '%s%s' % (self.sysc.pixelServerHostName, entry))) 1004 1005 if csvList: # ingest the data 1006 CSV.File(csvFileName, 'w').writelines(csvList) 1007 time.sleep(20) 1008 try: 1009 ingester = Ingester(self.archive, 1010 schema.parseTables(_ingestSchema, [_tableName])) 1011 except schema.MismatchError as error: 1012 raise IngCuSession.IngCuError(error) 1013 1014 IngestLogger.addMessage("Ingesting from %s" % csvFileName) 1015 ingester.ingestTable(_tableName, csvFileName, 1016 isCsv=True, deleteFile=False) 1017 ingester.commitTransaction() 1018 newPath = self.sysc.ingestedFilesPath(self.archive.sharePath()) 1019 newLocation = os.path.join( 1020 newPath, os.path.basename(csvFileName)) 1021 os.rename(ingester.sharePath(csvFileName), 1022 newLocation) 1023 del ingester 1024 time.sleep(20) 1025 1026 IngestLogger.addMessage( 1027 "FlatFileLookUp table updated with %s listed files." % len(csvList)) 1028 elif directUpdate: 1029 self.archive.commitTransaction() 1030 IngestLogger.addMessage( 1031 "FlatFileLookUp table updated with %s files." % nrWfau) 1032 else: 1033 IngestLogger.addMessage("No new data to update.")
1034 1035 #-------------------------------------------------------------------------- 1036
1037 - def updateMfIDs(self, dailyFileListDict):
1038 """ 1039 Write multiframe IDs into FlatFileLookUp. 1040 1041 @param dailyFileListDict: Dictionary containing daily file lists. 1042 @type dailyFileListDict: dict 1043 1044 """ 1045 IngestLogger.addMessage("Updating multiframeIDs in FlatFileLookUp.") 1046 try: 1047 self._cuNum = 1 1048 self._connectToDb() 1049 self._cuEventID = \ 1050 self._getNextCuEventID() if not self._isTrialRun else -9999 1051 1052 # output file prefix 1053 self._outPrefix = self.createFilePrefix( 1054 self._cuNum, self._cuEventID, self._hostName, 1055 self.archive.database) 1056 # update the FlatFileLookUp table 1057 csvFileName = self.archive.sharePath( 1058 "%s_%s.csv" % (self._outPrefix, 'FlatFileLookUp')) 1059 1060 for dateKey in sorted(dailyFileListDict): 1061 for filePath in dailyFileListDict[dateKey]: 1062 self.createFileList(filePath) 1063 if not self._isTrialRun: 1064 self.updateFlatFileLookUp(csvFileName) 1065 else: 1066 IngestLogger.addMessage( 1067 "<TEST> Update for " + filePath) 1068 finally: 1069 self._disconnectFromDb()
1070 1071 #------------------------------------------------------------------------------ 1072 # Entry point for DataBuilder 1073 1074 if __name__ == "__main__": 1075 # Define additional command-line options 1076 CLI.progArgs["comment"] = "Running CU0" 1077 CLI.progOpts += [ 1078 CLI.Option('b', "begin", 1079 "first date to process, eg. 20050101", 1080 "DATE", str(IngCuSession.beginDateDef)), 1081 CLI.Option('e', "end", 1082 "last date to process, eg. 20050131", 1083 "DATE", str(IngCuSession.endDateDef)), 1084 CLI.Option('f', "force", 1085 "CU1: transfer regardless any checks; " 1086 "CU3: compress uncompressed FITS files; " 1087 "CU4: process regardless any FlatFileIngest/CU3 checks."), 1088 CLI.Option('j', "janet", 1089 "CU1: use JANET instead of UKLight"), 1090 CLI.Option('k', "disklist", 1091 "CU2: list of RAID disk paths including the archive name, " 1092 "eg. '/disk01/wsa,/disk02/wsa' (depending on given archive).", 1093 "LIST", ''), 1094 CLI.Option('l', "xferlog", 1095 "xferlog of files to be processed", 1096 "LOGFILE", ''), 1097 CLI.Option('m', "writemfid", 1098 "CU1: only update MfIDs in FlatFileLookUp table"), 1099 CLI.Option('n', "nomfid", 1100 "CU1/3: don't write new MfID into FITS file"), 1101 CLI.Option('o', "outpath", 1102 "directory where the output data is written to, " 1103 "defaults to /mnt of given database", "PATH", ''), 1104 CLI.Option('p', "progorder", 1105 "CU4: the order of processing the programmes (accepts " 1106 "keywords 'other','others' for , all programmes not " 1107 "explicitely named)", "LIST", ''), 1108 CLI.Option('r', "redo", 1109 "CU3: enable overwriting of existing MfIDs, use only on a " 1110 "single day basis! CU2: redo existing jpegs. CU4: Redo " 1111 "monthly detection schema."), 1112 CLI.Option('s', "subdir", 1113 "CU2/3: subdirectory containing FITS date directories", 1114 "DIR", SystemConstants.fitsDir), 1115 CLI.Option('w', "casudisk", 1116 "CU1: data disk at CASU (depending on given archive)", 1117 "PATH"), 1118 CLI.Option('x', "cunums", 1119 "csv list of CU numbers to run", 1120 "LIST", ""), 1121 CLI.Option('y', "threads", 1122 "CU1: number of threads: [a]b (a: daywise threads, " 1123 "b: scp threads); CU2: number of processors to use; " 1124 "CU4: number of processors to use (daywise split)", 1125 "INT", '0'), 1126 CLI.Option('v', "version", 1127 "version number of the data", 1128 "STR", ''), 1129 CLI.Option('F', "ffluonly", 1130 "CU1: don't transfer but update FlatfileLookup."), 1131 CLI.Option('G', "gesoptions", 1132 "CU1: GES instrument and type, ie " 1133 "g[iraffe]/u[ves],n[ightly],s[tacked],a[rchived]", 1134 "STR", '', isValOK=lambda x: ',' in x \ 1135 and x.startswith(('g','u')) and x.endswith(('a', 'n', 's'))), 1136 CLI.Option('H', "getheader", 1137 "Only transfer FITS headers instead of full file."), 1138 CLI.Option('K', "keepwork", 1139 "Don't remove working dir."), 1140 CLI.Option('M', "mosaic", 1141 "CU2: force making mosaics. Don't use it for large files!"), 1142 CLI.Option('P', "programmes", "CU3/CU4: only process data for given " 1143 "programmes (accepts keywords 'all', 'ns' (non-survey), " 1144 "'ukidss' (all 5 main surveys)); one programme prefixed " 1145 "with 'x-' excludes this programme.", "LIST", 'all'), 1146 CLI.Option('R', "repromode", 1147 "mode for reprocessed data: 'otm' (one-to-many dirs) " 1148 "or 's::suffix' (CASU datedir suffix)." 1149 "or 'v::version' (supply version if not given at CASU)", 1150 "STR", ''), 1151 CLI.Option('S', "subset", "CU4: process subset of [Raw table," 1152 " Astrometry table, Photometry table]", "LIST", 1153 "Raw,Astrometry,Photometry"), 1154 CLI.Option('T', "timecheck", 1155 "CU1: check files against CASU file timestamps"), 1156 CLI.Option('X', "exclude", 1157 "CU3/4: exclude given files/file patterns from processing", 1158 "LIST", ""), 1159 CLI.Option('Z', "deprecation", 1160 "CU1: modus for deprecating files: 'mv', 'rm', 'ip' " 1161 "(leave in place); CU2: 'modus,deprecated FITS file list'", 1162 "MODE", '')] 1163 1164 cli = CLI(DataBuilder, "$Revision: 10233 $") 1165 IngestLogger.addMessage(cli.getProgDetails()) 1166 sysc = SystemConstants(cli.getArg("database").split('.')[-1]) 1167 DataBuilder.casuDisk = cli.getOpt("casudisk") 1168 DataBuilder.diskList = (cli.getOpt("disklist") if cli.getOpt("disklist") 1169 else sysc.availableRaidFileSystem()) 1170 1171 beginDate, endDate = sysc.obsCal.getDatesFromInput(cli.getOpt("begin"), 1172 cli.getOpt("end")) 1173 if not sysc.isGES() and int(endDate) < int(beginDate): 1174 raise SystemExit("<ERROR> The given end date %s lies before the begin " 1175 "date %s." % (cli.getOpt("end"), cli.getOpt("begin"))) 1176 1177 builder = DataBuilder(cli.getOpt("cunums"), 1178 cli.getOpt("curator"), 1179 cli.getArg("database"), 1180 cli.getOpt("begin"), 1181 cli.getOpt("end"), 1182 cli.getOpt("version"), 1183 cli.getOpt("outpath"), 1184 cli.getOpt("nomfid"), 1185 cli.getOpt("redo"), 1186 cli.getOpt("programmes"), 1187 cli.getOpt("progorder"), 1188 cli.getOpt("test"), 1189 cli.getOpt("xferlog"), 1190 cli.getOpt("timecheck"), 1191 cli.getOpt("threads"), 1192 cli.getOpt("writemfid"), 1193 cli.getOpt("subdir"), 1194 cli.getOpt("janet"), 1195 cli.getOpt("force"), 1196 cli.getOpt("exclude"), 1197 cli.getOpt("deprecation"), 1198 cli.getOpt("repromode"), 1199 cli.getOpt("mosaic"), 1200 cli.getOpt("subset"), 1201 cli.getOpt("keepwork"), 1202 cli.getOpt("ffluonly"), 1203 cli.getOpt("getheader"), 1204 cli.getOpt("gesoptions"), 1205 cli.getArg("comment")) 1206 builder.run() 1207 1208 #------------------------------------------------------------------------------ 1209 # Change log: 1210 # 1211 # 30-Nov-2006, ETWS: First version 1212 # 9-Jan-2007, ETWS: Included processing in daily batches 1213 # 30-Jan-2007, ETWS: Bug fixes 1214 # 2-Feb-2007, ETWS: Included programme order for CU4 1215 # 5-Feb-2007, ETWS: Moved Ratings class to wsatools.EnhancedDictionary; 1216 # fixed usage of transferlogs 1217 # 7-Feb-2007, ETWS: ...and the dictionary functions are on the move again... 1218 # 12-Mar-2007, ETWS: Included MultiframeID update for CU1. 1219 # 14-Mar-2007, ETWS: Included ArchiveCurationHistory update for CU1. 1220 # 27-Apr-2007, ETWS: Included CU1 downloads. 1221 # Included number of download threads for CU1. 1222 # 3-May-2007, ETWS: Included CU2 and upgraded other CUs. 1223 # 11-May-2007, ETWS: Included UKLight/JANET switch. 1224 # 15-May-2007, ETWS: Included date into comment/log. 1225 # 24-May-2007, ETWS: Fixed bug in file type count. 1226 # 08-Jun-2007, ETWS: Included handling of non standard file pathes 1227 # in transfer logs; updated documentation. 1228 # 21-Jun-2007, ETWS: Fixed bug in processing from transferlogs. 1229 # 22-Jun-2007, ETWS: Included exclusion of files without 4 extensions. 1230 # 27-Aug-2007, ETWS: Included transfer method switch; fixed bugs. 1231 # 13-Nov-2007, ETWS: Updated log file writing. 1232 # 16-Nov-2007, NJC: Changed wsaMaxHDUs for the case of mosaics. 1233 # 27-Nov-2007, ETWS: Enhanced date range usage for CU1 and 2. 1234 # 28-Nov-2007, ETWS: Updated help info. 1235 # 23-Jan-2008, ETWS: Fixed bug in setting of wsaMaxHDUs; 1236 # changed transfer default to 'scp'. 1237 # 29-Jan-2008, ETWS: Included possibility to force mosaic jpeg creation 1238 # and to re-process existing jpegs. 1239 # 1-Feb-2008, ETWS: Fixed minor bug for non-existing dates in CU2. 1240 # 5-Feb-2008, ETWS: Included usage of FlatFileLookUp for better parallel 1241 # performance. 1242 # 6-Feb-2008, ETWS: Fixed possibility that files can't be verified by pyfits. 1243 # 7-Feb-2008, ETWS: Fixed CU1 for immediate multiframeID update. 1244 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 1245 # file name setting. 1246 # 3-Mar-2008, ETWS: Updated to use only the availableRaidFileSystem. 1247 # 7-Apr-2008, ETWS: Upgraded to use new detection table layout. 1248 # 24-Apr-2008, ETWS: Included comment character in readlines. 1249 # 5-May-2008, ETWS: Finalised processing of detection table subset in CU4. 1250 # 28-May-2008, ETWS: Included switch to process specified programmes in CU4. 1251 # 30-May-2008, ETWS: Fixed comment for new CU4 layout. 1252 # 11-Aug-2008, ETWS: Updated for list driven deprecation in CU1 and 2. 1253 # 12-Jan-2008, ETWS: Fixed CU2/CU4 for case when data of the same date is on 1254 # several disks. 1255