Package invocations :: Package cu0 :: Module IngestCUFiles
[hide private]

Source Code for Module invocations.cu0.IngestCUFiles

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: IngestCUFiles.py 10184 2014-01-10 15:36:45Z EckhardSutorius $ 
  4  """ 
  5     Ingests products of parallel CU runs by DataBuilder. Contents of all ingest 
  6     logs found in the dbShare are ingested. Can also be used to only view the 
  7     content of a specified ingest log. 
  8   
  9     @author: Eckhard Sutorius 
 10     @org:    WFAU, IfA, University of Edinburgh 
 11   
 12     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
 13     @contributors: R.S. Collins 
 14  """ 
 15  #------------------------------------------------------------------------------ 
 16  from   collections import defaultdict, OrderedDict 
 17  import dircache 
 18  import math 
 19  import multiprocessing 
 20  import os 
 21  import time 
 22   
 23  from   wsatools.CLI                   import CLI 
 24  from   wsatools.DbConnect.IngIngester import CuIngest, IngestLogger, \ 
 25                                               IngestLogFile 
 26  from   wsatools.SystemConstants       import SystemConstants 
 27  #------------------------------------------------------------------------------ 
 28   
29 -def runIngest(logList):
30 for logFile in logList: 31 ingester = CuIngest( 32 curator=cli.getOpt("curator"), 33 database=cli.getArg("database"), 34 isTrialRun=cli.getOpt("test"), 35 logFileName=logFile.name, 36 ingestOneLog=cli.getOpt("ingestlog"), 37 cuNums=cli.getOpt("cunums"), 38 forceIngest=cli.getOpt("forceingest"), 39 autoCommit=cli.getOpt("autocommit"), 40 excludedTables=cli.getOpt("excluded"), 41 omitObjIDUpdate=True, 42 isParallelRun=True, 43 comment=cli.getArg("comment")) 44 ingester.run()
45 46 #------------------------------------------------------------------------------ 47 # Entry point for Ingester 48 49 if __name__ == '__main__': 50 # @@TODO: These should go into CuIngest in IngIngester if they don't propa. 51 # but eventually they will return here anyway. 52 CLI.progOpts += [ 53 CLI.Option('a', "autocommit", 54 "switch auto-commit on"), 55 CLI.Option('i', "ingestlog", 56 "ingest log file given by 'readlog' option"), 57 CLI.Option('r', "readlog", 58 "only read the given log file (cuXidY_host.log)", 59 "PATHNAME"), 60 CLI.Option('x', "cunums", 61 "csv list of CUs to ingest; CUs can have the format:" 62 " <cuNum>[-<cuEventID>][-<serverName>].", 63 "LIST", ""), 64 CLI.Option('F', "forceingest", 65 "try to ingest log file even when data from the same day " 66 "has been ingested before. Use only if you know what " 67 "you're doing!"), 68 CLI.Option('O', "omitobjid", 69 "Omit updating objIDs. Use with care on the VVV!"), 70 CLI.Option('P', "isparallel", 71 "Ingest monthly VVV data in parallel."), 72 CLI.Option('X', "excluded", 73 "exclude the given tables from ingest ", 74 "LIST", "")] 75 76 cli = CLI(CuIngest, "$Revision: 10184 $") 77 IngestLogger.addMessage(cli.getProgDetails()) 78 79 archive = cli.getArg("database") 80 sysc = SystemConstants(archive.split('.')[-1]) 81 82 if archive.split('.')[-1] == sysc.loadDatabase \ 83 and os.getenv('USER') != 'scos': 84 print "Only scos is allowed to ingest into the %s" % sysc.loadDatabase 85 exit() 86 87 if cli.getOpt("isparallel") and "VVV" in archive \ 88 and cli.getOpt("cunums") == '4': 89 IngestLogger.addMessage("Running monthly VVV ingests in parallel...") 90 try: 91 dirList = os.listdir(sysc.dbMntPath()) 92 logfiles = [IngestLogFile(os.path.join(sysc.dbMntPath(), f)) 93 for f in dirList if f.startswith("cu04") \ 94 and f.endswith("%s.log" % archive.partition('.')[2])] 95 except Exception as error: 96 IngestLogger.addExceptionDetails(error) 97 raise SystemExit 98 99 ingFileDict = defaultdict(list) 100 for logFile in logfiles: 101 ingFileDict[logFile.ingMonth].append(logFile) 102 ingFileLists = OrderedDict(sorted(ingFileDict.items(), 103 key=lambda t: t[0])).values() 104 105 for i, y in enumerate(ingFileLists): 106 for x in y: 107 print i,x.name 108 109 numprocessors = min(6, len(ingFileLists)) 110 chunksize = int(math.ceil(len(ingFileLists)/float(numprocessors))) 111 procs = [] 112 for i in xrange(numprocessors): 113 if i > 0: 114 time.sleep(20) 115 p = multiprocessing.Process( 116 target=runIngest, 117 args=(ingFileLists[chunksize * i:chunksize * (i + 1)])) 118 119 procs.append(p) 120 p.start() 121 122 # Wait for all worker processes to finish 123 for p in procs: 124 p.join() 125 IngestLogger.addMessage("Finished parallel ingests...") 126 else: 127 ingester = CuIngest( 128 curator=cli.getOpt("curator"), 129 database=cli.getArg("database"), 130 isTrialRun=cli.getOpt("test"), 131 logFileName=cli.getOpt("readlog"), 132 ingestOneLog=cli.getOpt("ingestlog"), 133 cuNums=cli.getOpt("cunums"), 134 forceIngest=cli.getOpt("forceingest"), 135 autoCommit=cli.getOpt("autocommit"), 136 excludedTables=cli.getOpt("excluded"), 137 omitObjIDUpdate=cli.getOpt("omitobjid"), 138 isParallelRun=False, 139 comment=cli.getArg("comment")) 140 ingester.run() 141 142 #------------------------------------------------------------------------------ 143 # Change log: 144 # 145 # 30-Nov-2006, ETWS: First version. 146 # 9-Jan-2007, ETWS: Included IngestCU class. 147 # 30-Jan-2007, ETWS: Bug fixes. 148 # 7-Feb-2007, ETWS: Added specified file ingest option. 149 # 7-Mar-2007, NJC: Replaced ingester with self in onRun(). 150 # 15-Mar-2007, ETWS: Changed Logger to IngestLogger. 151 # 14-Feb-2008, RSC: Updated for new DbSession interface, ensured only scos can 152 # ingest into the WSA, as directory write permission req. 153 # 4-Mar-2008, ETWS: Added possibility to force ingest. 154 # 5-Mar-2008, ETWS: Added a lock file to prevent running clashes. 155 # 17-Jun-2008, ETWS: Enhanced to only ingest for given CU numbers. 156