Package invocations :: Package monitoring :: Module CheckTransfer
[hide private]

Source Code for Module invocations.monitoring.CheckTransfer

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: CheckTransfer.py 9976 2013-07-31 16:45:24Z EckhardSutorius $ 
  4  """ 
  5     Checks file sizes of transferred files. 
  6   
  7     @author: E. Sutorius 
  8     @org:    WFAU, IfA, University of Edinburgh 
  9  """ 
 10  #------------------------------------------------------------------------------ 
 11  from   collections import defaultdict, namedtuple 
 12  import inspect 
 13  import mx.DateTime as mxTime 
 14  import os 
 15  import time 
 16   
 17  from   invocations.cu1.cu1Transfer  import CASUQueries 
 18  from   wsatools.CLI                 import CLI 
 19  from   wsatools.DbConnect.DbSession import DbSession 
 20  from   wsatools.File                import File 
 21  import wsatools.FitsUtils               as fits 
 22  from   wsatools.DbConnect.IngCuSession import IngCuSession 
 23  from   wsatools.Logger              import Logger 
 24  import wsatools.Utilities               as utils 
25 #------------------------------------------------------------------------------ 26 27 -class CheckTransfer(IngCuSession):
28 """ Check if transfers where successful. 29 """ 30 # Define class parameters 31 CasuData = namedtuple('CasuData', "sr, vers, list") 32 33 #-------------------------------------------------------------------------- 34
35 - def __init__(self, 36 curator=CLI.getOptDef("curator"), 37 database=DbSession.database, 38 beginDate=CLI.getOptDef("begin"), 39 endDate=CLI.getOptDef("end"), 40 casuDirs=CLI.getOptDef("casudirs"), 41 outPath=CLI.getOptDef("outpath"), 42 writeStats=CLI.getOptDef("stats"), 43 onlyMissing=CLI.getOptDef("missing"), 44 onlyCasuNewer=CLI.getOptDef("casutime"), 45 fixCats=CLI.getOptDef("fixcats"), 46 fixPix=CLI.getOptDef("fixpix"), 47 versionStr=CLI.getOptDef("version"), 48 ignorePad=CLI.getOptDef("ignorepad"), 49 timeComp=CLI.getOptDef("timecmp"), 50 janet=CLI.getOptDef("janet"), 51 isTrialRun=DbSession.isTrialRun, 52 comment=CLI.getArgDef("comment")):
53 """ 54 @param beginDate: First date to process, eg. 20050101. 55 @type beginDate: str 56 @param comment: Descriptive comment as to why curation task is 57 being performed. 58 @type comment: str 59 @param curator: Name of curator. 60 @type curator: str 61 @param casuDirs: Directories at CASU where the data is stored. 62 @type casuDirs: str 63 @param database: Name of the database to connect to. 64 @type database: str 65 @param endDate: Last date to process, eg. 20050131. 66 @type endDate: str 67 @param ignorePad: If True, ignore 2880 bit file size difference. 68 @type ignorePad: bool 69 @param isTrialRun: If True, do not perform database modifications. 70 @type isTrialRun: bool 71 @param fixCats: Fix timestamp for catalogues from CASU. 72 @type fixCats: str 73 @param fixPix: Fix timestamp for pixel files from CASU. 74 @type fixPix: str 75 @param janet: Use JANET instead of UKLight. 76 @type janet: bool 77 @param onlyCasuNewer: Full message only when CASU time is newer. 78 @type onlyCasuNewer: bool 79 @param onlyMissing: Check only for missing files. 80 @type onlyMissing: bool 81 @param outPath: Log file directory. 82 @type outPath: str 83 @param timeComp: WFAU</!=/>CASU time comparison method 84 @type timeComp: str 85 @param versionStr: Version number. 86 @type versionStr: str 87 @param writeStats: Write all file statistics into file. 88 @type writeStats: bool 89 90 """ 91 # Initialize parent class 92 super(CheckTransfer, self).__init__(cuNum=0, 93 curator=curator, 94 comment=comment, 95 database=database, 96 autoCommit=False, 97 isTrialRun=isTrialRun) 98 99 beginDate, endDate = \ 100 self.sysc.obsCal.getDatesFromInput(beginDate, endDate) 101 102 super(CheckTransfer, self).attributesFromArguments( 103 inspect.getargspec(CheckTransfer.__init__)[0], locals()) 104 105 CASUQueries.sysc = self.sysc 106 self.UKLight = not self.janet 107 108 if self.casuDirs: 109 self.scpCasuDirs = self.casuDirs.split(',') 110 else: 111 self.scpCasuDirs = CASUQueries.getCasuStagingDirs(self.UKLight) 112 113 if self.outPath == "./": 114 self.outPath = "./check%s" % self.sysc.loadDatabase 115 utils.ensureDirExist(self.outPath) 116 117 self.casuData = CheckTransfer.CasuData( 118 sr=defaultdict(), vers=defaultdict(), list=defaultdict()) 119 120 self.exclPrefix = ("t_w", "tv", "chan", "lin_", "squish", "bpm", 121 "dqcdark", "dqcbias", "tile_", "_psf", "xsky", 122 "tmp") 123 self.exclStr = ("orig", "_psf", "squish", "mos", "jrf", "list")
124 125 #-------------------------------------------------------------------------- 126
127 - def _onRun(self):
128 Logger.addMessage("Checking Transfers from %s to %s" % ( 129 self.beginDate, self.endDate)) 130 Logger.addMessage("Reading CASU info...") 131 for mainDir in self.scpCasuDirs: 132 # get dir versions 133 reproVers = (self.versionStr if self.versionStr else None) 134 versDirs = CASUQueries.getDirVersions( 135 mainDir, reproVersion=reproVers, UKLight=self.UKLight) 136 for dateDir in versDirs: 137 if self.beginDate <= os.path.basename(dateDir) <= self.endDate: 138 self.casuData.vers[dateDir] = versDirs[dateDir] 139 140 # get succesfully read dirs 141 srDirs = CASUQueries.getSRDirs(mainDir, self.UKLight) 142 for dateDir in srDirs: 143 if self.beginDate <= os.path.basename(dateDir) <= self.endDate: 144 self.casuData.sr[dateDir] = self.casuData.vers[dateDir] 145 146 fitsDirs = fits.FitsList(self.sysc, prefix="ct_") 147 fitsDirs.createFitsDateDict(ingestDirectory=self.sysc.fitsDir, 148 beginDateStr=self.beginDate, 149 endDateStr=self.endDate) 150 151 Logger.addMessage("getting file info at WFAU...") 152 edinFileDict = defaultdict() 153 for dateVersStr in sorted(fitsDirs.invFitsDateDict): 154 dirPath = os.path.join( 155 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr) 156 for fileName in os.listdir(dirPath): 157 edinFileDict[os.path.join(dateVersStr, fileName)] = \ 158 (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( 159 os.path.getmtime(os.path.join(dirPath, fileName)))), 160 os.path.getsize(os.path.join(dirPath, fileName)), 161 os.path.join(dirPath, fileName)) 162 163 if self.writeStats: 164 statsFile = File(os.path.join( 165 self.outPath, "TransferStatistics_%s_%s_%s_%s.log" % ( 166 self.sysc.loadDatabase, self.beginDate, self.endDate, 167 self.writeStats))) 168 statsFile.wopen() 169 170 misOverview = defaultdict(list) 171 xferErr = False 172 misFileSet = set() 173 for casuDir in sorted(self.casuData.sr): 174 Logger.addMessage("checking %s..." % os.path.basename(casuDir)) 175 timeMisCounter = 0 176 sizeMisCounter = 0 177 fileMisCounter = 0 178 allFilesCounter = 0 179 misOverview[os.path.basename(casuDir)] = [ 180 fileMisCounter, timeMisCounter, sizeMisCounter, allFilesCounter] 181 for line in CASUQueries.getDirListing(casuDir, self.UKLight, "-o"): 182 if line.rstrip().endswith((".fit", ".fits")): 183 (_access, _nol, _uid, casuSize, yyyymmdd, timeStamp, 184 _timezone, fileName) = line.rstrip().split(None, 7) 185 if "->" in fileName and _access[0] == 'l': 186 fileName = fileName.split(None, 1)[0] 187 yyyy, mon, dd = yyyymmdd.split('-') 188 if ':' in timeStamp: 189 hh = timeStamp[:2] 190 mm = timeStamp[3:5] 191 ss = timeStamp[6:8] 192 casuTime = mxTime.DateTime( 193 int(yyyy), int(mon), int(dd), int(hh), int(mm), 194 int(ss)).strftime("%Y-%m-%d %H:%M:%S") 195 casuSize = int(casuSize) 196 casuName = os.path.join(casuDir, fileName) 197 fileAcronym = os.path.join(self.casuData.sr[casuDir], 198 fileName) 199 allFilesCounter += 1 200 if fileName.startswith(self.exclPrefix) \ 201 or any(x in fileName for x in self.exclStr): 202 pass 203 elif fileAcronym in edinFileDict: 204 wfauTime, wfauSize, wfauName = edinFileDict[fileAcronym] 205 # size mismatch 206 sizeDiff = casuSize - wfauSize 207 if sizeDiff != 0 and \ 208 casuSize > 100 and \ 209 not (abs(sizeDiff) == 2880 and self.ignorePad): 210 if not self.onlyMissing: 211 Logger.addMessage( 212 "WARNING: %s" % os.path.join( 213 casuDir, fileName)) 214 Logger.addMessage( 215 " (%s)" % os.path.join(wfauName)) 216 Logger.addMessage( 217 " Size mismatch: C: %d - W: %d (%d)" % ( 218 casuSize, wfauSize, sizeDiff)) 219 Logger.addMessage( 220 " Time: C: %r - W: %r\n" % ( 221 casuTime, wfauTime)) 222 misFileSet.add((casuName, wfauName)) 223 sizeMisCounter += 1 224 xferErr = True 225 226 # time mismatch 227 if wfauTime != casuTime and casuSize > 100: 228 if not self.onlyMissing: 229 if wfauTime < casuTime or ( 230 not self.onlyCasuNewer \ 231 and wfauTime > casuTime): 232 Logger.addMessage( 233 "WARNING: %s" % os.path.join( 234 casuDir, fileName)) 235 Logger.addMessage( 236 " (%s)" % os.path.join(wfauName)) 237 Logger.addMessage( 238 " Time mismatch: C:%r - W:%r" % ( 239 casuTime, wfauTime)) 240 Logger.addMessage( 241 " Size: C: %d - W: %d\n" % ( 242 casuSize, wfauSize)) 243 if self.compare(wfauTime, casuTime, 244 self.timeComp): 245 misFileSet.add((casuName, wfauName)) 246 timeMisCounter += 1 247 xferErr = True 248 249 # reset cat timestamp and write it into cat file 250 if (self.fixCats and \ 251 self.sysc.catSuffix in fileName) \ 252 or (self.fixPix and \ 253 fileName.endswith(self.sysc.mefType)): 254 filePath = edinFileDict[fileAcronym][2] 255 dirName = os.path.basename(casuDir)[2:] 256 casuTimestamp = dirName + casuTime[2:].replace( 257 '-','').replace(' ','').replace(':','') 258 wfauTimestamp = dirName + wfauTime[2:].replace( 259 '-','').replace(' ','').replace(':','') 260 newTimestamp = (casuTimestamp 261 if casuTimestamp < wfauTimestamp 262 else wfauTimestamp) 263 # write vsaTimestamp 264 fimgptr = fits.open(filePath, "update") 265 fits.writeToFitsHdu( 266 fimgptr, (0,), 267 "%s_TIME" % self.sysc.loadDatabase, 268 newTimestamp, 269 "%s time stamp" % self.sysc.loadDatabase, 270 True) 271 fimgptr.close() 272 # reset file timestamp 273 timetpl = time.strptime(newTimestamp[ 274 len(dirName):], '%y%m%d%H%M%S') 275 modtime = time.mktime(timetpl) 276 os.utime(filePath, (time.time(), modtime)) 277 Logger.addMessage("Timestamp updated on %s" % 278 filePath) 279 280 # write statistics 281 if self.writeStats: 282 if casuSize > 100 and ((sizeDiff != 0 and \ 283 not (abs(sizeDiff) == 2880 and self.ignorePad)) \ 284 or wfauTime != casuTime) \ 285 or self.writeStats == 'f': 286 statsFile.writetheline(fileAcronym) 287 statsFile.writetheline( 288 "CASU: %r %d" % (casuTime, casuSize)) 289 statsFile.writetheline( 290 "WFAU: %r %d" % (wfauTime, wfauSize)) 291 statsFile.writetheline("----") 292 else: 293 Logger.addMessage("ERROR: %s missing!\n" % 294 os.path.join(casuDir, fileName)) 295 dateStr = os.path.basename(casuDir) 296 dateVersStr = "%s_v%s" % ( 297 dateStr, self.sysc.obsCal.maxVersOfDate(dateStr)) 298 wfauName = os.path.join( 299 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr, 300 os.path.basename(casuName)) 301 misFileSet.add((casuName, wfauName)) 302 fileMisCounter += 1 303 xferErr = True 304 305 if xferErr: 306 misOverview[os.path.basename(casuDir)] = [ 307 fileMisCounter, timeMisCounter, sizeMisCounter, 308 allFilesCounter] 309 310 misDates = [] 311 for casuDir in self.casuData.vers: 312 if casuDir not in self.casuData.sr: 313 misDates.append(self.casuData.vers[casuDir]) 314 xferErr = True 315 316 if misDates: 317 Logger.addMessage("The following days haven't been transferred yet:") 318 Logger.addMessage(','.join(misDates)) 319 320 if self.writeStats: 321 statsFile.close() 322 Logger.addMessage("Statistics written to %s" % statsFile.name) 323 if not xferErr: 324 Logger.addMessage("Transfer for %s from %s to %s successful!" % ( 325 self.sysc.loadDatabase, self.beginDate, self.endDate)) 326 else: 327 for dateStr in sorted(misOverview): 328 Logger.addMessage('\n '.join([ 329 "\n %s: %d CASU files" % (dateStr, misOverview[dateStr][3]), 330 "%d files are missing," % misOverview[dateStr][0], 331 "%d files show a wrong time," % misOverview[dateStr][1], 332 "%d files have the wrong size.\n" % misOverview[dateStr][2]])) 333 334 # write the log 335 logFileName = os.path.join( 336 self.outPath, "checkTransfer_%s_%s_%s.log" % ( 337 self.sysc.loadDatabase, self.beginDate, self.endDate)) 338 Logger.addMessage("Output written to %s" % logFileName) 339 340 # write scripts 341 if misFileSet: 342 # transfer science files 343 xferSciFile = File(os.path.join( 344 self.outPath, "xfer_%s_%s_%s_sci.sh" % ( 345 self.sysc.loadDatabase, self.beginDate, self.endDate))) 346 xferSciFile.wopen() 347 xferSciFile.writetheline("#! /usr/bin/tcsh") 348 349 # transfer filter files 350 xferFiFile = File(os.path.join( 351 self.outPath, "xfer_%s_%s_%s_fi.sh" % ( 352 self.sysc.loadDatabase, self.beginDate, self.endDate))) 353 xferFiFile.wopen() 354 xferFiFile.writetheline("#! /usr/bin/tcsh") 355 356 # ingest lists 357 toingSciFile = File(os.path.join( 358 self.outPath, "toing_%s_%s_%s_sci.list" % ( 359 self.sysc.loadDatabase, self.beginDate, self.endDate))) 360 toingSciFile.wopen() 361 toingSciFixFile = File(os.path.join( 362 self.outPath, "toing_%s_%s_%s_sci_fix.list" % ( 363 self.sysc.loadDatabase, self.beginDate, self.endDate))) 364 toingSciFixFile.wopen() 365 toingFiFile = File(os.path.join( 366 self.outPath, "toing_%s_%s_%s_fi.list" % ( 367 self.sysc.loadDatabase, self.beginDate, self.endDate))) 368 toingFiFile.wopen() 369 370 # write files to the scripts 371 sciDict = defaultdict(list) 372 for casuName, wfauName in sorted(misFileSet): 373 casuFile = File(casuName) 374 if casuFile.base.startswith("o2"): 375 fileGroup = '_'.join([casuFile.sdate, casuFile.runno]) 376 sciDict[fileGroup].append((casuName, wfauName)) 377 else: 378 xferFiFile.writetheline("%s %s@%s:%s %s" % ( 379 self.sysc.scpMethod().replace("-pq", "-p"), 380 self.sysc.scpUser, self.sysc.scpServer(), 381 casuName, wfauName)) 382 toingFiFile.writetheline(wfauName) 383 del casuFile 384 385 for fileGroup in sorted(sciDict): 386 if len(sciDict[fileGroup]) > 1: 387 for casuName, wfauName in sciDict[fileGroup]: 388 xferSciFile.writetheline("%s %s@%s:%s %s" % ( 389 self.sysc.scpMethod().replace("-pq", "-p"), 390 self.sysc.scpUser, self.sysc.scpServer(), 391 casuName, wfauName)) 392 toingSciFile.writetheline(wfauName) 393 else: 394 toingSciFixFile.writetheline(sciDict[fileGroup][0][1]) 395 396 # close files 397 xferSciFile.close() 398 xferSciFile.chmod(0744) 399 xferFiFile.close() 400 xferFiFile.chmod(0744) 401 Logger.addMessage("Transfer scripts written to %s, %s" % ( 402 xferSciFile.name, xferFiFile.name)) 403 toingSciFile.close() 404 toingSciFixFile.close() 405 toingFiFile.close() 406 Logger.addMessage("Ingest lists written to %s, %s, %s" % ( 407 toingSciFile.name, toingSciFixFile.name, toingFiFile.name)) 408 else: 409 Logger.addMessage("No files missing!") 410 411 Logger.dump(file(logFileName, 'w'))
412 413 #-------------------------------------------------------------------------- 414 415 @staticmethod
416 - def compare(x, y, method):
417 if method == 'lt': 418 return x < y 419 elif method == 'ne': 420 return x != y
421 422 #------------------------------------------------------------------------------ 423 # Entry point for script. 424 425 # Allow module to be imported as well as executed from the command line 426 if __name__ == "__main__": 427 # Define command-line interface settings for CheckTransfer 428 CLI.progOpts += [ 429 CLI.Option('C', "fixcats", 430 "fix WFAU timestamps from CASU for catalogues"), 431 CLI.Option('P', "fixpix", 432 "fix WFAU timestamps from CASU for pixel files"), 433 CLI.Option('I', "ignorepad", 434 "ignore 2880 bit padding on WFAU FITS files"), 435 CLI.Option('T', "timecmp", "use WFAU 'lt'/'ne'/'gt' CASU as time" 436 " comparison method for xfer output", 437 "MODE", 'lt', isValOK=lambda x: x.lower() in ( 438 "lt", "ne", "gt")), 439 CLI.Option('b', "begindate", 440 "first date to process, eg. 20050101", 441 "DATE", str(IngCuSession.beginDateDef)), 442 CLI.Option('e', "enddate", 443 "last date to process, eg. 20050131", 444 "DATE", str(IngCuSession.endDateDef)), 445 CLI.Option('j', "janet", 446 "use JANET instead of UKLight"), 447 CLI.Option('m', "missing", "only report missing files"), 448 CLI.Option('o', "outpath", 449 "directory where the output data is written to", 450 "PATH", './'), 451 CLI.Option('s', "stats", "create a file containing f(ull) or " 452 " d(iff) stats", 453 "MODE", '', isValOK=lambda x: x.lower() in "fd"), 454 CLI.Option('v', "version", 455 "overwrite CASU version with this version", 456 "STR", ''), 457 CLI.Option('w', "casudirs", 458 "main CASU data directories (depending on given archive)", 459 "PATH", ''), 460 CLI.Option('z', "casutime", "only report files with newer CASU time"), 461 ] 462 463 cli = CLI(CheckTransfer, "$Revision: 9976 $") 464 465 checkTransfer = CheckTransfer(cli.getOpt("curator"), 466 cli.getArg("database"), 467 cli.getOpt("begindate"), 468 cli.getOpt("enddate"), 469 cli.getOpt("casudirs"), 470 cli.getOpt("outpath"), 471 cli.getOpt("stats"), 472 cli.getOpt("missing"), 473 cli.getOpt("casutime"), 474 cli.getOpt("fixcats"), 475 cli.getOpt("fixpix"), 476 cli.getOpt("version"), 477 cli.getOpt("ignorepad"), 478 cli.getOpt("timecmp"), 479 cli.getOpt("janet"), 480 cli.getOpt("test"), 481 cli.getArg("comment")) 482 checkTransfer.run() 483