1
2
3
4 """
5 Checks file sizes of transferred files.
6
7 @author: E. Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9 """
10
11 from collections import defaultdict, namedtuple
12 import inspect
13 import mx.DateTime as mxTime
14 import os
15 import time
16
17 from invocations.cu1.cu1Transfer import CASUQueries
18 from wsatools.CLI import CLI
19 from wsatools.DbConnect.DbSession import DbSession
20 from wsatools.File import File
21 import wsatools.FitsUtils as fits
22 from wsatools.DbConnect.IngCuSession import IngCuSession
23 from wsatools.Logger import Logger
24 import wsatools.Utilities as utils
28 """ Check if transfers where successful.
29 """
30
31 CasuData = namedtuple('CasuData', "sr, vers, list")
32
33
34
35 - def __init__(self,
36 curator=CLI.getOptDef("curator"),
37 database=DbSession.database,
38 beginDate=CLI.getOptDef("begin"),
39 endDate=CLI.getOptDef("end"),
40 casuDirs=CLI.getOptDef("casudirs"),
41 outPath=CLI.getOptDef("outpath"),
42 writeStats=CLI.getOptDef("stats"),
43 onlyMissing=CLI.getOptDef("missing"),
44 onlyCasuNewer=CLI.getOptDef("casutime"),
45 fixCats=CLI.getOptDef("fixcats"),
46 fixPix=CLI.getOptDef("fixpix"),
47 versionStr=CLI.getOptDef("version"),
48 ignorePad=CLI.getOptDef("ignorepad"),
49 timeComp=CLI.getOptDef("timecmp"),
50 janet=CLI.getOptDef("janet"),
51 isTrialRun=DbSession.isTrialRun,
52 comment=CLI.getArgDef("comment")):
53 """
54 @param beginDate: First date to process, eg. 20050101.
55 @type beginDate: str
56 @param comment: Descriptive comment as to why curation task is
57 being performed.
58 @type comment: str
59 @param curator: Name of curator.
60 @type curator: str
61 @param casuDirs: Directories at CASU where the data is stored.
62 @type casuDirs: str
63 @param database: Name of the database to connect to.
64 @type database: str
65 @param endDate: Last date to process, eg. 20050131.
66 @type endDate: str
67 @param ignorePad: If True, ignore 2880 bit file size difference.
68 @type ignorePad: bool
69 @param isTrialRun: If True, do not perform database modifications.
70 @type isTrialRun: bool
71 @param fixCats: Fix timestamp for catalogues from CASU.
72 @type fixCats: str
73 @param fixPix: Fix timestamp for pixel files from CASU.
74 @type fixPix: str
75 @param janet: Use JANET instead of UKLight.
76 @type janet: bool
77 @param onlyCasuNewer: Full message only when CASU time is newer.
78 @type onlyCasuNewer: bool
79 @param onlyMissing: Check only for missing files.
80 @type onlyMissing: bool
81 @param outPath: Log file directory.
82 @type outPath: str
83 @param timeComp: WFAU</!=/>CASU time comparison method
84 @type timeComp: str
85 @param versionStr: Version number.
86 @type versionStr: str
87 @param writeStats: Write all file statistics into file.
88 @type writeStats: bool
89
90 """
91
92 super(CheckTransfer, self).__init__(cuNum=0,
93 curator=curator,
94 comment=comment,
95 database=database,
96 autoCommit=False,
97 isTrialRun=isTrialRun)
98
99 beginDate, endDate = \
100 self.sysc.obsCal.getDatesFromInput(beginDate, endDate)
101
102 super(CheckTransfer, self).attributesFromArguments(
103 inspect.getargspec(CheckTransfer.__init__)[0], locals())
104
105 CASUQueries.sysc = self.sysc
106 self.UKLight = not self.janet
107
108 if self.casuDirs:
109 self.scpCasuDirs = self.casuDirs.split(',')
110 else:
111 self.scpCasuDirs = CASUQueries.getCasuStagingDirs(self.UKLight)
112
113 if self.outPath == "./":
114 self.outPath = "./check%s" % self.sysc.loadDatabase
115 utils.ensureDirExist(self.outPath)
116
117 self.casuData = CheckTransfer.CasuData(
118 sr=defaultdict(), vers=defaultdict(), list=defaultdict())
119
120 self.exclPrefix = ("t_w", "tv", "chan", "lin_", "squish", "bpm",
121 "dqcdark", "dqcbias", "tile_", "_psf", "xsky",
122 "tmp")
123 self.exclStr = ("orig", "_psf", "squish", "mos", "jrf", "list")
124
125
126
128 Logger.addMessage("Checking Transfers from %s to %s" % (
129 self.beginDate, self.endDate))
130 Logger.addMessage("Reading CASU info...")
131 for mainDir in self.scpCasuDirs:
132
133 reproVers = (self.versionStr if self.versionStr else None)
134 versDirs = CASUQueries.getDirVersions(
135 mainDir, reproVersion=reproVers, UKLight=self.UKLight)
136 for dateDir in versDirs:
137 if self.beginDate <= os.path.basename(dateDir) <= self.endDate:
138 self.casuData.vers[dateDir] = versDirs[dateDir]
139
140
141 srDirs = CASUQueries.getSRDirs(mainDir, self.UKLight)
142 for dateDir in srDirs:
143 if self.beginDate <= os.path.basename(dateDir) <= self.endDate:
144 self.casuData.sr[dateDir] = self.casuData.vers[dateDir]
145
146 fitsDirs = fits.FitsList(self.sysc, prefix="ct_")
147 fitsDirs.createFitsDateDict(ingestDirectory=self.sysc.fitsDir,
148 beginDateStr=self.beginDate,
149 endDateStr=self.endDate)
150
151 Logger.addMessage("getting file info at WFAU...")
152 edinFileDict = defaultdict()
153 for dateVersStr in sorted(fitsDirs.invFitsDateDict):
154 dirPath = os.path.join(
155 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr)
156 for fileName in os.listdir(dirPath):
157 edinFileDict[os.path.join(dateVersStr, fileName)] = \
158 (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(
159 os.path.getmtime(os.path.join(dirPath, fileName)))),
160 os.path.getsize(os.path.join(dirPath, fileName)),
161 os.path.join(dirPath, fileName))
162
163 if self.writeStats:
164 statsFile = File(os.path.join(
165 self.outPath, "TransferStatistics_%s_%s_%s_%s.log" % (
166 self.sysc.loadDatabase, self.beginDate, self.endDate,
167 self.writeStats)))
168 statsFile.wopen()
169
170 misOverview = defaultdict(list)
171 xferErr = False
172 misFileSet = set()
173 for casuDir in sorted(self.casuData.sr):
174 Logger.addMessage("checking %s..." % os.path.basename(casuDir))
175 timeMisCounter = 0
176 sizeMisCounter = 0
177 fileMisCounter = 0
178 allFilesCounter = 0
179 misOverview[os.path.basename(casuDir)] = [
180 fileMisCounter, timeMisCounter, sizeMisCounter, allFilesCounter]
181 for line in CASUQueries.getDirListing(casuDir, self.UKLight, "-o"):
182 if line.rstrip().endswith((".fit", ".fits")):
183 (_access, _nol, _uid, casuSize, yyyymmdd, timeStamp,
184 _timezone, fileName) = line.rstrip().split(None, 7)
185 if "->" in fileName and _access[0] == 'l':
186 fileName = fileName.split(None, 1)[0]
187 yyyy, mon, dd = yyyymmdd.split('-')
188 if ':' in timeStamp:
189 hh = timeStamp[:2]
190 mm = timeStamp[3:5]
191 ss = timeStamp[6:8]
192 casuTime = mxTime.DateTime(
193 int(yyyy), int(mon), int(dd), int(hh), int(mm),
194 int(ss)).strftime("%Y-%m-%d %H:%M:%S")
195 casuSize = int(casuSize)
196 casuName = os.path.join(casuDir, fileName)
197 fileAcronym = os.path.join(self.casuData.sr[casuDir],
198 fileName)
199 allFilesCounter += 1
200 if fileName.startswith(self.exclPrefix) \
201 or any(x in fileName for x in self.exclStr):
202 pass
203 elif fileAcronym in edinFileDict:
204 wfauTime, wfauSize, wfauName = edinFileDict[fileAcronym]
205
206 sizeDiff = casuSize - wfauSize
207 if sizeDiff != 0 and \
208 casuSize > 100 and \
209 not (abs(sizeDiff) == 2880 and self.ignorePad):
210 if not self.onlyMissing:
211 Logger.addMessage(
212 "WARNING: %s" % os.path.join(
213 casuDir, fileName))
214 Logger.addMessage(
215 " (%s)" % os.path.join(wfauName))
216 Logger.addMessage(
217 " Size mismatch: C: %d - W: %d (%d)" % (
218 casuSize, wfauSize, sizeDiff))
219 Logger.addMessage(
220 " Time: C: %r - W: %r\n" % (
221 casuTime, wfauTime))
222 misFileSet.add((casuName, wfauName))
223 sizeMisCounter += 1
224 xferErr = True
225
226
227 if wfauTime != casuTime and casuSize > 100:
228 if not self.onlyMissing:
229 if wfauTime < casuTime or (
230 not self.onlyCasuNewer \
231 and wfauTime > casuTime):
232 Logger.addMessage(
233 "WARNING: %s" % os.path.join(
234 casuDir, fileName))
235 Logger.addMessage(
236 " (%s)" % os.path.join(wfauName))
237 Logger.addMessage(
238 " Time mismatch: C:%r - W:%r" % (
239 casuTime, wfauTime))
240 Logger.addMessage(
241 " Size: C: %d - W: %d\n" % (
242 casuSize, wfauSize))
243 if self.compare(wfauTime, casuTime,
244 self.timeComp):
245 misFileSet.add((casuName, wfauName))
246 timeMisCounter += 1
247 xferErr = True
248
249
250 if (self.fixCats and \
251 self.sysc.catSuffix in fileName) \
252 or (self.fixPix and \
253 fileName.endswith(self.sysc.mefType)):
254 filePath = edinFileDict[fileAcronym][2]
255 dirName = os.path.basename(casuDir)[2:]
256 casuTimestamp = dirName + casuTime[2:].replace(
257 '-','').replace(' ','').replace(':','')
258 wfauTimestamp = dirName + wfauTime[2:].replace(
259 '-','').replace(' ','').replace(':','')
260 newTimestamp = (casuTimestamp
261 if casuTimestamp < wfauTimestamp
262 else wfauTimestamp)
263
264 fimgptr = fits.open(filePath, "update")
265 fits.writeToFitsHdu(
266 fimgptr, (0,),
267 "%s_TIME" % self.sysc.loadDatabase,
268 newTimestamp,
269 "%s time stamp" % self.sysc.loadDatabase,
270 True)
271 fimgptr.close()
272
273 timetpl = time.strptime(newTimestamp[
274 len(dirName):], '%y%m%d%H%M%S')
275 modtime = time.mktime(timetpl)
276 os.utime(filePath, (time.time(), modtime))
277 Logger.addMessage("Timestamp updated on %s" %
278 filePath)
279
280
281 if self.writeStats:
282 if casuSize > 100 and ((sizeDiff != 0 and \
283 not (abs(sizeDiff) == 2880 and self.ignorePad)) \
284 or wfauTime != casuTime) \
285 or self.writeStats == 'f':
286 statsFile.writetheline(fileAcronym)
287 statsFile.writetheline(
288 "CASU: %r %d" % (casuTime, casuSize))
289 statsFile.writetheline(
290 "WFAU: %r %d" % (wfauTime, wfauSize))
291 statsFile.writetheline("----")
292 else:
293 Logger.addMessage("ERROR: %s missing!\n" %
294 os.path.join(casuDir, fileName))
295 dateStr = os.path.basename(casuDir)
296 dateVersStr = "%s_v%s" % (
297 dateStr, self.sysc.obsCal.maxVersOfDate(dateStr))
298 wfauName = os.path.join(
299 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr,
300 os.path.basename(casuName))
301 misFileSet.add((casuName, wfauName))
302 fileMisCounter += 1
303 xferErr = True
304
305 if xferErr:
306 misOverview[os.path.basename(casuDir)] = [
307 fileMisCounter, timeMisCounter, sizeMisCounter,
308 allFilesCounter]
309
310 misDates = []
311 for casuDir in self.casuData.vers:
312 if casuDir not in self.casuData.sr:
313 misDates.append(self.casuData.vers[casuDir])
314 xferErr = True
315
316 if misDates:
317 Logger.addMessage("The following days haven't been transferred yet:")
318 Logger.addMessage(','.join(misDates))
319
320 if self.writeStats:
321 statsFile.close()
322 Logger.addMessage("Statistics written to %s" % statsFile.name)
323 if not xferErr:
324 Logger.addMessage("Transfer for %s from %s to %s successful!" % (
325 self.sysc.loadDatabase, self.beginDate, self.endDate))
326 else:
327 for dateStr in sorted(misOverview):
328 Logger.addMessage('\n '.join([
329 "\n %s: %d CASU files" % (dateStr, misOverview[dateStr][3]),
330 "%d files are missing," % misOverview[dateStr][0],
331 "%d files show a wrong time," % misOverview[dateStr][1],
332 "%d files have the wrong size.\n" % misOverview[dateStr][2]]))
333
334
335 logFileName = os.path.join(
336 self.outPath, "checkTransfer_%s_%s_%s.log" % (
337 self.sysc.loadDatabase, self.beginDate, self.endDate))
338 Logger.addMessage("Output written to %s" % logFileName)
339
340
341 if misFileSet:
342
343 xferSciFile = File(os.path.join(
344 self.outPath, "xfer_%s_%s_%s_sci.sh" % (
345 self.sysc.loadDatabase, self.beginDate, self.endDate)))
346 xferSciFile.wopen()
347 xferSciFile.writetheline("#! /usr/bin/tcsh")
348
349
350 xferFiFile = File(os.path.join(
351 self.outPath, "xfer_%s_%s_%s_fi.sh" % (
352 self.sysc.loadDatabase, self.beginDate, self.endDate)))
353 xferFiFile.wopen()
354 xferFiFile.writetheline("#! /usr/bin/tcsh")
355
356
357 toingSciFile = File(os.path.join(
358 self.outPath, "toing_%s_%s_%s_sci.list" % (
359 self.sysc.loadDatabase, self.beginDate, self.endDate)))
360 toingSciFile.wopen()
361 toingSciFixFile = File(os.path.join(
362 self.outPath, "toing_%s_%s_%s_sci_fix.list" % (
363 self.sysc.loadDatabase, self.beginDate, self.endDate)))
364 toingSciFixFile.wopen()
365 toingFiFile = File(os.path.join(
366 self.outPath, "toing_%s_%s_%s_fi.list" % (
367 self.sysc.loadDatabase, self.beginDate, self.endDate)))
368 toingFiFile.wopen()
369
370
371 sciDict = defaultdict(list)
372 for casuName, wfauName in sorted(misFileSet):
373 casuFile = File(casuName)
374 if casuFile.base.startswith("o2"):
375 fileGroup = '_'.join([casuFile.sdate, casuFile.runno])
376 sciDict[fileGroup].append((casuName, wfauName))
377 else:
378 xferFiFile.writetheline("%s %s@%s:%s %s" % (
379 self.sysc.scpMethod().replace("-pq", "-p"),
380 self.sysc.scpUser, self.sysc.scpServer(),
381 casuName, wfauName))
382 toingFiFile.writetheline(wfauName)
383 del casuFile
384
385 for fileGroup in sorted(sciDict):
386 if len(sciDict[fileGroup]) > 1:
387 for casuName, wfauName in sciDict[fileGroup]:
388 xferSciFile.writetheline("%s %s@%s:%s %s" % (
389 self.sysc.scpMethod().replace("-pq", "-p"),
390 self.sysc.scpUser, self.sysc.scpServer(),
391 casuName, wfauName))
392 toingSciFile.writetheline(wfauName)
393 else:
394 toingSciFixFile.writetheline(sciDict[fileGroup][0][1])
395
396
397 xferSciFile.close()
398 xferSciFile.chmod(0744)
399 xferFiFile.close()
400 xferFiFile.chmod(0744)
401 Logger.addMessage("Transfer scripts written to %s, %s" % (
402 xferSciFile.name, xferFiFile.name))
403 toingSciFile.close()
404 toingSciFixFile.close()
405 toingFiFile.close()
406 Logger.addMessage("Ingest lists written to %s, %s, %s" % (
407 toingSciFile.name, toingSciFixFile.name, toingFiFile.name))
408 else:
409 Logger.addMessage("No files missing!")
410
411 Logger.dump(file(logFileName, 'w'))
412
413
414
415 @staticmethod
417 if method == 'lt':
418 return x < y
419 elif method == 'ne':
420 return x != y
421
422
423
424
425
426 if __name__ == "__main__":
427
428 CLI.progOpts += [
429 CLI.Option('C', "fixcats",
430 "fix WFAU timestamps from CASU for catalogues"),
431 CLI.Option('P', "fixpix",
432 "fix WFAU timestamps from CASU for pixel files"),
433 CLI.Option('I', "ignorepad",
434 "ignore 2880 bit padding on WFAU FITS files"),
435 CLI.Option('T', "timecmp", "use WFAU 'lt'/'ne'/'gt' CASU as time"
436 " comparison method for xfer output",
437 "MODE", 'lt', isValOK=lambda x: x.lower() in (
438 "lt", "ne", "gt")),
439 CLI.Option('b', "begindate",
440 "first date to process, eg. 20050101",
441 "DATE", str(IngCuSession.beginDateDef)),
442 CLI.Option('e', "enddate",
443 "last date to process, eg. 20050131",
444 "DATE", str(IngCuSession.endDateDef)),
445 CLI.Option('j', "janet",
446 "use JANET instead of UKLight"),
447 CLI.Option('m', "missing", "only report missing files"),
448 CLI.Option('o', "outpath",
449 "directory where the output data is written to",
450 "PATH", './'),
451 CLI.Option('s', "stats", "create a file containing f(ull) or "
452 " d(iff) stats",
453 "MODE", '', isValOK=lambda x: x.lower() in "fd"),
454 CLI.Option('v', "version",
455 "overwrite CASU version with this version",
456 "STR", ''),
457 CLI.Option('w', "casudirs",
458 "main CASU data directories (depending on given archive)",
459 "PATH", ''),
460 CLI.Option('z', "casutime", "only report files with newer CASU time"),
461 ]
462
463 cli = CLI(CheckTransfer, "$Revision: 9976 $")
464
465 checkTransfer = CheckTransfer(cli.getOpt("curator"),
466 cli.getArg("database"),
467 cli.getOpt("begindate"),
468 cli.getOpt("enddate"),
469 cli.getOpt("casudirs"),
470 cli.getOpt("outpath"),
471 cli.getOpt("stats"),
472 cli.getOpt("missing"),
473 cli.getOpt("casutime"),
474 cli.getOpt("fixcats"),
475 cli.getOpt("fixpix"),
476 cli.getOpt("version"),
477 cli.getOpt("ignorepad"),
478 cli.getOpt("timecmp"),
479 cli.getOpt("janet"),
480 cli.getOpt("test"),
481 cli.getArg("comment"))
482 checkTransfer.run()
483