Package invocations :: Package cu1 :: Module cu1Transfer
[hide private]

Source Code for Module invocations.cu1.cu1Transfer

   1  #------------------------------------------------------------------------------ 
   2  #$Id: cu1Transfer.py 10196 2014-01-21 15:15:41Z EckhardSutorius $ 
   3  """ 
   4     Python classes used in CU1. 
   5   
   6     @author: E. Sutorius 
   7     @org:    WFAU, IfA, University of Edinburgh 
   8  """ 
   9  #------------------------------------------------------------------------------ 
  10  from   collections import defaultdict, namedtuple 
  11  import dircache 
  12  import hashlib 
  13  import inspect 
  14  import math 
  15  import Queue 
  16  import mx.DateTime as mxTime 
  17  import os 
  18  import operator 
  19  import re 
  20  import shutil 
  21  import StringIO 
  22  from   subprocess  import call, Popen, PIPE, STDOUT 
  23  from   threading   import Thread, Event 
  24  import time 
  25   
  26  import wsatools.ExternalProcess            as extp 
  27  from   wsatools.File                   import File 
  28  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  29  from   wsatools.DbConnect.IngIngester  import IngestLogger as Logger 
  30  from   wsatools.Utilities              import Ratings 
  31  from   wsatools.SystemConstants        import SystemConstants 
  32  import wsatools.Utilities                  as utils 
