1
2
3 """
4 Python classes used in CU1.
5
6 @author: E. Sutorius
7 @org: WFAU, IfA, University of Edinburgh
8 """
9
10 from collections import defaultdict, namedtuple
11 import dircache
12 import hashlib
13 import inspect
14 import math
15 import Queue
16 import mx.DateTime as mxTime
17 import os
18 import operator
19 import re
20 import shutil
21 import StringIO
22 from subprocess import call, Popen, PIPE, STDOUT
23 from threading import Thread, Event
24 import time
25
26 import wsatools.ExternalProcess as extp
27 from wsatools.File import File
28 from wsatools.DbConnect.IngCuSession import IngCuSession
29 from wsatools.DbConnect.IngIngester import IngestLogger as Logger
30 from wsatools.Utilities import Ratings
31 from wsatools.SystemConstants import SystemConstants
32 import wsatools.Utilities as utils
36 """Initialize automatically instance variables from __init__ arguments.
37
38 @param argumentNames: List of arguments of the class to be initialized.
39 @type argumentNames: list[str]
40 @param argDict: Dictionary of arguments and their values.
41 @type argDict: dict{arg:value}
42 @param prefix: Prefix to be given to the argument names, eg. '_'
43 @type prefix: str
44 """
45 self = argDict.pop('self')
46 for arg in argumentNames[1:]:
47 setattr(self, prefix + arg, argDict[arg])
48
54
58 """
59 Transfer GES data.
60
61 """
62 sysc = SystemConstants()
63 gesPointings = ["m15", "mw", "ngc1851", "ngc2808", "ngc4372", "ngc4833",
64 "ngc6752", "solar", "trumpler20"]
65
66 - def __init__(self, instrument='', mode='', casuDir='',
67 xferList='', versStr='', forceTransfer=False,
68 deprecationMode="mv",
69 database=None, UKLight=True, outPrefix=None, isTestRun=False):
70 """
71 @param instrument: GES instrument, ie. giraffe or uves.
72 @type instrument: str
73 @param mode: GES mode, ie. nightly or stacked.
74 @type mode: str
75 @param casuDir: Directory at CASU, where FITS files are hosted.
76 @type casuDir: str
77 @param database: The database.
78 @type database: str
79 @param xferList: File containig list of files to transfer.
80 @type xferList: str
81 @param forceTransfer: Force the data transfer.
82 @type forceTransfer: bool
83 @param deprecationMode: Modus for deprecating files;
84 with forceTransfer: file suffix for overwriting.
85 @type deprecationMode: str
86 @param UKLight: Use UKLight instead of JANET.
87 @type UKLight: bool
88 @param outPrefix: Prefix of output file(s).
89 @type outPrefix: str
90 @param versStr: Version number of data.
91 @type versStr: str
92 @param isTestRun: If True, do not perform database modifications.
93 @type isTestRun: bool
94 """
95 _attributesFromArguments(
96 self, inspect.getargspec(GESTransfer.__init__)[0], locals())
97
98
99 CASUTransferLog.createTransferLogfile(self.sysc, self.outPrefix)
100
101
102
104 """
105 Transfer data from CASU.
106
107 """
108 CASUQueries.sysc = self.sysc
109 Logger.addMessage("Creating script for %s downloads of %s data." % (
110 self.mode, self.instrument))
111
112
113 casuSubDir = ("uves/proc_%s" if self.instrument == "uves" else
114 "proccasu_%s")
115 if self.mode == "archived":
116 casuSubDir = os.path.join("archdata/%s", casuSubDir)
117 if self.mode == "stacked":
118 casuSubDir = os.path.join(casuSubDir, self.mode)
119
120 if self.mode == "archived":
121 casuDirs = []
122 for point in self.gesPointings:
123 casuDirs.extend(CASUQueries.getCasuStagingDirs(
124 UKLight=self.UKLight,
125 casuSubDirPath=casuSubDir % (point, self.versStr)))
126 else:
127 casuDirs = CASUQueries.getCasuStagingDirs(
128 UKLight=self.UKLight, casuSubDirPath=casuSubDir % self.versStr)
129
130 for cd in casuDirs:
131 Logger.addMessage("Reading CASU dir: %s ..." % cd)
132 casuDirDict, inDirs = self.getCASUDirs(cd, runDate)
133
134 Logger.addMessage("Checking WFAU outdirs ...")
135 outDirs = self.checkOutDirs(inDirs)
136
137 Logger.addMessage("Writing scp scripts ...")
138 xferDict = self.writeScpScripts(casuDirDict, outDirs)
139
140 if not self.isTestRun:
141 Logger.addMessage("Starting transfer.")
142 status = self.runScpScripts()
143 else:
144 status = -1
145 Logger.addMessage("Finished transfer.")
146
147
148 CASUTransferLog.updateTransferLog(list(utils.unpackList(
149 xferDict.values())))
150 CASUTransferLog.closeTransferLogfile()
151
152 return CASUTransferLog.logFile.name, status, outDirs.values()
153
154
155
157 """
158 Ensure the output directories are available.
159
160 """
161 wfauSubDir = os.path.join(self.instrument,
162 "%s_v%s" % (self.mode, self.versStr))
163
164 availMainDirs = [os.path.join(x, self.sysc.fitsDir, wfauSubDir)
165 for x in sorted(utils.unpackList(
166 self.sysc.serverRaidArray.values()))]
167
168 for direc in availMainDirs:
169 utils.ensureDirExist(direc)
170
171 outDirs = defaultdict()
172
173 for md in availMainDirs:
174 if self.mode == "archived":
175 for dirname, dirnames, filenames in os.walk(md):
176 for subdirname in dirnames:
177 outPath = os.path.join(dirname, subdirname)
178 outDirs[outPath.partition(
179 "%s_v%s/" % (self.mode, self.versStr))[2]] = outPath
180 else:
181 for sd in dircache.listdir(md):
182 outDirs[sd] = os.path.join(md, sd)
183
184
185 for cdir in inDirs:
186 if self.mode == "archived":
187
188 if cdir not in outDirs.values():
189 outPath = os.path.join(availMainDirs[-1], cdir)
190 utils.ensureDirExist(outPath)
191 outDirs[cdir] = outPath
192 else:
193 if cdir not in outDirs.values():
194 wdir = ("%s_v%s" % (cdir, self.versStr)
195 if self.mode == "nightly" else cdir)
196 outPath = os.path.join(availMainDirs[-1], wdir)
197 utils.ensureDirExist(outPath)
198 outDirs[cdir] = outPath
199 return outDirs
200
201
202
204 """
205 Get the listing for the given CASU directory.
206 """
207 inDirs = []
208 casuDirDict = defaultdict(list)
209 for line in CASUQueries.getDirListing(casuDir, self.UKLight, "-p"):
210 if line.strip().startswith('d'):
211 subDir = line.strip().rpartition(' ')[2].replace('/', '')
212 if self.mode == "stacked" or \
213 (self.mode == "nightly" \
214 and subDir.isdigit() and subDir == runDate) or \
215 self.mode == "archived":
216 casuPath = os.path.join(casuDir, subDir)
217 if self.mode == "archived":
218 if subDir.isdigit() or subDir == "stacked":
219 inDirs.append(casuPath.partition("archdata/")[2])
220 else:
221 inDirs.append(subDir)
222 for entry in CASUQueries.getDirListing(
223 casuPath, self.UKLight, "-p"):
224 getFile = False
225 if ".fit" in entry:
226 if self.instrument == "giraffe" \
227 and (self.mode == "nightly" or \
228 self.mode == "archived")\
229 and "_final" in entry:
230 getFile = True
231 if self.instrument == "uves" \
232 and self.mode == "nightly":
233 getFile = True
234 if self.mode == "stacked":
235 getFile = True
236 if self.mode == "archived" and subDir == "stacked":
237 getFile = True
238 if getFile:
239 casuDirDict[casuPath].append(
240 entry.strip().rpartition(' ')[2])
241 return casuDirDict, inDirs
242
243
244
246 """
247 Run the scp scripts.
248
249 """
250 scpFile = File(os.path.join(self.sysc.sysPath, "scpcam%s%s" % (
251 self.sysc.loadDatabase, self.mode[:1])))
252
253 try:
254 os.system(scpFile.name)
255 status = 1
256 except:
257 Logger.addMessage("Problem running %s" % copyScript.name)
258 status = -1
259
260 return status
261
262
263
265 """
266 Write shell scp script based on the file list comparison.
267
268 @param casuDirDict: Dict of transferable files.
269 @type casuDirDict: dict(list)
270 @param outDirs: Dictionary of date-version strings and their
271 download directories.
272 @type outDirs: dict(str: str)
273
274 @return: Dictionary of downloaded file names and their paths.
275 @rtype: dict(str: str)
276
277 """
278 fileDict = {}
279 shell = "#! /usr/bin/tcsh"
280 transferCmd = ''
281
282 outFile = File(os.path.join(self.sysc.sysPath, "scpcam%s%s" % (
283 self.sysc.loadDatabase, self.mode[:1])))
284 outFile.wopen()
285 outFile.writetheline(shell)
286
287 transferCmd = ':'.join([CASUQueries.scpCmd(self.UKLight),
288 "InTemplate OutTemplate"])
289
290 for inPath in sorted(casuDirDict):
291 for fileName in casuDirDict[inPath]:
292 xferFile = File(os.path.join(inPath, fileName))
293 if self.mode == "archived":
294 theSubdir = inPath.partition("archdata/")[2]
295 else:
296 theSubdir = xferFile.subdir
297 outPath = os.path.join(outDirs[theSubdir], fileName)
298 if not os.path.exists(outPath):
299 command = transferCmd.replace(
300 "InTemplate", xferFile.name).replace(
301 "OutTemplate", outPath)
302 outFile.writetheline(command)
303 fileDict[os.path.join(theSubdir, fileName)] = outPath
304
305 outFile.close()
306
307
308 os.chmod(outFile.name, 0764)
309 return fileDict
310
314 """
315 Transfer FITS files of mixed dates from a single directory and distribute
316 the data accordingly.
317 Data at CASU:
318 GPS: /data/apm22_e/wfcam/reprocessed/nebulosity
319
320 """
321 sysc = SystemConstants()
322
323 - def __init__(self, fitsList=None, casuDir='',
324 xferList='', forceTransfer=False, deprecationMode="mv",
325 database=None, UKLight=True, outPrefix=None, isTestRun=False):
326 """
327 @param fitsList: Initialised FitsList object.
328 @type fitsList: FitsList
329 @param casuDir: Directory at CASU, where FITS files are hosted.
330 @type casuDir: str
331 @param database: The database.
332 @type database: str
333 @param xferList: File containig list of files to transfer.
334 @type xferList: str
335 @param forceTransfer: Force the data transfer.
336 @type forceTransfer: bool
337 @param deprecationMode: Modus for deprecating files;
338 with forceTransfer: file suffix for overwriting.
339 @type deprecationMode: str
340 @param UKLight: Use UKLight instead of JANET.
341 @type UKLight: bool
342 @param outPrefix: Prefix of output file(s).
343 @type outPrefix: str
344 @param isTestRun: If True, do not perform database modifications.
345 @type isTestRun: bool
346 """
347 _attributesFromArguments(
348 self, inspect.getargspec(TransferCASUList.__init__)[0], locals())
349
350 if self.xferList:
351 self.fromFile = True
352 self.casuListFileName = self.xferList
353 else:
354 self.fromFile = False
355 self.casuListFileName = "%s.list" % os.path.basename(casuDir)
356
357
358 self.fitsList.createFitsDateDict()
359 self.transferList = []
360 self.deprFileList = []
361 self.dateSet = set()
362
363 CASUTransferLog.createTransferLogfile(self.sysc, self.outPrefix)
364
365
366
368 CASUQueries.sysc = self.sysc
369 DeprecateFiles.sysc = self.sysc
370 if any(x in self.casuDir for x in ["nebulosity"]) \
371 or any(x in self.casuListFileName
372 for x in ["nebulosity"]):
373 mode = "GPSneb"
374 elif any(x in self.casuDir for x in ["gps_astrom"]) \
375 or any(x in self.casuListFileName
376 for x in ["gps_astrom"]):
377 mode = "GPSastron"
378
379 Logger.addMessage("Creating script for %s downloads." % mode)
380
381
382 if not self.fromFile:
383 cmd = "ls -R"
384 copyDict = {self.casuDir: [self.casuDir]}
385 casuDataFilePath = os.path.join(self.sysc.rem_camlist_dir,
386 self.casuListFileName)
387 casuListFile = CASUQueries.getCasuDataFile(
388 cmd, casuDataFilePath, copyDict, False)
389 else:
390 casuListFile = File(os.path.join(self.sysc.sysPath,
391 self.casuListFileName))
392 casuListFile.ropen()
393 casuFileList = casuListFile.readlines()
394 casuListFile.close()
395
396 Logger.addMessage("Creating transfer script...")
397 parsedDict = self.parseList(casuFileList)
398 copyScript = File("./copy%s_%s.sh" % (
399 mode, utils.makeTimeStamp().replace(' ', '_')))
400 copyScript.wopen()
401 for topDir in parsedDict:
402 scpCmd = ':'.join([CASUQueries.scpCmd(self.UKLight), topDir])
403 for dateStr in sorted(parsedDict[topDir]):
404 deprecateList = []
405 for fileName in sorted(parsedDict[topDir][dateStr]):
406 if mode == "GPSneb":
407 copyCmd = os.path.join(scpCmd, fileName)
408
409 if "_two" in fileName:
410 newFileName = os.path.join(
411 self.getDestDir(dateStr), fileName)
412 elif "_conf" in fileName \
413 and "nebulosity" in self.casuDir:
414 newFileName = os.path.join(
415 self.getDestDir(dateStr),
416 "%s_two%s%s" % fileName.partition('_conf'))
417 else:
418 newFileName = ''
419 if newFileName:
420 if not os.path.exists(newFileName) \
421 or (self.forceTransfer
422 and self.deprecationMode in newFileName):
423 copyScript.writetheline(
424 '%s %s' % (copyCmd, newFileName))
425 self.transferList.append(newFileName)
426 else:
427 Logger.addMessage("File already exists: %s" % \
428 newFileName)
429 elif mode == "GPSastron":
430 copyCmd = os.path.join(scpCmd, fileName)
431 newFileName = os.path.join(
432 self.getDestDir(dateStr), fileName)
433 copyScript.writetheline(
434 '%s %s' % (copyCmd, newFileName))
435 self.transferList.append(newFileName)
436 if os.path.exists(newFileName):
437 deprecateList.append(newFileName)
438 if deprecateList and self.deprecationMode.lower() != "no":
439 self.deprFileList.extend(DeprecateFiles._mvDeprecatedFiles(
440 self.database, self.outPrefix, deprecateList,
441 self.deprecationMode, self.isTestRun))
442 Logger.addMessage(
443 "Nr. of deprecated files: %s" % len(deprecateList))
444
445 copyScript.close()
446 Logger.addMessage("Finished writing %s." % copyScript.name)
447
448 if not self.isTestRun:
449 Logger.addMessage("Starting transfer.")
450 try:
451 copyScript.chmod(0744)
452
453 status = 1
454 except:
455 Logger.addMessage("Problem running %s" % copyScript.name)
456 status = -1
457 Logger.addMessage("Finished transfer.")
458 else:
459 status = -1
460
461 CASUTransferLog.updateTransferLog(self.transferList)
462 CASUTransferLog.closeTransferLogfile()
463
464 return CASUTransferLog.logFile.name, status, \
465 sorted(list(self.dateSet)), self.deprFileList
466
467
468
470 """ Find the destination directory.
471 """
472 versNos = self.sysc.obsCal.versNums(self.sysc.obsCal.checkDate(dateStr))
473 for versNum in versNos.reverse():
474 try:
475 dateVersStr = ('%s_v%s') % (dateStr, versNum)
476 destDir = os.path.join(
477 self.fitsList.invFitsDateDict[dateVersStr], dateVersStr)
478 self.dateSet.add(destDir)
479 except KeyError:
480 pass
481 else:
482 return destDir
483
484 Logger.addMessage(
485 "<WARNING> Directory not found (versions %r): %s" % (
486 versNos, destDir))
487
488
489
491 dataDict = {}
492 for entry in fileList:
493 fileName = entry.split()[-1].replace('@', '')
494 if self.casuDir in entry:
495 topDir = fileName.replace(':', '')
496 dataDict[topDir] = defaultdict(list)
497 elif fileName.endswith((".fit",".fits")):
498 if fileName.startswith('w'):
499 dateStr = fileName.partition('_')[0].replace('w', '')
500 dataDict[topDir][dateStr].append(fileName)
501 return dataDict
502
506 """Deprecate files for CASU transfers.
507 """
508 sysc = IngCuSession.sysc
509
510
511
512 @staticmethod
513 - def _mvDeprecatedFiles(database, outPrefix, fileList, deprMode="mv",
514 isTestRun=False):
515 """
516 Move files in fileList to the directory for deprecated files.
517
518 @param fileList: list of filenames(file, datedir, path(incl. datedir))
519 @type fileList: list(str)
520 @param deprMode: Mode for handling the deprecated files: move ('mv'),
521 remove ('rm'), leave in place ('ip')
522 @type deprMode: str
523
524 """
525 deprFileList = []
526 Logger.addMessage("Deprecating files:")
527 deprLogTime = utils.makeTimeStamp()
528 if deprMode.partition('_')[0] in ["mv", "ip"]:
529 deprecatedListFile = File(
530 os.path.join(DeprecateFiles.sysc.xferLogPath(), ''.join(
531 [outPrefix, "_deprecated_", deprMode, '_',
532 deprLogTime.replace(' ', '_'), ".log"])))
533 deprecatedListFile.wopen()
534 deprTime = utils.makeTimeStamp().split()[0].replace('-','')
535 for fileName in fileList:
536 deprFile = File(fileName)
537 if deprMode.partition('_')[0] == "mv":
538 if deprMode == "mv":
539 outDateDir = deprFile.subdir
540 else:
541 outDateDir = ''.join(
542 deprFile.subdir.partition("_v")[:2]) \
543 + deprMode.partition('_')[2]
544
545 spacePerDisk = \
546 utils.getDiskSpace(DeprecateFiles.sysc.deprecatedDataStorage)
547
548 deprPath = os.path.join(
549 utils.getNextDisk(DeprecateFiles.sysc, spacePerDisk),
550 '%s_%s' % (DeprecateFiles.sysc.deprecatedDir, deprTime),
551 outDateDir)
552
553 utils.ensureDirExist(deprPath)
554 deprName = os.path.join(deprPath, deprFile.base)
555 elif deprMode == "ip":
556 deprName = "%s.deprecated_%s" % (deprFile.name, deprTime)
557 elif deprMode == "rm":
558 deprName = "---"
559 try:
560 if deprMode.partition('_')[0] in ["mv", "ip"]:
561 if not isTestRun:
562 shutil.copy2(deprFile.name, deprName)
563 deprFileList.append(deprName)
564 Logger.addMessage(
565 ' '.join([deprMode, deprFile.name, deprName]))
566 deprecatedListFile.writetheline(
567 ','.join([deprFile.name, deprName]))
568 elif deprMode == "rm":
569 if not isTestRun:
570 os.remove(fileName)
571 Logger.addMessage("rm " + deprFile.name)
572 except:
573 Logger.addMessage(' '.join(["Can't", deprMode, deprFile.name,
574 "//", deprName]))
575 raise
576
577 if deprMode.partition('_')[0] in ["mv", "ip"]:
578 deprecatedListFile.close()
579
580 cmd = "%s/UpdateFitsFileNames.py -d %s -D -f %s %s" % \
581 (DeprecateFiles.sysc.helpersPath(), database,
582 deprecatedListFile.name, DeprecateFiles.sysc.loadDatabase)
583
584 Logger.addMessage("Running '%s'" % cmd)
585 if not isTestRun:
586 status = os.system(cmd)
587 status = 0
588 if status:
589 Logger.addMessage("<ERROR> UpdateFitsFileNames.py failed!")
590 for fileName in fileList:
591 os.remove(fileName)
592
593 Logger.addMessage(
594 "Deprecated file names are listed in " + deprecatedListFile.name)
595 Logger.addMessage("<INFO> Run 'helpers/SyncFlatFileLookUp.py'")
596 return deprFileList
597
601 """Transfer log for CASU transfers.
602 """
603
604 @staticmethod
613
614
615
616 @staticmethod
631
632
633
634 @staticmethod
636 """
637 Write new full file path names into transfer log.
638
639 @param fileList: List of transferred file names.
640 @type fileList: list(str)
641
642 """
643 for entry in fileList:
644 CASUTransferLog.logFile.writetheline(entry)
645
649 """
650 Contains methods and functions related to CU1 transfers.
651 """
652 maxRetries = 10
653 remLsOpt = ".fit"
654 remExclPattern = ["dqc", "xsky", "chan", "bpm", "tmp", "orig", "jrf",
655 "mos", "squish", "list", '~']
656 transferMethod = "scp"
657 sysc = SystemConstants()
658 verbose = 0
659
660 - def __init__(self, scpMainDir=None, scpThreads=1,
661 checkTimeFlag=False, checkMd5Flag=False,
662 UKLight=True, forceTransfer=False,
663 deprecationMode="mv", database=None, outPrefix=None,
664 reproMethod='', reproVersStr=None, ffluOnly=False,
665 onlyHeaderTransfer=False,
666 isTestRun=False):
667 """
668 @param scpMainDir: Disk at CASU where the data is stored.
669 @type scpMainDir: str
670 @param scpThreads: Number of scp transfer threads.
671 @type scpThreads: int
672 @param checkTimeFlag: Determines if the timestamp of existing files
673 should be checked against the file at CASU.
674 @type checkTimeFlag: bool
675 @param checkMd5Flag: Check the md5 checksums at CASU and WFAU.
676 @type checkMd5Flag: bool
677 @param UKLight: Use UKLight instead of JANET.
678 @type UKLight: bool
679 @param ffluOnly: Don't transfer but update FlatfileLookup table.
680 @type ffluOnly: bool
681 @param forceTransfer: Force the data transfer.
682 @type forceTransfer: bool
683 @param deprecationMode: Modus for deprecating files.
684 @type deprecationMode: str
685 @param database: The database.
686 @type database: str
687 @param onlyHeaderTransfer: Transfer only FITS headers.
688 @type onlyHeaderTransfer: bool
689 @param outPrefix: Prefix of output file(s).
690 @type outPrefix: str
691 @param reproMethod: Method for reprocessed data: s::suffix (CASU
692 datedir suffix) or v::version.
693 @type reproMethod: str
694 @param reproVersStr: Version number of reprocessed data.
695 @type reproVersStr: str
696 @param isTestRun: If True, do not perform database modifications.
697 @type isTestRun: bool
698
699 """
700 _attributesFromArguments(
701 self, inspect.getargspec(CASUTransfer.__init__)[0], locals())
702
703 self.scpSuffix = ("head" if self.onlyHeaderTransfer else '')
704
705 if self.onlyHeaderTransfer:
706 self.scpThreads = 1
707
708
709 if self.reproVersStr:
710 if "=>" in self.reproVersStr:
711 self.oldVersStr, self.newVersStr = \
712 self.reproVersStr.partition("=>")[::2]
713 elif ',' in self.reproVersStr:
714 self.oldVersStr, self.newVersStr = \
715 self.reproVersStr.partition(',')[::2]
716 else:
717 self.oldVersStr, self.newVersStr = [
718 self.reproVersStr, self.reproVersStr]
719 if ',' in self.oldVersStr:
720 self.versStr, self.oldVersStr = self.oldVersStr.split(',')
721 else:
722 self.versStr = self.oldVersStr
723
724
725
760
761
762
763 - def transfer(self, runDate, xferList=[]):
764 """
765 Main script to transfer data from CASU.
766
767 @param runDate: The date for the transfer.
768 @type runDate: int
769 @param xferList: Optional list containing files to be downloaded.
770 @type xferList: list
771
772 @return: xferlogfile name, download status, log text,
773 list of transferred dates.
774 @rtype: tuple(str, int, StringIO, list)
775
776 """
777 Logger.addMessage("Initialising transfer: %s..." % runDate)
778 self.initTransfer(runDate)
779
780 CASUQueries.camListFile = File(os.path.join(self.sysc.sysPath,
781 "%s_camlist_%s" % (self.sysc.loadDatabase, self.runDateStr)))
782
783 if xferList:
784 checkSRFlag = False
785 self.scpMainDir = File(xferList[0]).topdir
786 else:
787 checkSRFlag = not self.forceTransfer
788 if not self.scpMainDir:
789 self.scpMainDir = CASUQueries.getCasuDateDirs(
790 UKLight=self.UKLight)[self.runDateStr]
791
792 if type(self.scpMainDir) == type([]):
793 if len(self.scpMainDir) > 1:
794
795 OTCList = []
796 for cd in self.scpMainDir:
797 OTCList.extend(CASUQueries.getOTCDirs(
798 runDate, runDate, [], {},
799 os.path.join(cd, self.runDateStr),
800 returnList=True))
801 OTCList = [x.partition(self.runDateStr)[0]
802 for x in OTCList]
803
804 cpList = list(set(OTCList).intersection(
805 self.scpMainDir))
806 if len(cpList) > 1:
807 Logger.addMessage(
808 "There are multiple directories for %s: %r" % (
809 self.runDateStr, self.scpMainDir))
810 raise NoDataError
811 else:
812 self.scpMainDir = cpList[0]
813 else:
814 self.scpMainDir = self.scpMainDir[0]
815 try:
816 casuDict, hereDict, casuStats, hereStats, outDateDirs = \
817 self.getListings(runDate, xferList, checkSR=checkSRFlag)
818 except NoDataError:
819 logText = self._writeLoggerMessages()
820 return CASUTransferLog.logFile.name, False, logText, [], [], []
821
822 fileList, totalDLSize, deprFileList = self._checkLocalExist(
823 casuDict, hereDict, casuStats, hereStats)
824
825
826 fileList.sort(key=operator.itemgetter(5))
827
828
829 loadList = self._buildLoadList(fileList, totalDLSize)
830
831
832 lockedDirs = 0
833 for x in casuStats.itervalues():
834 lockedDirs += x[0]
835
836
837 if totalDLSize > 0:
838 dlStatus = self.transferFiles(loadList, totalDLSize, casuStats,
839 hereStats, casuDict, outDateDirs)
840 else:
841 text = "# No files downloaded."
842 if lockedDirs > 0:
843 text = "\n".join([text,
844 "# %d dir(s) read locked: " % (lockedDirs)])
845 for k, v in casuStats.iteritems():
846 if v[0] > 0:
847 text = "\n".join([text, "# %s" % k])
848 else:
849 text = "\n".join([text, "# Directories are identical."])
850 Logger.addMessage(text)
851 dlStatus = True
852
853
854 tmpTransferList = self._writeLocSuccess(hereStats, outDateDirs)
855
856 transferList = self._cleanupTransferList(tmpTransferList)
857
858
859 newFileList = self._mvToFin(transferList)
860
861 if self.ffluOnly:
862 newFileList = []
863 for thread in xrange(self.scpThreads % 10):
864 loadList[thread].sort()
865 for fileName in loadList[thread]:
866 newFileList.append(fileName)
867
868 CASUTransferLog.updateTransferLog(newFileList)
869 if not self.isTestRun:
870 if not self.forceTransfer and not self.ffluOnly:
871 self._writeRemSuccess(casuStats)
872 self._cleanUp()
873 else:
874 self._cleanUp()
875
876
877 logText = self._writeLoggerMessages()
878
879 dateList = []
880 for oDir in outDateDirs:
881 fDir = outDateDirs[oDir].replace(self.sysc.downloadDir,
882 self.sysc.fitsDir)
883
884 dateList.append(os.path.join(fDir, oDir))
885
886
887
888
889 if self.ffluOnly:
890 for dirPath in dateList:
891 try:
892 os.rmdir(dirPath)
893 except OSError:
894 Logger.addMessage("<Warning> Can't remove %s" % dirPath)
895
896 return CASUTransferLog.logFile.name, dlStatus, logText, dateList, \
897 newFileList, deprFileList
898
899
900
901 - def getListings(self, runDate, xferList=[], checkSR=True):
902 """
903 Get the remote and local listing (remote first to create same
904 dirs here).
905
906 @param runDate: The date for the transfer.
907 @type runDate: int
908 @param xferList: Optional list containing files to be downloaded.
909 @type xferList: list
910 @param checkSR: Check if the SUCCESSFULLY_READ tag is set.
911 @type checkSR: bool
912
913 @return: dictionaries containing data and stats for directories
914 at CASU and WFAU.
915 @rtype: dict
916
917 """
918
919 try:
920 transferableDict = self._getTransferableDirs(runDate, checkSR)
921 remStats = [self.sysc.rem_lock_name, self.sysc.rem_success_name,
922 self.sysc.rem_ready2copy_name]
923 casuDict, casuSizes, casuStats = self._listRemoteDir(
924 transferableDict, remStats, CASUTransfer.remLsOpt, xferList)
925
926 outDateDirs = self._checkNewDirs(transferableDict, casuSizes)
927 except extp.Error:
928 print '\n'.join(["\n The server you seek",
929 " cannot be located, but",
930 " endless others exist.\n"])
931 self._cleanUp(outDateDirs)
932 raise
933 except NoDataError:
934 self._cleanUp()
935 raise
936
937 try:
938 hereDict, hereStats = \
939 self._listHomeDir(transferableDict,
940 ['', self.sysc.rem_success_name, ''], xferList)
941 except extp.Error:
942 print '\n'.join(["\n The list you seek",
943 " cannot be located, but",
944 " endless others exist.\n"])
945 self._cleanUp(outDateDirs)
946 raise
947
948 return casuDict, hereDict, casuStats, hereStats, outDateDirs
949
950
951
952 - def transferFiles(self, fileList, totalDLSize, casuStats, hereStats,
953 casuDict, outDirs):
954 """
955 Transfer the files in the given list.
956 @param fileList: The list of files to transfer.
957 @type fileList: list
958 @param totalDLSize: The estimated total size to transfer.
959 @type totalDLSize: int
960 @param casuStats: Statistics of CASU directories.
961 @type casuStats: int
962 @param hereStats: Statistics of WFAU directories.
963 @type hereStats: int
964 @param casuDict: Dictionary of CASU data.
965 @type casuDict: dict
966 @param outDirs: Dictionary of date-version strings and their
967 download directories.
968 @type outDirs: dict(str: str)
969
970 @return: transfer status
971 @rtype: int
972
973 """
974 retries = 0
975 dlStatus = False
976 while not dlStatus and retries <= CASUTransfer.maxRetries:
977 for k in casuStats:
978 casuStats[k][2] = 0
979 for k in hereStats:
980 hereStats[k][2] = 0
981
982
983 try:
984 transferDict = self._downloadFiles(fileList, outDirs,
985 totalDLSize)
986 except extp.Error:
987 print '\n'.join(["\n There is a chasm",
988 " of carbon and silicon",
989 " the software can't bridge.\n"])
990 self._cleanUp()
991 raise
992
993
994 missingFiles = self._missingFileCheck(transferDict.values())
995
996
997 if self.checkMd5Flag:
998 try:
999 md5Status, md5FilesMissing = \
1000 MD5Checker().md5Run(casuStats, transferDict, outDirs)
1001 except extp.Error:
1002 print '\n'.join(["\n Chaos reigns within.",
1003 " Reflect, repent, and reload.",
1004 " Order shall return.\n"])
1005 self._cleanUp()
1006 raise
1007
1008
1009 if md5Status[0] > 0:
1010 Logger.addMessage(' '.join([
1011 "# md5sum must be checked manually",
1012 "in these directories."]))
1013 if sum(md5Status) == 0:
1014 Logger.addMessage("# md5sum successfully checked.")
1015 missingFiles.extend(md5FilesMissing)
1016 else:
1017 md5Status = [0,0]
1018 md5FilesMissing = []
1019
1020
1021
1022 retryList = []
1023 if not self.isTestRun and not self.ffluOnly \
1024 and (md5Status[1] > 0 or missingFiles <> []):
1025 Logger.addMessage("# The following files have to be reloaded:")
1026 for fileName in missingFiles:
1027 misFile = File(fileName)
1028 Logger.addMessage(misFile.name)
1029 shortName = os.path.join(misFile.subdir, misFile.base)
1030 del transferDict[shortName]
1031 retryList.append([misFile.base, misFile.subdir,
1032 casuDict[shortName][2], '-',
1033 shortName])
1034
1035 casuStats[misFile.subdir][2] = 1
1036 hereStats[misFile.subdir][2] = 1
1037 else:
1038 Logger.addMessage("# Transfer completed successfully!")
1039 dlStatus = True
1040
1041 if retryList:
1042 reloadSize = 0
1043 for entry in retryList:
1044 reloadSize += entry[2]
1045 self._buildLoadList(retryList, reloadSize)
1046 retries += 1
1047
1048 return dlStatus
1049
1050
1051
1052
1053
1055 """
1056 Write log messages to logText.
1057
1058 @return: Log text string object.
1059 @rtype: StringIO
1060
1061 """
1062 logText = StringIO.StringIO()
1063 Logger.dump(logText)
1064 Logger.reset()
1065 return logText
1066
1067
1068
1069
1070
1072 """
1073 Removes the lock file, closes the download log file.
1074
1075 @param rmDirs: List of directories that should be removed.
1076 @type rmDirs: list(str)
1077
1078 """
1079 if rmDirs:
1080 for directory in rmDirs:
1081 dlDir = os.path.join(rmDirs[directory], directory)
1082 fiDir = os.path.join(
1083 os.path.dirname(os.path.dirname(rmDirs[directory])),
1084 self.sysc.fitsDir, directory)
1085 try:
1086 os.rmdir(dlDir)
1087 except OSError:
1088 print "Directory not empty:", dlDir
1089 try:
1090 os.rmdir(fiDir)
1091 except OSError:
1092 print "Directory not empty:", fiDir
1093 CASUTransferLog.closeTransferLogfile()
1094
1095
1096
1098 """
1099 Build dict from file list.
1100
1101 @param list: File name list (fileName, path, values).
1102 @type list: list
1103
1104 @return: Dictionary of full file paths and values.
1105 @rtype: dict
1106
1107 """
1108 dictionary = {}
1109 for s in list:
1110 dictionary[os.path.join(s[1], s[0])] = s[:]
1111 return dictionary
1112
1113
1114
1116 """
1117 @param dateTimeTpl: Date time tuple: ('yyyy-mm-dd', 'hh:mm:ss(.ss)')
1118 @type dateTimeTpl: (str, str)
1119
1120 @return: Time difference tuple: (days, seconds)
1121 @rtype: tuple(int, float)
1122
1123 """
1124 dateTime0 = mxTime.mktime((2001, 1, 1, 0, 0 ,0 , 0, 0, 0))
1125 date = [int(x) for x in dateTimeTpl[0].split('-')]
1126 dtime = [int(float(x)) for x in dateTimeTpl[1].split(':')]
1127 dateTime1 = mxTime.mktime(date + dtime + [0, 0, 0])
1128 return (dateTime1 - dateTime0).absvalues()
1129
1130
1131
1132 - def _readDirList(self, aListFile, lsNames=['','',''], lsOpt=''):
1133 """
1134 Read directory listing from file and transform it into internal list.
1135
1136 @param aListFile: File containing the file system listings.
1137 @type aListFile: File class object
1138 @param lsNames: Names of the locked and the successfully read file.
1139 @type lsNames: list(str, str, str)
1140 @param lsOpt: listing option (regexp for name.find).
1141 @type lsOpt: str
1142
1143 @return: Dict of directories, and a dictionary of directory statistics,
1144 i.e. [locked,used,success].
1145 @rtype: dict(str:list), dict(str: list(int, int, int))
1146
1147 """
1148 dirList = []
1149 dirStats = {}
1150 lineType = (('d', "total") if self.sysc.isOSA() else ('d', 'l', "total"))
1151 if aListFile.exists():
1152 aListFile.ropen()
1153 casuListing = [l for l in aListFile.readlines(
1154 commentChar='#',
1155 findValues=[".fit", ".fits", "ingest",
1156 self.scpMainDir] + lsNames)
1157 if not l.startswith(lineType)]
1158 aListFile.close()
1159
1160 for line in casuListing:
1161 if line == '':
1162 break
1163 if self.scpMainDir in line:
1164
1165 listedDir = File(os.path.normpath(
1166 line.replace(':','').strip()))
1167
1168
1169 if CASUQueries.re_date.match(listedDir.base.replace(
1170 self.reproMode.replace, '')):
1171 dirStats[listedDir.base.replace(
1172 self.reproMode.replace, '')] = [1,0,0]
1173 elif "/" + self.sysc.loadDatabase.lower() + "/ingest/" in line:
1174
1175 listedDir = File(line.replace(':',''))
1176 dirStats[listedDir.base] = [0,0,0]
1177 else:
1178 access, _nol, _uid, size, yyyymmdd, \
1179 ftime, _timezone, name = line.split(None, 7)
1180 if self.sysc.isOSA() and "->" in name and access[0] == 'l':
1181 name = name.split(None, 1)[0]
1182 yyyy, mon, dd = yyyymmdd.split('-')
1183 name = name.replace('*', '')
1184
1185
1186 if lsOpt in name \
1187 and not any(ex in name
1188 for ex in CASUTransfer.remExclPattern) \
1189 and (CASUQueries.re_date.match(
1190 listedDir.base.replace(self.reproMode.replace, '')) \
1191 or CASUQueries.re_datevers.match(listedDir.base)):
1192 hh, mm, ss = ftime.partition('.')[0].split(':')
1193 fileTime = mxTime.DateTime(int(yyyy), int(mon),
1194 int(dd), int(hh),
1195 int(mm), int(ss)).jdn
1196 dirList.append([name, listedDir.base, int(size),
1197 access[:1], fileTime, listedDir.name])
1198 if name == lsNames[0] or name == lsNames[1]:
1199
1200 dirStats[listedDir.base.replace(
1201 self.reproMode.replace, '')][0] = 1
1202 if name == lsNames[2]:
1203 dirStats[listedDir.base.replace(
1204 self.reproMode.replace, '')][0] = 0
1205 else:
1206 Logger.addMessage("%s doesn't exist!" % aListFile.name)
1207 return dirList, dirStats
1208
1209
1210
1211
1212
1214 """
1215 Scans through the transfer list to see which files match the
1216 delete conditions and deletes them.
1217
1218 @param tflin: Transfer file list.
1219 @type tflin: list
1220
1221 @return: Updated list of transferable files.
1222 @rtype: list
1223
1224 """
1225 filesToDelete = ["t_w", "chan", "lin_", "squish", "bpm", "dqcdark",
1226 "_psf", "xsky", "dqcbias"]
1227 filesToIngest = [self.sysc.catSuffix, self.sysc.stackSuffix,
1228 self.sysc.skyPrefix, self.sysc.darkPrefix,
1229 self.sysc.flatSuffix, self.sysc.confSuffix,
1230 self.sysc.cpmSuffix, self.sysc.mosaicSuffix,
1231 self.sysc.tileSuffix]
1232 if self.sysc.isOSA():
1233 filesToIngest.extend([self.sysc.biasPrefix, self.sysc.fringeSuffix])
1234
1235 r1 = re.compile('|'.join(map(re.escape, filesToIngest)))
1236 r2 = re.compile('|'.join(map(re.escape, filesToDelete)))
1237 r3 = re.compile(r'[ovw]20')
1238 tflout = []
1239 for entry in tflin:
1240 fname = os.path.basename(entry)
1241 if r1.search(fname):
1242 tflout.append(entry)
1243 elif r2.search(fname):
1244 os.remove(entry)
1245 elif r3.match(fname,0,3):
1246 tflout.append(entry)
1247 else:
1248 Logger.addMessage(
1249 "<Warning> No match found in ingest list: " + entry)
1250 return tflout
1251
1252
1253
1255 """
1256 Create a listing of given directories into a file at CASU
1257 and copy the file into the sys dir.
1258
1259 @param copyDict: Dictionary of transferable dates and CASU paths.
1260 @type copyDict: dict
1261
1262 @return: File listing the directories at CASU.
1263 @rtype: fileObj
1264
1265 """
1266 lsFlags = ("-E" if "29" in self.sysc.scpServer(self.UKLight)
1267 else "--time-style=full-iso --indicator-style=none")
1268 cmds = ["ls -d %s", "ls -o %s %%s" % lsFlags]
1269 CASUQueries.getCasuFileListing(cmds, CASUQueries.camListFile,
1270 copyDict, self.UKLight)
1271
1272
1273
1275 """
1276 Create a dictionary of directories which are ready to copy and
1277 not yet successfully downloaded. It also includes the version number
1278 as a combination of <date>_v<#>.
1279
1280 @param runDate: Transfer date.
1281 @type runDate: int
1282 @param checkSR: Check if the SUCCESSFULLY_READ tag is set.
1283 @type checkSR: bool
1284
1285 @return: Dictionary of CASU path and date-version number strings
1286 for each date.
1287 @rtype: dict(str: list(str, str))
1288
1289 """
1290 Logger.addMessage("Getting transferable directories...")
1291
1292 srDirs = CASUQueries.getSRDirs(self.scpMainDir, self.UKLight,
1293 dateStr=str(runDate))
1294
1295 reproVers = (self.newVersStr if self.reproVersStr else None)
1296 dirVersions = CASUQueries.getDirVersions(
1297 self.scpMainDir, reproVers, self.forceTransfer,
1298 self.UKLight, dateStr=str(runDate))
1299
1300 otcDirs = CASUQueries.getOTCDirs(
1301 runDate, runDate, srDirs, dirVersions, self.scpMainDir,
1302 self.reproMode, reproVers, self.forceTransfer,
1303 checkSR, self.UKLight)
1304
1305 if not otcDirs and checkSR:
1306 text = "<Warning> There are no directories to copy."
1307 Logger.addMessage(text)
1308 raise NoDataError, text
1309 return otcDirs
1310
1311
1312
1314 """
1315 List remote directory 'scpMainDir'.
1316
1317 @param copyDict: Dictionary of transferable directories.
1318 @type copyDict: dict
1319 @param lsNames: Names of the locked and the successfully read file.
1320 @type lsNames: list(str, str, str)
1321 @param lsOpt: listing option (regexp for name.find).
1322 @type lsOpt: str
1323 @param xferList: Optional list containing files to be downloaded.
1324 @type xferList: list
1325
1326 @return: List of directories at WFAU, size of directories at WFAU,
1327 and a dictionary containing directory statistics.
1328 @rtype: list, int, dict
1329
1330 """
1331 Logger.addMessage("Listing CASU dir...")
1332 self._createCasuListFile(copyDict)
1333 dirList, dirStats = self._readDirList(CASUQueries.camListFile,
1334 lsNames, lsOpt)
1335
1336 tmpDirList = []
1337 totalSize = {}
1338 for entry in dirList:
1339 entry[1] = copyDict[entry[1].replace(self.reproMode.replace, '')][1]
1340 if xferList:
1341 if os.path.join(entry[-1],entry[0]) in xferList:
1342 tmpDirList.append(entry)
1343 if xferList:
1344 dirList = tmpDirList[:]
1345
1346 for entry in dirList:
1347 totalSize[entry[1]] = totalSize.get(entry[1], 0) \
1348 + entry[2]/self.sysc.one_kilobyte
1349 dirDict = self._dictFromList(dirList)
1350
1351
1352 versionStats = {}
1353 for key in dirStats:
1354 versionStats[copyDict[key][1]] = dirStats[key]
1355
1356 return dirDict, totalSize, versionStats
1357
1358
1359
1360
1361
1363 """
1364 Build a list of #-of-threads sublists.
1365
1366 The filelist is split into #-of-threads roughly equal bytesized
1367 sublists.
1368
1369 @param fileList: List of filenames.
1370 @type fileList: list(str)
1371 @param totalSize: Total size of files in filelist.
1372 @type totalSize: int
1373
1374 @return: List of sublists of files to download in different threads.
1375 @rtype: list(list(str))
1376 """
1377 loadList = []
1378 i = 0
1379 size = totalSize
1380 tts = 0
1381 subThreads = self.scpThreads % 10
1382 for x in xrange(subThreads):
1383 quart = int(size/(subThreads - x))
1384 sizeList = 0
1385 loadList.append([])
1386 while sizeList < quart and tts < totalSize:
1387 fileName = os.path.join(fileList[i][1], fileList[i][0])
1388 loadList[-1].append(fileName)
1389 sizeList += fileList[i][2]
1390 tts += fileList[i][2]
1391 i += 1
1392 size -= quart
1393 return loadList
1394
1395
1396
1398 """
1399 Check if remote file is in local dict. If so, deprecate.
1400
1401 @param casuDict: Dictionary of CASU data.
1402 @type casuDict: dict
1403 @param hereDict: Dictionary of WFAU data.
1404 @type hereDict: dict
1405 @param casuStats: Statistics of CASU directories.
1406 @type casuStats: int
1407 @param hereStats: Statistics of WFAU directories.
1408 @type hereStats: int
1409
1410 @return: file list of transferrable files and their total size.
1411 @rtype: list, int
1412 """
1413 fileList = []
1414 deprecateList = []
1415 totalSize = 0
1416 for casuPath in casuDict:
1417 if self.reproVersStr and self.newVersStr != self.oldVersStr:
1418 herePath = casuPath.replace(
1419 "_v%s" % self.newVersStr, "_v%s" % self.oldVersStr)
1420 if herePath in hereDict:
1421 hereStats[casuDict[casuPath][1]] = \
1422 hereStats[hereDict[herePath][1]][:]
1423 else:
1424 herePath = casuPath.replace(
1425 "_v%s" % self.versStr, "_v%s" % self.oldVersStr)
1426 if herePath in hereDict:
1427 hereStats[casuDict[casuPath][1]] = \
1428 hereStats[hereDict[herePath][1]][:]
1429
1430 else:
1431 herePath = casuPath
1432
1433 if self.isTestRun and self.verbose > 4:
1434 print casuPath,casuDict[casuPath]
1435 if herePath in hereDict:
1436 print ">>>>",hereDict[herePath]
1437 else:
1438 print ">>>> %s doesn't exist locally" % casuPath
1439 print "CS<<:",casuStats
1440 print "HS<<:",hereStats
1441
1442 if self.forceTransfer \
1443 or casuStats.get(casuDict[casuPath][1], 1)[0] == 0:
1444 if herePath not in hereDict:
1445 if not self.reproVersStr or (
1446 self.reproVersStr and not ',' in self.reproVersStr):
1447 fileList.append(casuDict[casuPath])
1448 casuStats[casuDict[casuPath][1]][1] = 1
1449 hereStats[casuDict[casuPath][1]][1] = 1
1450 totalSize += casuDict[casuPath][2]
1451 elif (self.forceTransfer or self.checkTimeFlag) \
1452 and hereDict[herePath][4] < casuDict[casuPath][4]:
1453 fileList.append(casuDict[casuPath])
1454 deprecateList.append(
1455 os.path.join(hereDict[herePath][5],
1456 casuDict[casuPath][0]))
1457 casuStats[casuDict[casuPath][1]][1] = 1
1458 hereStats[hereDict[herePath][1]][1] = 1
1459 totalSize += casuDict[casuPath][2]
1460
1461 if self.isTestRun and self.verbose > 4:
1462 print "CS>>:",casuStats
1463 print "HS>>:",hereStats
1464
1465
1466 if deprecateList and self.deprecationMode.lower() != "no" \
1467 and not self.ffluOnly:
1468 deprFileList = self._mvDeprecatedFiles(deprecateList,
1469 self.deprecationMode)
1470 Logger.addMessage(
1471 "Nr. of deprecated files: %s" % len(deprecateList))
1472 else:
1473 deprFileList = []
1474
1475 return fileList, totalSize, deprFileList
1476
1477
1478
1480 """
1481 Check if there is enough space to transfer the whole dir,
1482 otherwise change outdir to a free disk.
1483
1484 @param copyDict: Dictionary containing the CASU path and the
1485 date-version number string for every date.
1486 @type copyDict: dict(str:list(str, str))
1487 @param casuSizes: Dictionary of directory names and their sizes.
1488 @type casuSizes: dict(str: int)
1489
1490 @return: Dictionary of date-version strings and their download
1491 directory.
1492 @rtype: dict(str: str)
1493
1494 """
1495 transferableDates = sorted(copyDict[k][1] for k in copyDict)
1496
1497
1498 allFitsDirs = {}
1499 for disk in self.sysc.availableRaidFileSystem():
1500 fitsDirs = dircache.listdir(os.path.join(disk, self.sysc.fitsDir))
1501 for dateDir in set(transferableDates).intersection(fitsDirs):
1502 fitsPath = os.path.join(disk, self.sysc.fitsDir)
1503 size = self._getDirsize(os.path.join(fitsPath, dateDir))
1504 if dateDir in allFitsDirs:
1505 allFitsDirs[dateDir].append((fitsPath, size))
1506 Logger.addMessage(' '.join(["<Warning>",
1507 dateDir, "exists in more than one place:",
1508 repr(allFitsDirs[dateDir])]))
1509 else:
1510 allFitsDirs[dateDir] = [(fitsPath, size)]
1511
1512
1513 outDirs = {}
1514
1515 freePerc = self.sysc.leaveSpaceOnMSRFS()
1516
1517
1518
1519 freeDiskSpace = Ratings(dict((d, -self.spacePerDisk[d][1])
1520 for d in self.spacePerDisk
1521 if self.spacePerDisk[d][1] > freePerc[d]))
1522
1523 freeDisks = Ratings(dict((key, freeDiskSpace.rating(key))
1524 for key in freeDiskSpace))
1525 latestAccess = self._getLatestAccess(freeDisks)
1526
1527 if self.isTestRun and self.verbose > 3:
1528 print "freePerc:", freePerc
1529 print "spacePerDisk:", self.spacePerDisk
1530 print "freeDisks:", freeDisks.items()
1531 print "latestAccess:", latestAccess.items()
1532
1533 for xferDir in transferableDates:
1534 if xferDir in allFitsDirs:
1535
1536 outDisk = ''.join(allFitsDirs[xferDir][0][0].partition(
1537 self.sysc.loadDatabase.lower())[:2])
1538 outDirs[xferDir] = os.path.join(outDisk, self.sysc.downloadDir)
1539 self.spacePerDisk[outDisk][1] -= \
1540 casuSizes[xferDir] - allFitsDirs[xferDir][0][1]
1541 if self.isTestRun and self.verbose > 3:
1542 print "xferDir: ",xferDir
1543 print "outDirs: ",outDirs
1544 else:
1545
1546 isSpace = False
1547 for _outDisk in sorted(self.spacePerDisk):
1548
1549 if self.spacePerDisk[_outDisk][1] - freePerc[_outDisk] > \
1550 casuSizes[xferDir]:
1551 outDirs.setdefault(xferDir, []).append(_outDisk)
1552 isSpace = True
1553
1554 if self.isTestRun and self.verbose > 3:
1555 print "xferDir: ",xferDir
1556 print "outDirs: ",outDirs
1557
1558 if not isSpace:
1559 raise MemoryError(
1560 "There is no space left on the RAID array.")
1561
1562
1563 for _fitsDir in freeDisks:
1564 if _fitsDir in outDirs[xferDir]:
1565 finalDir = _fitsDir
1566 if _fitsDir != latestAccess.keys()[-1]:
1567 break
1568
1569 outDirs[xferDir] = \
1570 os.path.join(finalDir, self.sysc.downloadDir)
1571
1572 self.spacePerDisk[finalDir][1] -= casuSizes[xferDir]
1573 freeDisks[finalDir] += len(freeDisks)
1574
1575
1576 time.sleep(1)
1577 latestAccess[finalDir] = \
1578 self._getTimeDiff(utils.makeTimeStamp().split())
1579
1580 for _outDir in outDirs:
1581
1582 utils.ensureDirExist(os.path.join(outDirs[_outDir], _outDir))
1583
1584 utils.ensureDirExist(os.path.join(
1585 outDirs[_outDir].partition(self.sysc.downloadDir)[0],
1586 self.sysc.fitsDir, _outDir))
1587 Logger.addMessage("Data will be downloaded into these directories: "+
1588 repr(outDirs))
1589 return outDirs
1590
1591
1592
1594 """
1595 Get the size of a directory.
1596
1597 @param dirPath: Full path to a directory.
1598 @type dirPath: str
1599
1600 @return: Size in kilobytes.
1601 @rtype: int
1602
1603 """
1604 try:
1605 proc = Popen(["du", "-k" , "--max-depth=0", dirPath], stdout=PIPE)
1606 size, _direc = proc.stdout.read().split()
1607 except ValueError:
1608 size = 0
1609
1610 return int(size)
1611
1612
1613
1615 """
1616 Create a rating dict for the given disks containing date directories
1617 rated by last access time.
1618
1619 @param diskList: List of raid file system disks.
1620 @type diskList: list(str)
1621 @param subDir: Subdirectory to check.
1622 @type subDir: str
1623
1624 @return: Ratings dictionary of disks and their latest access time.
1625 @rtype: dict(str: str)
1626 """
1627 latestAccess = Ratings()
1628 for disk in diskList:
1629 cmd = "ls -ldct --full-time " + os.path.join(disk, subDir, '*')
1630 if self.verbose > 1:
1631 print "_getLatestAccess:", cmd
1632 try:
1633 timeTpl = extp.run(cmd, isVerbose=False)[0].split()[5:7]
1634 latestAccess[disk] = self._getTimeDiff(timeTpl)
1635 except:
1636
1637
1638 latestAccess[disk] = (0, 0.0)
1639 return latestAccess
1640
1641
1642
1644 """
1645 List home directory 'fitsDir'.
1646
1647 @param copyDict: Dictionary of transferable directories.
1648 @type copyDict: dict
1649 @param lsNames: Names of the locked and the successfully read file.
1650 @type lsNames: list(str, str, str)
1651
1652 @return: Dict of directories at WFAU, their size, and a
1653 dictionary containing directory statistics.
1654 @rtype: list, int, dict
1655
1656 """
1657 hdDict = {}
1658 for disk in self.sysc.availableRaidFileSystem():
1659 fitsPath = os.path.join(disk, self.sysc.fitsDir)
1660 for dateDir in dircache.listdir(fitsPath):
1661 hdDict[dateDir] = os.path.join(fitsPath, dateDir)
1662
1663 listDirs = []
1664 for date in copyDict:
1665 if self.reproVersStr and self.newVersStr != self.oldVersStr:
1666 dvStr = copyDict[date][1].replace(
1667 "_v%s" % self.newVersStr, "_v%s" % self.oldVersStr)
1668 listDirs.append(hdDict[dvStr])
1669 else:
1670 if copyDict[date][1] in hdDict:
1671 listDirs.append(hdDict[copyDict[date][1]])
1672
1673 if listDirs:
1674 wfcamListFile = File(os.path.join(
1675 self.sysc.sysPath, "%s_wfcamlist_%s" % (
1676 self.sysc.loadDatabase, self.runDateStr)))
1677 cmd = ' '.join(["ls --indicator-style=none -oR",
1678 "--time-style=full-iso ",
1679 ' '.join(listDirs)])
1680 if self.verbose > 1:
1681 print "_listHomeDir:", cmd
1682 p = Popen(cmd, shell=True, stderr=STDOUT, stdout=PIPE)
1683 wfcamListFile.wopen()
1684 for entry in [x.rstrip() for x in p.stdout]:
1685 wfcamListFile.writetheline(entry)
1686 wfcamListFile.close()
1687 dirList, dirStats = self._readDirList(wfcamListFile, lsNames)
1688 tmpDirList = []
1689 if xferList:
1690 fileNames = [os.path.basename(entry) for entry in xferList]
1691 for entry in dirList:
1692 if entry[0] in fileNames:
1693 tmpDirList.append(entry)
1694 dirList = tmpDirList[:]
1695
1696 dirDict = self._dictFromList(dirList)
1697 if wfcamListFile.exists():
1698 wfcamListFile.remove()
1699 else:
1700 dirDict = {}
1701 dirStats = {}
1702
1703 return dirDict, dirStats
1704
1705
1706
1708 """
1709 Move files in fileList to the directory for deprecated files.
1710
1711 @param fileList: list of filenames(file, datedir, path(incl. datedir))
1712 @type fileList: list(str)
1713 @param deprMode: Mode for handling the deprecated files: move ('mv'),
1714 remove ('rm'), leave in place ('ip')
1715 @type deprMode: str
1716
1717 """
1718 deprFileList = []
1719 Logger.addMessage("Deprecating files:")
1720 deprLogTime = utils.makeTimeStamp()
1721 if deprMode.partition('_')[0] in ["mv", "ip"]:
1722 if self.isTestRun and self.verbose > 4:
1723 print self.outPrefix
1724 deprecatedListFile = File(
1725 os.path.join(self.sysc.xferLogPath(), ''.join(
1726 [self.outPrefix, "_deprecated_", deprMode, '_',
1727 deprLogTime.replace(' ', '_'), ".log"])))
1728 deprecatedListFile.wopen()
1729 deprTime = utils.makeTimeStamp().split()[0].replace('-','')
1730 for fileName in sorted(fileList):
1731 deprFile = File(fileName)
1732 if deprMode.partition('_')[0] == "mv":
1733 if deprMode == "mv":
1734 outDateDir = deprFile.subdir
1735 else:
1736 outDateDir = ''.join(
1737 deprFile.subdir.partition("_v")[:2]) \
1738 + deprMode.partition('_')[2]
1739
1740 spacePerDisk = \
1741 utils.getDiskSpace(self.sysc.deprecatedDataStorage)
1742
1743 deprPath = os.path.join(
1744 utils.getNextDisk(self.sysc, spacePerDisk),
1745 '%s_%s' % (self.sysc.deprecatedDir, deprTime),
1746 outDateDir)
1747
1748 utils.ensureDirExist(deprPath)
1749 deprName = os.path.join(deprPath, deprFile.base)
1750 elif deprMode == "ip":
1751 deprName = "%s.deprecated_%s" % (deprFile.name, deprTime)
1752 elif deprMode == "rm":
1753 deprName = "---"
1754 try:
1755 if deprMode.partition('_')[0] in ["mv", "ip"]:
1756 if not self.isTestRun:
1757 shutil.copy2(deprFile.name, deprName)
1758 deprFileList.append(deprName)
1759 Logger.addMessage(
1760 ' '.join([deprMode, deprFile.name, deprName]))
1761 deprecatedListFile.writetheline(
1762 ','.join([deprFile.name, deprName]))
1763 elif deprMode == "rm":
1764 if not self.isTestRun:
1765 os.remove(fileName)
1766 Logger.addMessage("rm " + deprFile.name)
1767 except:
1768 Logger.addMessage(' '.join(["Can't", deprMode, deprFile.name,
1769 "//", deprName]))
1770 raise
1771
1772 if deprMode.partition('_')[0] in ["mv", "ip"]:
1773 deprecatedListFile.close()
1774
1775 cmd = "%s/UpdateFitsFileNames.py -d %s -D -f %s %s" % \
1776 (self.sysc.helpersPath(), self.database, deprecatedListFile.name,
1777 self.sysc.loadDatabase)
1778
1779 Logger.addMessage("Running '%s'" % cmd)
1780 if not self.isTestRun:
1781 status = os.system(cmd)
1782 if status:
1783 Logger.addMessage("<ERROR> UpdateFitsFileNames.py failed!")
1784 for fileName in fileList:
1785 os.remove(fileName)
1786
1787 Logger.addMessage(
1788 "Deprecated file names are listed in " + deprecatedListFile.name)
1789 Logger.addMessage("<INFO> Run 'helpers/SyncFlatFileLookUp.py'")
1790 return deprFileList
1791
1792
1793
1794
1795
1797 """
1798 Calculate the transfer rate.
1799
1800 @param startTime: Begin of download.
1801 @type startTime: time object
1802 @param endTime: End of download.
1803 @type endTime: time object
1804 @param totalSize: Total size from download scripts.
1805 @type totalSize: int
1806
1807 """
1808 elapsedTime = endTime - startTime
1809 transferRate = totalSize/(elapsedTime*self.sysc.one_megabyte)
1810
1811 sizemega = 1.*totalSize/self.sysc.one_megabyte
1812 if sizemega < 1.0:
1813 sizeText = "%.6f kBytes"% (1.*totalSize/self.sysc.one_kilobyte)
1814 elif sizemega > 1.0 and sizemega < 1000.0:
1815 sizeText = "%.6f MBytes"% (1.*totalSize/self.sysc.one_megabyte)
1816 elif sizemega > 1000.0:
1817 sizeText = "%.6f GBytes"% (1.*totalSize/self.sysc.one_gigabyte)
1818 Logger.addMessage(''.join([
1819 sizeText, " transfered in %d m %.8f s" % (int(elapsedTime/60.),
1820 (elapsedTime % 60))]))
1821 Logger.addMessage("Transfer rate: %.8f MB/s" % (transferRate))
1822
1823
1824
1826 """
1827 Download files.
1828
1829 @param fileList: File list of transferable files.
1830 @type fileList: list
1831 @param outDirDict: Dictionary of date-version strings and their
1832 download directories.
1833 @type outDirDict: dict(str: str)
1834 @param totalSize: Total size of transfer.
1835 @type totalSize: int
1836
1837 @return: Dictionary of transferred files.
1838 @rtype: dict
1839 """
1840 self._removeScpScripts()
1841 transferDict = self._writeScpScripts(fileList, outDirDict)
1842 startDownload = time.time()
1843 self._threadScpScripts()
1844 endDownload = time.time()
1845 self._calcTransferRate(startDownload, endDownload, totalSize)
1846 return transferDict
1847
1848
1849
1851 """
1852 Check for not transfered files.
1853
1854 @param fileList: list of filenames(file, datedir, path(incl. datedir))
1855 @type fileList: list
1856
1857 @return: List of missing files.
1858 @rtype: list(str)
1859
1860 """
1861 missingFiles = []
1862 Logger.addMessage("checking for missing files, started at %s UTC" % \
1863 (utils.makeTimeStamp()))
1864 for fileName in fileList:
1865 if not os.path.exists(fileName):
1866 missingFiles.append(fileName)
1867
1868 Logger.addMessage("check for missing files finished at %s UTC" % \
1869 (utils.makeTimeStamp()))
1870 return missingFiles
1871
1872
1873
1883
1884
1885
1887 """
1888 Run the scp scripts simultaneously, but wait until each has finished.
1889 """
1890 args_q = Queue.Queue()
1891 result_q = Queue.Queue()
1892 results = defaultdict(int)
1893
1894
1895 pool = [Download(args_q=args_q, result_q=result_q)
1896 for i in xrange(self.scpThreads % 10)]
1897
1898
1899 for thread in pool:
1900 thread.start()
1901
1902
1903 work_count = 0
1904
1905 for thread in xrange(self.scpThreads % 10):
1906 scpFileName = getScpScriptName(self.sysc, self.runDateStr,
1907 thread, self.scpSuffix)
1908 if os.path.exists(scpFileName):
1909 work_count += 1
1910 args_q.put((scpFileName, self.isTestRun, self.ffluOnly))
1911
1912 while work_count > 0:
1913
1914 status = result_q.get()
1915 results[status[0]] = status
1916 work_count -= 1
1917
1918
1919 for thread in pool:
1920 thread.join()
1921
1922 theTime = utils.makeTimeStamp()
1923 sFlag = 0
1924 for thread in results:
1925 res = results[thread][1]
1926 script = results[thread][2]
1927 sFlag += math.fabs(res)
1928 if res < 0:
1929 Logger.addMessage("%s failed at %s UTC, killed by signal %d" % (
1930 script, theTime, res))
1931 elif res > 0:
1932 Logger.addMessage("%s unsuccessful at %s UTC with signal %d" % (
1933 script, theTime, res))
1934 elif res == 0:
1935 Logger.addMessage("%s finished at %s UTC" % (script,theTime))
1936
1937
1938
1940 """
1941 Write shell scp script based on the file list comparison.
1942
1943 @param fileList: List of transferable file names.
1944 @type fileList: list(str)
1945 @param outDirs: Dictionary of date-version strings and their
1946 download directories.
1947 @type outDirs: dict(str: str)
1948
1949 @return: Dictionary of downloaded file names and their paths.
1950 @rtype: dict(str: str)
1951
1952 """
1953 fileDict = {}
1954 shell = "#! /usr/bin/tcsh"
1955 transferCmd = ''
1956 for thread in xrange(self.scpThreads % 10):
1957 outFile = File(getScpScriptName(self.sysc, self.runDateStr,
1958 thread, self.scpSuffix))
1959 outFile.wopen()
1960 outFile.writetheline(shell)
1961 if CASUTransfer.transferMethod == "ftp":
1962 marker = "cmd"
1963 initCmd = ' '.join([self.sysc.ftp_method,
1964 self.sysc.scpServer(self.UKLight),
1965 "<<", marker])
1966 outFile.writetheline(initCmd)
1967 outFile.writetheline("bin")
1968 transferCmd = "get InTemplate OutTemplate"
1969 elif CASUTransfer.transferMethod == "axel":
1970 transferCmd = ' '.join(["axel -va -o OutTemplate ",
1971 self.sysc.scpServer(self.UKLight),
1972 "InTemplate"])
1973 elif self.onlyHeaderTransfer:
1974 transferCmd = ' '.join([
1975 CASUQueries.sshCmd(self.UKLight, SystemConstants("WSA")),
1976 "'/home/wfcam/bin/listhead InTemplate' > OutTemplate"])
1977 else:
1978 transferCmd = ':'.join([CASUQueries.scpCmd(self.UKLight),
1979 "InTemplate OutTemplate"])
1980 fileList[thread].sort()
1981 for fileName in fileList[thread]:
1982 xferFile = File(fileName)
1983 if self.reproMode.mode == "p":
1984 subdir = "%s%s" % (self.reproMode.replace,
1985 xferFile.subdir.partition('_v')[0])
1986 elif self.reproMode.mode == "s":
1987 subdir = "%s%s" % (xferFile.subdir.partition('_v')[0],
1988 self.reproMode.replace)
1989 else:
1990 subdir = xferFile.subdir.partition('_v')[0]
1991 inPath = os.path.join(
1992 self.scpMainDir, subdir, xferFile.base)
1993 if self.onlyHeaderTransfer:
1994 outPath = os.path.join(outDirs[xferFile.subdir],
1995 xferFile.name.replace(xferFile.ext, ".head"))
1996 else:
1997 outPath = os.path.join(outDirs[xferFile.subdir], fileName)
1998
1999 if not os.path.exists(outPath):
2000 command = transferCmd.replace(
2001 "InTemplate", inPath).replace("OutTemplate", outPath)
2002 outFile.writetheline(command)
2003 fileDict[fileName] = outPath
2004
2005 if CASUTransfer.transferMethod == "ftp":
2006 closeCmd = "quit"
2007 outFile.writetheline(closeCmd)
2008 outFile.writetheline(marker)
2009 outFile.close()
2010
2011
2012 os.chmod(outFile.name, 0764)
2013 return fileDict
2014
2015
2016
2017
2018
2020 """
2021 Write successful_downdload file to Cambridge dirs.
2022
2023 @param dirStats: Directory statistics.
2024 @type dirStats: dict(str:list(int))
2025
2026 """
2027 for key, value in dirStats.iteritems():
2028 if value[0] == 0 and value[1] == 1 and value[2] == 0:
2029 dateStr = key.partition('_v')[0]
2030 if self.reproMode.mode == "p":
2031 subdir = "%s%s" % (self.reproMode.replace, dateStr)
2032 elif self.reproMode.mode == "s":
2033 subdir = "%s%s" % (dateStr, self.reproMode.replace)
2034 else:
2035 subdir = dateStr
2036
2037 outXferFile = File(os.path.join(
2038 self.sysc.sysPath,
2039 "XferredFiles_%s" % self.sysc.loadDatabase))
2040 outXferFile.aopen()
2041 outText = "%s %s %s" % (
2042 self.sysc.casuFileSystemVariables.name, dateStr,
2043 utils.makeTimeStamp().partition(' ')[0].replace('-',''))
2044 if subdir != dateStr:
2045 outText += " [%s]" % subdir
2046 outXferFile.writetheline(outText)
2047 outXferFile.close()
2048
2049 if self.verbose > 1:
2050 print "_writeRemSuccess:", outText
2051
2052
2053
2055 """
2056 Write successful_downdload file to Cambridge dirs.
2057
2058 @param dirStats: Directory statistics.
2059 @type dirStats: dict(str:list(int))
2060
2061 """
2062 for key, value in dirStats.iteritems():
2063 if value[0] == 0 and value[1] == 1 and value[2] == 0:
2064 dateStr = key.partition('_v')[0]
2065 if self.reproMode.mode == "p":
2066 subdir = "%s%s" % (self.reproMode.replace, dateStr)
2067 elif self.reproMode.mode == "s":
2068 subdir = "%s%s" % (dateStr, self.reproMode.replace)
2069 else:
2070 subdir = dateStr
2071 outDirPath = os.path.join(self.scpMainDir, subdir)
2072 cmd = ' '.join([CASUQueries.sshCmd(self.UKLight),
2073 "'touch %s'" % os.path.join(outDirPath,
2074 self.sysc.rem_success_name)])
2075 if self.verbose > 1:
2076 print "_writeRemSuccess:", cmd
2077 try:
2078 if not self.ffluOnly:
2079 extp.run(cmd)
2080 except extp.Error:
2081 Logger.addMessage("# Error writing success to %s@%s: %s" %
2082 (self.sysc.scpUser,
2083 self.sysc.scpServer(self.UKLight),
2084 dateStr))
2085
2086
2087
2088
2089
2113
2114
2115
2117 """
2118 Write successful_downdload file to local dirs and create transfer list.
2119
2120 @param dirStats: Directory statistics.
2121 @type dirStats: dict
2122 @param outDirs: Dictionary of date-version strings and their
2123 download directories.
2124 @type outDirs: dict(str: str)
2125
2126 @return: List of successfully transferred files.
2127 @rtype: list
2128
2129 """
2130 xferredFileList = []
2131
2132 for dateVersStr in outDirs:
2133 if dateVersStr in dirStats and dirStats[dateVersStr] == [0, 1, 0]:
2134 outDirPath = os.path.join(outDirs[dateVersStr], dateVersStr)
2135 for fileName in dircache.listdir(outDirPath):
2136 fileNamePath = os.path.join(outDirPath, fileName)
2137 if "md5" not in fileName and os.path.isfile(fileNamePath) \
2138 and self.sysc.rem_success_name not in fileName:
2139 xferredFileList.append(fileNamePath)
2140
2141 if not self.isTestRun:
2142 fp = os.path.join(outDirPath, self.sysc.rem_success_name)
2143 try:
2144 extp.run("touch " + fp)
2145 except:
2146 Logger.addMessage("# Error writing success to: %s" %
2147 dateVersStr)
2148 raise
2149 else:
2150 Logger.addMessage("Problem in directory stats: %s %r" % (
2151 dateVersStr, dirStats[dateVersStr]))
2152
2153 return xferredFileList
2154
2160 """Query the CASU directories."""
2161 camListFile = None
2162 re_digits = re.compile(r'(\d+)')
2163 re_date = re.compile(r'(\A(\d{8})\Z)')
2164 sysc = IngCuSession.sysc
2165 re_datevers = re.compile(r'(\A(\d{8})_v(\d)(.{0,1}\d*)\Z)')
2166 notAvailCASUDisks = []
2167
2168
2169
2170 @staticmethod
2192
2193
2194
2195 @staticmethod
2197 """Execute the given command to create a file at CASU
2198 and copy the file into the sys dir."""
2199 if casuDataFile.exists():
2200 casuDataFile.remove()
2201 casuDataFile.aopen()
2202 for dirPath in copyDict:
2203 cmd = ["%s '%s'" % (CASUQueries.sshCmd(UKLight),
2204 commands[0] % copyDict[dirPath][0]),
2205 "%s '%s'" % (CASUQueries.sshCmd(UKLight),
2206 commands[1] % copyDict[dirPath][0])]
2207 if CASUTransfer.verbose > 1:
2208 print "getCasuFileListing:", cmd
2209 try:
2210 p = Popen(cmd[0], shell=True, stderr=STDOUT, stdout=PIPE)
2211 for entry in [x.rstrip() for x in p.stdout]:
2212 casuDataFile.writetheline(entry)
2213 except:
2214 Logger.addMessage("# Error accessing remote server to get "
2215 "file listing: %s@%s" %
2216 (CASUQueries.sysc.scpUser,
2217 CASUQueries.sysc.scpServer(UKLight)))
2218 raise
2219 try:
2220 p = Popen(cmd[1], shell=True, stderr=STDOUT, stdout=PIPE)
2221 for entry in [x.rstrip() for x in p.stdout]:
2222 casuDataFile.writetheline(entry)
2223 except:
2224 Logger.addMessage("# Error accessing remote server to get "
2225 "file listing: %s@%s" %
2226 (CASUQueries.sysc.scpUser,
2227 CASUQueries.sysc.scpServer(UKLight)))
2228 raise
2229 casuDataFile.close()
2230
2231
2232
2233 @staticmethod
2254
2255
2256
2257 @staticmethod
2259 """ Get a dictionary of all dateDirs and their paths.
2260 """
2261 if not dirList:
2262 dirList = CASUQueries.getCasuStagingDirs(UKLight=UKLight)
2263
2264 prefix = (presuf.partition("::")[2] if "p::" in presuf else '')
2265 suffix = (presuf.partition("::")[2] if "s::" in presuf else '')
2266 casuDateDict = {}
2267 for direc in dirList:
2268 cmd = ' '.join([CASUQueries.sshCmd(UKLight),
2269 "'ls -d %s/%s20??????%s'" % (direc, prefix, suffix)])
2270 if CASUTransfer.verbose > 1:
2271 print "getCasuDateDirs:", cmd
2272 if mode == "MJD":
2273 casuDateDict[direc] = [int(mxTime.strptime(os.path.basename(
2274 os.path.normpath(path.rstrip())), "%Y%m%d").mjd)
2275 for path in os.popen(cmd)
2276 if not os.path.normpath(path.rstrip()).endswith('@')]
2277 else:
2278 casuDateDict[direc] = [os.path.basename(os.path.normpath(
2279 path.rstrip())) for path in os.popen(cmd)]
2280 casuDateDict.update(utils.invertDict(casuDateDict))
2281
2282 return casuDateDict
2283
2284
2285
2286 @staticmethod
2287 - def getDirListing(casuDisk, UKLight=True, lsFlags='', longStyle=True):
2288 """
2289 Get a listing of the given directory.
2290 """
2291 lsLongArgs = ("-E" if "29" in CASUQueries.sysc.scpServer(UKLight)
2292 else "--time-style=full-iso --indicator-style=none")
2293 lsArgs = (lsLongArgs if longStyle else '')
2294 lsArgs = ' '.join([lsArgs, lsFlags])
2295 cmd = ' '.join([CASUQueries.sshCmd(UKLight),
2296 "\"ls", lsArgs, casuDisk, "\""])
2297 if CASUTransfer.verbose > 1:
2298 print "getDirListing:", cmd
2299 p = Popen(cmd, shell=True, stderr=STDOUT, stdout=PIPE)
2300
2301 return p.stdout
2302
2303
2304
2305 @staticmethod
2306 - def getDirSizes(casuDisk, UKLight=True, returnList=False, dateRE="20*"):
2307 """
2308 Dict of directory sizes.
2309 """
2310 if CASUQueries.sysc.isWSA():
2311 duArgs = ' '.join(["--max-depth=1", casuDisk])
2312 else:
2313 duArgs = os.path.join(casuDisk, dateRE)
2314
2315 cmd = ' '.join([CASUQueries.sshCmd(UKLight),
2316 "\"du -h", duArgs, "\""])
2317 if CASUTransfer.verbose > 1:
2318 print "getDirSizes:", cmd
2319 dirSizes = [x.rstrip().split('\t') for x in os.popen(cmd)]
2320
2321 if returnList:
2322 return dirSizes
2323
2324 return dict((os.path.basename(cDir), size) for size, cDir in dirSizes)
2325
2326
2327
2328 @staticmethod
2329 - def getDirVersions(casuDisk, reproVersion=None, forceTransfer=False,
2330 UKLight=True, returnList=False, dateStr = ''):
2331 """
2332 Dict of directory version numbers (dv).
2333 """
2334 if forceTransfer:
2335 if reproVersion and reproVersion == '4':
2336 findArg = "VERSION_*"
2337 else:
2338 findArg = ("UPDATED" if "reprocessed" in casuDisk
2339 else "VERSION_*")
2340 else:
2341 findArg = "VERSION_*"
2342 findPath = os.path.join(casuDisk, dateStr)
2343 cmd = "%s \"sh -c 'find %s -name %s -print 2> /dev/null'\"" % \
2344 (CASUQueries.sshCmd(UKLight), findPath, findArg)
2345 if CASUTransfer.verbose > 1:
2346 print "getDirVersions:", cmd
2347
2348 dirVersions = sorted(x.replace('\n', '') for x in os.popen(cmd)
2349 if findPath in x)
2350
2351 if returnList:
2352 return dirVersions
2353
2354 dvDict = {}
2355 for fileName in dirVersions:
2356 versFile = File(fileName)
2357 versStr = (reproVersion if reproVersion
2358 else versFile.base.rpartition('_')[2])
2359 dvDict[versFile.path] = '_v'.join([versFile.subdir, versStr])
2360 return dvDict
2361
2362
2363
2364 @staticmethod
2365 - def getOTCDirs(startDate, endDate, srDirs, dirVersions,
2366 casuDisk, reproMode=None, reproVersion=None,
2367 forceTransfer=False, checkSR=True, UKLight=True,
2368 returnList=False):
2369 """
2370 List of ok_to_copy (otc) directories.
2371
2372 @param startDate: Begin of transfer period.
2373 @type startDate: int
2374 @param endDate: End of transfer period.
2375 @type endDate: int
2376 @param srDirs: List of SUCCESSFULLY_READ directories.
2377 @type srDirs: list
2378 @param dirVersions: Dict of directory version numbers.
2379 @type dirVersions: dict
2380 @param checkSR: Check if the SUCCESSFULLY_READ tag is set.
2381 @type checkSR: bool
2382
2383 """
2384 if forceTransfer:
2385 if reproVersion and reproVersion == '4':
2386 findArg = "VERSION_*"
2387 else:
2388 findArg = ("UPDATED" if "reprocessed" in casuDisk
2389 else "VERSION_*")
2390 else:
2391 findArg = "OK_TO_COPY"
2392 findPath = (os.path.join(casuDisk, str(startDate))
2393 if startDate == endDate else casuDisk)
2394 cmd = "%s \"sh -c 'find %s -name %s -print 2> /dev/null'\"" % \
2395 (CASUQueries.sshCmd(UKLight), findPath, findArg)
2396 if CASUTransfer.verbose > 1:
2397 print "getOTCDirs:", cmd
2398
2399 tmpOtcDirs = [x.replace('\n', '') for x in os.popen(cmd)]
2400
2401
2402 otcDirs = []
2403 for fileName in tmpOtcDirs:
2404 otcFile = File(fileName)
2405 otcDate = (otcFile.subdir.replace(reproMode.replace, '')
2406 if reproMode else otcFile.subdir[:8])
2407 if otcDate.isdigit() and startDate <= int(otcDate) <= endDate:
2408 otcDirs.append(fileName)
2409
2410 if returnList:
2411 return otcDirs
2412
2413 otcDict = {}
2414 for fileName in otcDirs:
2415 otcFile = File(fileName)
2416 fileDateStr = otcFile.subdir.replace(reproMode.replace, '')
2417 if otcFile.path not in dirVersions and not reproMode:
2418 text = "%s has no version number." % otcFile.path
2419 Logger.addMessage(text)
2420 raise NoDataError, text
2421 if otcFile.path not in srDirs or not checkSR:
2422 versionStr = (dirVersions[otcFile.path] if not reproVersion
2423 else '_v'.join([fileDateStr, reproVersion]))
2424 otcDict[otcFile.subdir.replace(reproMode.replace, '')] = \
2425 [otcFile.path, versionStr]
2426 return otcDict
2427
2428
2429
2430 @staticmethod
2431 - def getSRDirs(casuDisk, UKLight=True, returnList=False, dateStr=''):
2432 """List of successfully_read (sr) directories."""
2433 findArg = "SUCCESSFULLY_READ"
2434 findPath = os.path.join(casuDisk, dateStr)
2435 cmd = "%s \"sh -c 'find %s -name %s -print 2> /dev/null'\"" % \
2436 (CASUQueries.sshCmd(UKLight), findPath, findArg)
2437 if CASUTransfer.verbose > 1:
2438 print "getSRDirs:", cmd
2439
2440 srDirs = sorted(x.replace('\n', '') for x in os.popen(cmd))
2441
2442 if returnList:
2443 return srDirs
2444
2445 srDict = {}
2446 for fileName in srDirs:
2447 srFile = File(fileName)
2448 srDict[srFile.path] = srFile.base
2449 return srDict
2450
2451
2452
2453 @staticmethod
2473
2474
2475
2476 @staticmethod
2481
2482
2483
2484 @staticmethod
2485 - def sshCmd(UKLight=True, syscon=None):
2491
2495 """Run the previously made scp scripts as separate threads."""
2497 super(Download, self).__init__()
2498 self.args_q = args_q
2499 self.result_q = result_q
2500 self.stoprequest = Event()
2501
2503 """Spawn the scp scripts."""
2504 while not self.stoprequest.isSet():
2505 try:
2506 script, isTestRun, ffluOnly = self.args_q.get(True, 0.05)
2507 theTime = utils.makeTimeStamp()
2508 Logger.addMessage(' '.join([os.path.basename(script),
2509 "started at %s UTC" % (theTime)]))
2510 if isTestRun:
2511 status = 0
2512 print "No download due to test run."
2513 elif ffluOnly:
2514 status = 0
2515 Logger.addMessage("No download due to FlatfileLookup"
2516 " update only.")
2517 else:
2518 status = call(script)
2519 self.result_q.put((self.name, status, os.path.basename(script)))
2520 except Queue.Empty:
2521 continue
2522
2523 - def join(self, timeout=None):
2524 self.stoprequest.set()
2525 super(Download, self).join(timeout)
2526
2530 """
2531 Exception if the transfer from CASU was not completely successful.
2532 """
2534 Logger.addMessage(
2535 "# Transfer from CASU not successful. But there may be some "
2536 "successfully read files in %s." % logFile)
2537
2541 """Throw exception if no data is available for the given date"""
2542 pass
2543
2547 """Class containing all MD5 related functions.
2548 """
2549 sysc = IngCuSession.sysc
2550 - def md5Run(self, dirStats, tfDict, outDirs):
2551 """
2552 md5 checking.
2553
2554 @param dirStats: Directory statistics.
2555 @type dirStats: dict
2556 @param tfDict: Dictionary of transferred files.
2557 @type tfDict: dict
2558
2559 @return: Status of md5 checks on files here and at CASU, and list of
2560 files with missing md5 checks.
2561 @rtype: list(int, int), list
2562
2563 """
2564
2565 md5HereDict = self._md5Calc(tfDict.values())
2566
2567 md5CambList = []
2568 tmpList = []
2569 md5FilesMiss = []
2570
2571 for k in dirStats:
2572 if dirStats.get(k)[0] == 0 and dirStats.get(k)[1] == 1:
2573 md5sumsDirPath = os.path.join(outDirs[k], k)
2574 tmpList, md5CambStatus = \
2575 self._readCamMd5(os.path.join(md5sumsDirPath, "md5sums_"+k))
2576 md5CambList.extend(tmpList)
2577
2578 md5HereStatus, md5FilesMissing = \
2579 self._md5Compare(md5CambList, md5HereDict, md5FilesMiss)
2580
2581 return [md5CambStatus, md5HereStatus], md5FilesMissing
2582
2583
2584
2586 """
2587 md5 checksum calculation.
2588
2589 @param fileList: List of files to check.
2590 @type fileList: list(str)
2591
2592 @return: Dictionary of files and their checksums.
2593 @rtype: dict(str: int)
2594
2595 """
2596 Logger.addMessage("# md5 checksum calculation started at %s UTC" %
2597 utils.makeTimeStamp())
2598
2599 md5Dict = {}
2600 for fileName in fileList:
2601 if os.path.exists(fileName):
2602 m = hashlib.md5()
2603 for line in open(fileName):
2604 m.update(line)
2605
2606 outPath = os.path.splitext(fileName)[0] + ".md5"
2607 open(outPath, 'w').write(m.hexdigest())
2608
2609 inDir = os.path.split(os.path.dirname(fileName))[1]
2610 inPath = os.path.join(inDir, os.path.basename(fileName))
2611 md5Dict[inPath] = [m.hexdigest(), fileName]
2612
2613 Logger.addMessage("# md5 checksum calculation finished at %s UTC" %
2614 utils.makeTimeStamp())
2615
2616 return md5Dict
2617
2618
2619
2620 - def _md5Compare(self, remList, locDict, missingFiles):
2621 """
2622 Test md5 checksums and move corrupted files to 'cor_dir'.
2623
2624 @param remList: File list from CASU.
2625 @type remList: list
2626 @param locDict: Local file dict of transfered files.
2627 @type locDict: dict
2628 @param missingFiles: Missing (not downloaded) files.
2629 @type missingFiles: list
2630
2631 @return: Status of md5 check (0 OK, 1 missing/wrong checksum) and
2632 missingFiles list updated with files with different checksums.
2633 @rtype: int, list
2634
2635 """
2636 noMatch = 0
2637 status = 0
2638 for element in remList:
2639 if element[0] in locDict:
2640
2641
2642 if element[1] != locDict.get(element[0])[0] \
2643 and "md5sum" not in element[0]:
2644 noMatch += 1
2645 status = 1
2646 corFile = os.path.join(
2647 self.sysc.downloadPathList(),
2648 locDict.get(element[0])[1])
2649 shutil.copy2(corFile, self.sysc.corruptFilesPath)
2650 os.remove(corFile)
2651 missingFiles.append(locDict.get(element[0])[1])
2652 Logger.addMessage(
2653 "# md5sum of %s doesn't match! File copied into %s" %
2654 (element[0], self.sysc.corruptFilesPath))
2655
2656 return status, missingFiles
2657
2658
2659
2661 """
2662 Read camb md5 list.
2663
2664 @param fileName: File at CASU containing the md5 checksums there.
2665 @type fileName: str
2666
2667 @return: List of [file name, checksum] pairs, and status of md5 checks.
2668 @rtype: list(list(str, int)), int
2669
2670 """
2671 md5List = []
2672 status = 0
2673 if os.path.exists(fileName):
2674 for line in file(fileName):
2675 md5List.append([os.path.basename(line.split()[1]),
2676 line.split()[0]])
2677 else:
2678 Logger.addMessage(
2679 "# No md5sum from CASU in " + os.path.dirname(fileName))
2680 status = 1
2681 return md5List, status
2682
2683
2684