Package invocations :: Package cu1 :: Module cu1
[hide private]

Source Code for Module invocations.cu1.cu1

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: cu1.py 9970 2013-07-30 14:59:14Z EckhardSutorius $ 
  4  """ 
  5     Invoke CU1. Fetch science data from CASU without connecting to the database; 
  6     to update the database run cu1updatedb.py afterwards. 
  7   
  8     @author: E. Sutorius 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10   
 11     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
 12     @contributors: I. Bond, R.S. Collins 
 13  """ 
 14  #------------------------------------------------------------------------------ 
 15  from   collections import defaultdict 
 16  import inspect 
 17  import math 
 18  import multiprocessing 
 19  from collections import namedtuple 
 20  import os 
 21  import time 
 22   
 23  from cu1Transfer import CASUTransfer, TransferCASUList, GESTransfer, getScpScriptName 
 24   
 25  from   wsatools.CLI                    import CLI 
 26  import wsatools.DataFactory                as df 
 27  import wsatools.DbConnect.DbConstants      as dbc 
 28  from   wsatools.DbConnect.DbSession    import DbSession 
 29  from   wsatools.File                   import File 
 30  from   wsatools.FitsUtils              import FitsList 
 31  from   wsatools.DbConnect.IngCuSession import IngCuSession 
 32  from   wsatools.DbConnect.IngIngester  import IngestLogger as Logger, \ 
 33                                                IngestLogFile 
 34  from   wsatools.SystemConstants        import SystemConstants 
 35  import wsatools.Utilities                  as utils 
 36  #------------------------------------------------------------------------------ 
 37   