33 #------------------------------------------------------------------------------ 34 35 -def _attributesFromArguments(self, argumentNames, argDict, prefix=''):
36 """Initialize automatically instance variables from __init__ arguments. 37 38 @param argumentNames: List of arguments of the class to be initialized. 39 @type argumentNames: list[str] 40 @param argDict: Dictionary of arguments and their values. 41 @type argDict: dict{arg:value} 42 @param prefix: Prefix to be given to the argument names, eg. '_' 43 @type prefix: str 44 """ 45 self = argDict.pop('self') 46 for arg in argumentNames[1:]: 47 setattr(self, prefix + arg, argDict[arg])
48
49 #------------------------------------------------------------------------------ 50 51 -def getScpScriptName(sysc, dateStr, thread, scpSuffix=''):
52 return os.path.join(sysc.sysPath, "scpcam%s%s%s_%02d" % ( 53 sysc.loadDatabase, scpSuffix, dateStr, thread))
54
55 #------------------------------------------------------------------------------ 56 57 -class GESTransfer(object):
58 """ 59 Transfer GES data. 60 61 """ 62 sysc = SystemConstants() 63 gesPointings = ["m15", "mw", "ngc1851", "ngc2808", "ngc4372", "ngc4833", 64 "ngc6752", "solar", "trumpler20"] 65
66 - def __init__(self, instrument='', mode='', casuDir='', 67 xferList='', versStr='', forceTransfer=False, 68 deprecationMode="mv", 69 database=None, UKLight=True, outPrefix=None, isTestRun=False):
70 """ 71 @param instrument: GES instrument, ie. giraffe or uves. 72 @type instrument: str 73 @param mode: GES mode, ie. nightly or stacked. 74 @type mode: str 75 @param casuDir: Directory at CASU, where FITS files are hosted. 76 @type casuDir: str 77 @param database: The database. 78 @type database: str 79 @param xferList: File containig list of files to transfer. 80 @type xferList: str 81 @param forceTransfer: Force the data transfer. 82 @type forceTransfer: bool 83 @param deprecationMode: Modus for deprecating files; 84 with forceTransfer: file suffix for overwriting. 85 @type deprecationMode: str 86 @param UKLight: Use UKLight instead of JANET. 87 @type UKLight: bool 88 @param outPrefix: Prefix of output file(s). 89 @type outPrefix: str 90 @param versStr: Version number of data. 91 @type versStr: str 92 @param isTestRun: If True, do not perform database modifications. 93 @type isTestRun: bool 94 """ 95 _attributesFromArguments( 96 self, inspect.getargspec(GESTransfer.__init__)[0], locals()) 97 98 # create transfer log file 99 CASUTransferLog.createTransferLogfile(self.sysc, self.outPrefix)
100 101 #-------------------------------------------------------------------------- 102
103 - def transfer(self, runDate):
104 """ 105 Transfer data from CASU. 106 107 """ 108 CASUQueries.sysc = self.sysc 109 Logger.addMessage("Creating script for %s downloads of %s data." % ( 110 self.mode, self.instrument)) 111 112 # get dir listing from CASU 113 casuSubDir = ("uves/proc_%s" if self.instrument == "uves" else 114 "proccasu_%s") 115 if self.mode == "archived": 116 casuSubDir = os.path.join("archdata/%s", casuSubDir) 117 if self.mode == "stacked": 118 casuSubDir = os.path.join(casuSubDir, self.mode) 119 120 if self.mode == "archived": 121 casuDirs = [] 122 for point in self.gesPointings: 123 casuDirs.extend(CASUQueries.getCasuStagingDirs( 124 UKLight=self.UKLight, 125 casuSubDirPath=casuSubDir % (point, self.versStr))) 126 else: 127 casuDirs = CASUQueries.getCasuStagingDirs( 128 UKLight=self.UKLight, casuSubDirPath=casuSubDir % self.versStr) 129 130 for cd in casuDirs: 131 Logger.addMessage("Reading CASU dir: %s ..." % cd) 132 casuDirDict, inDirs = self.getCASUDirs(cd, runDate) 133 134 Logger.addMessage("Checking WFAU outdirs ...") 135 outDirs = self.checkOutDirs(inDirs) 136 137 Logger.addMessage("Writing scp scripts ...") 138 xferDict = self.writeScpScripts(casuDirDict, outDirs) 139 140 if not self.isTestRun: 141 Logger.addMessage("Starting transfer.") 142 status = self.runScpScripts() 143 else: 144 status = -1 145 Logger.addMessage("Finished transfer.") 146 147 # write transfered files into log 148 CASUTransferLog.updateTransferLog(list(utils.unpackList( 149 xferDict.values()))) 150 CASUTransferLog.closeTransferLogfile() 151 152 return CASUTransferLog.logFile.name, status, outDirs.values()
153 154 #-------------------------------------------------------------------------- 155
156 - def checkOutDirs(self, inDirs):
157 """ 158 Ensure the output directories are available. 159 160 """ 161 wfauSubDir = os.path.join(self.instrument, 162 "%s_v%s" % (self.mode, self.versStr)) 163 164 availMainDirs = [os.path.join(x, self.sysc.fitsDir, wfauSubDir) 165 for x in sorted(utils.unpackList( 166 self.sysc.serverRaidArray.values()))] 167 168 for direc in availMainDirs: 169 utils.ensureDirExist(direc) 170 171 outDirs = defaultdict() 172 # get existing out dirs 173 for md in availMainDirs: 174 if self.mode == "archived": 175 for dirname, dirnames, filenames in os.walk(md): 176 for subdirname in dirnames: 177 outPath = os.path.join(dirname, subdirname) 178 outDirs[outPath.partition( 179 "%s_v%s/" % (self.mode, self.versStr))[2]] = outPath 180 else: 181 for sd in dircache.listdir(md): 182 outDirs[sd] = os.path.join(md, sd) 183 184 # check if new CASU dirs exist 185 for cdir in inDirs: 186 if self.mode == "archived": 187 #td, md, sd = cdir.split('/') 188 if cdir not in outDirs.values(): 189 outPath = os.path.join(availMainDirs[-1], cdir) 190 utils.ensureDirExist(outPath) 191 outDirs[cdir] = outPath 192 else: 193 if cdir not in outDirs.values(): 194 wdir = ("%s_v%s" % (cdir, self.versStr) 195 if self.mode == "nightly" else cdir) 196 outPath = os.path.join(availMainDirs[-1], wdir) 197 utils.ensureDirExist(outPath) 198 outDirs[cdir] = outPath 199 return outDirs
200 201 #-------------------------------------------------------------------------- 202
203 - def getCASUDirs(self, casuDir, runDate):
204 """ 205 Get the listing for the given CASU directory. 206 """ 207 inDirs = [] 208 casuDirDict = defaultdict(list) 209 for line in CASUQueries.getDirListing(casuDir, self.UKLight, "-p"): 210 if line.strip().startswith('d'): 211 subDir = line.strip().rpartition(' ')[2].replace('/', '') 212 if self.mode == "stacked" or \ 213 (self.mode == "nightly" \ 214 and subDir.isdigit() and subDir == runDate) or \ 215 self.mode == "archived": 216 casuPath = os.path.join(casuDir, subDir) 217 if self.mode == "archived": 218 if subDir.isdigit() or subDir == "stacked": 219 inDirs.append(casuPath.partition("archdata/")[2]) 220 else: 221 inDirs.append(subDir) 222 for entry in CASUQueries.getDirListing( 223 casuPath, self.UKLight, "-p"): 224 getFile = False 225 if ".fit" in entry: 226 if self.instrument == "giraffe" \ 227 and (self.mode == "nightly" or \ 228 self.mode == "archived")\ 229 and "_final" in entry: 230 getFile = True 231 if self.instrument == "uves" \ 232 and self.mode == "nightly": 233 getFile = True 234 if self.mode == "stacked": 235 getFile = True 236 if self.mode == "archived" and subDir == "stacked": 237 getFile = True 238 if getFile: 239 casuDirDict[casuPath].append( 240 entry.strip().rpartition(' ')[2]) 241 return casuDirDict, inDirs
242 243 #-------------------------------------------------------------------------- 244
245 - def runScpScripts(self):
246 """ 247 Run the scp scripts. 248 249 """ 250 scpFile = File(os.path.join(self.sysc.sysPath, "scpcam%s%s" % ( 251 self.sysc.loadDatabase, self.mode[:1]))) 252 253 try: 254 os.system(scpFile.name) 255 status = 1 256 except: 257 Logger.addMessage("Problem running %s" % copyScript.name) 258 status = -1 259 260 return status
261 262 #-------------------------------------------------------------------------- 263
264 - def writeScpScripts(self, casuDirDict, outDirs):
265 """ 266 Write shell scp script based on the file list comparison. 267 268 @param casuDirDict: Dict of transferable files. 269 @type casuDirDict: dict(list) 270 @param outDirs: Dictionary of date-version strings and their 271 download directories. 272 @type outDirs: dict(str: str) 273 274 @return: Dictionary of downloaded file names and their paths. 275 @rtype: dict(str: str) 276 277 """ 278 fileDict = {} 279 shell = "#! /usr/bin/tcsh" 280 transferCmd = '' 281 282 outFile = File(os.path.join(self.sysc.sysPath, "scpcam%s%s" % ( 283 self.sysc.loadDatabase, self.mode[:1]))) 284 outFile.wopen() 285 outFile.writetheline(shell) 286 287 transferCmd = ':'.join([CASUQueries.scpCmd(self.UKLight), 288 "InTemplate OutTemplate"]) 289 290 for inPath in sorted(casuDirDict): 291 for fileName in casuDirDict[inPath]: 292 xferFile = File(os.path.join(inPath, fileName)) 293 if self.mode == "archived": 294 theSubdir = inPath.partition("archdata/")[2] 295 else: 296 theSubdir = xferFile.subdir 297 outPath = os.path.join(outDirs[theSubdir], fileName) 298 if not os.path.exists(outPath): 299 command = transferCmd.replace( 300 "InTemplate", xferFile.name).replace( 301 "OutTemplate", outPath) 302 outFile.writetheline(command) 303 fileDict[os.path.join(theSubdir, fileName)] = outPath 304 305 outFile.close() 306 307 # make file executable 308 os.chmod(outFile.name, 0764) 309 return fileDict
310
311 #------------------------------------------------------------------------------ 312 313 -class TransferCASUList(object):
314 """ 315 Transfer FITS files of mixed dates from a single directory and distribute 316 the data accordingly. 317 Data at CASU: 318 GPS: /data/apm22_e/wfcam/reprocessed/nebulosity 319 320 """ 321 sysc = SystemConstants() 322
323 - def __init__(self, fitsList=None, casuDir='', 324 xferList='', forceTransfer=False, deprecationMode="mv", 325 database=None, UKLight=True, outPrefix=None, isTestRun=False):
326 """ 327 @param fitsList: Initialised FitsList object. 328 @type fitsList: FitsList 329 @param casuDir: Directory at CASU, where FITS files are hosted. 330 @type casuDir: str 331 @param database: The database. 332 @type database: str 333 @param xferList: File containig list of files to transfer. 334 @type xferList: str 335 @param forceTransfer: Force the data transfer. 336 @type forceTransfer: bool 337 @param deprecationMode: Modus for deprecating files; 338 with forceTransfer: file suffix for overwriting. 339 @type deprecationMode: str 340 @param UKLight: Use UKLight instead of JANET. 341 @type UKLight: bool 342 @param outPrefix: Prefix of output file(s). 343 @type outPrefix: str 344 @param isTestRun: If True, do not perform database modifications. 345 @type isTestRun: bool 346 """ 347 _attributesFromArguments( 348 self, inspect.getargspec(TransferCASUList.__init__)[0], locals()) 349 350 if self.xferList: 351 self.fromFile = True 352 self.casuListFileName = self.xferList 353 else: 354 self.fromFile = False 355 self.casuListFileName = "%s.list" % os.path.basename(casuDir) 356 357 # create a dictionary containig all dates 358 self.fitsList.createFitsDateDict() 359 self.transferList = [] 360 self.deprFileList = [] 361 self.dateSet = set() 362 # create transfer log file 363 CASUTransferLog.createTransferLogfile(self.sysc, self.outPrefix)
364 365 #-------------------------------------------------------------------------- 366
367 - def transfer(self):
368 CASUQueries.sysc = self.sysc 369 DeprecateFiles.sysc = self.sysc 370 if any(x in self.casuDir for x in ["nebulosity"]) \ 371 or any(x in self.casuListFileName 372 for x in ["nebulosity"]): 373 mode = "GPSneb" 374 elif any(x in self.casuDir for x in ["gps_astrom"]) \ 375 or any(x in self.casuListFileName 376 for x in ["gps_astrom"]): 377 mode = "GPSastron" 378 379 Logger.addMessage("Creating script for %s downloads." % mode) 380 381 # get dir listing from CASU 382 if not self.fromFile: 383 cmd = "ls -R" 384 copyDict = {self.casuDir: [self.casuDir]} 385 casuDataFilePath = os.path.join(self.sysc.rem_camlist_dir, 386 self.casuListFileName) 387 casuListFile = CASUQueries.getCasuDataFile( 388 cmd, casuDataFilePath, copyDict, False) 389 else: 390 casuListFile = File(os.path.join(self.sysc.sysPath, 391 self.casuListFileName)) 392 casuListFile.ropen() 393 casuFileList = casuListFile.readlines() 394 casuListFile.close() 395 396 Logger.addMessage("Creating transfer script...") 397 parsedDict = self.parseList(casuFileList) 398 copyScript = File("./copy%s_%s.sh" % ( 399 mode, utils.makeTimeStamp().replace(' ', '_'))) 400 copyScript.wopen() 401 for topDir in parsedDict: 402 scpCmd = ':'.join([CASUQueries.scpCmd(self.UKLight), topDir]) 403 for dateStr in sorted(parsedDict[topDir]): 404 deprecateList = [] 405 for fileName in sorted(parsedDict[topDir][dateStr]): 406 if mode == "GPSneb": 407 copyCmd = os.path.join(scpCmd, fileName) 408 409 if "_two" in fileName: 410 newFileName = os.path.join( 411 self.getDestDir(dateStr), fileName) 412 elif "_conf" in fileName \ 413 and "nebulosity" in self.casuDir: 414 newFileName = os.path.join( 415 self.getDestDir(dateStr), 416 "%s_two%s%s" % fileName.partition('_conf')) 417 else: 418 newFileName = '' 419 if newFileName: 420 if not os.path.exists(newFileName) \ 421 or (self.forceTransfer 422 and self.deprecationMode in newFileName): 423 copyScript.writetheline( 424 '%s %s' % (copyCmd, newFileName)) 425 self.transferList.append(newFileName) 426 else: 427 Logger.addMessage("File already exists: %s" % \ 428 newFileName) 429 elif mode == "GPSastron": 430 copyCmd = os.path.join(scpCmd, fileName) 431 newFileName = os.path.join( 432 self.getDestDir(dateStr), fileName) 433 copyScript.writetheline( 434 '%s %s' % (copyCmd, newFileName)) 435 self.transferList.append(newFileName) 436 if os.path.exists(newFileName): 437 deprecateList.append(newFileName) 438 if deprecateList and self.deprecationMode.lower() != "no": 439 self.deprFileList.extend(DeprecateFiles._mvDeprecatedFiles( 440 self.database, self.outPrefix, deprecateList, 441 self.deprecationMode, self.isTestRun)) 442 Logger.addMessage( 443 "Nr. of deprecated files: %s" % len(deprecateList)) 444 445 copyScript.close() 446 Logger.addMessage("Finished writing %s." % copyScript.name) 447 448 if not self.isTestRun: 449 Logger.addMessage("Starting transfer.") 450 try: 451 copyScript.chmod(0744) 452 #etws#os.system(copyScript.name) 453 status = 1 454 except: 455 Logger.addMessage("Problem running %s" % copyScript.name) 456 status = -1 457 Logger.addMessage("Finished transfer.") 458 else: 459 status = -1 460 # write transfered files into log 461 CASUTransferLog.updateTransferLog(self.transferList) 462 CASUTransferLog.closeTransferLogfile() 463 464 return CASUTransferLog.logFile.name, status, \ 465 sorted(list(self.dateSet)), self.deprFileList
466 467 #-------------------------------------------------------------------------- 468
469 - def getDestDir(self, dateStr):
470 """ Find the destination directory. 471 """ 472 versNos = self.sysc.obsCal.versNums(self.sysc.obsCal.checkDate(dateStr)) 473 for versNum in versNos.reverse(): 474 try: 475 dateVersStr = ('%s_v%s') % (dateStr, versNum) 476 destDir = os.path.join( 477 self.fitsList.invFitsDateDict[dateVersStr], dateVersStr) 478 self.dateSet.add(destDir) 479 except KeyError: 480 pass 481 else: 482 return destDir 483 484 Logger.addMessage( 485 "<WARNING> Directory not found (versions %r): %s" % ( 486 versNos, destDir))
487 488 #-------------------------------------------------------------------------- 489
490 - def parseList(self, fileList):
491 dataDict = {} 492 for entry in fileList: 493 fileName = entry.split()[-1].replace('@', '') 494 if self.casuDir in entry: 495 topDir = fileName.replace(':', '') 496 dataDict[topDir] = defaultdict(list) 497 elif fileName.endswith((".fit",".fits")): 498 if fileName.startswith('w'): 499 dateStr = fileName.partition('_')[0].replace('w', '') 500 dataDict[topDir][dateStr].append(fileName) 501 return dataDict
502
503 #------------------------------------------------------------------------------ 504 505 -class DeprecateFiles(object):
506 """Deprecate files for CASU transfers. 507 """ 508 sysc = IngCuSession.sysc 509 510 #-------------------------------------------------------------------------- 511 512 @staticmethod
513 - def _mvDeprecatedFiles(database, outPrefix, fileList, deprMode="mv", 514 isTestRun=False):
515 """ 516 Move files in fileList to the directory for deprecated files. 517 518 @param fileList: list of filenames(file, datedir, path(incl. datedir)) 519 @type fileList: list(str) 520 @param deprMode: Mode for handling the deprecated files: move ('mv'), 521 remove ('rm'), leave in place ('ip') 522 @type deprMode: str 523 524 """ 525 deprFileList = [] 526 Logger.addMessage("Deprecating files:") 527 deprLogTime = utils.makeTimeStamp() 528 if deprMode.partition('_')[0] in ["mv", "ip"]: 529 deprecatedListFile = File( 530 os.path.join(DeprecateFiles.sysc.xferLogPath(), ''.join( 531 [outPrefix, "_deprecated_", deprMode, '_', 532 deprLogTime.replace(' ', '_'), ".log"]))) 533 deprecatedListFile.wopen() 534 deprTime = utils.makeTimeStamp().split()[0].replace('-','') 535 for fileName in fileList: 536 deprFile = File(fileName) 537 if deprMode.partition('_')[0] == "mv": 538 if deprMode == "mv": 539 outDateDir = deprFile.subdir 540 else: 541 outDateDir = ''.join( 542 deprFile.subdir.partition("_v")[:2]) \ 543 + deprMode.partition('_')[2] 544 545 spacePerDisk = \ 546 utils.getDiskSpace(DeprecateFiles.sysc.deprecatedDataStorage) 547 548 deprPath = os.path.join( 549 utils.getNextDisk(DeprecateFiles.sysc, spacePerDisk), 550 '%s_%s' % (DeprecateFiles.sysc.deprecatedDir, deprTime), 551 outDateDir) 552 553 utils.ensureDirExist(deprPath) 554 deprName = os.path.join(deprPath, deprFile.base) 555 elif deprMode == "ip": 556 deprName = "%s.deprecated_%s" % (deprFile.name, deprTime) 557 elif deprMode == "rm": 558 deprName = "---" 559 try: 560 if deprMode.partition('_')[0] in ["mv", "ip"]: 561 if not isTestRun: 562 shutil.copy2(deprFile.name, deprName) 563 deprFileList.append(deprName) 564 Logger.addMessage( 565 ' '.join([deprMode, deprFile.name, deprName])) 566 deprecatedListFile.writetheline( 567 ','.join([deprFile.name, deprName])) 568 elif deprMode == "rm": 569 if not isTestRun: 570 os.remove(fileName) 571 Logger.addMessage("rm " + deprFile.name) 572 except: 573 Logger.addMessage(' '.join(["Can't", deprMode, deprFile.name, 574 "//", deprName])) 575 raise 576 577 if deprMode.partition('_')[0] in ["mv", "ip"]: 578 deprecatedListFile.close() 579 580 cmd = "%s/UpdateFitsFileNames.py -d %s -D -f %s %s" % \ 581 (DeprecateFiles.sysc.helpersPath(), database, 582 deprecatedListFile.name, DeprecateFiles.sysc.loadDatabase) 583 584 Logger.addMessage("Running '%s'" % cmd) 585 if not isTestRun: 586 status = os.system(cmd) 587 status = 0 588 if status: 589 Logger.addMessage("<ERROR> UpdateFitsFileNames.py failed!") 590 for fileName in fileList: 591 os.remove(fileName) 592 593 Logger.addMessage( 594 "Deprecated file names are listed in " + deprecatedListFile.name) 595 Logger.addMessage("<INFO> Run 'helpers/SyncFlatFileLookUp.py'") 596 return deprFileList
597
598 #------------------------------------------------------------------------------ 599 600 -class CASUTransferLog(object):
601 """Transfer log for CASU transfers. 602 """ 603 604 @staticmethod
605 - def closeTransferLogfile():
606 """ 607 Close transfer logfile, write in the END time stamp. 608 """ 609 logTime = utils.makeTimeStamp() 610 CASUTransferLog.logFile.writetheline( 611 "# END transfer at " + logTime + " UTC") 612 CASUTransferLog.logFile.close()
613 614 #-------------------------------------------------------------------------- 615 616 @staticmethod
617 - def createTransferLogfile(sysc, outPrefix=''):
618 """ 619 Create the transfer log file, write in the START time stamp. 620 """ 621 logTime = utils.makeTimeStamp() 622 logFileName = os.path.join( 623 sysc.xferLogPath(), ''.join( 624 ["transfer_", outPrefix, '_', 625 logTime.replace(' ', '_'), ".log"])) 626 CASUTransferLog.logFile = File(logFileName) 627 CASUTransferLog.logFile.aopen() 628 time.sleep(3) 629 CASUTransferLog.logFile.writetheline( 630 "# START transfer at " + logTime + " UTC")
631 632 #-------------------------------------------------------------------------- 633 634 @staticmethod
635 - def updateTransferLog(fileList):
636 """ 637 Write new full file path names into transfer log. 638 639 @param fileList: List of transferred file names. 640 @type fileList: list(str) 641 642 """ 643 for entry in fileList: 644 CASUTransferLog.logFile.writetheline(entry)
645
646 #------------------------------------------------------------------------------ 647 648 -class CASUTransfer(object):
649 """ 650 Contains methods and functions related to CU1 transfers. 651 """ 652 maxRetries = 10 653 remLsOpt = ".fit" 654 remExclPattern = ["dqc", "xsky", "chan", "bpm", "tmp", "orig", "jrf", 655 "mos", "squish", "list", '~'] 656 transferMethod = "scp" 657 sysc = SystemConstants() 658 verbose = 0 # >1 cmd info; >3 dir info; >4 file info 659
660 - def __init__(self, scpMainDir=None, scpThreads=1, 661 checkTimeFlag=False, checkMd5Flag=False, 662 UKLight=True, forceTransfer=False, 663 deprecationMode="mv", database=None, outPrefix=None, 664 reproMethod='', reproVersStr=None, ffluOnly=False, 665 onlyHeaderTransfer=False, 666 isTestRun=False):
667 """ 668 @param scpMainDir: Disk at CASU where the data is stored. 669 @type scpMainDir: str 670 @param scpThreads: Number of scp transfer threads. 671 @type scpThreads: int 672 @param checkTimeFlag: Determines if the timestamp of existing files 673 should be checked against the file at CASU. 674 @type checkTimeFlag: bool 675 @param checkMd5Flag: Check the md5 checksums at CASU and WFAU. 676 @type checkMd5Flag: bool 677 @param UKLight: Use UKLight instead of JANET. 678 @type UKLight: bool 679 @param ffluOnly: Don't transfer but update FlatfileLookup table. 680 @type ffluOnly: bool 681 @param forceTransfer: Force the data transfer. 682 @type forceTransfer: bool 683 @param deprecationMode: Modus for deprecating files. 684 @type deprecationMode: str 685 @param database: The database. 686 @type database: str 687 @param onlyHeaderTransfer: Transfer only FITS headers. 688 @type onlyHeaderTransfer: bool 689 @param outPrefix: Prefix of output file(s). 690 @type outPrefix: str 691 @param reproMethod: Method for reprocessed data: s::suffix (CASU 692 datedir suffix) or v::version. 693 @type reproMethod: str 694 @param reproVersStr: Version number of reprocessed data. 695 @type reproVersStr: str 696 @param isTestRun: If True, do not perform database modifications. 697 @type isTestRun: bool 698 699 """ 700 _attributesFromArguments( 701 self, inspect.getargspec(CASUTransfer.__init__)[0], locals()) 702 703 self.scpSuffix = ("head" if self.onlyHeaderTransfer else '') 704 # restrict number of threads for header transfers 705 if self.onlyHeaderTransfer: 706 self.scpThreads = 1 707 708 # set versions for reprocessed data 709 if self.reproVersStr: 710 if "=>" in self.reproVersStr: 711 self.oldVersStr, self.newVersStr = \ 712 self.reproVersStr.partition("=>")[::2] 713 elif ',' in self.reproVersStr: 714 self.oldVersStr, self.newVersStr = \ 715 self.reproVersStr.partition(',')[::2] 716 else: 717 self.oldVersStr, self.newVersStr = [ 718 self.reproVersStr, self.reproVersStr] 719 if ',' in self.oldVersStr: 720 self.versStr, self.oldVersStr = self.oldVersStr.split(',') 721 else: 722 self.versStr = self.oldVersStr
723 724 #-------------------------------------------------------------------------- 725
726 - def initTransfer(self, runDate):
727 """ 728 Initialise the transfer. 729 """ 730 CASUQueries.sysc = self.sysc 731 self.runDateStr = str(runDate) 732 733 # check output directory 734 utils.ensureDirExist(self.sysc.sysPath) 735 utils.ensureDirExist(self.sysc.xferLogPath()) 736 utils.ensureDirExist(self.sysc.corruptFilesPath()) 737 for disk in sorted(set(self.sysc.availableRaidFileSystem())\ 738 .difference(self.sysc.developDisks)): 739 utils.ensureDirExist(os.path.join(disk, self.sysc.fitsDir)) 740 utils.ensureDirExist(os.path.join(disk, self.sysc.downloadDir)) 741 742 # get free memory on available disks here 743 self.spacePerDisk = \ 744 utils.getDiskSpace(self.sysc.availableRaidFileSystem()) 745 746 # create transfer log file 747 try: 748 CASUTransferLog.createTransferLogfile(self.sysc, self.outPrefix) 749 except: 750 raise 751 752 # setup CASU reprocessing 753 Reprocessing = namedtuple('Reprocessing', 'mode, value, replace') 754 self.reproMode = Reprocessing( 755 self.reproMethod.partition('::')[0], 756 self.reproMethod.partition('::')[2], 757 (self.reproMethod.partition('::')[2] 758 if self.reproMethod.partition('::')[0].startswith(('p','s')) 759 else ''))
760 761 #-------------------------------------------------------------------------- 762
763 - def transfer(self, runDate, xferList=[]):
764 """ 765 Main script to transfer data from CASU. 766 767 @param runDate: The date for the transfer. 768 @type runDate: int 769 @param xferList: Optional list containing files to be downloaded. 770 @type xferList: list 771 772 @return: xferlogfile name, download status, log text, 773 list of transferred dates. 774 @rtype: tuple(str, int, StringIO, list) 775 776 """ 777 Logger.addMessage("Initialising transfer: %s..." % runDate) 778 self.initTransfer(runDate) 779 780 CASUQueries.camListFile = File(os.path.join(self.sysc.sysPath, 781 "%s_camlist_%s" % (self.sysc.loadDatabase, self.runDateStr))) 782 783 if xferList: 784 checkSRFlag = False 785 self.scpMainDir = File(xferList[0]).topdir 786 else: 787 checkSRFlag = not self.forceTransfer 788 if not self.scpMainDir: 789 self.scpMainDir = CASUQueries.getCasuDateDirs( 790 UKLight=self.UKLight)[self.runDateStr] 791 # if there is more than one date dir try to find the correct one 792 if type(self.scpMainDir) == type([]): 793 if len(self.scpMainDir) > 1: 794 # check if any of the date dirs has the OTC tag 795 OTCList = [] 796 for cd in self.scpMainDir: 797 OTCList.extend(CASUQueries.getOTCDirs( 798 runDate, runDate, [], {}, 799 os.path.join(cd, self.runDateStr), 800 returnList=True)) 801 OTCList = [x.partition(self.runDateStr)[0] 802 for x in OTCList] 803 # intersect OTC dirs with found date dirs 804 cpList = list(set(OTCList).intersection( 805 self.scpMainDir)) 806 if len(cpList) > 1: 807 Logger.addMessage( 808 "There are multiple directories for %s: %r" % ( 809 self.runDateStr, self.scpMainDir)) 810 raise NoDataError 811 else: 812 self.scpMainDir = cpList[0] 813 else: 814 self.scpMainDir = self.scpMainDir[0] 815 try: 816 casuDict, hereDict, casuStats, hereStats, outDateDirs = \ 817 self.getListings(runDate, xferList, checkSR=checkSRFlag) 818 except NoDataError: 819 logText = self._writeLoggerMessages() 820 return CASUTransferLog.logFile.name, False, logText, [], [], [] 821 822 fileList, totalDLSize, deprFileList = self._checkLocalExist( 823 casuDict, hereDict, casuStats, hereStats) 824 825 # sort the filelist 826 fileList.sort(key=operator.itemgetter(5)) 827 828 # build a list of #-of-threads sublists 829 loadList = self._buildLoadList(fileList, totalDLSize) 830 831 # count remote locked dirs 832 lockedDirs = 0 833 for x in casuStats.itervalues(): 834 lockedDirs += x[0] 835 836 # transfer data 837 if totalDLSize > 0: 838 dlStatus = self.transferFiles(loadList, totalDLSize, casuStats, 839 hereStats, casuDict, outDateDirs) 840 else: 841 text = "# No files downloaded." 842 if lockedDirs > 0: 843 text = "\n".join([text, 844 "# %d dir(s) read locked: " % (lockedDirs)]) 845 for k, v in casuStats.iteritems(): 846 if v[0] > 0: 847 text = "\n".join([text, "# %s" % k]) 848 else: 849 text = "\n".join([text, "# Directories are identical."]) 850 Logger.addMessage(text) 851 dlStatus = True 852 853 # write success to successful downloaded dirs 854 tmpTransferList = self._writeLocSuccess(hereStats, outDateDirs) 855 856 transferList = self._cleanupTransferList(tmpTransferList) 857 858 # move files to final destination 859 newFileList = self._mvToFin(transferList) 860 861 if self.ffluOnly: 862 newFileList = [] 863 for thread in xrange(self.scpThreads % 10): 864 loadList[thread].sort() 865 for fileName in loadList[thread]: 866 newFileList.append(fileName) 867 868 CASUTransferLog.updateTransferLog(newFileList) 869 if not self.isTestRun: 870 if not self.forceTransfer and not self.ffluOnly: 871 self._writeRemSuccess(casuStats) 872 self._cleanUp() 873 else: 874 self._cleanUp() 875 876 # write log messages to logText 877 logText = self._writeLoggerMessages() 878 879 dateList = [] 880 for oDir in outDateDirs: 881 fDir = outDateDirs[oDir].replace(self.sysc.downloadDir, 882 self.sysc.fitsDir) 883 884 dateList.append(os.path.join(fDir, oDir)) 885 886 # clean-up 887 #CASUQueries.camListFile.remove() 888 889 if self.ffluOnly: 890 for dirPath in dateList: 891 try: 892 os.rmdir(dirPath) 893 except OSError: 894 Logger.addMessage("<Warning> Can't remove %s" % dirPath) 895 896 return CASUTransferLog.logFile.name, dlStatus, logText, dateList, \ 897 newFileList, deprFileList
898 899 #-------------------------------------------------------------------------- 900
901 - def getListings(self, runDate, xferList=[], checkSR=True):
902 """ 903 Get the remote and local listing (remote first to create same 904 dirs here). 905 906 @param runDate: The date for the transfer. 907 @type runDate: int 908 @param xferList: Optional list containing files to be downloaded. 909 @type xferList: list 910 @param checkSR: Check if the SUCCESSFULLY_READ tag is set. 911 @type checkSR: bool 912 913 @return: dictionaries containing data and stats for directories 914 at CASU and WFAU. 915 @rtype: dict 916 917 """ 918 # get CASU listings 919 try: 920 transferableDict = self._getTransferableDirs(runDate, checkSR) 921 remStats = [self.sysc.rem_lock_name, self.sysc.rem_success_name, 922 self.sysc.rem_ready2copy_name] 923 casuDict, casuSizes, casuStats = self._listRemoteDir( 924 transferableDict, remStats, CASUTransfer.remLsOpt, xferList) 925 # create new dirs 926 outDateDirs = self._checkNewDirs(transferableDict, casuSizes) 927 except extp.Error: 928 print '\n'.join(["\n The server you seek", 929 " cannot be located, but", 930 " endless others exist.\n"]) 931 self._cleanUp(outDateDirs) 932 raise 933 except NoDataError: 934 self._cleanUp() 935 raise 936 # get WFAU listings 937 try: 938 hereDict, hereStats = \ 939 self._listHomeDir(transferableDict, 940 ['', self.sysc.rem_success_name, ''], xferList) 941 except extp.Error: 942 print '\n'.join(["\n The list you seek", 943 " cannot be located, but", 944 " endless others exist.\n"]) 945 self._cleanUp(outDateDirs) 946 raise 947 948 return casuDict, hereDict, casuStats, hereStats, outDateDirs
949 950 #-------------------------------------------------------------------------- 951
952 - def transferFiles(self, fileList, totalDLSize, casuStats, hereStats, 953 casuDict, outDirs):
954 """ 955 Transfer the files in the given list. 956 @param fileList: The list of files to transfer. 957 @type fileList: list 958 @param totalDLSize: The estimated total size to transfer. 959 @type totalDLSize: int 960 @param casuStats: Statistics of CASU directories. 961 @type casuStats: int 962 @param hereStats: Statistics of WFAU directories. 963 @type hereStats: int 964 @param casuDict: Dictionary of CASU data. 965 @type casuDict: dict 966 @param outDirs: Dictionary of date-version strings and their 967 download directories. 968 @type outDirs: dict(str: str) 969 970 @return: transfer status 971 @rtype: int 972 973 """ 974 retries = 0 975 dlStatus = False 976 while not dlStatus and retries <= CASUTransfer.maxRetries: 977 for k in casuStats: 978 casuStats[k][2] = 0 979 for k in hereStats: 980 hereStats[k][2] = 0 981 982 # download files 983 try: 984 transferDict = self._downloadFiles(fileList, outDirs, 985 totalDLSize) 986 except extp.Error: 987 print '\n'.join(["\n There is a chasm", 988 " of carbon and silicon", 989 " the software can't bridge.\n"]) 990 self._cleanUp() 991 raise 992 993 # look for missing files 994 missingFiles = self._missingFileCheck(transferDict.values()) 995 996 # md5 checksum tests 997 if self.checkMd5Flag: 998 try: 999 md5Status, md5FilesMissing = \ 1000 MD5Checker().md5Run(casuStats, transferDict, outDirs) 1001 except extp.Error: 1002 print '\n'.join(["\n Chaos reigns within.", 1003 " Reflect, repent, and reload.", 1004 " Order shall return.\n"]) 1005 self._cleanUp() 1006 raise 1007 1008 # md5 sums missing at CASU 1009 if md5Status[0] > 0: 1010 Logger.addMessage(' '.join([ 1011 "# md5sum must be checked manually", 1012 "in these directories."])) 1013 if sum(md5Status) == 0: 1014 Logger.addMessage("# md5sum successfully checked.") 1015 missingFiles.extend(md5FilesMissing) 1016 else: 1017 md5Status = [0,0] 1018 md5FilesMissing = [] 1019 1020 # Now check if everything went well; if so, write success message 1021 # message to CASU input dirs 1022 retryList = [] 1023 if not self.isTestRun and not self.ffluOnly \ 1024 and (md5Status[1] > 0 or missingFiles <> []): 1025 Logger.addMessage("# The following files have to be reloaded:") 1026 for fileName in missingFiles: 1027 misFile = File(fileName) 1028 Logger.addMessage(misFile.name) 1029 shortName = os.path.join(misFile.subdir, misFile.base) 1030 del transferDict[shortName] 1031 retryList.append([misFile.base, misFile.subdir, 1032 casuDict[shortName][2], '-', 1033 shortName]) 1034 # dir with missing file set to non-success 1035 casuStats[misFile.subdir][2] = 1 1036 hereStats[misFile.subdir][2] = 1 1037 else: 1038 Logger.addMessage("# Transfer completed successfully!") 1039 dlStatus = True 1040 1041 if retryList: 1042 reloadSize = 0 1043 for entry in retryList: 1044 reloadSize += entry[2] 1045 self._buildLoadList(retryList, reloadSize) 1046 retries += 1 1047 1048 return dlStatus
1049 1050 #-------------------------------------------------------------------------- 1051 # log file related 1052 #-------------------------------------------------------------------------- 1053
1054 - def _writeLoggerMessages(self):
1055 """ 1056 Write log messages to logText. 1057 1058 @return: Log text string object. 1059 @rtype: StringIO 1060 1061 """ 1062 logText = StringIO.StringIO() 1063 Logger.dump(logText) 1064 Logger.reset() 1065 return logText
1066 1067 #-------------------------------------------------------------------------- 1068 # general routines 1069 #-------------------------------------------------------------------------- 1070
1071 - def _cleanUp(self, rmDirs=[]):
1072 """ 1073 Removes the lock file, closes the download log file. 1074 1075 @param rmDirs: List of directories that should be removed. 1076 @type rmDirs: list(str) 1077 1078 """ 1079 if rmDirs: 1080 for directory in rmDirs: 1081 dlDir = os.path.join(rmDirs[directory], directory) 1082 fiDir = os.path.join( 1083 os.path.dirname(os.path.dirname(rmDirs[directory])), 1084 self.sysc.fitsDir, directory) 1085 try: 1086 os.rmdir(dlDir) 1087 except OSError: 1088 print "Directory not empty:", dlDir 1089 try: 1090 os.rmdir(fiDir) 1091 except OSError: 1092 print "Directory not empty:", fiDir 1093 CASUTransferLog.closeTransferLogfile()
1094 1095 #-------------------------------------------------------------------------- 1096
1097 - def _dictFromList(self, list):
1098 """ 1099 Build dict from file list. 1100 1101 @param list: File name list (fileName, path, values). 1102 @type list: list 1103 1104 @return: Dictionary of full file paths and values. 1105 @rtype: dict 1106 1107 """ 1108 dictionary = {} 1109 for s in list: 1110 dictionary[os.path.join(s[1], s[0])] = s[:] 1111 return dictionary
1112 1113 #-------------------------------------------------------------------------- 1114
1115 - def _getTimeDiff(self, dateTimeTpl):
1116 """ 1117 @param dateTimeTpl: Date time tuple: ('yyyy-mm-dd', 'hh:mm:ss(.ss)') 1118 @type dateTimeTpl: (str, str) 1119 1120 @return: Time difference tuple: (days, seconds) 1121 @rtype: tuple(int, float) 1122 1123 """ 1124 dateTime0 = mxTime.mktime((2001, 1, 1, 0, 0 ,0 , 0, 0, 0)) 1125 date = [int(x) for x in dateTimeTpl[0].split('-')] 1126 dtime = [int(float(x)) for x in dateTimeTpl[1].split(':')] 1127 dateTime1 = mxTime.mktime(date + dtime + [0, 0, 0]) 1128 return (dateTime1 - dateTime0).absvalues()
1129 1130 #-------------------------------------------------------------------------- 1131
1132 - def _readDirList(self, aListFile, lsNames=['','',''], lsOpt=''):
1133 """ 1134 Read directory listing from file and transform it into internal list. 1135 1136 @param aListFile: File containing the file system listings. 1137 @type aListFile: File class object 1138 @param lsNames: Names of the locked and the successfully read file. 1139 @type lsNames: list(str, str, str) 1140 @param lsOpt: listing option (regexp for name.find). 1141 @type lsOpt: str 1142 1143 @return: Dict of directories, and a dictionary of directory statistics, 1144 i.e. [locked,used,success]. 1145 @rtype: dict(str:list), dict(str: list(int, int, int)) 1146 1147 """ 1148 dirList = [] 1149 dirStats = {} 1150 lineType = (('d', "total") if self.sysc.isOSA() else ('d', 'l', "total")) 1151 if aListFile.exists(): 1152 aListFile.ropen() 1153 casuListing = [l for l in aListFile.readlines( 1154 commentChar='#', 1155 findValues=[".fit", ".fits", "ingest", 1156 self.scpMainDir] + lsNames) 1157 if not l.startswith(lineType)] 1158 aListFile.close() 1159 1160 for line in casuListing: 1161 if line == '': 1162 break 1163 if self.scpMainDir in line: 1164 # remote dirs 1165 listedDir = File(os.path.normpath( 1166 line.replace(':','').strip())) 1167 # tracing dirs, flags are [locked,used,success] 1168 # remote dir is locked until unlock file is found 1169 if CASUQueries.re_date.match(listedDir.base.replace( 1170 self.reproMode.replace, '')): 1171 dirStats[listedDir.base.replace( 1172 self.reproMode.replace, '')] = [1,0,0] 1173 elif "/" + self.sysc.loadDatabase.lower() + "/ingest/" in line: 1174 # local dirs 1175 listedDir = File(line.replace(':','')) 1176 dirStats[listedDir.base] = [0,0,0] 1177 else: 1178 access, _nol, _uid, size, yyyymmdd, \ 1179 ftime, _timezone, name = line.split(None, 7) 1180 if self.sysc.isOSA() and "->" in name and access[0] == 'l': 1181 name = name.split(None, 1)[0] 1182 yyyy, mon, dd = yyyymmdd.split('-') 1183 name = name.replace('*', '') 1184 # dir should have the format "yyyymmdd" in Camb 1185 # and "yyymmdd_v#" here 1186 if lsOpt in name \ 1187 and not any(ex in name 1188 for ex in CASUTransfer.remExclPattern) \ 1189 and (CASUQueries.re_date.match( 1190 listedDir.base.replace(self.reproMode.replace, '')) \ 1191 or CASUQueries.re_datevers.match(listedDir.base)): 1192 hh, mm, ss = ftime.partition('.')[0].split(':') 1193 fileTime = mxTime.DateTime(int(yyyy), int(mon), 1194 int(dd), int(hh), 1195 int(mm), int(ss)).jdn 1196 dirList.append([name, listedDir.base, int(size), 1197 access[:1], fileTime, listedDir.name]) 1198 if name == lsNames[0] or name == lsNames[1]: 1199 # dir is locked or successfully read 1200 dirStats[listedDir.base.replace( 1201 self.reproMode.replace, '')][0] = 1 1202 if name == lsNames[2]: 1203 dirStats[listedDir.base.replace( 1204 self.reproMode.replace, '')][0] = 0 1205 else: 1206 Logger.addMessage("%s doesn't exist!" % aListFile.name) 1207 return dirList, dirStats
1208 1209 #-------------------------------------------------------------------------- 1210 # pre-transfer CASU 1211 #-------------------------------------------------------------------------- 1212
1213 - def _cleanupTransferList(self, tflin):
1214 """ 1215 Scans through the transfer list to see which files match the 1216 delete conditions and deletes them. 1217 1218 @param tflin: Transfer file list. 1219 @type tflin: list 1220 1221 @return: Updated list of transferable files. 1222 @rtype: list 1223 1224 """ 1225 filesToDelete = ["t_w", "chan", "lin_", "squish", "bpm", "dqcdark", 1226 "_psf", "xsky", "dqcbias"] 1227 filesToIngest = [self.sysc.catSuffix, self.sysc.stackSuffix, 1228 self.sysc.skyPrefix, self.sysc.darkPrefix, 1229 self.sysc.flatSuffix, self.sysc.confSuffix, 1230 self.sysc.cpmSuffix, self.sysc.mosaicSuffix, 1231 self.sysc.tileSuffix] 1232 if self.sysc.isOSA(): 1233 filesToIngest.extend([self.sysc.biasPrefix, self.sysc.fringeSuffix]) 1234 1235 r1 = re.compile('|'.join(map(re.escape, filesToIngest))) 1236 r2 = re.compile('|'.join(map(re.escape, filesToDelete))) 1237 r3 = re.compile(r'[ovw]20') 1238 tflout = [] 1239 for entry in tflin: 1240 fname = os.path.basename(entry) 1241 if r1.search(fname): 1242 tflout.append(entry) 1243 elif r2.search(fname): 1244 os.remove(entry) 1245 elif r3.match(fname,0,3): 1246 tflout.append(entry) 1247 else: 1248 Logger.addMessage( 1249 "<Warning> No match found in ingest list: " + entry) 1250 return tflout
1251 1252 #-------------------------------------------------------------------------- 1253
1254 - def _createCasuListFile(self, copyDict):
1255 """ 1256 Create a listing of given directories into a file at CASU 1257 and copy the file into the sys dir. 1258 1259 @param copyDict: Dictionary of transferable dates and CASU paths. 1260 @type copyDict: dict 1261 1262 @return: File listing the directories at CASU. 1263 @rtype: fileObj 1264 1265 """ 1266 lsFlags = ("-E" if "29" in self.sysc.scpServer(self.UKLight) 1267 else "--time-style=full-iso --indicator-style=none") 1268 cmds = ["ls -d %s", "ls -o %s %%s" % lsFlags] 1269 CASUQueries.getCasuFileListing(cmds, CASUQueries.camListFile, 1270 copyDict, self.UKLight)
1271 1272 #-------------------------------------------------------------------------- 1273
1274 - def _getTransferableDirs(self, runDate, checkSR=True):
1275 """ 1276 Create a dictionary of directories which are ready to copy and 1277 not yet successfully downloaded. It also includes the version number 1278 as a combination of <date>_v<#>. 1279 1280 @param runDate: Transfer date. 1281 @type runDate: int 1282 @param checkSR: Check if the SUCCESSFULLY_READ tag is set. 1283 @type checkSR: bool 1284 1285 @return: Dictionary of CASU path and date-version number strings 1286 for each date. 1287 @rtype: dict(str: list(str, str)) 1288 1289 """ 1290 Logger.addMessage("Getting transferable directories...") 1291 1292 srDirs = CASUQueries.getSRDirs(self.scpMainDir, self.UKLight, 1293 dateStr=str(runDate)) 1294 1295 reproVers = (self.newVersStr if self.reproVersStr else None) 1296 dirVersions = CASUQueries.getDirVersions( 1297 self.scpMainDir, reproVers, self.forceTransfer, 1298 self.UKLight, dateStr=str(runDate)) 1299 1300 otcDirs = CASUQueries.getOTCDirs( 1301 runDate, runDate, srDirs, dirVersions, self.scpMainDir, 1302 self.reproMode, reproVers, self.forceTransfer, 1303 checkSR, self.UKLight) 1304 1305 if not otcDirs and checkSR: 1306 text = "<Warning> There are no directories to copy." 1307 Logger.addMessage(text) 1308 raise NoDataError, text 1309 return otcDirs
1310 1311 #-------------------------------------------------------------------------- 1312
1313 - def _listRemoteDir(self, copyDict, lsNames, lsOpt, xferList=[]):
1314 """ 1315 List remote directory 'scpMainDir'. 1316 1317 @param copyDict: Dictionary of transferable directories. 1318 @type copyDict: dict 1319 @param lsNames: Names of the locked and the successfully read file. 1320 @type lsNames: list(str, str, str) 1321 @param lsOpt: listing option (regexp for name.find). 1322 @type lsOpt: str 1323 @param xferList: Optional list containing files to be downloaded. 1324 @type xferList: list 1325 1326 @return: List of directories at WFAU, size of directories at WFAU, 1327 and a dictionary containing directory statistics. 1328 @rtype: list, int, dict 1329 1330 """ 1331 Logger.addMessage("Listing CASU dir...") 1332 self._createCasuListFile(copyDict) 1333 dirList, dirStats = self._readDirList(CASUQueries.camListFile, 1334 lsNames, lsOpt) 1335 # dirList is [filename,dir,size,access[:1],filetime,dirpath] 1336 tmpDirList = [] 1337 totalSize = {} 1338 for entry in dirList: 1339 entry[1] = copyDict[entry[1].replace(self.reproMode.replace, '')][1] 1340 if xferList: 1341 if os.path.join(entry[-1],entry[0]) in xferList: 1342 tmpDirList.append(entry) 1343 if xferList: 1344 dirList = tmpDirList[:] 1345 1346 for entry in dirList: 1347 totalSize[entry[1]] = totalSize.get(entry[1], 0) \ 1348 + entry[2]/self.sysc.one_kilobyte 1349 dirDict = self._dictFromList(dirList) 1350 1351 # copy date based dict to date_version based dict 1352 versionStats = {} 1353 for key in dirStats: 1354 versionStats[copyDict[key][1]] = dirStats[key] 1355 1356 return dirDict, totalSize, versionStats
1357 1358 #-------------------------------------------------------------------------- 1359 # pre-transfer WFAU 1360 #-------------------------------------------------------------------------- 1361
1362 - def _buildLoadList(self, fileList, totalSize):
1363 """ 1364 Build a list of #-of-threads sublists. 1365 1366 The filelist is split into #-of-threads roughly equal bytesized 1367 sublists. 1368 1369 @param fileList: List of filenames. 1370 @type fileList: list(str) 1371 @param totalSize: Total size of files in filelist. 1372 @type totalSize: int 1373 1374 @return: List of sublists of files to download in different threads. 1375 @rtype: list(list(str)) 1376 """ 1377 loadList = [] 1378 i = 0 1379 size = totalSize 1380 tts = 0 1381 subThreads = self.scpThreads % 10 1382 for x in xrange(subThreads): 1383 quart = int(size/(subThreads - x)) 1384 sizeList = 0 1385 loadList.append([]) 1386 while sizeList < quart and tts < totalSize: 1387 fileName = os.path.join(fileList[i][1], fileList[i][0]) 1388 loadList[-1].append(fileName) 1389 sizeList += fileList[i][2] 1390 tts += fileList[i][2] 1391 i += 1 1392 size -= quart 1393 return loadList
1394 1395 #-------------------------------------------------------------------------- 1396
1397 - def _checkLocalExist(self, casuDict, hereDict, casuStats, hereStats):
1398 """ 1399 Check if remote file is in local dict. If so, deprecate. 1400 1401 @param casuDict: Dictionary of CASU data. 1402 @type casuDict: dict 1403 @param hereDict: Dictionary of WFAU data. 1404 @type hereDict: dict 1405 @param casuStats: Statistics of CASU directories. 1406 @type casuStats: int 1407 @param hereStats: Statistics of WFAU directories. 1408 @type hereStats: int 1409 1410 @return: file list of transferrable files and their total size. 1411 @rtype: list, int 1412 """ 1413 fileList = [] 1414 deprecateList = [] 1415 totalSize = 0 1416 for casuPath in casuDict: 1417 if self.reproVersStr and self.newVersStr != self.oldVersStr: 1418 herePath = casuPath.replace( 1419 "_v%s" % self.newVersStr, "_v%s" % self.oldVersStr) 1420 if herePath in hereDict: 1421 hereStats[casuDict[casuPath][1]] = \ 1422 hereStats[hereDict[herePath][1]][:] 1423 else: 1424 herePath = casuPath.replace( 1425 "_v%s" % self.versStr, "_v%s" % self.oldVersStr) 1426 if herePath in hereDict: 1427 hereStats[casuDict[casuPath][1]] = \ 1428 hereStats[hereDict[herePath][1]][:] 1429 1430 else: 1431 herePath = casuPath 1432 1433 if self.isTestRun and self.verbose > 4: 1434 print casuPath,casuDict[casuPath] 1435 if herePath in hereDict: 1436 print ">>>>",hereDict[herePath] 1437 else: 1438 print ">>>> %s doesn't exist locally" % casuPath 1439 print "CS<<:",casuStats 1440 print "HS<<:",hereStats 1441 1442 if self.forceTransfer \ 1443 or casuStats.get(casuDict[casuPath][1], 1)[0] == 0: 1444 if herePath not in hereDict: 1445 if not self.reproVersStr or ( 1446 self.reproVersStr and not ',' in self.reproVersStr): 1447 fileList.append(casuDict[casuPath]) 1448 casuStats[casuDict[casuPath][1]][1] = 1 1449 hereStats[casuDict[casuPath][1]][1] = 1 1450 totalSize += casuDict[casuPath][2] 1451 elif (self.forceTransfer or self.checkTimeFlag) \ 1452 and hereDict[herePath][4] < casuDict[casuPath][4]: 1453 fileList.append(casuDict[casuPath]) 1454 deprecateList.append( 1455 os.path.join(hereDict[herePath][5], 1456 casuDict[casuPath][0])) 1457 casuStats[casuDict[casuPath][1]][1] = 1 1458 hereStats[hereDict[herePath][1]][1] = 1 1459 totalSize += casuDict[casuPath][2] 1460 1461 if self.isTestRun and self.verbose > 4: 1462 print "CS>>:",casuStats 1463 print "HS>>:",hereStats 1464 1465 # move deprecated files to deprecate dir 1466 if deprecateList and self.deprecationMode.lower() != "no" \ 1467 and not self.ffluOnly: 1468 deprFileList = self._mvDeprecatedFiles(deprecateList, 1469 self.deprecationMode) 1470 Logger.addMessage( 1471 "Nr. of deprecated files: %s" % len(deprecateList)) 1472 else: 1473 deprFileList = [] 1474 1475 return fileList, totalSize, deprFileList
1476 1477 #-------------------------------------------------------------------------- 1478
1479 - def _checkNewDirs(self, copyDict, casuSizes):
1480 """ 1481 Check if there is enough space to transfer the whole dir, 1482 otherwise change outdir to a free disk. 1483 1484 @param copyDict: Dictionary containing the CASU path and the 1485 date-version number string for every date. 1486 @type copyDict: dict(str:list(str, str)) 1487 @param casuSizes: Dictionary of directory names and their sizes. 1488 @type casuSizes: dict(str: int) 1489 1490 @return: Dictionary of date-version strings and their download 1491 directory. 1492 @rtype: dict(str: str) 1493 1494 """ 1495 transferableDates = sorted(copyDict[k][1] for k in copyDict) 1496 1497 # allFitsDirs: dict of existing dirs and their sizes 1498 allFitsDirs = {} 1499 for disk in self.sysc.availableRaidFileSystem(): 1500 fitsDirs = dircache.listdir(os.path.join(disk, self.sysc.fitsDir)) 1501 for dateDir in set(transferableDates).intersection(fitsDirs): 1502 fitsPath = os.path.join(disk, self.sysc.fitsDir) 1503 size = self._getDirsize(os.path.join(fitsPath, dateDir)) 1504 if dateDir in allFitsDirs: 1505 allFitsDirs[dateDir].append((fitsPath, size)) 1506 Logger.addMessage(' '.join(["<Warning>", 1507 dateDir, "exists in more than one place:", 1508 repr(allFitsDirs[dateDir])])) 1509 else: 1510 allFitsDirs[dateDir] = [(fitsPath, size)] 1511 1512 # create outdirs dict 1513 outDirs = {} 1514 # Percentage of free space to be left on each disk 1515 freePerc = self.sysc.leaveSpaceOnMSRFS() 1516 1517 # get free disks in size sorted order and the latest access 1518 # to a date dir 1519 freeDiskSpace = Ratings(dict((d, -self.spacePerDisk[d][1]) 1520 for d in self.spacePerDisk 1521 if self.spacePerDisk[d][1] > freePerc[d])) 1522 1523 freeDisks = Ratings(dict((key, freeDiskSpace.rating(key)) 1524 for key in freeDiskSpace)) 1525 latestAccess = self._getLatestAccess(freeDisks) 1526 1527 if self.isTestRun and self.verbose > 3: 1528 print "freePerc:", freePerc 1529 print "spacePerDisk:", self.spacePerDisk 1530 print "freeDisks:", freeDisks.items() 1531 print "latestAccess:", latestAccess.items() 1532 1533 for xferDir in transferableDates: 1534 if xferDir in allFitsDirs: 1535 # if dir already exists use it 1536 outDisk = ''.join(allFitsDirs[xferDir][0][0].partition( 1537 self.sysc.loadDatabase.lower())[:2]) 1538 outDirs[xferDir] = os.path.join(outDisk, self.sysc.downloadDir) 1539 self.spacePerDisk[outDisk][1] -= \ 1540 casuSizes[xferDir] - allFitsDirs[xferDir][0][1] 1541 if self.isTestRun and self.verbose > 3: 1542 print "xferDir: ",xferDir 1543 print "outDirs: ",outDirs 1544 else: 1545 # find disks with enough space to transfer whole dir 1546 isSpace = False 1547 for _outDisk in sorted(self.spacePerDisk): 1548 # list of ?sa disks ('/disk0?/?sa') 1549 if self.spacePerDisk[_outDisk][1] - freePerc[_outDisk] > \ 1550 casuSizes[xferDir]: 1551 outDirs.setdefault(xferDir, []).append(_outDisk) 1552 isSpace = True 1553 1554 if self.isTestRun and self.verbose > 3: 1555 print "xferDir: ",xferDir 1556 print "outDirs: ",outDirs 1557 1558 if not isSpace: 1559 raise MemoryError( 1560 "There is no space left on the RAID array.") 1561 1562 # use disk which wasn't used lately 1563 for _fitsDir in freeDisks: 1564 if _fitsDir in outDirs[xferDir]: 1565 finalDir = _fitsDir 1566 if _fitsDir != latestAccess.keys()[-1]: 1567 break 1568 1569 outDirs[xferDir] = \ 1570 os.path.join(finalDir, self.sysc.downloadDir) 1571 1572 self.spacePerDisk[finalDir][1] -= casuSizes[xferDir] 1573 freeDisks[finalDir] += len(freeDisks) 1574 # if there are more than 1 dir to be processed simulate 1575 # data transfer so data is spread over the available disks. 1576 time.sleep(1) 1577 latestAccess[finalDir] = \ 1578 self._getTimeDiff(utils.makeTimeStamp().split()) 1579 1580 for _outDir in outDirs: 1581 # check if outDir exist 1582 utils.ensureDirExist(os.path.join(outDirs[_outDir], _outDir)) 1583 # check if fitsDir exist 1584 utils.ensureDirExist(os.path.join( 1585 outDirs[_outDir].partition(self.sysc.downloadDir)[0], 1586 self.sysc.fitsDir, _outDir)) 1587 Logger.addMessage("Data will be downloaded into these directories: "+ 1588 repr(outDirs)) 1589 return outDirs
1590 1591 #-------------------------------------------------------------------------- 1592
1593 - def _getDirsize(self, dirPath):
1594 """ 1595 Get the size of a directory. 1596 1597 @param dirPath: Full path to a directory. 1598 @type dirPath: str 1599 1600 @return: Size in kilobytes. 1601 @rtype: int 1602 1603 """ 1604 try: 1605 proc = Popen(["du", "-k" , "--max-depth=0", dirPath], stdout=PIPE) 1606 size, _direc = proc.stdout.read().split() 1607 except ValueError: 1608 size = 0 1609 1610 return int(size)
1611 1612 #-------------------------------------------------------------------------- 1613
1614 - def _getLatestAccess(self, diskList, subDir=SystemConstants.fitsDir):
1615 """ 1616 Create a rating dict for the given disks containing date directories 1617 rated by last access time. 1618 1619 @param diskList: List of raid file system disks. 1620 @type diskList: list(str) 1621 @param subDir: Subdirectory to check. 1622 @type subDir: str 1623 1624 @return: Ratings dictionary of disks and their latest access time. 1625 @rtype: dict(str: str) 1626 """ 1627 latestAccess = Ratings() 1628 for disk in diskList: 1629 cmd = "ls -ldct --full-time " + os.path.join(disk, subDir, '*') 1630 if self.verbose > 1: 1631 print "_getLatestAccess:", cmd 1632 try: 1633 timeTpl = extp.run(cmd, isVerbose=False)[0].split()[5:7] 1634 latestAccess[disk] = self._getTimeDiff(timeTpl) 1635 except: 1636 # @@BUG: This will silently fail for *any* error, must be 1637 # fixed! 1638 latestAccess[disk] = (0, 0.0) 1639 return latestAccess
1640 1641 #-------------------------------------------------------------------------- 1642
1643 - def _listHomeDir(self, copyDict, lsNames, xferList=[]):
1644 """ 1645 List home directory 'fitsDir'. 1646 1647 @param copyDict: Dictionary of transferable directories. 1648 @type copyDict: dict 1649 @param lsNames: Names of the locked and the successfully read file. 1650 @type lsNames: list(str, str, str) 1651 1652 @return: Dict of directories at WFAU, their size, and a 1653 dictionary containing directory statistics. 1654 @rtype: list, int, dict 1655 1656 """ 1657 hdDict = {} 1658 for disk in self.sysc.availableRaidFileSystem(): 1659 fitsPath = os.path.join(disk, self.sysc.fitsDir) 1660 for dateDir in dircache.listdir(fitsPath): 1661 hdDict[dateDir] = os.path.join(fitsPath, dateDir) 1662 1663 listDirs = [] 1664 for date in copyDict: 1665 if self.reproVersStr and self.newVersStr != self.oldVersStr: 1666 dvStr = copyDict[date][1].replace( 1667 "_v%s" % self.newVersStr, "_v%s" % self.oldVersStr) 1668 listDirs.append(hdDict[dvStr]) 1669 else: 1670 if copyDict[date][1] in hdDict: 1671 listDirs.append(hdDict[copyDict[date][1]]) 1672 1673 if listDirs: 1674 wfcamListFile = File(os.path.join( 1675 self.sysc.sysPath, "%s_wfcamlist_%s" % ( 1676 self.sysc.loadDatabase, self.runDateStr))) 1677 cmd = ' '.join(["ls --indicator-style=none -oR", 1678 "--time-style=full-iso ", 1679 ' '.join(listDirs)]) 1680 if self.verbose > 1: 1681 print "_listHomeDir:", cmd 1682 p = Popen(cmd, shell=True, stderr=STDOUT, stdout=PIPE) 1683 wfcamListFile.wopen() 1684 for entry in [x.rstrip() for x in p.stdout]: 1685 wfcamListFile.writetheline(entry) 1686 wfcamListFile.close() 1687 dirList, dirStats = self._readDirList(wfcamListFile, lsNames) 1688 tmpDirList = [] 1689 if xferList: 1690 fileNames = [os.path.basename(entry) for entry in xferList] 1691 for entry in dirList: 1692 if entry[0] in fileNames: 1693 tmpDirList.append(entry) 1694 dirList = tmpDirList[:] 1695 1696 dirDict = self._dictFromList(dirList) 1697 if wfcamListFile.exists(): 1698 wfcamListFile.remove() 1699 else: 1700 dirDict = {} 1701 dirStats = {} 1702 1703 return dirDict, dirStats
1704 1705 #-------------------------------------------------------------------------- 1706
1707 - def _mvDeprecatedFiles(self, fileList, deprMode="mv"):
1708 """ 1709 Move files in fileList to the directory for deprecated files. 1710 1711 @param fileList: list of filenames(file, datedir, path(incl. datedir)) 1712 @type fileList: list(str) 1713 @param deprMode: Mode for handling the deprecated files: move ('mv'), 1714 remove ('rm'), leave in place ('ip') 1715 @type deprMode: str 1716 1717 """ 1718 deprFileList = [] 1719 Logger.addMessage("Deprecating files:") 1720 deprLogTime = utils.makeTimeStamp() 1721 if deprMode.partition('_')[0] in ["mv", "ip"]: 1722 if self.isTestRun and self.verbose > 4: 1723 print self.outPrefix 1724 deprecatedListFile = File( 1725 os.path.join(self.sysc.xferLogPath(), ''.join( 1726 [self.outPrefix, "_deprecated_", deprMode, '_', 1727 deprLogTime.replace(' ', '_'), ".log"]))) 1728 deprecatedListFile.wopen() 1729 deprTime = utils.makeTimeStamp().split()[0].replace('-','') 1730 for fileName in sorted(fileList): 1731 deprFile = File(fileName) 1732 if deprMode.partition('_')[0] == "mv": 1733 if deprMode == "mv": 1734 outDateDir = deprFile.subdir 1735 else: 1736 outDateDir = ''.join( 1737 deprFile.subdir.partition("_v")[:2]) \ 1738 + deprMode.partition('_')[2] 1739 1740 spacePerDisk = \ 1741 utils.getDiskSpace(self.sysc.deprecatedDataStorage) 1742 1743 deprPath = os.path.join( 1744 utils.getNextDisk(self.sysc, spacePerDisk), 1745 '%s_%s' % (self.sysc.deprecatedDir, deprTime), 1746 outDateDir) 1747 1748 utils.ensureDirExist(deprPath) 1749 deprName = os.path.join(deprPath, deprFile.base) 1750 elif deprMode == "ip": 1751 deprName = "%s.deprecated_%s" % (deprFile.name, deprTime) 1752 elif deprMode == "rm": 1753 deprName = "---" 1754 try: 1755 if deprMode.partition('_')[0] in ["mv", "ip"]: 1756 if not self.isTestRun: 1757 shutil.copy2(deprFile.name, deprName) 1758 deprFileList.append(deprName) 1759 Logger.addMessage( 1760 ' '.join([deprMode, deprFile.name, deprName])) 1761 deprecatedListFile.writetheline( 1762 ','.join([deprFile.name, deprName])) 1763 elif deprMode == "rm": 1764 if not self.isTestRun: 1765 os.remove(fileName) 1766 Logger.addMessage("rm " + deprFile.name) 1767 except: 1768 Logger.addMessage(' '.join(["Can't", deprMode, deprFile.name, 1769 "//", deprName])) 1770 raise 1771 1772 if deprMode.partition('_')[0] in ["mv", "ip"]: 1773 deprecatedListFile.close() 1774 1775 cmd = "%s/UpdateFitsFileNames.py -d %s -D -f %s %s" % \ 1776 (self.sysc.helpersPath(), self.database, deprecatedListFile.name, 1777 self.sysc.loadDatabase) 1778 1779 Logger.addMessage("Running '%s'" % cmd) 1780 if not self.isTestRun: 1781 status = os.system(cmd) 1782 if status: 1783 Logger.addMessage("<ERROR> UpdateFitsFileNames.py failed!") 1784 for fileName in fileList: 1785 os.remove(fileName) 1786 1787 Logger.addMessage( 1788 "Deprecated file names are listed in " + deprecatedListFile.name) 1789 Logger.addMessage("<INFO> Run 'helpers/SyncFlatFileLookUp.py'") 1790 return deprFileList
1791 1792 #-------------------------------------------------------------------------- 1793 # transfer 1794 #-------------------------------------------------------------------------- 1795
1796 - def _calcTransferRate(self, startTime, endTime, totalSize):
1797 """ 1798 Calculate the transfer rate. 1799 1800 @param startTime: Begin of download. 1801 @type startTime: time object 1802 @param endTime: End of download. 1803 @type endTime: time object 1804 @param totalSize: Total size from download scripts. 1805 @type totalSize: int 1806 1807 """ 1808 elapsedTime = endTime - startTime 1809 transferRate = totalSize/(elapsedTime*self.sysc.one_megabyte) 1810 # write to log file 1811 sizemega = 1.*totalSize/self.sysc.one_megabyte 1812 if sizemega < 1.0: 1813 sizeText = "%.6f kBytes"% (1.*totalSize/self.sysc.one_kilobyte) 1814 elif sizemega > 1.0 and sizemega < 1000.0: 1815 sizeText = "%.6f MBytes"% (1.*totalSize/self.sysc.one_megabyte) 1816 elif sizemega > 1000.0: 1817 sizeText = "%.6f GBytes"% (1.*totalSize/self.sysc.one_gigabyte) 1818 Logger.addMessage(''.join([ 1819 sizeText, " transfered in %d m %.8f s" % (int(elapsedTime/60.), 1820 (elapsedTime % 60))])) 1821 Logger.addMessage("Transfer rate: %.8f MB/s" % (transferRate))
1822 1823 #-------------------------------------------------------------------------- 1824
1825 - def _downloadFiles(self, fileList, outDirDict, totalSize):
1826 """ 1827 Download files. 1828 1829 @param fileList: File list of transferable files. 1830 @type fileList: list 1831 @param outDirDict: Dictionary of date-version strings and their 1832 download directories. 1833 @type outDirDict: dict(str: str) 1834 @param totalSize: Total size of transfer. 1835 @type totalSize: int 1836 1837 @return: Dictionary of transferred files. 1838 @rtype: dict 1839 """ 1840 self._removeScpScripts() 1841 transferDict = self._writeScpScripts(fileList, outDirDict) 1842 startDownload = time.time() 1843 self._threadScpScripts() 1844 endDownload = time.time() 1845 self._calcTransferRate(startDownload, endDownload, totalSize) 1846 return transferDict
1847 1848 #-------------------------------------------------------------------------- 1849
1850 - def _missingFileCheck(self, fileList):
1851 """ 1852 Check for not transfered files. 1853 1854 @param fileList: list of filenames(file, datedir, path(incl. datedir)) 1855 @type fileList: list 1856 1857 @return: List of missing files. 1858 @rtype: list(str) 1859 1860 """ 1861 missingFiles = [] 1862 Logger.addMessage("checking for missing files, started at %s UTC" % \ 1863 (utils.makeTimeStamp())) 1864 for fileName in fileList: 1865 if not os.path.exists(fileName): 1866 missingFiles.append(fileName) 1867 1868 Logger.addMessage("check for missing files finished at %s UTC" % \ 1869 (utils.makeTimeStamp())) 1870 return missingFiles
1871 1872 #-------------------------------------------------------------------------- 1873
1874 - def _removeScpScripts(self):
1875 """ 1876 Remove the previously made scp scripts. 1877 1878 """ 1879 for fileName in os.listdir(self.sysc.sysPath): 1880 if fileName.startswith("scpcam%s%s%s" % ( 1881 self.sysc.loadDatabase, self.scpSuffix, self.runDateStr)): 1882 os.remove(os.path.join(self.sysc.sysPath, fileName))
1883 1884 #-------------------------------------------------------------------------- 1885
1886 - def _threadScpScripts(self):
1887 """ 1888 Run the scp scripts simultaneously, but wait until each has finished. 1889 """ 1890 args_q = Queue.Queue() 1891 result_q = Queue.Queue() 1892 results = defaultdict(int) 1893 1894 # Create the "thread pool" 1895 pool = [Download(args_q=args_q, result_q=result_q) 1896 for i in xrange(self.scpThreads % 10)] 1897 1898 # Start all threads 1899 for thread in pool: 1900 thread.start() 1901 1902 # Give the workers some work to do 1903 work_count = 0 1904 1905 for thread in xrange(self.scpThreads % 10): 1906 scpFileName = getScpScriptName(self.sysc, self.runDateStr, 1907 thread, self.scpSuffix) 1908 if os.path.exists(scpFileName): 1909 work_count += 1 1910 args_q.put((scpFileName, self.isTestRun, self.ffluOnly)) 1911 1912 while work_count > 0: 1913 # Blocking 'get' from a Queue. 1914 status = result_q.get() 1915 results[status[0]] = status 1916 work_count -= 1 1917 1918 # Ask threads to die and wait for them to do it 1919 for thread in pool: 1920 thread.join() 1921 1922 theTime = utils.makeTimeStamp() 1923 sFlag = 0 1924 for thread in results: 1925 res = results[thread][1] 1926 script = results[thread][2] 1927 sFlag += math.fabs(res) 1928 if res < 0: 1929 Logger.addMessage("%s failed at %s UTC, killed by signal %d" % ( 1930 script, theTime, res)) 1931 elif res > 0: 1932 Logger.addMessage("%s unsuccessful at %s UTC with signal %d" % ( 1933 script, theTime, res)) 1934 elif res == 0: 1935 Logger.addMessage("%s finished at %s UTC" % (script,theTime))
1936 1937 #-------------------------------------------------------------------------- 1938
1939 - def _writeScpScripts(self, fileList, outDirs):
1940 """ 1941 Write shell scp script based on the file list comparison. 1942 1943 @param fileList: List of transferable file names. 1944 @type fileList: list(str) 1945 @param outDirs: Dictionary of date-version strings and their 1946 download directories. 1947 @type outDirs: dict(str: str) 1948 1949 @return: Dictionary of downloaded file names and their paths. 1950 @rtype: dict(str: str) 1951 1952 """ 1953 fileDict = {} 1954 shell = "#! /usr/bin/tcsh" 1955 transferCmd = '' 1956 for thread in xrange(self.scpThreads % 10): 1957 outFile = File(getScpScriptName(self.sysc, self.runDateStr, 1958 thread, self.scpSuffix)) 1959 outFile.wopen() 1960 outFile.writetheline(shell) 1961 if CASUTransfer.transferMethod == "ftp": 1962 marker = "cmd" 1963 initCmd = ' '.join([self.sysc.ftp_method, 1964 self.sysc.scpServer(self.UKLight), 1965 "<<", marker]) 1966 outFile.writetheline(initCmd) 1967 outFile.writetheline("bin") 1968 transferCmd = "get InTemplate OutTemplate" 1969 elif CASUTransfer.transferMethod == "axel": 1970 transferCmd = ' '.join(["axel -va -o OutTemplate ", 1971 self.sysc.scpServer(self.UKLight), 1972 "InTemplate"]) 1973 elif self.onlyHeaderTransfer: 1974 transferCmd = ' '.join([ 1975 CASUQueries.sshCmd(self.UKLight, SystemConstants("WSA")), 1976 "'/home/wfcam/bin/listhead InTemplate' > OutTemplate"]) 1977 else: 1978 transferCmd = ':'.join([CASUQueries.scpCmd(self.UKLight), 1979 "InTemplate OutTemplate"]) 1980 fileList[thread].sort() 1981 for fileName in fileList[thread]: 1982 xferFile = File(fileName) 1983 if self.reproMode.mode == "p": 1984 subdir = "%s%s" % (self.reproMode.replace, 1985 xferFile.subdir.partition('_v')[0]) 1986 elif self.reproMode.mode == "s": 1987 subdir = "%s%s" % (xferFile.subdir.partition('_v')[0], 1988 self.reproMode.replace) 1989 else: 1990 subdir = xferFile.subdir.partition('_v')[0] 1991 inPath = os.path.join( 1992 self.scpMainDir, subdir, xferFile.base) 1993 if self.onlyHeaderTransfer: 1994 outPath = os.path.join(outDirs[xferFile.subdir], 1995 xferFile.name.replace(xferFile.ext, ".head")) 1996 else: 1997 outPath = os.path.join(outDirs[xferFile.subdir], fileName) 1998 1999 if not os.path.exists(outPath): 2000 command = transferCmd.replace( 2001 "InTemplate", inPath).replace("OutTemplate", outPath) 2002 outFile.writetheline(command) 2003 fileDict[fileName] = outPath 2004 2005 if CASUTransfer.transferMethod == "ftp": 2006 closeCmd = "quit" 2007 outFile.writetheline(closeCmd) 2008 outFile.writetheline(marker) 2009 outFile.close() 2010 2011 # make file executable 2012 os.chmod(outFile.name, 0764) 2013 return fileDict
2014 2015 #-------------------------------------------------------------------------- 2016 # post-transfer CASU 2017 #-------------------------------------------------------------------------- 2018
2019 - def _writeRemSuccess(self, dirStats):
2020 """ 2021 Write successful_downdload file to Cambridge dirs. 2022 2023 @param dirStats: Directory statistics. 2024 @type dirStats: dict(str:list(int)) 2025 2026 """ 2027 for key, value in dirStats.iteritems(): 2028 if value[0] == 0 and value[1] == 1 and value[2] == 0: 2029 dateStr = key.partition('_v')[0] 2030 if self.reproMode.mode == "p": 2031 subdir = "%s%s" % (self.reproMode.replace, dateStr) 2032 elif self.reproMode.mode == "s": 2033 subdir = "%s%s" % (dateStr, self.reproMode.replace) 2034 else: 2035 subdir = dateStr 2036 2037 outXferFile = File(os.path.join( 2038 self.sysc.sysPath, 2039 "XferredFiles_%s" % self.sysc.loadDatabase)) 2040 outXferFile.aopen() 2041 outText = "%s %s %s" % ( 2042 self.sysc.casuFileSystemVariables.name, dateStr, 2043 utils.makeTimeStamp().partition(' ')[0].replace('-','')) 2044 if subdir != dateStr: 2045 outText += " [%s]" % subdir 2046 outXferFile.writetheline(outText) 2047 outXferFile.close() 2048 2049 if self.verbose > 1: 2050 print "_writeRemSuccess:", outText
2051 2052 #-------------------------------------------------------------------------- 2053
2054 - def _writeRemSuccessToCasu(self, dirStats):
2055 """ 2056 Write successful_downdload file to Cambridge dirs. 2057 2058 @param dirStats: Directory statistics. 2059 @type dirStats: dict(str:list(int)) 2060 2061 """ 2062 for key, value in dirStats.iteritems(): 2063 if value[0] == 0 and value[1] == 1 and value[2] == 0: 2064 dateStr = key.partition('_v')[0] 2065 if self.reproMode.mode == "p": 2066 subdir = "%s%s" % (self.reproMode.replace, dateStr) 2067 elif self.reproMode.mode == "s": 2068 subdir = "%s%s" % (dateStr, self.reproMode.replace) 2069 else: 2070 subdir = dateStr 2071 outDirPath = os.path.join(self.scpMainDir, subdir) 2072 cmd = ' '.join([CASUQueries.sshCmd(self.UKLight), 2073 "'touch %s'" % os.path.join(outDirPath, 2074 self.sysc.rem_success_name)]) 2075 if self.verbose > 1: 2076 print "_writeRemSuccess:", cmd 2077 try: 2078 if not self.ffluOnly: 2079 extp.run(cmd) 2080 except extp.Error: 2081 Logger.addMessage("# Error writing success to %s@%s: %s" % 2082 (self.sysc.scpUser, 2083 self.sysc.scpServer(self.UKLight), 2084 dateStr))
2085 2086 #-------------------------------------------------------------------------- 2087 # post-transfer WFAU 2088 #-------------------------------------------------------------------------- 2089
2090 - def _mvToFin(self, fileList):
2091 """Move files in fileList to final directory. 2092 2093 @param fileList: List of full path file names 2094 @type fileList: list(str) 2095 2096 """ 2097 newFileList = [] 2098 for fileName in fileList: 2099 theFile = File(fileName) 2100 newDirPath = theFile.path.replace(self.sysc.downloadDir, 2101 (self.sysc.headerDir if self.onlyHeaderTransfer 2102 else self.sysc.fitsDir)) 2103 utils.ensureDirExist(newDirPath) 2104 newFile = os.path.join(newDirPath, theFile.base) 2105 if os.path.exists(newFile): 2106 Logger.addMessage("File %s already exists!" % newFile) 2107 self._mvDeprecatedFiles([newFile], deprMode="ip") 2108 os.rename(theFile.name, newFile) 2109 # make file non-executable 2110 os.chmod(newFile, 0664) 2111 newFileList.append(newFile) 2112 return newFileList
2113 2114 #-------------------------------------------------------------------------- 2115
2116 - def _writeLocSuccess(self, dirStats, outDirs):
2117 """ 2118 Write successful_downdload file to local dirs and create transfer list. 2119 2120 @param dirStats: Directory statistics. 2121 @type dirStats: dict 2122 @param outDirs: Dictionary of date-version strings and their 2123 download directories. 2124 @type outDirs: dict(str: str) 2125 2126 @return: List of successfully transferred files. 2127 @rtype: list 2128 2129 """ 2130 xferredFileList = [] 2131 2132 for dateVersStr in outDirs: 2133 if dateVersStr in dirStats and dirStats[dateVersStr] == [0, 1, 0]: 2134 outDirPath = os.path.join(outDirs[dateVersStr], dateVersStr) 2135 for fileName in dircache.listdir(outDirPath): 2136 fileNamePath = os.path.join(outDirPath, fileName) 2137 if "md5" not in fileName and os.path.isfile(fileNamePath) \ 2138 and self.sysc.rem_success_name not in fileName: 2139 xferredFileList.append(fileNamePath) 2140 2141 if not self.isTestRun: 2142 fp = os.path.join(outDirPath, self.sysc.rem_success_name) 2143 try: 2144 extp.run("touch " + fp) 2145 except: 2146 Logger.addMessage("# Error writing success to: %s" % 2147 dateVersStr) 2148 raise 2149 else: 2150 Logger.addMessage("Problem in directory stats: %s %r" % ( 2151 dateVersStr, dirStats[dateVersStr])) 2152 2153 return xferredFileList
2154
2155 #------------------------------------------------------------------------------ 2156 # new stuff 2157 #------------------------------------------------------------------------------ 2158 2159 -class CASUQueries(object):
2160 """Query the CASU directories.""" 2161 camListFile = None 2162 re_digits = re.compile(r'(\d+)') 2163 re_date = re.compile(r'(\A(\d{8})\Z)') 2164 sysc = IngCuSession.sysc 2165 re_datevers = re.compile(r'(\A(\d{8})_v(\d)(.{0,1}\d*)\Z)') 2166 notAvailCASUDisks = [] 2167 2168 #-------------------------------------------------------------------------- 2169 2170 @staticmethod
2171 - def getCasuDataFile(command, casuDataFilePath, copyDict, UKLight=True):
2172 """Execute the given command to create a file at CASU 2173 and copy the file into the sys dir.""" 2174 appendFlag = ">!" 2175 for dirPath in copyDict: 2176 cmd = ' '.join([CASUQueries.sshCmd(UKLight), 2177 "'", command, copyDict[dirPath][0], 2178 appendFlag, casuDataFilePath, "'"]) 2179 if CASUTransfer.verbose > 1: 2180 print "getCasuDataFile:", cmd 2181 try: 2182 extp.run(cmd) 2183 except: 2184 Logger.addMessage("# Error accessing remote server to get " 2185 "data file: %s@%s" % 2186 (CASUQueries.sysc.scpUser, 2187 CASUQueries.sysc.scpServer(UKLight))) 2188 raise 2189 if appendFlag == ">!": 2190 appendFlag = ">>" 2191 return File(CASUQueries.copyFile(casuDataFilePath, UKLight=UKLight))
2192 2193 #-------------------------------------------------------------------------- 2194 2195 @staticmethod
2196 - def getCasuFileListing(commands, casuDataFile, copyDict, UKLight=True):
2197 """Execute the given command to create a file at CASU 2198 and copy the file into the sys dir.""" 2199 if casuDataFile.exists(): 2200 casuDataFile.remove() 2201 casuDataFile.aopen() 2202 for dirPath in copyDict: 2203 cmd = ["%s '%s'" % (CASUQueries.sshCmd(UKLight), 2204 commands[0] % copyDict[dirPath][0]), 2205 "%s '%s'" % (CASUQueries.sshCmd(UKLight), 2206 commands[1] % copyDict[dirPath][0])] 2207 if CASUTransfer.verbose > 1: 2208 print "getCasuFileListing:", cmd 2209 try: 2210 p = Popen(cmd[0], shell=True, stderr=STDOUT, stdout=PIPE) 2211 for entry in [x.rstrip() for x in p.stdout]: 2212 casuDataFile.writetheline(entry) 2213 except: 2214 Logger.addMessage("# Error accessing remote server to get " 2215 "file listing: %s@%s" % 2216 (CASUQueries.sysc.scpUser, 2217 CASUQueries.sysc.scpServer(UKLight))) 2218 raise 2219 try: 2220 p = Popen(cmd[1], shell=True, stderr=STDOUT, stdout=PIPE) 2221 for entry in [x.rstrip() for x in p.stdout]: 2222 casuDataFile.writetheline(entry) 2223 except: 2224 Logger.addMessage("# Error accessing remote server to get " 2225 "file listing: %s@%s" % 2226 (CASUQueries.sysc.scpUser, 2227 CASUQueries.sysc.scpServer(UKLight))) 2228 raise 2229 casuDataFile.close()
2230 2231 #-------------------------------------------------------------------------- 2232 2233 @staticmethod
2234 - def getCasuStagingDirs(UKLight=True, 2235 casuSubDirPath="processed"):
2236 """ Get a list of all main dirs at CASU. 2237 """ 2238 scpCasuDirs = [] 2239 for i in CASUQueries.sysc.casuFileSystemVariables.num: 2240 for x in CASUQueries.sysc.casuFileSystemVariables.alpha: 2241 if (i, x) not in CASUQueries.notAvailCASUDisks: 2242 for line in CASUQueries.getDirListing( 2243 '/data/apm%d_%s/%s/%s' % ( 2244 i, x, CASUQueries.sysc.casuFileSystemVariables.name, 2245 casuSubDirPath), UKLight, "-d", False): 2246 if "Permission denied" in line: 2247 CASUQueries.notAvailCASUDisks.append((i, x)) 2248 Logger.addMessage("# Permission denied on: %s" % 2249 line.partition(':')[0]) 2250 elif not "No such file or directory" in line: 2251 scpCasuDirs.append(line.strip()) 2252 2253 return scpCasuDirs
2254 2255 #-------------------------------------------------------------------------- 2256 2257 @staticmethod
2258 - def getCasuDateDirs(dirList=None, UKLight=True, mode="STR", presuf=''):
2259 """ Get a dictionary of all dateDirs and their paths. 2260 """ 2261 if not dirList: 2262 dirList = CASUQueries.getCasuStagingDirs(UKLight=UKLight) 2263 2264 prefix = (presuf.partition("::")[2] if "p::" in presuf else '') 2265 suffix = (presuf.partition("::")[2] if "s::" in presuf else '') 2266 casuDateDict = {} 2267 for direc in dirList: 2268 cmd = ' '.join([CASUQueries.sshCmd(UKLight), 2269 "'ls -d %s/%s20??????%s'" % (direc, prefix, suffix)]) 2270 if CASUTransfer.verbose > 1: 2271 print "getCasuDateDirs:", cmd 2272 if mode == "MJD": 2273 casuDateDict[direc] = [int(mxTime.strptime(os.path.basename( 2274 os.path.normpath(path.rstrip())), "%Y%m%d").mjd) 2275 for path in os.popen(cmd) 2276 if not os.path.normpath(path.rstrip()).endswith('@')] 2277 else: 2278 casuDateDict[direc] = [os.path.basename(os.path.normpath( 2279 path.rstrip())) for path in os.popen(cmd)] 2280 casuDateDict.update(utils.invertDict(casuDateDict)) 2281 2282 return casuDateDict
2283 2284 #-------------------------------------------------------------------------- 2285 2286 @staticmethod
2287 - def getDirListing(casuDisk, UKLight=True, lsFlags='', longStyle=True):
2288 """ 2289 Get a listing of the given directory. 2290 """ 2291 lsLongArgs = ("-E" if "29" in CASUQueries.sysc.scpServer(UKLight) 2292 else "--time-style=full-iso --indicator-style=none") 2293 lsArgs = (lsLongArgs if longStyle else '') 2294 lsArgs = ' '.join([lsArgs, lsFlags]) 2295 cmd = ' '.join([CASUQueries.sshCmd(UKLight), 2296 "\"ls", lsArgs, casuDisk, "\""]) 2297 if CASUTransfer.verbose > 1: 2298 print "getDirListing:", cmd 2299 p = Popen(cmd, shell=True, stderr=STDOUT, stdout=PIPE) 2300 #dirs = [x.rstrip() for x in p.stdout] 2301 return p.stdout
2302 2303 #-------------------------------------------------------------------------- 2304 2305 @staticmethod
2306 - def getDirSizes(casuDisk, UKLight=True, returnList=False, dateRE="20*"):
2307 """ 2308 Dict of directory sizes. 2309 """ 2310 if CASUQueries.sysc.isWSA(): 2311 duArgs = ' '.join(["--max-depth=1", casuDisk]) 2312 else: 2313 duArgs = os.path.join(casuDisk, dateRE) 2314 2315 cmd = ' '.join([CASUQueries.sshCmd(UKLight), 2316 "\"du -h", duArgs, "\""]) 2317 if CASUTransfer.verbose > 1: 2318 print "getDirSizes:", cmd 2319 dirSizes = [x.rstrip().split('\t') for x in os.popen(cmd)] 2320 2321 if returnList: 2322 return dirSizes 2323 2324 return dict((os.path.basename(cDir), size) for size, cDir in dirSizes)
2325 2326 #-------------------------------------------------------------------------- 2327 2328 @staticmethod
2329 - def getDirVersions(casuDisk, reproVersion=None, forceTransfer=False, 2330 UKLight=True, returnList=False, dateStr = ''):
2331 """ 2332 Dict of directory version numbers (dv). 2333 """ 2334 if forceTransfer: 2335 if reproVersion and reproVersion == '4': 2336 findArg = "VERSION_*" 2337 else: 2338 findArg = ("UPDATED" if "reprocessed" in casuDisk 2339 else "VERSION_*") 2340 else: 2341 findArg = "VERSION_*" 2342 findPath = os.path.join(casuDisk, dateStr) 2343 cmd = "%s \"sh -c 'find %s -name %s -print 2> /dev/null'\"" % \ 2344 (CASUQueries.sshCmd(UKLight), findPath, findArg) 2345 if CASUTransfer.verbose > 1: 2346 print "getDirVersions:", cmd 2347 2348 dirVersions = sorted(x.replace('\n', '') for x in os.popen(cmd) 2349 if findPath in x) 2350 2351 if returnList: 2352 return dirVersions 2353 2354 dvDict = {} 2355 for fileName in dirVersions: 2356 versFile = File(fileName) 2357 versStr = (reproVersion if reproVersion 2358 else versFile.base.rpartition('_')[2]) 2359 dvDict[versFile.path] = '_v'.join([versFile.subdir, versStr]) 2360 return dvDict
2361 2362 #-------------------------------------------------------------------------- 2363 2364 @staticmethod
2365 - def getOTCDirs(startDate, endDate, srDirs, dirVersions, 2366 casuDisk, reproMode=None, reproVersion=None, 2367 forceTransfer=False, checkSR=True, UKLight=True, 2368 returnList=False):
2369 """ 2370 List of ok_to_copy (otc) directories. 2371 2372 @param startDate: Begin of transfer period. 2373 @type startDate: int 2374 @param endDate: End of transfer period. 2375 @type endDate: int 2376 @param srDirs: List of SUCCESSFULLY_READ directories. 2377 @type srDirs: list 2378 @param dirVersions: Dict of directory version numbers. 2379 @type dirVersions: dict 2380 @param checkSR: Check if the SUCCESSFULLY_READ tag is set. 2381 @type checkSR: bool 2382 2383 """ 2384 if forceTransfer: 2385 if reproVersion and reproVersion == '4': 2386 findArg = "VERSION_*" 2387 else: 2388 findArg = ("UPDATED" if "reprocessed" in casuDisk 2389 else "VERSION_*") 2390 else: 2391 findArg = "OK_TO_COPY" 2392 findPath = (os.path.join(casuDisk, str(startDate)) 2393 if startDate == endDate else casuDisk) 2394 cmd = "%s \"sh -c 'find %s -name %s -print 2> /dev/null'\"" % \ 2395 (CASUQueries.sshCmd(UKLight), findPath, findArg) 2396 if CASUTransfer.verbose > 1: 2397 print "getOTCDirs:", cmd 2398 2399 tmpOtcDirs = [x.replace('\n', '') for x in os.popen(cmd)] 2400 2401 # pick out only directories between startdate and enddate 2402 otcDirs = [] 2403 for fileName in tmpOtcDirs: 2404 otcFile = File(fileName) 2405 otcDate = (otcFile.subdir.replace(reproMode.replace, '') 2406 if reproMode else otcFile.subdir[:8]) 2407 if otcDate.isdigit() and startDate <= int(otcDate) <= endDate: 2408 otcDirs.append(fileName) 2409 2410 if returnList: 2411 return otcDirs 2412 2413 otcDict = {} 2414 for fileName in otcDirs: 2415 otcFile = File(fileName) 2416 fileDateStr = otcFile.subdir.replace(reproMode.replace, '') 2417 if otcFile.path not in dirVersions and not reproMode: 2418 text = "%s has no version number." % otcFile.path 2419 Logger.addMessage(text) 2420 raise NoDataError, text 2421 if otcFile.path not in srDirs or not checkSR: 2422 versionStr = (dirVersions[otcFile.path] if not reproVersion 2423 else '_v'.join([fileDateStr, reproVersion])) 2424 otcDict[otcFile.subdir.replace(reproMode.replace, '')] = \ 2425 [otcFile.path, versionStr] 2426 return otcDict
2427 2428 #-------------------------------------------------------------------------- 2429 2430 @staticmethod
2431 - def getSRDirs(casuDisk, UKLight=True, returnList=False, dateStr=''):
2432 """List of successfully_read (sr) directories.""" 2433 findArg = "SUCCESSFULLY_READ" 2434 findPath = os.path.join(casuDisk, dateStr) 2435 cmd = "%s \"sh -c 'find %s -name %s -print 2> /dev/null'\"" % \ 2436 (CASUQueries.sshCmd(UKLight), findPath, findArg) 2437 if CASUTransfer.verbose > 1: 2438 print "getSRDirs:", cmd 2439 2440 srDirs = sorted(x.replace('\n', '') for x in os.popen(cmd)) 2441 2442 if returnList: 2443 return srDirs 2444 2445 srDict = {} 2446 for fileName in srDirs: 2447 srFile = File(fileName) 2448 srDict[srFile.path] = srFile.base 2449 return srDict
2450 2451 #-------------------------------------------------------------------------- 2452 2453 @staticmethod
2454 - def copyFile(fileName, destDir=SystemConstants.sysPath, UKLight=True):
2455 """copy single File from CASU to local 'sys_dir'""" 2456 destFilePath = os.path.join(destDir, os.path.basename(fileName)) 2457 if os.path.exists(destFilePath): 2458 os.remove(destFilePath) 2459 2460 cmd = ':'.join([CASUQueries.scpCmd(UKLight), 2461 "%s %s" % (fileName, destDir)]) 2462 if CASUTransfer.verbose > 1: 2463 print "copyFile:", cmd 2464 2465 try: 2466 extp.run(cmd) 2467 except: 2468 Logger.addMessage( 2469 "# Error accessing remote file: " + fileName) 2470 raise 2471 2472 return destFilePath
2473 2474 #-------------------------------------------------------------------------- 2475 2476 @staticmethod
2477 - def scpCmd(UKLight=True):
2481 2482 #-------------------------------------------------------------------------- 2483 2484 @staticmethod
2485 - def sshCmd(UKLight=True, syscon=None):
2486 if not syscon: 2487 syscon = CASUQueries.sysc 2488 return "%s %s@%s" % (syscon.sshMethod(UKLight), 2489 syscon.scpUser, 2490 syscon.scpServer(UKLight))
2491
2492 #------------------------------------------------------------------------------ 2493 2494 -class Download(Thread):
2495 """Run the previously made scp scripts as separate threads."""
2496 - def __init__(self, args_q, result_q):
2497 super(Download, self).__init__() 2498 self.args_q = args_q 2499 self.result_q = result_q 2500 self.stoprequest = Event()
2501
2502 - def run(self):
2503 """Spawn the scp scripts.""" 2504 while not self.stoprequest.isSet(): 2505 try: 2506 script, isTestRun, ffluOnly = self.args_q.get(True, 0.05) 2507 theTime = utils.makeTimeStamp() 2508 Logger.addMessage(' '.join([os.path.basename(script), 2509 "started at %s UTC" % (theTime)])) 2510 if isTestRun: 2511 status = 0 2512 print "No download due to test run." 2513 elif ffluOnly: 2514 status = 0 2515 Logger.addMessage("No download due to FlatfileLookup" 2516 " update only.") 2517 else: 2518 status = call(script) 2519 self.result_q.put((self.name, status, os.path.basename(script))) 2520 except Queue.Empty: 2521 continue
2522
2523 - def join(self, timeout=None):
2524 self.stoprequest.set() 2525 super(Download, self).join(timeout)
2526
2527 #------------------------------------------------------------------------------ 2528 2529 -class NonSuccessException(Exception):
2530 """ 2531 Exception if the transfer from CASU was not completely successful. 2532 """
2533 - def __init__(self, logFile):
2534 Logger.addMessage( 2535 "# Transfer from CASU not successful. But there may be some " 2536 "successfully read files in %s." % logFile)
2537
2538 #------------------------------------------------------------------------------ 2539 2540 -class NoDataError(Exception):
2541 """Throw exception if no data is available for the given date""" 2542 pass
2543
2544 #------------------------------------------------------------------------------ 2545 2546 -class MD5Checker(object):
2547 """Class containing all MD5 related functions. 2548 """ 2549 sysc = IngCuSession.sysc
2550 - def md5Run(self, dirStats, tfDict, outDirs):
2551 """ 2552 md5 checking. 2553 2554 @param dirStats: Directory statistics. 2555 @type dirStats: dict 2556 @param tfDict: Dictionary of transferred files. 2557 @type tfDict: dict 2558 2559 @return: Status of md5 checks on files here and at CASU, and list of 2560 files with missing md5 checks. 2561 @rtype: list(int, int), list 2562 2563 """ 2564 # create md5s here 2565 md5HereDict = self._md5Calc(tfDict.values()) 2566 # create md5s there 2567 md5CambList = [] 2568 tmpList = [] 2569 md5FilesMiss = [] 2570 # check md5s 2571 for k in dirStats: 2572 if dirStats.get(k)[0] == 0 and dirStats.get(k)[1] == 1: 2573 md5sumsDirPath = os.path.join(outDirs[k], k) 2574 tmpList, md5CambStatus = \ 2575 self._readCamMd5(os.path.join(md5sumsDirPath, "md5sums_"+k)) 2576 md5CambList.extend(tmpList) 2577 2578 md5HereStatus, md5FilesMissing = \ 2579 self._md5Compare(md5CambList, md5HereDict, md5FilesMiss) 2580 2581 return [md5CambStatus, md5HereStatus], md5FilesMissing
2582 2583 #-------------------------------------------------------------------------- 2584
2585 - def _md5Calc(self, fileList):
2586 """ 2587 md5 checksum calculation. 2588 2589 @param fileList: List of files to check. 2590 @type fileList: list(str) 2591 2592 @return: Dictionary of files and their checksums. 2593 @rtype: dict(str: int) 2594 2595 """ 2596 Logger.addMessage("# md5 checksum calculation started at %s UTC" % 2597 utils.makeTimeStamp()) 2598 2599 md5Dict = {} 2600 for fileName in fileList: 2601 if os.path.exists(fileName): 2602 m = hashlib.md5() 2603 for line in open(fileName): 2604 m.update(line) 2605 2606 outPath = os.path.splitext(fileName)[0] + ".md5" 2607 open(outPath, 'w').write(m.hexdigest()) 2608 2609 inDir = os.path.split(os.path.dirname(fileName))[1] 2610 inPath = os.path.join(inDir, os.path.basename(fileName)) 2611 md5Dict[inPath] = [m.hexdigest(), fileName] 2612 2613 Logger.addMessage("# md5 checksum calculation finished at %s UTC" % 2614 utils.makeTimeStamp()) 2615 2616 return md5Dict
2617 2618 #-------------------------------------------------------------------------- 2619
2620 - def _md5Compare(self, remList, locDict, missingFiles):
2621 """ 2622 Test md5 checksums and move corrupted files to 'cor_dir'. 2623 2624 @param remList: File list from CASU. 2625 @type remList: list 2626 @param locDict: Local file dict of transfered files. 2627 @type locDict: dict 2628 @param missingFiles: Missing (not downloaded) files. 2629 @type missingFiles: list 2630 2631 @return: Status of md5 check (0 OK, 1 missing/wrong checksum) and 2632 missingFiles list updated with files with different checksums. 2633 @rtype: int, list 2634 2635 """ 2636 noMatch = 0 2637 status = 0 2638 for element in remList: 2639 if element[0] in locDict: 2640 # check md5sum CASU against WFAU, 2641 # but not for CASUs md5sum-files 2642 if element[1] != locDict.get(element[0])[0] \ 2643 and "md5sum" not in element[0]: 2644 noMatch += 1 2645 status = 1 2646 corFile = os.path.join( 2647 self.sysc.downloadPathList(), 2648 locDict.get(element[0])[1]) 2649 shutil.copy2(corFile, self.sysc.corruptFilesPath) 2650 os.remove(corFile) 2651 missingFiles.append(locDict.get(element[0])[1]) 2652 Logger.addMessage( 2653 "# md5sum of %s doesn't match! File copied into %s" % 2654 (element[0], self.sysc.corruptFilesPath)) 2655 2656 return status, missingFiles
2657 2658 #-------------------------------------------------------------------------- 2659
2660 - def _readCamMd5(self, fileName):
2661 """ 2662 Read camb md5 list. 2663 2664 @param fileName: File at CASU containing the md5 checksums there. 2665 @type fileName: str 2666 2667 @return: List of [file name, checksum] pairs, and status of md5 checks. 2668 @rtype: list(list(str, int)), int 2669 2670 """ 2671 md5List = [] 2672 status = 0 2673 if os.path.exists(fileName): 2674 for line in file(fileName): 2675 md5List.append([os.path.basename(line.split()[1]), 2676 line.split()[0]]) 2677 else: 2678 Logger.addMessage( 2679 "# No md5sum from CASU in " + os.path.dirname(fileName)) 2680 status = 1 2681 return md5List, status
2682 2683 #------------------------------------------------------------------------------ 2684