Package invocations :: Package cu2 :: Module cu2
[hide private]

Source Code for Module invocations.cu2.cu2

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: cu2.py 10242 2014-03-11 12:20:21Z EckhardSutorius $ 
   4  """ 
   5     Invoke CU2. Make JPEGs of pixel data. This will make compressed images for 
   6     all pixel data in the input transfer log 
   7   
   8     @author: E. Sutorius 
   9     @org:    WFAU, IfA, University of Edinburgh 
  10   
  11     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  12     @contributors: I. Bond, R.S. Collins 
  13  """ 
  14  #------------------------------------------------------------------------------ 
  15  from   collections import defaultdict 
  16  import dircache 
  17  import inspect 
  18  from itertools import islice 
  19  import os 
  20  import re 
  21  import math 
  22  import multiprocessing 
  23  import numpy 
  24  import shutil 
  25  import Image, ImageDraw, ImageEnhance, ImageFont, ImageOps 
  26  import pywcs 
  27   
  28  from   wsatools.CLI                    import CLI 
  29  import wsatools.DataFactory                as df 
  30  import wsatools.DbConnect.DbConstants      as dbc 
  31  from   wsatools.DbConnect.DbSession    import DbSession 
  32  from   wsatools.File                   import File 
  33  import wsatools.FitsUtils                  as fits 
  34  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  35  from   wsatools.DbConnect.IngIngester  import IngestLogger as Logger, \ 
  36                                                IngestLogFile 
  37  import wsatools.jpeg                       as jpeg 
  38  from   wsatools.SystemConstants        import SystemConstants 
  39  import wsatools.Utilities                  as utils 