38 -class ThreadDL(object):
39 40 #-------------------------------------------------------------------------- 41 # Define class constants (access as ThreadDL.varName) 42 numprocessors = 2 43 DLStatus = dbc.yes() 44 45 #-------------------------------------------------------------------------- 46
47 - def xferThreads(self, params, procDates, resultQueue):
48 """ 49 Run the transfers. 50 51 @param params: Parameters for CASUTransfer class. 52 @type params: namedtuple 53 @param procDates: The dates to transfer in parallel. 54 @type procDates: list 55 @param resultQueue: The results queue. 56 @type resultQueue: Queue object 57 58 """ 59 outDict = defaultdict(list) 60 for date in procDates: 61 transfer = CASUTransfer( 62 scpMainDir=params.scpMainDir, 63 scpThreads=params.scpThreads, 64 checkTimeFlag=params.checkTimeFlag, 65 checkMd5Flag=params.checkMd5Flag, 66 UKLight=params.UKLight, 67 forceTransfer=params.forceTransfer, 68 deprecationMode=params.deprecationMode, 69 database=params.database, 70 outPrefix=params.outPrefix, 71 reproMethod=params.reproMethod, 72 reproVersStr=params.reproVersStr, 73 ffluOnly=params.ffluOnly, 74 onlyHeaderTransfer=params.onlyHeaderTransfer, 75 isTestRun=params.isTestRun) 76 outDict[int(date)] = transfer.transfer(int(date), []) 77 resultQueue.put(outDict)
78 79 #-------------------------------------------------------------------------- 80
81 - def run(self, dateList, xferParams):
82 """ 83 Thread the downloads. 84 85 @param dateList: The dates to transfer in parallel. 86 @type dateList: list 87 @param xferParams: Parameters for CASUTransfer class. 88 @type xferParams: namedtuple 89 90 @return: Dictionary containing metadata of the daily transfers. 91 @rtype: dict 92 """ 93 outQueue = multiprocessing.Queue() 94 chunksize = int(math.ceil(len(dateList) / float(self.numprocessors))) 95 procs = [] 96 for i in xrange(self.numprocessors): 97 if i > 0: 98 time.sleep(60) 99 p = multiprocessing.Process( 100 target=self.xferThreads, 101 args=(xferParams, dateList[chunksize * i:chunksize * (i + 1)], 102 outQueue)) 103 procs.append(p) 104 p.start() 105 106 # Collect all results into a single result dict. We know how many dicts 107 # with results to expect. 108 resultDict = defaultdict(list) 109 for i in xrange(self.numprocessors): 110 resultDict.update(outQueue.get()) 111 112 # Wait for all worker processes to finish 113 for p in procs: 114 p.join() 115 116 # Return the results 117 return resultDict
118 119 #-------------------------------------------------------------------------- 120
121 - def finaliseTransfer(self, resultDict):
122 """ 123 Process the results. 124 125 126 @return: Dictionary containing metadata of the daily transfers. 127 @rtype: dict 128 129 """ 130 allTransferLogs = defaultdict(list) 131 fullTransferList = [] 132 fullDeprFileList = [] 133 134 for date in resultDict: 135 try: 136 # Parse the result 137 # result = [self.transferLog, xferStatus, logText, dateList, 138 # self.transferList, self.deprFileList] 139 transferLog, xferStatus, logText, dateList, \ 140 transferList, deprFileList = resultDict[date] 141 142 allTransferLogs[date].append(transferLog) 143 fullTransferList.extend(transferList) 144 fullDeprFileList.extend(deprFileList) 145 # Set the status accordingly 146 if xferStatus < 0: 147 status = -1 148 elif xferStatus: 149 status = dbc.yes() 150 else: 151 status = dbc.no() 152 153 # Update the set of transferred dates 154 Cu1.dateSet |= set(dateList) 155 156 # Write transfer status and transfer logfilename to logfile 157 Logger.addMessage("File transfer finished with status = " 158 + str(Cu1.DLStatus)) 159 Logger.addMessage("Data Transfer messages:\n") 160 Logger.addMessage(logText.getvalue()) 161 Logger.addMessage("Files downloaded are in: " + transferLog) 162 except Exception as error: 163 # Log the details of the exception: 164 Logger.addExceptionDetails(error) 165 166 # Set the status of the threaded transfers over all dates. 167 self.DLStatus *= status 168 169 # Set the overall status of the threaded transfers. 170 Cu1.DLStatus *= self.DLStatus 171 return allTransferLogs, fullTransferList, fullDeprFileList
172 173 #------------------------------------------------------------------------------ 174
175 -class Cu1(IngCuSession):
176 """Download data from CASU using the module cu1Transfer. 177 """ 178 DLStatus = 1 # the download status 179 dateSet = set() 180 XferParams = namedtuple("XferParams", "scpMainDir, scpThreads, checkTimeFlag, checkMd5Flag, UKLight, forceTransfer, deprecationMode, database, outPrefix, reproMethod, reproVersStr, ffluOnly, onlyHeaderTransfer, isTestRun") 181
182 - def __init__(self, 183 curator=CLI.getOptDef("curator"), 184 database=DbSession.database, 185 runDate=CLI.getOptDef("date"), 186 versionStr=CLI.getOptDef("version"), 187 casuDisk=CLI.getOptDef("casudisk"), 188 timeCheck=CLI.getOptDef("timecheck"), 189 numThreads=CLI.getOptDef("scpthreads"), 190 janet=CLI.getOptDef("janet"), 191 deprecation=CLI.getOptDef("deprecation"), 192 xferList=CLI.getOptDef("xferlog"), 193 isTrialRun=DbSession.isTrialRun, 194 forceXfer=CLI.getOptDef("force"), 195 reproMode=CLI.getOptDef("repromode"), 196 ffluOnly=CLI.getOptDef("ffluonly"), 197 onlyHeader=CLI.getOptDef("getheader"), 198 gesOptions=CLI.getOptDef("gesoptions"), 199 comment=CLI.getArgDef("comment")):
200 """ 201 @param casuDisk: Disk at CASU where the data is stored. 202 @type casuDisk: str 203 @param comment: Descriptive comment as to why curation task is 204 being performed. 205 @type comment: str 206 @param curator: Name of curator. 207 @type curator: str 208 @param deprecation: Modus for deprecating files. 209 @type deprecation: str 210 @param ffluOnly: Don't transfer but update FlatfileLookup table. 211 @type ffluOnly: bool 212 @param forceXfer: Force the data transfer. 213 @type forceXfer: bool 214 @param gesOptions: GES instrument and type. 215 @type gesOptions: str 216 @param isTrialRun: If True, do not perform database modifications. 217 @type isTrialRun: bool 218 @param janet: Use JANET instead of UKLight. 219 @type janet: bool 220 @param numThreads: Number of scp transfer threads. 221 @type numThreads: int 222 @param onlyHeader: Only transfer FITS headers. 223 @type onlyHeader: bool 224 @param reproMode: Mode for reprocessed data: otm (one-to-many dirs) 225 or s::suffix (CASU datedir suffix). 226 @type reproMode: str 227 @param runDate: Dates to process, eg. [20050101]. 228 @type runDate: list(int) 229 @param timeCheck: Determines if existing files should be checked for 230 their timestamp against the same file at CASU; 231 @type timeCheck: bool 232 @param versionStr: Version number of the data. 233 @type versionStr: str 234 @param xferList: List containing files to be downloaded. 235 @type xferList: list 236 237 """ 238 super(Cu1, self).attributesFromArguments( 239 inspect.getargspec(Cu1.__init__)[0], locals()) 240 241 # Initialize parent class 242 super(Cu1, self).__init__(cuNum=1, 243 curator=self.curator, 244 comment=self.comment, 245 reqWorkDir=False, 246 database=self.database, 247 autoCommit=False, 248 isTrialRun=self.isTrialRun) 249 250 self.csvSharePath = self.sysc.dbSharePath() 251 252 # init DB entry 253 self._initFromDB() 254 255 # md5 checksums? 256 # set to False until we have md5s at CASU 257 self._md5Check = False 258 259 # No data available for given date? 260 self._noData = False 261 262 if not self.deprecation: 263 self.deprecation = "mv" 264 265 # list of transferred and deprecated files 266 self.transferList = [] 267 self.deprFileList = [] 268 self.transferLog = [] 269 self.allXferLogs = defaultdict(list) 270 271 if self.gesOptions: 272 self.gesInstrument = ("giraffe" if self.gesOptions.startswith('g') 273 else "uves") 274 self.gesMode = {'s': "stacked", 'n': "nightly", 'a': "archived"}[ 275 self.gesOptions[-1]]
276 277 #-------------------------------------------------------------------------- 278
279 - def _initFromDB(self):
280 """initialize constants from the DB 281 """ 282 283 # Connect to database 284 self._connectToDb() 285 try: 286 if not self.isTrialRun: 287 self._cuEventID = self._getNextCuEventID() 288 289 if self.cuNum != dbc.smallIntDefault(): 290 try: 291 self._cuTable = \ 292 df.CurationTaskTable(self.archive, self.cuNum) 293 Logger.addMessage( 294 "Initialising Curation Use case for %s:" % 295 (','.join([str(self.runDate), self.reproMode]) 296 if self.reproMode else str(self.runDate))) 297 Logger.addMessage(self._cuTable.getDescription()) 298 except ValueError, details: 299 Logger.addMessage("<Warning> " + str(details)) 300 self._cuTable = None 301 302 # Create the translation dictionary of programme identification 303 # strings to programme ID numbers for all programmes: 304 self._createProgrammeTranslation() 305 finally: 306 self._disconnectFromDb()
307 308 #-------------------------------------------------------------------------- 309
310 - def _onRun(self):
311 """ Do CU1. """ 312 # create lock file 313 self.lockFile = File(os.path.join(SystemConstants.sysPath, 314 "lockCU1_%s" % self.sysc.loadDatabase)) 315 316 # check and create lock 317 try: 318 self._createLock() 319 except: 320 print '\n'.join(["\n This code will not run,", 321 " you don't have a key,", 322 " and a lock exists. \n"]) 323 raise 324 325 Logger.addMessage("Started CU %d for: %s" % ( 326 self.cuNum, ','.join([str(self.runDate), self.reproMode]) 327 if self.reproMode else str(self.runDate))) 328 329 # Create a log file to hold the verbose record for this invocation 330 _logFile = Logger(self.createLogFileName()) 331 332 # Invoke methods for opening connection to CASU, 333 # fetching the files, and producing a transfer log. 334 multiThreaded = False 335 try: 336 # output file prefix 337 self._outPrefix = self.createFilePrefix( 338 self.cuNum, self._cuEventID, self._hostName, 339 self.database.rpartition('.')[2]) 340 341 # Create ingest log file 342 ingestLogFile = IngestLogFile(os.path.join( 343 self.dbMntPath, self._outPrefix + ".log")) 344 345 # Run transfers depending on re-processing, GPS, or standard. 346 if 'otm' in self.reproMode: 347 # one-to-many dirs transfer 348 TransferCASUList.sysc = self.sysc 349 casuTransferList = TransferCASUList( 350 fitsList=FitsList(self.sysc), 351 casuDir=self.casuDisk, 352 xferList=self.xferList, 353 forceTransfer=self.forceXfer, 354 deprecationMode=self.deprecation, 355 database='.'.join([self.sysc.loadServer, 356 self.sysc.loadDatabase]), 357 UKLight=not self.janet, 358 outPrefix=self._outPrefix, 359 isTestRun=self.isTrialRun) 360 361 self.transferLog, xferStatus, dateList, self.deprFileList = \ 362 casuTransferList.transfer() 363 self.dateSet |= set(dateList) 364 elif self.sysc.isGES(): 365 # GES transfer 366 GESTransfer.sysc = self.sysc 367 368 casuGesTransferList = GESTransfer( 369 instrument=self.gesInstrument, 370 mode=self.gesMode, 371 casuDir=self.casuDisk, 372 xferList=self.xferList, 373 versStr=self.versionStr, 374 forceTransfer=self.forceXfer, 375 database='.'.join([self.sysc.loadServer, 376 self.sysc.loadDatabase]), 377 UKLight=not self.janet, 378 outPrefix=self._outPrefix, 379 isTestRun=self.isTrialRun) 380 self.transferLog, xferStatus, dateList = \ 381 casuGesTransferList.transfer(self.runDate[0]) 382 self.dateSet |= set(dateList) 383 self.allXferLogs[self.runDate[0]].append(self.transferLog) 384 else: 385 CASUTransfer.sysc = self.sysc 386 xferParams = self.XferParams( 387 self.casuDisk, self.numThreads, self.timeCheck, 388 self._md5Check, not self.janet, self.forceXfer, 389 self.deprecation, 390 '.'.join([self.sysc.loadServer, self.sysc.loadDatabase]), 391 self._outPrefix, self.reproMode, 392 ((','.join([self.versionStr, 393 self.reproMode.partition('::')[2]]) 394 if "v::" in self.reproMode else self.versionStr) 395 if self.reproMode else None), 396 self.ffluOnly, self.onlyHeader, self.isTrialRun) 397 # transfer for given filelist 398 if self.xferList: 399 casuTransfer = CASUTransfer( 400 scpMainDir=xferParams.scpMainDir, 401 scpThreads=xferParams.scpThreads, 402 checkTimeFlag=xferParams.checkTimeFlag, 403 checkMd5Flag=xferParams.checkMd5Flag, 404 UKLight=xferParams.UKLight, 405 forceTransfer=xferParams.forceTransfer, 406 deprecationMode=xferParams.deprecationMode, 407 database=xferParams.database, 408 outPrefix=xferParams.outPrefix, 409 reproMethod=xferParams.reproMethod, 410 reproVersStr=xferParams.reproVersStr, 411 ffluOnly=xferParams.ffluOnly, 412 onlyHeaderTransfer=xferParams.onlyHeaderTransfer, 413 isTestRun=xferParams.isTestRun) 414 415 self.transferLog, xferStatus, logText, dateList, \ 416 self.transferList, self.deprFileList = \ 417 casuTransfer.transfer(int(self.runDate[0]), 418 self.xferList) 419 self.dateSet |= set(dateList) 420 self.allXferLogs[self.runDate[0]].append(self.transferLog) 421 else: 422 # multi-day threaded transfer 423 multiThreaded = True 424 threadDLs = ThreadDL() 425 resDict = threadDLs.run(self.runDate, xferParams) 426 self.allXferLogs, self.transferList, self.deprFileList = \ 427 threadDLs.finaliseTransfer(resDict) 428 del threadDLs 429 430 # keep the latest transfer's scp files if available 431 keepScpName = os.path.join(self.sysc.sysPath, 432 "scpcam%s_%01d%01d") 433 for i, dateStr in enumerate(self.runDate): 434 for j in xrange(self.numThreads % 10): 435 try: 436 os.rename( 437 getScpScriptName(self.sysc, dateStr, j), 438 keepScpName % (self.sysc.loadDatabase, 439 i, j)) 440 except OSError: 441 pass 442 443 # write the CU01ED tag 444 for entry in sorted(self.dateSet): 445 cuedFile = File(os.path.join( 446 entry, "CU01ED_" + self.database.rpartition('.')[2])) 447 if not self.isTrialRun and not self.ffluOnly: 448 cuedFile.wopen() 449 cuedFile.writetheline(utils.makeTimeStamp()) 450 cuedFile.close() 451 452 if not multiThreaded: 453 # update transfer status 454 if xferStatus < 0: 455 self.DLStatus = -1 456 elif xferStatus: 457 self.DLStatus = dbc.yes() 458 else: 459 self.DLStatus = dbc.no() 460 461 # Write transfer status and transfer logfilename to logfile 462 Logger.addMessage("File transfer finished with status = " 463 + str(self.DLStatus)) 464 if 'otm' not in self.reproMode and not self.sysc.isGES(): 465 Logger.addMessage("Data Transfer messages:\n") 466 Logger.addMessage(logText.getvalue()) 467 Logger.addMessage("Files downloaded are in: " \ 468 + self.transferLog) 469 470 # write ingest info into the ingest log file 471 timestamp = utils.makeMssqlTimeStamp() 472 histories = {} 473 # include processed dates 474 histories.setdefault("DCH", []).extend(sorted(self.dateSet)) 475 # for the update of the curation event history 476 histories.setdefault("ACH", []).append( 477 [self._cuEventID, self.cuNum, _logFile.pathName, 478 dbc.charDefault(), timestamp, self.curator, 479 self.comment, dbc.yes()]) 480 481 # for the update of the programme curation history 482 histories.setdefault("PCH", []).append([self.DLStatus]) 483 if self.isTrialRun: 484 for entry in histories: 485 print "%s: %r" % (entry, histories[entry]) 486 else: 487 ingestLogFile.pickleWrite([], {}, histories) 488 Logger.addMessage(''.join(["Curation History info written to ", 489 ingestLogFile.name])) 490 491 except Exception as error: 492 # Log the details of the exception: 493 Logger.addExceptionDetails(error) 494 else: 495 Logger.addMessage("Transfer finished.") 496 Logger.addMessage("Please, run ./IngestCUFiles.py -i -r %s %s" 497 % (ingestLogFile.name, self.database)) 498 Logger.addMessage("Log written to " + _logFile.pathName) 499 500 Logger.dump(file(_logFile.pathName, 'a')) 501 Logger.reset() 502 503 # clean up lock file 504 self.lockFile.remove()
505 506 #-------------------------------------------------------------------------- 507
508 - def _createLock(self):
509 """ 510 Create a lock, writing info to sysLog. If lock exists, abort. 511 """ 512 lockTime = utils.makeTimeStamp() 513 if self.lockFile.exists(): 514 Logger.addMessage(''.join( 515 ["# ", lockTime, 516 " UTC: Lock file %s exists. scp_camb aborted." % \ 517 self.lockFile.name])) 518 raise SystemExit 519 else: 520 self.lockFile.wopen() 521 self.lockFile.writetheline( 522 "%s [%d] \n" % (lockTime, os.getpid())) 523 self.lockFile.close()
524 525 #------------------------------------------------------------------------------ 526 # Entry point for CU1 527 528 if __name__ == '__main__': 529 CLI.progOpts += [ 530 CLI.Option('b', "date", 531 "first date to process, eg. 20050101", 532 "DATE", IngCuSession.beginDateDef), 533 CLI.Option('v', "version", 534 "version number of the data", 535 "STR", '1'), 536 CLI.Option('l', "xferlog", 537 "xferlog of files to be transferred", 538 "LOGFILE", ''), 539 CLI.Option('w', "casudisk", 540 "data disk at CASU", 541 "PATH", ''), 542 CLI.Option('y', "scpthreads", 543 "number of scp threads", 544 "INT", SystemConstants.scpThreads), 545 CLI.Option('j', "janet", 546 "if True, use JANET instead of UKLight"), 547 CLI.Option('f', "force", 548 "if True, transfer regardless any checks"), 549 CLI.Option('F', "ffluonly", 550 "CU1: don't transfer but update FlatfileLookup."), 551 CLI.Option('G', "gesoptions", 552 "GES instrument and type, ie " 553 "g[iraffe]/u[ves],n[ightly],s[tacked]", 554 "STR", ''), 555 CLI.Option('H', "getheader", 556 "Only transfer FITS headers."), 557 CLI.Option('T', "timecheck", 558 "check files against CASU file timestamps"), 559 CLI.Option('R', "repromode", 560 "mode for reprocessed data: 'otm' " 561 "(one-to-many dirs) or 's::suffix' (CASU " 562 "datedir suffix).", 563 "STR", 'none'), 564 CLI.Option('Z', "deprecation", 565 "modus for deprecating files: 'mv', 'rm', 'ip'" 566 " (leave in place)", 567 "STR[2]", "mv")] 568 569 cli = CLI(Cu1, "$Revision: 9970 $") 570 Logger.addMessage(cli.getProgDetails()) 571 572 cu1 = Cu1(cli.getOpt("curator"), 573 cli.getArg("database"), 574 cli.getOpt("date"), 575 cli.getOpt("version"), 576 cli.getOpt("casudisk"), 577 cli.getOpt("timecheck"), 578 cli.getOpt("scpthreads"), 579 cli.getOpt("janet"), 580 cli.getOpt("deprecation"), 581 cli.getOpt("xferlog"), 582 cli.getOpt("test"), 583 cli.getOpt("force"), 584 cli.getOpt("repromode"), 585 cli.getOpt("ffluonly"), 586 cli.getOpt("getheader"), 587 cli.getOpt("gesoptions"), 588 cli.getArg("comment")) 589 cu1.run() 590 591 #------------------------------------------------------------------------------ 592 # Change log: 593 # 594 # 13-Jan-2006, ETWS: first version 595 # 5-Jul-2006, RSC: * Upgraded to use new CSV.py module. 596 # * Obviated need for "from wsatools import *" statement 597 # 25-Apr-2007, ETWS: Refactored to use OO framework. 598 # 27-Apr-2007, ETWS: Fixed bugs. 599 # 3-May-2007, ETWS: Included into CU0. 600 # 11-May-2007, ETWS: Included UKLight switch. 601 # 16-Aug-2007, ETWS: Fixed missing log output. 602 # 23-Aug-2007, ETWS: Included transfer option switch. 603 # 23-Jan-2008, ETWS: Changed transfer default to 'scp'. 604 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 605 # file name setting. 606 # 11-Aug-2008, ETWS: Enhanced for deprecation of data from a given transfer 607 # list. The data will be copied, then the database entries 608 # will be updated before the original data is removed. 609