1
2
3
4 """
5 Invoke CU2. Make JPEGs of pixel data. This will make compressed images for
6 all pixel data in the input transfer log
7
8 @author: E. Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10
11 @newfield contributors: Contributors, Contributors (Alphabetical Order)
12 @contributors: I. Bond, R.S. Collins
13 """
14
15 from collections import defaultdict
16 import dircache
17 import inspect
18 from itertools import islice
19 import os
20 import re
21 import math
22 import multiprocessing
23 import numpy
24 import shutil
25 import Image, ImageDraw, ImageEnhance, ImageFont, ImageOps
26 import pywcs
27
28 from wsatools.CLI import CLI
29 import wsatools.DataFactory as df
30 import wsatools.DbConnect.DbConstants as dbc
31 from wsatools.DbConnect.DbSession import DbSession
32 from wsatools.File import File
33 import wsatools.FitsUtils as fits
34 from wsatools.DbConnect.IngCuSession import IngCuSession
35 from wsatools.DbConnect.IngIngester import IngestLogger as Logger, \
36 IngestLogFile
37 import wsatools.jpeg as jpeg
38 from wsatools.SystemConstants import SystemConstants
39 import wsatools.Utilities as utils
45
49
50
51
52 numprocessors = 1
53
54
55
57 """
58 Create a pool using all CPUs and farm out the calculations.
59
60 @param inputs: List containing argument tuples for JPG calculation.
61 @type inputs: list(tuple(str))
62
63 """
64 status = 0
65 numprocesses = (len(inputs) if len(inputs) <= JpgCalc.numprocessors else
66 JpgCalc.numprocessors)
67 chunksize = 1 + max(0, min(100, len(inputs) / numprocesses))
68 pool = multiprocessing.Pool(processes=numprocesses, maxtasksperchild=1)
69 try:
70 pool_outputs = pool.imap(calculatestar, inputs, chunksize)
71 for entry in pool_outputs:
72 if "False" in entry:
73 Logger.addMessage(entry)
74 except KeyError, details:
75 print details
76 print inputs
77 status = 1
78 except IndexError:
79 status = 1
80 except KeyboardInterrupt:
81 Logger.addMessage(
82 "Got ^C while pool mapping, terminating the pool.")
83 pool.terminate()
84 Logger.addMessage("pool is terminated.")
85 except Exception as e:
86 print e
87 status = 1
88 if status == 1:
89 pool.terminate()
90 raise PoolIndexError
91 pool.close()
92 pool.join()
93 del pool
94
95
96
98 """
99 Calculate JPGs.
100 @param args: Arguments for createCompressedImage.
101 @type args: args
102
103 """
104 result = self.createCompressedImage(*args)
105 return '%s(%s) = %s' % (self.createCompressedImage.__name__, args,
106 result)
107
108
109
111 """
112 Creates a JPEG compressed image of the given FITS file.
113
114 @param fileName: Full path to the FITS file.
115 @type fileName: str
116 @param dateVersStr: Date-version string of the processed file batch.
117 @type dateVersStr: str
118
119 """
120 self.sysc = IngCuSession.sysc
121
122 isOverlay = not any(part in fileName
123 for part in ["_p1_", "_p2_", "_p3_"])
124
125 try:
126 fitsFileHDUs = fits.open(fileName)
127 except IOError as error:
128
129 Logger.setEchoOn()
130 Logger.addExceptionMessage(error)
131 Logger.addMessage("in file: %s" % fileName)
132 return False
133
134
135 arrays = [hduNo for hduNo, hdu in enumerate(fitsFileHDUs)
136 if hdu.header["NAXIS"] is 2 and
137 (hdu.header.has_key("SIMPLE")
138 or hdu.header.get("XTENSION") == "IMAGE"
139 or hdu.header.get("XTENSION") == "BINTABLE" and
140 hdu.header.has_key("ZIMAGE"))]
141
142 try:
143 arrows = self.getArrowCoords(fitsFileHDUs, arrays)
144 if Cu2.verbose > 3:
145 Logger.addMessage("-> %r" % arrows)
146 except IOError as error:
147
148 Logger.setEchoOn()
149 Logger.addExceptionMessage(error)
150 Logger.addMessage("in file: %s" % fileName)
151 return False
152 except Exception as error:
153 if error.__class__.__name__ in ["SingularMatrixError"]:
154 isOverlay = False
155 Logger.addMessage("No WCS in file: %s."
156 " No overlay created." % fileName)
157 else:
158 Logger.addExceptionMessage(error)
159 Logger.addMessage("in file: %s" % fileName)
160 return False
161
162
163 compMap = {}
164 for arrayNum in arrays:
165 hduNum = (arrayNum if self.sysc.tileSuffix in fileName
166 else fitsFileHDUs[arrayNum].header[self.sysc.hduNumKey])
167 compMap[str(arrayNum + 1)] = "%s_%s.jpg" % \
168 (self.getJpegName(fileName, jpgPath), hduNum)
169
170 fitsFileHDUs.close()
171
172 if Cu2.verbose == 0:
173 Logger.setEchoOff()
174
175 try:
176 if Cu2.verbose > 0:
177 Logger.addMessage("MEF = %s %r" % (fileName, arrays))
178
179 info = jpeg.mef2jpeg(mef=fileName,
180 scrunch=self.sysc.scrunch, quality=Cu2.quality,
181 contrast=Cu2.contrast, hdumap=compMap)
182 if not info:
183 raise FitsIOError(fileName)
184
185 for hdu in info:
186 if isOverlay and arrows:
187 image = Image.open(compMap[hdu])
188 if Cu2.verbose > 2:
189 Logger.addMessage("%s :: %r %r" % (hdu, arrows[hdu],
190 info[hdu][3]))
191 self.makeOverlay(image, arrows[hdu], info[hdu][3])
192 image.save(compMap[hdu], "JPEG", filter=Image.BILINEAR)
193 del image
194 status = True
195 except Exception as error:
196
197 Logger.setEchoOn()
198 Logger.addExceptionMessage(error)
199 status = False
200 finally:
201 pass
202 Logger.setEchoOn()
203
204 return status
205
206
207
209 """Calculate unit coords for the N/E arrows.
210 @param fitsFileHDUs: The FITS file HDU list the coords should be
211 created for.
212 @type fitsFileHDUs: pyfits HDU list
213 @param arrays: List of available HDUs.
214 @type arrays: list
215
216 """
217 wcsDict = defaultdict()
218 for hduNo, hdu in enumerate(fitsFileHDUs):
219 wcsDict[hduNo] = pywcs.WCS(hdu.header)
220
221 arrows = defaultdict(tuple)
222 for hduNo in arrays:
223 crpix = wcsDict[hduNo].wcs.crpix
224 crval = wcsDict[hduNo].wcs.crval
225 latType = wcsDict[hduNo].wcs.lattyp
226 longType = wcsDict[hduNo].wcs.lngtyp
227 raidx = decidx = -1
228 if latType == "RA" and longType == "DEC":
229 raidx = wcsDict[hduNo].wcs.lat
230 decidx = wcsDict[hduNo].wcs.lng
231 elif latType == "DEC" and longType == "RA":
232 raidx = wcsDict[hduNo].wcs.lng
233 decidx = wcsDict[hduNo].wcs.lat
234
235 if raidx >= 0 and decidx >= 0:
236 theN = [crval[raidx], crval[decidx] + 1.0]
237 theE = [crval[raidx] + 1.0, crval[decidx]]
238
239 skycrd = numpy.array([theN, theE], numpy.float_)
240 pixN, pixE = (
241 wcsDict[hduNo].wcs_sky2pix(skycrd, 1) - crpix)
242 pixC = [0.0, 0.0]
243 lenN = math.sqrt(pixN[0]**2 + pixN[1]**2)
244 lenE = math.sqrt(pixE[0]**2 + pixE[1]**2)
245 arrows[str(hduNo + 1)] = numpy.round_(numpy.array(
246 [pixN/lenN, pixC, pixE/lenE], numpy.float_), 3)
247 elif not "dark" in fitsFileHDUs.filename():
248 Logger.addMessage("WrongCoordSystem: %s[%d]" % (
249 fitsFileHDUs.filename(), hduNo))
250 return arrows
251
252
253
273
274
275
277 """
278 Make the N/E axes overlay.
279
280 @param image: The image for which the overlay is produced.
281 @type image: PIL image object
282 @param thetaRad: Rotation angle of the overlay in radians.thetaRad=0.0
283 @type thetaRad: double
284
285 @return: The updated image.
286 @rtype: PIL image object
287
288 """
289 overFont = ImageFont.truetype(
290 '/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans.ttf',
291 (25 if self.sysc.isWSA() else 12))
292
293
294 overSize = (120 if self.sysc.isWSA() else 60)
295 overBox = (10, 10, overSize + 10, overSize + 10)
296
297 overlay = image.crop(overBox)
298 enhancer = ImageEnhance.Brightness(overlay)
299 overlay = enhancer.enhance(1.1)
300 del enhancer
301 image.paste(overlay, overBox)
302 del overlay
303
304
305 xy0 = (overSize / 2, overSize / 2)
306
307
308
309 lineWidth = overSize / 60
310 xyLine = map(tuple, arrows * (1, -1) * overSize / 2)
311 offSet = arrows.sum(0) * overSize * (-1, 1) / 3
312 xyLine = self.coordAdd(offSet, xyLine)
313 if Cu2.verbose > 2:
314 Logger.addMessage("XYL: %r" % (xyLine))
315
316
317 arrLength, arrWidth = ((7, 5) if self.sysc.isWSA() else (3.5, 2.5))
318 arrowTipN = self.coordAdd(
319 xyLine[0],
320 self.coordRot(((arrWidth, arrLength), (0, 0),
321 (-arrWidth, arrLength)), -thetaRad))
322 arrowTipE = self.coordAdd(xyLine[2],
323 self.coordRot(((-arrLength, -arrWidth), (0, 0),
324 (-arrLength, arrWidth)), -thetaRad))
325
326
327 overlay = Image.new('L', (overSize, overSize))
328 draw = ImageDraw.Draw(overlay)
329 draw.line(self.coordAdd(xy0, xyLine), width=lineWidth, fill=255)
330 draw.line(self.coordAdd(xy0, arrowTipN) , width=lineWidth, fill=255)
331 draw.line(self.coordAdd(xy0, arrowTipE) , width=lineWidth, fill=255)
332 overlay = overlay.rotate(0, resample=Image.BILINEAR, expand=0)
333 del draw
334
335
336 draw = ImageDraw.Draw(overlay)
337 offsetN = numpy.divide(draw.textsize('N', font=overFont), 2.)
338 offsetE = numpy.divide(draw.textsize('E', font=overFont), 2.)
339 textCN = self.coordAdd(xyLine[0],
340 self.coordRot((0, -overSize / 8.5), -thetaRad))
341 textCE = self.coordAdd(xyLine[2],
342 self.coordRot((overSize / 8.5, 0), -thetaRad))
343 if Cu2.verbose > 2:
344 Logger.addMessage("OFF: %r %r [%r:%r]" %(offsetN,offsetE,
345 textCN, textCE))
346 xyNlab = self.coordAdd(textCN, -offsetN)
347 xyElab = self.coordAdd(textCE, -offsetE)
348
349
350 draw.text(self.coordAdd(xy0, xyNlab), 'N', font=overFont, fill=255)
351 draw.text(self.coordAdd(xy0, xyElab), 'E', font=overFont, fill=255)
352 image.paste(ImageOps.colorize(overlay, (0, 0, 0), (0, 0, 0)),
353 (10, 10), overlay)
354 del draw
355 del overlay
356 return image
357
358
359
361 """
362 Routine to add one coordinate pair to another or a list of coordinates.
363
364 @param coord1: Single coordinate (x,y).
365 @type coord1: tuple
366 @param coord2: Single coordinate or list of coordinates.
367 @type coord2: tuple or list(tuples)
368
369 @return: Added coordinates.
370 @rtype: tuple or list(tuples)
371
372 """
373 if type(coord2[0]) in [type([]), type(())]:
374 return list([tuple(numpy.add(coord1, xy)) for xy in coord2])
375 else:
376 return tuple([tuple(xy) for xy in numpy.add(coord1, [coord2])][0])
377
378
379
381 """
382 Routine to rotate coordinate pairs (x,y).
383
384 @param coord: Single coordinate or list of coordinates.
385 @type coord: tuple or list(tuples)
386 @param thetaRad: Rotation angle in radians.
387 @type thetaRad: double
388
389 @return: Rotated coordinates.
390 @rtype: tuple or list(tuples)
391
392 """
393 costheta = math.cos(thetaRad)
394 sintheta = math.sin(thetaRad)
395
396 rotMatrix = numpy.array([[costheta, -sintheta], [sintheta, costheta]])
397 if type(coord[0]) in [type([]), type(())]:
398 return [tuple(numpy.dot(xy, rotMatrix)) for xy in coord]
399 else:
400 return tuple(numpy.dot(coord, rotMatrix))
401
409
410
411
412 -class Cu2(IngCuSession):
413 """
414 Make JPEGS of pixel data. This will make compressed images for all pixel
415 data in the input transfer log.
416
417 """
418 maxNum = 1000
419 contrast = 0.25
420 quality = 90
421 maxReTry = 3
422 verbose = 0
423 cu2Path = \
424 os.path.join(SystemConstants.curationCodePath(), "invocations/cu2")
425
426 - def __init__(self,
427 curator=CLI.getOptDef("curator"),
428 database=DbSession.database,
429 beginDate=CLI.getOptDef("begin"),
430 endDate=CLI.getOptDef("end"),
431 subDir=CLI.getOptDef("subdir"),
432 versionStr=CLI.getOptDef("version"),
433 xferLog=CLI.getOptDef("xferlog"),
434 jpgPath=CLI.getOptDef("outpath"),
435 forceMosaic=CLI.getOptDef("mosaic"),
436 reDo=CLI.getOptDef("redo"),
437 numThreads=CLI.getOptDef("threads"),
438 deprecation=CLI.getOptDef("deprecation"),
439 isTrialRun=DbSession.isTrialRun,
440 comment=CLI.getArgDef("comment")):
441 """
442 @param curator: Name of curator.
443 @type curator: str
444 @param comment: Descriptive comment as to why curation task is
445 being performed.
446 @type comment: str
447 @param beginDate: First date to process, eg. 20050101.
448 @type beginDate: int
449 @param deprecation: Modus for deprecating files and file containing
450 pairs of 'old path,new path' of deprecated
451 FITS files.
452 @type deprecation: str
453 @param endDate: Last date to process, eg. 20050131.
454 @type endDate: int
455 @param forceMosaic: Create jpg for mosaic.
456 @type forceMosaic: bool
457 @param isTrialRun: If True, do not perform database modifications.
458 @type isTrialRun: bool
459 @param jpgPath: Directory where 'products/jpgs' will reside.
460 If isTrialRun, the log will be written there
461 as well.
462 @type jpgPath: str
463 @param numThreads: Number of multiprocessing threads.
464 @type numThreads: int
465 @param reDo: If True, overwrite existing jpg (CU2).
466 @type reDo: bool
467 @param subDir: fits subdirectory, eg. 'products/stacks'
468 @type subDir: str
469 @param versionStr: Version number of the data.
470 @type versionStr: str
471 @param xferLog: Logfile containing files to be jpeged.
472 @type xferLog: str
473
474 """
475 typeTranslation = {"curator":str,
476 "database":str,
477 "beginDate":type(IngCuSession.beginDateDef),
478 "endDate":type(IngCuSession.endDateDef),
479 "versionStr":str,
480 "xferLog":str,
481 "subDir":str,
482 "jpgPath":str,
483 "forceMosaic":bool,
484 "reDo":bool,
485 "numThreads":int,
486 "deprecation":str,
487 "isTrialRun":bool,
488 "comment":str}
489
490 super(Cu2, self).attributesFromArguments(
491 inspect.getargspec(Cu2.__init__)[0], locals(),
492 types=typeTranslation)
493
494
495 super(Cu2, self).__init__(cuNum=2,
496 curator=self.curator,
497 comment=self.comment,
498 reqWorkDir=False,
499 database=self.database,
500 autoCommit=False,
501 isTrialRun=self.isTrialRun)
502
503 Cu2.cu2Path = os.path.join(self.sysc.curationCodePath(),
504 "invocations/cu2")
505
506 if self.isTrialRun and self.jpgPath != CLI.getOptDef("outpath"):
507 self.csvSharePath = self.jpgPath
508 else:
509 self.csvSharePath = self.sysc.dbSharePath()
510
511
512 self.numprocessors = (multiprocessing.cpu_count()
513 if self.numThreads == 0 else self.numThreads)
514
515
516 self._initFromDB()
517
518
519 self.fitsDirs = fits.FitsList(self.sysc, prefix="all_")
520 self.fitsDirs.createFitsDateDict(
521 self.sysc.availableRaidFileSystem(), self.subDir,
522 beginDateStr=str(self.beginDate), endDateStr=str(self.endDate),
523 versionStr=self.versionStr)
524
525
526 self.jpegDirs = fits.FitsList(self.sysc, 'jpg')
527 self.jpegDirs.createFitsDateDict(
528 ingestDirectory=self.sysc.compressImDir,
529 beginDateStr=str(self.beginDate), endDateStr=str(self.endDate),
530 versionStr=self.versionStr, forceLists=True)
531
532
533 for dateVersStr in self.fitsDirs.invFitsDateDict:
534 self.cleanUp(dateVersStr)
535
536
537 self.maxNumExts = self.sysc.maxHDUs - 1
538 self.numTileExt = {True: 1, False: self.maxNumExts}
539
540
541
569
570
571
573 """ Do CU2. """
574 Logger.addMessage(''.join(["Started CU", str(self.cuNum),
575 " from: ", str(self.beginDate),
576 " to: ", str(self.endDate)]))
577
578
579 _logFile = Logger(self.createLogFileName())
580
581
582 self._outPrefix = self.createFilePrefix(
583 self.cuNum, self._cuEventID, self._hostName,
584 self.database.rpartition('.')[2])
585
586
587
588 if self.deprecation:
589 self.deprecationMode, self.deprecationListFile = \
590 self.deprecation.split(',')
591 Logger.addMessage("Creating jpeg date dictionary...")
592 self.createJpegDateDict()
593 deprFitsDict = self.readDeprecationLog(self.deprecationListFile)
594 Logger.addMessage("")
595 self.deprecateJpgs(deprFitsDict, self.deprecationMode)
596
597 try:
598 indexError = True
599 reTry = Cu2.maxReTry
600 while indexError and reTry > 0:
601 indexError = False
602
603
604
605 if reTry != Cu2.maxReTry:
606 self.jpegDirs = fits.FitsList(self.sysc, 'jpg')
607 self.jpegDirs.createFitsDateDict(
608 ingestDirectory=self.sysc.compressImDir,
609 beginDateStr=str(self.beginDate),
610 endDateStr=str(self.endDate),
611 versionStr=self.versionStr, forceLists=True)
612
613 for dateVersStr in self.fitsDirs.invFitsDateDict:
614 self.cleanUp(dateVersStr)
615
616
617 JpgCalc.numprocessors = (self.numprocessors if reTry > 1
618 else 1)
619
620
621 self.checkCu2()
622 try:
623
624 self.makeJPEGs()
625 except PoolIndexError:
626 Logger.addMessage("<WARNING> Index Error.")
627 indexError = True
628 reTry -= 1
629 if reTry == 0:
630 Logger.addMessage(
631 "<ERROR> Retried %d times, Index Error persists." %
632 Cu2.maxReTry)
633
634
635 ingestLogFile = IngestLogFile(os.path.join(
636 self.dbMntPath, self._outPrefix + ".log"))
637
638
639 timestamp = utils.makeMssqlTimeStamp()
640 histories = {}
641
642 dateList = [self.beginDate, self.endDate,
643 self.versionStr, self.subDir]
644 if self.jpgPath:
645 dateList.append(os.path.join(self.jpgPath,
646 self.sysc.compressImDir))
647 histories.setdefault("DCH", []).extend(dateList)
648
649 histories.setdefault("ACH", []).append(
650 [self._cuEventID, self.cuNum, _logFile.pathName,
651 dbc.charDefault(), timestamp, self.curator,
652 self.comment, dbc.yes()])
653
654
655 histories.setdefault("PCH", []).append([])
656
657 ingestLogFile.pickleWrite([], {}, histories)
658 Logger.addMessage("Curation History info written to "
659 + ingestLogFile.name)
660 Logger.addMessage("Making jpgs finished.")
661 Logger.addMessage("Please, run ./IngestCUFiles.py -i -r %s %s"
662 % (ingestLogFile.name, self.database))
663
664 except Exception as error:
665
666 Logger.addExceptionDetails(error)
667
668
669 for dateVersStr in self.fitsDirs.invFitsDateDict:
670 self.cleanUp(dateVersStr)
671
672 Logger.addMessage("Log written to " + _logFile.pathName)
673 Logger.dump(file(_logFile.pathName, 'w'))
674 Logger.reset()
675
676
677
679 """Checks if files are already processed by CU2. Creates lists of
680 files and completely non-processed directories.
681
682 """
683 Logger.addMessage("%50s %6s %8s %11s" % (
684 "date", "fits", "jpg/" + str(self.maxNumExts),
685 "sum(jpg)/" + str(self.maxNumExts)))
686 statusMessages = []
687 duplicateList = []
688 for dateVersStr in self.fitsDirs.invFitsDateDict:
689 notProcDirs = []
690 notProcessedFiles = []
691 processedFiles = []
692
693 if not self.xferLog:
694 fitsList = list(self.fitsDirs.allFiles(
695 '*' + self.sysc.mefType, os.path.join(
696 self.fitsDirs.invFitsDateDict[dateVersStr], dateVersStr)))
697 else:
698 xferLogFile = File(self.xferLog)
699 xferLogFile.ropen()
700 fitsList = xferLogFile.readlines(
701 commentChar='#', omitValues=[self.sysc.catSuffix])
702 xferLogFile.close()
703
704
705 jpgSum = 0
706 if dateVersStr in self.jpegDirs.invFitsDateDict:
707
708 jpegDict = {}
709 for jpgDir in self.jpegDirs.invFitsDateDict[dateVersStr]:
710 jpegList = (list(self.jpegDirs.allFiles(
711 '*' + self.sysc.jpgType,
712 os.path.join(jpgDir, dateVersStr))))
713 tileList = (list(self.jpegDirs.allFiles(
714 '*' + self.sysc.tileSuffix + '*' + self.sysc.jpgType,
715 os.path.join(jpgDir, dateVersStr))))
716 mosaicList = (list(self.jpegDirs.allFiles(
717 '*' + self.sysc.mosaicSuffix + '*' + self.sysc.jpgType,
718 os.path.join(jpgDir, dateVersStr))))
719 numJpgs = (len(jpegList) - len(tileList) - len(mosaicList))\
720 / self.maxNumExts + len(tileList) + len(mosaicList)
721 jpgSum += numJpgs
722 for elem in jpegList:
723 jpgName = os.path.split(elem)[1].rpartition('_')[0]
724 if jpgName in jpegDict:
725 jpegDict[jpgName] += 1
726 else:
727 jpegDict[jpgName] = 1
728
729
730 Logger.addMessage("%50s %6s %8s %11s" % (
731 os.path.join(jpgDir, dateVersStr),
732 len(fitsList), numJpgs, jpgSum))
733
734
735 for item in fitsList:
736 fileName = os.path.splitext(os.path.split(item)[1])[0]
737 filePath = os.path.join(
738 self.fitsDirs.invFitsDateDict[dateVersStr],
739 dateVersStr, item)
740 fpnum = notProcessedFiles.count(filePath)
741 if self.sysc.catSuffix in fileName:
742 pass
743 elif self.reDo:
744 notProcessedFiles.append(filePath)
745
746 elif self.sysc.mosaicSuffix in fileName and \
747 fileName.split('_')[2] == "dp":
748 if self.forceMosaic:
749 notProcessedFiles.append(filePath)
750 else:
751 pass
752
753 elif fileName not in jpegDict and \
754 self.sysc.catSuffix not in fileName and fpnum == 0:
755 notProcessedFiles.append(filePath)
756
757 elif fileName in jpegDict and fpnum == 0 and \
758 jpegDict[fileName] < self.numTileExt[
759 (self.sysc.tileSuffix in fileName
760 or self.sysc.mosaicSuffix in fileName)]:
761 notProcessedFiles.append(filePath)
762
763 elif fileName in jpegDict and \
764 jpegDict[fileName] > self.numTileExt[ \
765 (self.sysc.tileSuffix in fileName
766 or self.sysc.mosaicSuffix in fileName)]:
767 duplicateList.append(
768 (self.jpegDirs.invFitsDateDict[dateVersStr],
769 fileName, jpegDict[fileName]))
770
771 else:
772 processedFiles.append(filePath)
773 if fpnum > 0:
774 notProcessedFiles.remove(filePath)
775 else:
776 notProcDirs.append(os.path.join(
777 self.fitsDirs.invFitsDateDict[dateVersStr], dateVersStr))
778
779 Logger.addMessage("%50s %6s %8s %11s" % (os.path.join(
780 self.fitsDirs.invFitsDateDict[dateVersStr], dateVersStr),
781 len(fitsList), 0, jpgSum))
782
783
784 if notProcessedFiles:
785 outFile = File(os.path.join(
786 Cu2.cu2Path, "reprocfiles_%s.log" % dateVersStr))
787 statusMessages.append("%d files to be processed are in: %s" %
788 (len(notProcessedFiles), outFile.name))
789 outFile.wopen()
790 outFile.writelines(notProcessedFiles)
791 outFile.close()
792 self.createPixLists(dateVersStr, notProcessedFiles, Cu2.maxNum)
793
794
795 if notProcDirs:
796 outFile = File(os.path.join(
797 Cu2.cu2Path, "notprocdirs_%s.log" % dateVersStr))
798 statusMessages.append("%d dirs to be processed are in: %s" %
799 (len(notProcDirs), outFile.name))
800 outFile.wopen()
801 outFile.writelines(notProcDirs)
802 outFile.close()
803 self.createPixLists(dateVersStr, fitsList, Cu2.maxNum)
804
805
806 if processedFiles:
807 outFile = File(os.path.join(
808 Cu2.cu2Path, "procfiles_%s.log" % dateVersStr))
809 statusMessages.append("%d processed files are in: %s" %
810 (len(processedFiles), outFile.name))
811 outFile.wopen()
812 outFile.writelines(processedFiles)
813 outFile.close()
814
815
816 if duplicateList:
817 log_time = utils.makeTimeStamp()
818 log_time = log_time[:log_time.find('.')].replace(' ', '_')
819 outFile = File("duplicatejpgs_" + log_time + ".log")
820 statusMessages.append("%s duplicate jpgs are in: %s" %
821 (len(duplicateList), outFile.name))
822 print duplicateList
823 outFile.wopen()
824 outFile.writelines(duplicateList)
825 outFile.close()
826
827 for line in statusMessages:
828 Logger.addMessage(line)
829
830
831
833 """
834 Cleanup the cu2 directory for the given datumVersStr.
835
836 @param datumVersStr: Date and version of search.
837 @type datumVersStr: str
838
839 """
840 cmd = ' '.join(["rm -f",
841 os.path.join(Cu2.cu2Path, "*%s*.log" % datumVersStr),
842 os.path.join(Cu2.cu2Path, "*%s.tbl" % datumVersStr)])
843 os.system(cmd)
844
845
846
848 """
849 Create the daily filelists.
850
851 @param disks: List of RAID file system disks.
852 @type disks: list(str)
853
854 """
855 disks = disks or self.sysc.availableRaidFileSystem()
856 self.jpegDateDict = defaultdict(list)
857 for disk in disks:
858 jdir = os.path.join(disk, self.sysc.compressImDir)
859 if os.path.exists(jdir):
860 listdir = [elem for elem in dircache.listdir(jdir)
861 if elem[:2] == "20" and elem.find('_v') > 0]
862 if listdir:
863 self.jpegDateDict[jdir] = listdir
864
865 self.invJpegDateDict = utils.invertDict(self.jpegDateDict)
866
867
868
870 """
871 Create pixelfile lists from the given file list.
872
873 @param dateVersStr: Date and version of search.
874 @type dateVersStr: str
875 @param fileList: List of all files that need processing.
876 @type fileList: list(str)
877 @param maxNum: Maximum number of fits files per written list.
878 @type maxNum: int
879
880 """
881 if len(fileList) > 0:
882 pixCounter = (1 if maxNum > 0 else 0)
883 fileCounter = 0
884 for fitsList in self.grouper(fileList, maxNum):
885 fileCounter += 1
886 pixFile = File(os.path.join(
887 Cu2.cu2Path, "pixlist%s_%03d.log" % (
888 dateVersStr, fileCounter)))
889 pixFile.wopen()
890 for fitsName in fitsList:
891 pixFile.writetheline(fitsName)
892 pixFile.close()
893
894
895
896 @staticmethod
898 i = iter(iterable)
899 while True:
900 tmpr = [list(islice(i, int(chunksize)))]
901 if not tmpr[0]:
902 break
903 yield tmpr.pop()
904
905
906
908 """
909 Create the JPEGs from given pixlist* files.
910 """
911
912 curPath = os.curdir
913 os.chdir(Cu2.cu2Path)
914
915 for dateVersStr in self.fitsDirs.invFitsDateDict:
916 pixLists = [x for x in os.listdir(Cu2.cu2Path)
917 if x.startswith("pixlist") and dateVersStr in x]
918
919 for pixListFileName in pixLists:
920 pixListFile = File(pixListFileName)
921 Logger.addMessage("Running Curation Use case %s on %s" %
922 (self.cuNum, pixListFile.name))
923 pixListFile.ropen()
924 fitsFileList = pixListFile.readlines()
925 pixListFile.close()
926
927
928 inputs = [(fileName, dateVersStr, self.jpgPath)
929 for fileName in fitsFileList]
930
931
932 c = JpgCalc()
933 c.pooling(inputs)
934 del c
935 os.chdir(curPath)
936
937
938
940 """Read the transfer log containing pairs of 'old FITS filename,
941 new FITS filename' and create a dictionary {old: new} from it.
942
943 @param deprLog: Logfile containing fits file pairs.
944 @type deprLog: str
945
946 """
947 xferLogFile = File(deprLog)
948 xferLogFile.ropen()
949 result = xferLogFile.readlines(commentChar='#')
950 xferLogFile.close()
951 return dict([r.split(',') for r in result
952 if not self.sysc.catSuffix in r])
953
954
955
956
958 """
959 Deprecate files in fitsFileDict to the directory for deprecated files.
960
961 @param fitsFileDict: Translation dict from old to new fits filenames.
962 @type fitsFileDict: dict(str: str)
963 @param deprMode: Mode for handling the deprecated files: move ('mv'),
964 remove ('rm'), leave in place ('ip')
965 @type deprMode: str
966
967 """
968 re_date = re.compile(r'((\d{8})\Z)')
969
970 Logger.addMessage("Deprecating files:")
971 deprLogTime = utils.makeTimeStamp()
972 if deprMode in ["mv", "ip"]:
973 depracatedListFile = File(
974 os.path.join(self.sysc.xferLogPath(), ''.join(
975 [self._outPrefix, "_deprecatedjpgs_", deprMode, '_',
976 deprLogTime.replace(' ', '_'), ".log"])))
977 depracatedListFile.wopen()
978
979 deprJpgList = []
980 for fileName in fitsFileDict:
981 newFitsFile = File(fitsFileDict[fileName])
982 deprTime = re_date.search(newFitsFile.commondir).group()
983 oldJpgNameRoot = os.path.join(newFitsFile.subdir, newFitsFile.root)
984 if deprMode == "mv":
985 spacePerDisk = \
986 utils.getDiskSpace(self.sysc.deprecatedDataStorage)
987
988 deprPath = os.path.join(
989 utils.getNextDisk(self.sysc, spacePerDisk),
990 '%s_%s' % (self.sysc.deprecatedComprImDir, deprTime),
991 newFitsFile.subdir)
992
993 utils.ensureDirExist(deprPath)
994 deprName = os.path.join(deprPath, newFitsFile.root)
995
996 elif deprMode == "ip":
997 deprName = "deprecated_%s" % (deprTime)
998
999 elif deprMode == "rm":
1000 deprName = "---"
1001
1002 try:
1003 for i in range(1, self.sysc.maxHDUs):
1004 for path in self.invJpegDateDict[newFitsFile.subdir]:
1005 oldJpgName = os.path.join(
1006 path, "%s_%s.jpg" % (oldJpgNameRoot, i))
1007 if os.path.exists(oldJpgName):
1008 break
1009 if deprMode in ["mv", "ip"]:
1010 if deprMode == "mv":
1011 deprJpgName = "%s_%s.jpg" % (deprName, i)
1012 else:
1013 deprJpgName = "%s_%s.%s.jpg" % (oldJpgName,
1014 i, deprName)
1015 if not self.isTrialRun:
1016 if os.path.exists(oldJpgName):
1017 shutil.copy2(oldJpgName, deprJpgName)
1018 deprJpgList.append(oldJpgName)
1019 else:
1020 Logger.addMessage("%s doesn't exist." % \
1021 oldJpgName)
1022 Logger.addMessage(' '.join([deprMode,
1023 oldJpgName, deprJpgName]))
1024 depracatedListFile.writetheline(
1025 ','.join([oldJpgName, deprJpgName]))
1026 elif deprMode == "rm":
1027 if not self.isTrialRun:
1028 os.remove(oldJpgName)
1029 Logger.addMessage(' '.join([deprMode, oldJpgName]))
1030
1031 except:
1032 Logger.addMessage(' '.join(["Can't", deprMode,
1033 oldJpgNameRoot]))
1034 raise
1035 finally:
1036 del newFitsFile
1037
1038 if deprMode in ["mv", "ip"]:
1039 depracatedListFile.close()
1040 cmd1 = "%s/CreateMfIdJpegList.py -o %s -r %s -j %s -s %s %s %s %s %s" % (
1041 self.sysc.monitorPath(), self.sysc.helpersPath(),
1042 ','.join(self.sysc.deprecatedDataStorage),
1043 '_'.join([self.sysc.deprecatedComprImDir, deprTime]),
1044 '_'.join([self.sysc.deprecatedDir, deprTime]),
1045 self.database, self.beginDate, self.endDate, self.versionStr)
1046 Logger.addMessage("Running '%s'" % cmd1)
1047 if not self.isTrialRun:
1048 status1 = os.system(cmd1)
1049 if status1:
1050 Logger.addMessage("<ERROR> CreateMfIdJpegList.py failed!")
1051 else:
1052 for updateDB in ["dbload::WSA", "dbpub::all"]:
1053 cmd2 = "%s/UpdateJpegs.py -M %s -j %s %s/deprjpgs_WSA_%s_%s_%s.log" % (
1054 self.sysc.helpersPath(), updateDB,
1055 self.sysc.helpersPath(), self.database,
1056 self.beginDate, self.endDate, self.versionStr)
1057 Logger.addMessage("Running '%s'" % cmd2)
1058 status = os.system(cmd2)
1059 if status:
1060 Logger.addMessage(
1061 "<ERROR> UpdateJpegs.py %s failed!" % updateDB)
1062
1063 for fileName in deprJpgList:
1064 os.remove(fileName)
1065
1066 Logger.addMessage("Deprecated jpg file names are listed in %s"
1067 % depracatedListFile.name)
1068
1069
1070
1071
1072
1073
1074 if __name__ == '__main__':
1075
1076 CLI.progOpts += [
1077 CLI.Option('b', "begin", "first date to process, eg. 20050101",
1078 "DATE", str(IngCuSession.beginDateDef)),
1079 CLI.Option('e', "end", "last date to process, eg. 20050131",
1080 "DATE", str(IngCuSession.endDateDef)),
1081 CLI.Option('s', "subdir", "subdirectory containing FITS files",
1082 "DIR", IngCuSession.sysc.fitsDir),
1083 CLI.Option('v', "version", "version number of the data",
1084 "STR", '1'),
1085 CLI.Option('l', "xferlog", "xferlog of files to be jpeged",
1086 "LOGFILE", ''),
1087 CLI.Option('M', "mosaic", "force making mosaics."),
1088 CLI.Option('r', "redo", "re-do jpegs."),
1089 CLI.Option('y', "threads", "number of processors to use",
1090 "INT", '0'),
1091 CLI.Option('Z', "deprecation",
1092 "modus for deprecating files: 'mv', 'rm', 'ip'"
1093 " (leave in place)", "STR[2]", ''),
1094 CLI.Option('o', "outpath",
1095 "directory where %s will reside. Generally this is created "
1096 "on the fly. If given, the log will be written there as "
1097 "well." % SystemConstants.compressImDir,
1098 "PATH", '')]
1099
1100 cli = CLI(Cu2, "$Revision: 10242 $")
1101 Logger.addMessage(cli.getProgDetails())
1102
1103 cu2 = Cu2(cli.getOpt("curator"),
1104 cli.getArg("database"),
1105 cli.getOpt("begin"),
1106 cli.getOpt("end"),
1107 cli.getOpt("subdir"),
1108 cli.getOpt("version"),
1109 cli.getOpt("xferlog"),
1110 cli.getOpt("outpath"),
1111 cli.getOpt("mosaic"),
1112 cli.getOpt("redo"),
1113 cli.getOpt("threads"),
1114 cli.getOpt("deprecation"),
1115 cli.getOpt("test"),
1116 cli.getArg("comment"))
1117 cu2.run()
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148