40 #-------------------------------------------------------------------------- 41 42 -class FitsIOError(Exception): pass
43 -class KeyboardInterruptError(Exception): pass
44 -class PoolIndexError(Exception): pass
45
46 #-------------------------------------------------------------------------- 47 48 -class JpgCalc(object):
49 50 #-------------------------------------------------------------------------- 51 # Define class constants (access as JpgCalc.varName) 52 numprocessors = 1 53 54 #-------------------------------------------------------------------------- 55
56 - def pooling(self, inputs):
57 """ 58 Create a pool using all CPUs and farm out the calculations. 59 60 @param inputs: List containing argument tuples for JPG calculation. 61 @type inputs: list(tuple(str)) 62 63 """ 64 status = 0 65 numprocesses = (len(inputs) if len(inputs) <= JpgCalc.numprocessors else 66 JpgCalc.numprocessors) 67 chunksize = 1 + max(0, min(100, len(inputs) / numprocesses)) 68 pool = multiprocessing.Pool(processes=numprocesses, maxtasksperchild=1) 69 try: 70 pool_outputs = pool.imap(calculatestar, inputs, chunksize) 71 for entry in pool_outputs: 72 if "False" in entry: 73 Logger.addMessage(entry) 74 except KeyError, details: 75 print details 76 print inputs 77 status = 1 78 except IndexError: 79 status = 1 80 except KeyboardInterrupt: 81 Logger.addMessage( 82 "Got ^C while pool mapping, terminating the pool.") 83 pool.terminate() 84 Logger.addMessage("pool is terminated.") 85 except Exception as e: 86 print e 87 status = 1 88 if status == 1: 89 pool.terminate() 90 raise PoolIndexError 91 pool.close() 92 pool.join() 93 del pool
94 95 #-------------------------------------------------------------------------- 96
97 - def calculate(self, args):
98 """ 99 Calculate JPGs. 100 @param args: Arguments for createCompressedImage. 101 @type args: args 102 103 """ 104 result = self.createCompressedImage(*args) 105 return '%s(%s) = %s' % (self.createCompressedImage.__name__, args, 106 result)
107 108 #-------------------------------------------------------------------------- 109
110 - def createCompressedImage(self, fileName, dateVersStr, jpgPath):
111 """ 112 Creates a JPEG compressed image of the given FITS file. 113 114 @param fileName: Full path to the FITS file. 115 @type fileName: str 116 @param dateVersStr: Date-version string of the processed file batch. 117 @type dateVersStr: str 118 119 """ 120 self.sysc = IngCuSession.sysc 121 # Mosaic images split into parts don't receive a N-E overlay 122 isOverlay = not any(part in fileName 123 for part in ["_p1_", "_p2_", "_p3_"]) 124 125 try: 126 fitsFileHDUs = fits.open(fileName) 127 except IOError as error: 128 # corrupt FITS file 129 Logger.setEchoOn() 130 Logger.addExceptionMessage(error) 131 Logger.addMessage("in file: %s" % fileName) 132 return False 133 134 # Just want arrays with 2D image data 135 arrays = [hduNo for hduNo, hdu in enumerate(fitsFileHDUs) 136 if hdu.header["NAXIS"] is 2 and 137 (hdu.header.has_key("SIMPLE") 138 or hdu.header.get("XTENSION") == "IMAGE" 139 or hdu.header.get("XTENSION") == "BINTABLE" and 140 hdu.header.has_key("ZIMAGE"))] 141 142 try: 143 arrows = self.getArrowCoords(fitsFileHDUs, arrays) 144 if Cu2.verbose > 3: 145 Logger.addMessage("-> %r" % arrows) 146 except IOError as error: 147 # corrupt FITS file 148 Logger.setEchoOn() 149 Logger.addExceptionMessage(error) 150 Logger.addMessage("in file: %s" % fileName) 151 return False 152 except Exception as error: 153 if error.__class__.__name__ in ["SingularMatrixError"]: 154 isOverlay = False 155 Logger.addMessage("No WCS in file: %s." 156 " No overlay created." % fileName) 157 else: 158 Logger.addExceptionMessage(error) 159 Logger.addMessage("in file: %s" % fileName) 160 return False 161 162 # Dictionary mapping HDU number to compressed image filename 163 compMap = {} 164 for arrayNum in arrays: 165 hduNum = (arrayNum if self.sysc.tileSuffix in fileName 166 else fitsFileHDUs[arrayNum].header[self.sysc.hduNumKey]) 167 compMap[str(arrayNum + 1)] = "%s_%s.jpg" % \ 168 (self.getJpegName(fileName, jpgPath), hduNum) 169 170 fitsFileHDUs.close() 171 172 if Cu2.verbose == 0: 173 Logger.setEchoOff() 174 175 try: 176 if Cu2.verbose > 0: 177 Logger.addMessage("MEF = %s %r" % (fileName, arrays)) 178 # Make the JPEGs 179 info = jpeg.mef2jpeg(mef=fileName, 180 scrunch=self.sysc.scrunch, quality=Cu2.quality, 181 contrast=Cu2.contrast, hdumap=compMap) 182 if not info: 183 raise FitsIOError(fileName) 184 # Parse output info from the jpeg module 185 for hdu in info: 186 if isOverlay and arrows: 187 image = Image.open(compMap[hdu]) 188 if Cu2.verbose > 2: 189 Logger.addMessage("%s :: %r %r" % (hdu, arrows[hdu], 190 info[hdu][3])) 191 self.makeOverlay(image, arrows[hdu], info[hdu][3]) 192 image.save(compMap[hdu], "JPEG", filter=Image.BILINEAR) 193 del image 194 status = True 195 except Exception as error: 196 # Image or Overlay couldn't be created 197 Logger.setEchoOn() 198 Logger.addExceptionMessage(error) 199 status = False 200 finally: 201 pass 202 Logger.setEchoOn() 203 204 return status
205 206 #-------------------------------------------------------------------------- 207
208 - def getArrowCoords(self, fitsFileHDUs, arrays):
209 """Calculate unit coords for the N/E arrows. 210 @param fitsFileHDUs: The FITS file HDU list the coords should be 211 created for. 212 @type fitsFileHDUs: pyfits HDU list 213 @param arrays: List of available HDUs. 214 @type arrays: list 215 216 """ 217 wcsDict = defaultdict() 218 for hduNo, hdu in enumerate(fitsFileHDUs): 219 wcsDict[hduNo] = pywcs.WCS(hdu.header) 220 221 arrows = defaultdict(tuple) 222 for hduNo in arrays: 223 crpix = wcsDict[hduNo].wcs.crpix 224 crval = wcsDict[hduNo].wcs.crval 225 latType = wcsDict[hduNo].wcs.lattyp 226 longType = wcsDict[hduNo].wcs.lngtyp 227 raidx = decidx = -1 228 if latType == "RA" and longType == "DEC": 229 raidx = wcsDict[hduNo].wcs.lat 230 decidx = wcsDict[hduNo].wcs.lng 231 elif latType == "DEC" and longType == "RA": 232 raidx = wcsDict[hduNo].wcs.lng 233 decidx = wcsDict[hduNo].wcs.lat 234 235 if raidx >= 0 and decidx >= 0: 236 theN = [crval[raidx], crval[decidx] + 1.0] 237 theE = [crval[raidx] + 1.0, crval[decidx]] 238 239 skycrd = numpy.array([theN, theE], numpy.float_) 240 pixN, pixE = ( 241 wcsDict[hduNo].wcs_sky2pix(skycrd, 1) - crpix) 242 pixC = [0.0, 0.0] 243 lenN = math.sqrt(pixN[0]**2 + pixN[1]**2) 244 lenE = math.sqrt(pixE[0]**2 + pixE[1]**2) 245 arrows[str(hduNo + 1)] = numpy.round_(numpy.array( 246 [pixN/lenN, pixC, pixE/lenE], numpy.float_), 3) 247 elif not "dark" in fitsFileHDUs.filename(): 248 Logger.addMessage("WrongCoordSystem: %s[%d]" % ( 249 fitsFileHDUs.filename(), hduNo)) 250 return arrows
251 252 #-------------------------------------------------------------------------- 253
254 - def getJpegName(self, fileName, jpgPath):
255 """Create the full JPG file path. 256 """ 257 inDisk, outDir = os.path.split(os.path.dirname(fileName)) 258 if jpgPath: 259 outDisk = jpgPath 260 elif self.sysc.testOutputPath() in inDisk: 261 outDisk = self.sysc.testOutputPath() 262 else: 263 outDisk = utils.getNextDisk(self.sysc) 264 265 outPath = os.path.join(outDisk, self.sysc.compressImDir, outDir) 266 try: 267 utils.ensureDirExist(outPath) 268 except OSError: 269 pass 270 271 return os.path.join(outPath, 272 os.path.splitext(os.path.basename(fileName))[0])
273 274 #-------------------------------------------------------------------------- 275
276 - def makeOverlay(self, image, arrows, thetaRad=0.0):
277 """ 278 Make the N/E axes overlay. 279 280 @param image: The image for which the overlay is produced. 281 @type image: PIL image object 282 @param thetaRad: Rotation angle of the overlay in radians.thetaRad=0.0 283 @type thetaRad: double 284 285 @return: The updated image. 286 @rtype: PIL image object 287 288 """ 289 overFont = ImageFont.truetype( 290 '/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans.ttf', 291 (25 if self.sysc.isWSA() else 12)) 292 293 # create a brighter box 294 overSize = (120 if self.sysc.isWSA() else 60) 295 overBox = (10, 10, overSize + 10, overSize + 10) 296 #overBox = (10,10,130,130) 297 overlay = image.crop(overBox) 298 enhancer = ImageEnhance.Brightness(overlay) 299 overlay = enhancer.enhance(1.1) 300 del enhancer 301 image.paste(overlay, overBox) 302 del overlay 303 # create the labeled arrows 304 # box centre 305 xy0 = (overSize / 2, overSize / 2) 306 #xy0 = (60,60) 307 308 # Lines 309 lineWidth = overSize / 60 310 xyLine = map(tuple, arrows * (1, -1) * overSize / 2) 311 offSet = arrows.sum(0) * overSize * (-1, 1) / 3 312 xyLine = self.coordAdd(offSet, xyLine) 313 if Cu2.verbose > 2: 314 Logger.addMessage("XYL: %r" % (xyLine)) 315 316 # Arrows 317 arrLength, arrWidth = ((7, 5) if self.sysc.isWSA() else (3.5, 2.5)) 318 arrowTipN = self.coordAdd( 319 xyLine[0], 320 self.coordRot(((arrWidth, arrLength), (0, 0), 321 (-arrWidth, arrLength)), -thetaRad)) 322 arrowTipE = self.coordAdd(xyLine[2], 323 self.coordRot(((-arrLength, -arrWidth), (0, 0), 324 (-arrLength, arrWidth)), -thetaRad)) 325 326 # draw lines and arrows 327 overlay = Image.new('L', (overSize, overSize)) 328 draw = ImageDraw.Draw(overlay) 329 draw.line(self.coordAdd(xy0, xyLine), width=lineWidth, fill=255) 330 draw.line(self.coordAdd(xy0, arrowTipN) , width=lineWidth, fill=255) 331 draw.line(self.coordAdd(xy0, arrowTipE) , width=lineWidth, fill=255) 332 overlay = overlay.rotate(0, resample=Image.BILINEAR, expand=0) 333 del draw 334 335 # N/E label 336 draw = ImageDraw.Draw(overlay) 337 offsetN = numpy.divide(draw.textsize('N', font=overFont), 2.) 338 offsetE = numpy.divide(draw.textsize('E', font=overFont), 2.) 339 textCN = self.coordAdd(xyLine[0], 340 self.coordRot((0, -overSize / 8.5), -thetaRad)) 341 textCE = self.coordAdd(xyLine[2], 342 self.coordRot((overSize / 8.5, 0), -thetaRad)) 343 if Cu2.verbose > 2: 344 Logger.addMessage("OFF: %r %r [%r:%r]" %(offsetN,offsetE, 345 textCN, textCE)) 346 xyNlab = self.coordAdd(textCN, -offsetN) 347 xyElab = self.coordAdd(textCE, -offsetE) 348 349 # draw N/E label 350 draw.text(self.coordAdd(xy0, xyNlab), 'N', font=overFont, fill=255) 351 draw.text(self.coordAdd(xy0, xyElab), 'E', font=overFont, fill=255) 352 image.paste(ImageOps.colorize(overlay, (0, 0, 0), (0, 0, 0)), 353 (10, 10), overlay) 354 del draw 355 del overlay 356 return image
357 358 #-------------------------------------------------------------------------- 359
360 - def coordAdd(self, coord1, coord2):
361 """ 362 Routine to add one coordinate pair to another or a list of coordinates. 363 364 @param coord1: Single coordinate (x,y). 365 @type coord1: tuple 366 @param coord2: Single coordinate or list of coordinates. 367 @type coord2: tuple or list(tuples) 368 369 @return: Added coordinates. 370 @rtype: tuple or list(tuples) 371 372 """ 373 if type(coord2[0]) in [type([]), type(())]: 374 return list([tuple(numpy.add(coord1, xy)) for xy in coord2]) 375 else: 376 return tuple([tuple(xy) for xy in numpy.add(coord1, [coord2])][0])
377 378 #-------------------------------------------------------------------------- 379
380 - def coordRot(self, coord, thetaRad):
381 """ 382 Routine to rotate coordinate pairs (x,y). 383 384 @param coord: Single coordinate or list of coordinates. 385 @type coord: tuple or list(tuples) 386 @param thetaRad: Rotation angle in radians. 387 @type thetaRad: double 388 389 @return: Rotated coordinates. 390 @rtype: tuple or list(tuples) 391 392 """ 393 costheta = math.cos(thetaRad) 394 sintheta = math.sin(thetaRad) 395 # Rotation Matrix 396 rotMatrix = numpy.array([[costheta, -sintheta], [sintheta, costheta]]) 397 if type(coord[0]) in [type([]), type(())]: 398 return [tuple(numpy.dot(xy, rotMatrix)) for xy in coord] 399 else: 400 return tuple(numpy.dot(coord, rotMatrix))
401
402 #------------------------------------------------------------------------------ 403 404 -def calculatestar(args):
405 try: 406 return JpgCalc().calculate(args) 407 except KeyboardInterrupt: 408 raise KeyboardInterruptError()
409
410 #------------------------------------------------------------------------------ 411 412 -class Cu2(IngCuSession):
413 """ 414 Make JPEGS of pixel data. This will make compressed images for all pixel 415 data in the input transfer log. 416 417 """ 418 maxNum = 1000 # max Number of files in seperate file lists 419 contrast = 0.25 #: Greyscale contrast: structures: 0.25; smooth: 1.042 420 quality = 90 #: JPEG quality (0-100). 421 maxReTry = 3 #: max Number of retries. 422 verbose = 0 423 cu2Path = \ 424 os.path.join(SystemConstants.curationCodePath(), "invocations/cu2") 425
426 - def __init__(self, 427 curator=CLI.getOptDef("curator"), 428 database=DbSession.database, 429 beginDate=CLI.getOptDef("begin"), 430 endDate=CLI.getOptDef("end"), 431 subDir=CLI.getOptDef("subdir"), 432 versionStr=CLI.getOptDef("version"), 433 xferLog=CLI.getOptDef("xferlog"), 434 jpgPath=CLI.getOptDef("outpath"), 435 forceMosaic=CLI.getOptDef("mosaic"), 436 reDo=CLI.getOptDef("redo"), 437 numThreads=CLI.getOptDef("threads"), 438 deprecation=CLI.getOptDef("deprecation"), 439 isTrialRun=DbSession.isTrialRun, 440 comment=CLI.getArgDef("comment")):
441 """ 442 @param curator: Name of curator. 443 @type curator: str 444 @param comment: Descriptive comment as to why curation task is 445 being performed. 446 @type comment: str 447 @param beginDate: First date to process, eg. 20050101. 448 @type beginDate: int 449 @param deprecation: Modus for deprecating files and file containing 450 pairs of 'old path,new path' of deprecated 451 FITS files. 452 @type deprecation: str 453 @param endDate: Last date to process, eg. 20050131. 454 @type endDate: int 455 @param forceMosaic: Create jpg for mosaic. 456 @type forceMosaic: bool 457 @param isTrialRun: If True, do not perform database modifications. 458 @type isTrialRun: bool 459 @param jpgPath: Directory where 'products/jpgs' will reside. 460 If isTrialRun, the log will be written there 461 as well. 462 @type jpgPath: str 463 @param numThreads: Number of multiprocessing threads. 464 @type numThreads: int 465 @param reDo: If True, overwrite existing jpg (CU2). 466 @type reDo: bool 467 @param subDir: fits subdirectory, eg. 'products/stacks' 468 @type subDir: str 469 @param versionStr: Version number of the data. 470 @type versionStr: str 471 @param xferLog: Logfile containing files to be jpeged. 472 @type xferLog: str 473 474 """ 475 typeTranslation = {"curator":str, 476 "database":str, 477 "beginDate":type(IngCuSession.beginDateDef), 478 "endDate":type(IngCuSession.endDateDef), 479 "versionStr":str, 480 "xferLog":str, 481 "subDir":str, 482 "jpgPath":str, 483 "forceMosaic":bool, 484 "reDo":bool, 485 "numThreads":int, 486 "deprecation":str, 487 "isTrialRun":bool, 488 "comment":str} 489 490 super(Cu2, self).attributesFromArguments( 491 inspect.getargspec(Cu2.__init__)[0], locals(), 492 types=typeTranslation) 493 494 # Initialize parent class 495 super(Cu2, self).__init__(cuNum=2, 496 curator=self.curator, 497 comment=self.comment, 498 reqWorkDir=False, 499 database=self.database, 500 autoCommit=False, 501 isTrialRun=self.isTrialRun) 502 503 Cu2.cu2Path = os.path.join(self.sysc.curationCodePath(), 504 "invocations/cu2") 505 506 if self.isTrialRun and self.jpgPath != CLI.getOptDef("outpath"): 507 self.csvSharePath = self.jpgPath 508 else: 509 self.csvSharePath = self.sysc.dbSharePath() 510 511 # set number of multiprocessing threads 512 self.numprocessors = (multiprocessing.cpu_count() 513 if self.numThreads == 0 else self.numThreads) 514 515 # init DB entry 516 self._initFromDB() 517 518 # create the dictionary listing all dates for every fits dir. 519 self.fitsDirs = fits.FitsList(self.sysc, prefix="all_") 520 self.fitsDirs.createFitsDateDict( 521 self.sysc.availableRaidFileSystem(), self.subDir, 522 beginDateStr=str(self.beginDate), endDateStr=str(self.endDate), 523 versionStr=self.versionStr) 524 525 # create the dictionary listing all dates for every jpeg dir. 526 self.jpegDirs = fits.FitsList(self.sysc, 'jpg') 527 self.jpegDirs.createFitsDateDict( 528 ingestDirectory=self.sysc.compressImDir, 529 beginDateStr=str(self.beginDate), endDateStr=str(self.endDate), 530 versionStr=self.versionStr, forceLists=True) 531 532 # clean up existing logs and temp files 533 for dateVersStr in self.fitsDirs.invFitsDateDict: 534 self.cleanUp(dateVersStr) 535 536 # number of secondary headers 537 self.maxNumExts = self.sysc.maxHDUs - 1 538 self.numTileExt = {True: 1, False: self.maxNumExts}
539 540 #-------------------------------------------------------------------------- 541
542 - def _initFromDB(self):
543 """initialize constants from the DB 544 """ 545 # Connect to database 546 self._connectToDb() 547 548 try: 549 if not self.isTrialRun: 550 self._cuEventID = self._getNextCuEventID() 551 552 if self.cuNum != dbc.smallIntDefault(): 553 try: 554 self._cuTable = df.CurationTaskTable(self.archive, 555 self.cuNum) 556 Logger.addMessage("Initialising Curation Use case:") 557 Logger.addMessage(self._cuTable.getDescription()) 558 Logger.addMessage(' '.join(["from:", str(self.beginDate), 559 "to:", str(self.endDate)])) 560 except ValueError, details: 561 Logger.addMessage("<Warning> " + str(details)) 562 self._cuTable = None 563 564 # Create the translation dictionary of programme identification 565 # strings to programme ID numbers for all programmes: 566 self._createProgrammeTranslation() 567 finally: 568 self._disconnectFromDb()
569 570 #-------------------------------------------------------------------------- 571
572 - def _onRun(self):
573 """ Do CU2. """ 574 Logger.addMessage(''.join(["Started CU", str(self.cuNum), 575 " from: ", str(self.beginDate), 576 " to: ", str(self.endDate)])) 577 578 # Create a log file to hold the verbose record for this invocation 579 _logFile = Logger(self.createLogFileName()) 580 581 # output file prefix 582 self._outPrefix = self.createFilePrefix( 583 self.cuNum, self._cuEventID, self._hostName, 584 self.database.rpartition('.')[2]) 585 586 # Deprecate jpgs from given transferlog containing 'old fits path, 587 # new fits path' pairs. 588 if self.deprecation: 589 self.deprecationMode, self.deprecationListFile = \ 590 self.deprecation.split(',') 591 Logger.addMessage("Creating jpeg date dictionary...") 592 self.createJpegDateDict() 593 deprFitsDict = self.readDeprecationLog(self.deprecationListFile) 594 Logger.addMessage("") 595 self.deprecateJpgs(deprFitsDict, self.deprecationMode) 596 597 try: 598 indexError = True 599 reTry = Cu2.maxReTry 600 while indexError and reTry > 0: 601 indexError = False 602 603 # recreate the dictionary listing all dates for every jpeg dir 604 # and clean-up existing logs and temp files 605 if reTry != Cu2.maxReTry: 606 self.jpegDirs = fits.FitsList(self.sysc, 'jpg') 607 self.jpegDirs.createFitsDateDict( 608 ingestDirectory=self.sysc.compressImDir, 609 beginDateStr=str(self.beginDate), 610 endDateStr=str(self.endDate), 611 versionStr=self.versionStr, forceLists=True) 612 613 for dateVersStr in self.fitsDirs.invFitsDateDict: 614 self.cleanUp(dateVersStr) 615 616 # Set num of processes to lowest on last try. 617 JpgCalc.numprocessors = (self.numprocessors if reTry > 1 618 else 1) 619 620 # check for files to process 621 self.checkCu2() 622 try: 623 # create JPEGs 624 self.makeJPEGs() 625 except PoolIndexError: 626 Logger.addMessage("<WARNING> Index Error.") 627 indexError = True 628 reTry -= 1 629 if reTry == 0: 630 Logger.addMessage( 631 "<ERROR> Retried %d times, Index Error persists." % 632 Cu2.maxReTry) 633 634 # Create ingest log file 635 ingestLogFile = IngestLogFile(os.path.join( 636 self.dbMntPath, self._outPrefix + ".log")) 637 638 # write ingest info into the ingest log file 639 timestamp = utils.makeMssqlTimeStamp() 640 histories = {} 641 # include processed dates 642 dateList = [self.beginDate, self.endDate, 643 self.versionStr, self.subDir] 644 if self.jpgPath: 645 dateList.append(os.path.join(self.jpgPath, 646 self.sysc.compressImDir)) 647 histories.setdefault("DCH", []).extend(dateList) 648 # for the update of the curation event history 649 histories.setdefault("ACH", []).append( 650 [self._cuEventID, self.cuNum, _logFile.pathName, 651 dbc.charDefault(), timestamp, self.curator, 652 self.comment, dbc.yes()]) 653 654 # for the update of the programme curation history 655 histories.setdefault("PCH", []).append([]) 656 657 ingestLogFile.pickleWrite([], {}, histories) 658 Logger.addMessage("Curation History info written to " 659 + ingestLogFile.name) 660 Logger.addMessage("Making jpgs finished.") 661 Logger.addMessage("Please, run ./IngestCUFiles.py -i -r %s %s" 662 % (ingestLogFile.name, self.database)) 663 664 except Exception as error: 665 # Log the details of the exception: 666 Logger.addExceptionDetails(error) 667 668 # clean up existing logs and temp files 669 for dateVersStr in self.fitsDirs.invFitsDateDict: 670 self.cleanUp(dateVersStr) 671 672 Logger.addMessage("Log written to " + _logFile.pathName) 673 Logger.dump(file(_logFile.pathName, 'w')) 674 Logger.reset()
675 676 #-------------------------------------------------------------------------- 677
678 - def checkCu2(self):
679 """Checks if files are already processed by CU2. Creates lists of 680 files and completely non-processed directories. 681 682 """ 683 Logger.addMessage("%50s %6s %8s %11s" % ( 684 "date", "fits", "jpg/" + str(self.maxNumExts), 685 "sum(jpg)/" + str(self.maxNumExts))) 686 statusMessages = [] 687 duplicateList = [] 688 for dateVersStr in self.fitsDirs.invFitsDateDict: 689 notProcDirs = [] 690 notProcessedFiles = [] 691 processedFiles = [] 692 # list of all FITS files (no cats) 693 if not self.xferLog: 694 fitsList = list(self.fitsDirs.allFiles( 695 '*' + self.sysc.mefType, os.path.join( 696 self.fitsDirs.invFitsDateDict[dateVersStr], dateVersStr))) 697 else: 698 xferLogFile = File(self.xferLog) 699 xferLogFile.ropen() 700 fitsList = xferLogFile.readlines( 701 commentChar='#', omitValues=[self.sysc.catSuffix]) 702 xferLogFile.close() 703 704 # list of all JPGs 705 jpgSum = 0 706 if dateVersStr in self.jpegDirs.invFitsDateDict: 707 # set up a name dict for jpgs 708 jpegDict = {} 709 for jpgDir in self.jpegDirs.invFitsDateDict[dateVersStr]: 710 jpegList = (list(self.jpegDirs.allFiles( 711 '*' + self.sysc.jpgType, 712 os.path.join(jpgDir, dateVersStr)))) 713 tileList = (list(self.jpegDirs.allFiles( 714 '*' + self.sysc.tileSuffix + '*' + self.sysc.jpgType, 715 os.path.join(jpgDir, dateVersStr)))) 716 mosaicList = (list(self.jpegDirs.allFiles( 717 '*' + self.sysc.mosaicSuffix + '*' + self.sysc.jpgType, 718 os.path.join(jpgDir, dateVersStr)))) 719 numJpgs = (len(jpegList) - len(tileList) - len(mosaicList))\ 720 / self.maxNumExts + len(tileList) + len(mosaicList) 721 jpgSum += numJpgs 722 for elem in jpegList: 723 jpgName = os.path.split(elem)[1].rpartition('_')[0] 724 if jpgName in jpegDict: 725 jpegDict[jpgName] += 1 726 else: 727 jpegDict[jpgName] = 1 728 # print out how much fits files there are per date and how 729 # much jpgs (divided by maxNumExts (# of jpgs per fits)) 730 Logger.addMessage("%50s %6s %8s %11s" % ( 731 os.path.join(jpgDir, dateVersStr), 732 len(fitsList), numJpgs, jpgSum)) 733 734 # check with fits list 735 for item in fitsList: 736 fileName = os.path.splitext(os.path.split(item)[1])[0] 737 filePath = os.path.join( 738 self.fitsDirs.invFitsDateDict[dateVersStr], 739 dateVersStr, item) 740 fpnum = notProcessedFiles.count(filePath) 741 if self.sysc.catSuffix in fileName: 742 pass 743 elif self.reDo: 744 notProcessedFiles.append(filePath) 745 # is large mosaic file 746 elif self.sysc.mosaicSuffix in fileName and \ 747 fileName.split('_')[2] == "dp": 748 if self.forceMosaic: 749 notProcessedFiles.append(filePath) 750 else: 751 pass 752 # is not processed yet 753 elif fileName not in jpegDict and \ 754 self.sysc.catSuffix not in fileName and fpnum == 0: 755 notProcessedFiles.append(filePath) 756 # less than maxnum extensions processed 757 elif fileName in jpegDict and fpnum == 0 and \ 758 jpegDict[fileName] < self.numTileExt[ 759 (self.sysc.tileSuffix in fileName 760 or self.sysc.mosaicSuffix in fileName)]: 761 notProcessedFiles.append(filePath) 762 # duplicate JPEGs 763 elif fileName in jpegDict and \ 764 jpegDict[fileName] > self.numTileExt[ \ 765 (self.sysc.tileSuffix in fileName 766 or self.sysc.mosaicSuffix in fileName)]: 767 duplicateList.append( 768 (self.jpegDirs.invFitsDateDict[dateVersStr], 769 fileName, jpegDict[fileName])) 770 # processed 771 else: 772 processedFiles.append(filePath) 773 if fpnum > 0: 774 notProcessedFiles.remove(filePath) 775 else: 776 notProcDirs.append(os.path.join( 777 self.fitsDirs.invFitsDateDict[dateVersStr], dateVersStr)) 778 # print out how much fits files there are per date 779 Logger.addMessage("%50s %6s %8s %11s" % (os.path.join( 780 self.fitsDirs.invFitsDateDict[dateVersStr], dateVersStr), 781 len(fitsList), 0, jpgSum)) 782 783 # write not yet processed files to a log 784 if notProcessedFiles: 785 outFile = File(os.path.join( 786 Cu2.cu2Path, "reprocfiles_%s.log" % dateVersStr)) 787 statusMessages.append("%d files to be processed are in: %s" % 788 (len(notProcessedFiles), outFile.name)) 789 outFile.wopen() 790 outFile.writelines(notProcessedFiles) 791 outFile.close() 792 self.createPixLists(dateVersStr, notProcessedFiles, Cu2.maxNum) 793 794 # unprocessed dirs 795 if notProcDirs: 796 outFile = File(os.path.join( 797 Cu2.cu2Path, "notprocdirs_%s.log" % dateVersStr)) 798 statusMessages.append("%d dirs to be processed are in: %s" % 799 (len(notProcDirs), outFile.name)) 800 outFile.wopen() 801 outFile.writelines(notProcDirs) 802 outFile.close() 803 self.createPixLists(dateVersStr, fitsList, Cu2.maxNum) 804 805 # processed files 806 if processedFiles: 807 outFile = File(os.path.join( 808 Cu2.cu2Path, "procfiles_%s.log" % dateVersStr)) 809 statusMessages.append("%d processed files are in: %s" % 810 (len(processedFiles), outFile.name)) 811 outFile.wopen() 812 outFile.writelines(processedFiles) 813 outFile.close() 814 815 # duplicate files 816 if duplicateList: 817 log_time = utils.makeTimeStamp() 818 log_time = log_time[:log_time.find('.')].replace(' ', '_') 819 outFile = File("duplicatejpgs_" + log_time + ".log") 820 statusMessages.append("%s duplicate jpgs are in: %s" % 821 (len(duplicateList), outFile.name)) 822 print duplicateList 823 outFile.wopen() 824 outFile.writelines(duplicateList) 825 outFile.close() 826 827 for line in statusMessages: 828 Logger.addMessage(line)
829 830 #-------------------------------------------------------------------------- 831
832 - def cleanUp(self, datumVersStr):
833 """ 834 Cleanup the cu2 directory for the given datumVersStr. 835 836 @param datumVersStr: Date and version of search. 837 @type datumVersStr: str 838 839 """ 840 cmd = ' '.join(["rm -f", 841 os.path.join(Cu2.cu2Path, "*%s*.log" % datumVersStr), 842 os.path.join(Cu2.cu2Path, "*%s.tbl" % datumVersStr)]) 843 os.system(cmd)
844 845 #-------------------------------------------------------------------------- 846
847 - def createJpegDateDict(self, disks=None):
848 """ 849 Create the daily filelists. 850 851 @param disks: List of RAID file system disks. 852 @type disks: list(str) 853 854 """ 855 disks = disks or self.sysc.availableRaidFileSystem() 856 self.jpegDateDict = defaultdict(list) 857 for disk in disks: 858 jdir = os.path.join(disk, self.sysc.compressImDir) 859 if os.path.exists(jdir): 860 listdir = [elem for elem in dircache.listdir(jdir) 861 if elem[:2] == "20" and elem.find('_v') > 0] 862 if listdir: 863 self.jpegDateDict[jdir] = listdir 864 865 self.invJpegDateDict = utils.invertDict(self.jpegDateDict)
866 867 #-------------------------------------------------------------------------- 868
869 - def createPixLists(self, dateVersStr, fileList, maxNum):
870 """ 871 Create pixelfile lists from the given file list. 872 873 @param dateVersStr: Date and version of search. 874 @type dateVersStr: str 875 @param fileList: List of all files that need processing. 876 @type fileList: list(str) 877 @param maxNum: Maximum number of fits files per written list. 878 @type maxNum: int 879 880 """ 881 if len(fileList) > 0: 882 pixCounter = (1 if maxNum > 0 else 0) 883 fileCounter = 0 884 for fitsList in self.grouper(fileList, maxNum): 885 fileCounter += 1 886 pixFile = File(os.path.join( 887 Cu2.cu2Path, "pixlist%s_%03d.log" % ( 888 dateVersStr, fileCounter))) 889 pixFile.wopen() 890 for fitsName in fitsList: 891 pixFile.writetheline(fitsName) 892 pixFile.close()
893 894 #-------------------------------------------------------------------------- 895 896 @staticmethod
897 - def grouper(iterable, chunksize):
898 i = iter(iterable) 899 while True: 900 tmpr = [list(islice(i, int(chunksize)))] 901 if not tmpr[0]: 902 break 903 yield tmpr.pop()
904 905 #-------------------------------------------------------------------------- 906
907 - def makeJPEGs(self):
908 """ 909 Create the JPEGs from given pixlist* files. 910 """ 911 # create a list of log files for the actual making of jpegs 912 curPath = os.curdir 913 os.chdir(Cu2.cu2Path) 914 915 for dateVersStr in self.fitsDirs.invFitsDateDict: 916 pixLists = [x for x in os.listdir(Cu2.cu2Path) 917 if x.startswith("pixlist") and dateVersStr in x] 918 919 for pixListFileName in pixLists: 920 pixListFile = File(pixListFileName) 921 Logger.addMessage("Running Curation Use case %s on %s" % 922 (self.cuNum, pixListFile.name)) 923 pixListFile.ropen() 924 fitsFileList = pixListFile.readlines() 925 pixListFile.close() 926 927 # create list with input values for multitasking 928 inputs = [(fileName, dateVersStr, self.jpgPath) 929 for fileName in fitsFileList] 930 931 # pool the input list and create JPGs 932 c = JpgCalc() 933 c.pooling(inputs) 934 del c 935 os.chdir(curPath)
936 937 #-------------------------------------------------------------------------- 938
939 - def readDeprecationLog(self, deprLog):
940 """Read the transfer log containing pairs of 'old FITS filename, 941 new FITS filename' and create a dictionary {old: new} from it. 942 943 @param deprLog: Logfile containing fits file pairs. 944 @type deprLog: str 945 946 """ 947 xferLogFile = File(deprLog) 948 xferLogFile.ropen() 949 result = xferLogFile.readlines(commentChar='#') 950 xferLogFile.close() 951 return dict([r.split(',') for r in result 952 if not self.sysc.catSuffix in r])
953 954 955 #-------------------------------------------------------------------------- 956
957 - def deprecateJpgs(self, fitsFileDict, deprMode="mv"):
958 """ 959 Deprecate files in fitsFileDict to the directory for deprecated files. 960 961 @param fitsFileDict: Translation dict from old to new fits filenames. 962 @type fitsFileDict: dict(str: str) 963 @param deprMode: Mode for handling the deprecated files: move ('mv'), 964 remove ('rm'), leave in place ('ip') 965 @type deprMode: str 966 967 """ 968 re_date = re.compile(r'((\d{8})\Z)') 969 970 Logger.addMessage("Deprecating files:") 971 deprLogTime = utils.makeTimeStamp() 972 if deprMode in ["mv", "ip"]: 973 depracatedListFile = File( 974 os.path.join(self.sysc.xferLogPath(), ''.join( 975 [self._outPrefix, "_deprecatedjpgs_", deprMode, '_', 976 deprLogTime.replace(' ', '_'), ".log"]))) 977 depracatedListFile.wopen() 978 979 deprJpgList = [] 980 for fileName in fitsFileDict: 981 newFitsFile = File(fitsFileDict[fileName]) 982 deprTime = re_date.search(newFitsFile.commondir).group() 983 oldJpgNameRoot = os.path.join(newFitsFile.subdir, newFitsFile.root) 984 if deprMode == "mv": 985 spacePerDisk = \ 986 utils.getDiskSpace(self.sysc.deprecatedDataStorage) 987 988 deprPath = os.path.join( 989 utils.getNextDisk(self.sysc, spacePerDisk), 990 '%s_%s' % (self.sysc.deprecatedComprImDir, deprTime), 991 newFitsFile.subdir) 992 993 utils.ensureDirExist(deprPath) 994 deprName = os.path.join(deprPath, newFitsFile.root) 995 996 elif deprMode == "ip": 997 deprName = "deprecated_%s" % (deprTime) 998 999 elif deprMode == "rm": 1000 deprName = "---" 1001 1002 try: 1003 for i in range(1, self.sysc.maxHDUs): 1004 for path in self.invJpegDateDict[newFitsFile.subdir]: 1005 oldJpgName = os.path.join( 1006 path, "%s_%s.jpg" % (oldJpgNameRoot, i)) 1007 if os.path.exists(oldJpgName): 1008 break 1009 if deprMode in ["mv", "ip"]: 1010 if deprMode == "mv": 1011 deprJpgName = "%s_%s.jpg" % (deprName, i) 1012 else: 1013 deprJpgName = "%s_%s.%s.jpg" % (oldJpgName, 1014 i, deprName) 1015 if not self.isTrialRun: 1016 if os.path.exists(oldJpgName): 1017 shutil.copy2(oldJpgName, deprJpgName) 1018 deprJpgList.append(oldJpgName) 1019 else: 1020 Logger.addMessage("%s doesn't exist." % \ 1021 oldJpgName) 1022 Logger.addMessage(' '.join([deprMode, 1023 oldJpgName, deprJpgName])) 1024 depracatedListFile.writetheline( 1025 ','.join([oldJpgName, deprJpgName])) 1026 elif deprMode == "rm": 1027 if not self.isTrialRun: 1028 os.remove(oldJpgName) 1029 Logger.addMessage(' '.join([deprMode, oldJpgName])) 1030 1031 except: 1032 Logger.addMessage(' '.join(["Can't", deprMode, 1033 oldJpgNameRoot])) 1034 raise 1035 finally: 1036 del newFitsFile 1037 1038 if deprMode in ["mv", "ip"]: 1039 depracatedListFile.close() 1040 cmd1 = "%s/CreateMfIdJpegList.py -o %s -r %s -j %s -s %s %s %s %s %s" % ( 1041 self.sysc.monitorPath(), self.sysc.helpersPath(), 1042 ','.join(self.sysc.deprecatedDataStorage), 1043 '_'.join([self.sysc.deprecatedComprImDir, deprTime]), 1044 '_'.join([self.sysc.deprecatedDir, deprTime]), 1045 self.database, self.beginDate, self.endDate, self.versionStr) 1046 Logger.addMessage("Running '%s'" % cmd1) 1047 if not self.isTrialRun: 1048 status1 = os.system(cmd1) 1049 if status1: 1050 Logger.addMessage("<ERROR> CreateMfIdJpegList.py failed!") 1051 else: 1052 for updateDB in ["dbload::WSA", "dbpub::all"]: 1053 cmd2 = "%s/UpdateJpegs.py -M %s -j %s %s/deprjpgs_WSA_%s_%s_%s.log" % ( 1054 self.sysc.helpersPath(), updateDB, 1055 self.sysc.helpersPath(), self.database, 1056 self.beginDate, self.endDate, self.versionStr) 1057 Logger.addMessage("Running '%s'" % cmd2) 1058 status = os.system(cmd2) 1059 if status: 1060 Logger.addMessage( 1061 "<ERROR> UpdateJpegs.py %s failed!" % updateDB) 1062 1063 for fileName in deprJpgList: 1064 os.remove(fileName) 1065 1066 Logger.addMessage("Deprecated jpg file names are listed in %s" 1067 % depracatedListFile.name)
1068 1069 1070 #------------------------------------------------------------------------------ 1071 # 1072 # Entry point for CU2 1073 1074 if __name__ == '__main__': 1075 # check existing files for their timestamp against the same file at CASU 1076 CLI.progOpts += [ 1077 CLI.Option('b', "begin", "first date to process, eg. 20050101", 1078 "DATE", str(IngCuSession.beginDateDef)), 1079 CLI.Option('e', "end", "last date to process, eg. 20050131", 1080 "DATE", str(IngCuSession.endDateDef)), 1081 CLI.Option('s', "subdir", "subdirectory containing FITS files", 1082 "DIR", IngCuSession.sysc.fitsDir), 1083 CLI.Option('v', "version", "version number of the data", 1084 "STR", '1'), 1085 CLI.Option('l', "xferlog", "xferlog of files to be jpeged", 1086 "LOGFILE", ''), 1087 CLI.Option('M', "mosaic", "force making mosaics."), 1088 CLI.Option('r', "redo", "re-do jpegs."), 1089 CLI.Option('y', "threads", "number of processors to use", 1090 "INT", '0'), 1091 CLI.Option('Z', "deprecation", 1092 "modus for deprecating files: 'mv', 'rm', 'ip'" 1093 " (leave in place)", "STR[2]", ''), 1094 CLI.Option('o', "outpath", 1095 "directory where %s will reside. Generally this is created " 1096 "on the fly. If given, the log will be written there as " 1097 "well." % SystemConstants.compressImDir, 1098 "PATH", '')] 1099 1100 cli = CLI(Cu2, "$Revision: 10242 $") 1101 Logger.addMessage(cli.getProgDetails()) 1102 1103 cu2 = Cu2(cli.getOpt("curator"), 1104 cli.getArg("database"), 1105 cli.getOpt("begin"), 1106 cli.getOpt("end"), 1107 cli.getOpt("subdir"), 1108 cli.getOpt("version"), 1109 cli.getOpt("xferlog"), 1110 cli.getOpt("outpath"), 1111 cli.getOpt("mosaic"), 1112 cli.getOpt("redo"), 1113 cli.getOpt("threads"), 1114 cli.getOpt("deprecation"), 1115 cli.getOpt("test"), 1116 cli.getArg("comment")) 1117 cu2.run() 1118 1119 #------------------------------------------------------------------------------ 1120 # Change log: 1121 # 1122 # 11-Aug-2004, ETWS: Refactoring to implement latest changes to the 1123 # data model along the lines of CU3 1124 # 27-Jan-2005, ETWS: Refactored using pyfits instead of xheader 1125 # 16-Feb-2005, ETWS: changed jpg directory name to the same as the 1126 # input fits directory name (ie. observing date) 1127 # 11-Apr-2005, ETWS: Included default image for files with fits key 1128 # errors in their header. 1129 # fixed the NextCuEventId DB update 1130 # 6-May-2005, ETWS: Included os.umask setting to check_dir_exist, 1131 # so mode will be set correctly 1132 # 27-May-2005, ETWS: included cuNum in getNextCurationEventID 1133 # 29-Jul-2005, ETWS: refactored to run the stand alone cu2jpeg to have 1134 # database input as well. 1135 # 30-Apr-2007, ETWS: Refactored to use OO framework. 1136 # 3-May-2007, ETWS: Updated for use with CU0 1137 # 15-May-2007, ETWS: Removed sneferu's disk list restriction. 1138 # 10-Jul-2007, ETWS: Updated for use with test cases. 1139 # 28-Aug-2007, ETWS: Fixed output path problem. 1140 # 29-Jan-2008, ETWS: Included possibility to force mosaic jpeg creation 1141 # and to re-process existing jpegs. 1142 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 1143 # file name setting. 1144 # 11-Aug-2008, ETWS: Enhanced for deprecation of data from a given deprecation 1145 # and transfer list. Existing jpegs will be copied to a 1146 # directory, then the database entries will be updated 1147 # before the original data is removed. 1148