1
2
3
4 """
5 Ingests products of parallel CU runs by DataBuilder. Contents of all ingest
6 logs found in the dbShare are ingested. Can also be used to only view the
7 content of a specified ingest log.
8
9 @author: Eckhard Sutorius
10 @org: WFAU, IfA, University of Edinburgh
11
12 @newfield contributors: Contributors, Contributors (Alphabetical Order)
13 @contributors: R.S. Collins
14 """
15
16 from collections import defaultdict, OrderedDict
17 import dircache
18 import math
19 import multiprocessing
20 import os
21 import time
22
23 from wsatools.CLI import CLI
24 from wsatools.DbConnect.IngIngester import CuIngest, IngestLogger, \
25 IngestLogFile
26 from wsatools.SystemConstants import SystemConstants
27
28
30 for logFile in logList:
31 ingester = CuIngest(
32 curator=cli.getOpt("curator"),
33 database=cli.getArg("database"),
34 isTrialRun=cli.getOpt("test"),
35 logFileName=logFile.name,
36 ingestOneLog=cli.getOpt("ingestlog"),
37 cuNums=cli.getOpt("cunums"),
38 forceIngest=cli.getOpt("forceingest"),
39 autoCommit=cli.getOpt("autocommit"),
40 excludedTables=cli.getOpt("excluded"),
41 omitObjIDUpdate=True,
42 isParallelRun=True,
43 comment=cli.getArg("comment"))
44 ingester.run()
45
46
47
48
49 if __name__ == '__main__':
50
51
52 CLI.progOpts += [
53 CLI.Option('a', "autocommit",
54 "switch auto-commit on"),
55 CLI.Option('i', "ingestlog",
56 "ingest log file given by 'readlog' option"),
57 CLI.Option('r', "readlog",
58 "only read the given log file (cuXidY_host.log)",
59 "PATHNAME"),
60 CLI.Option('x', "cunums",
61 "csv list of CUs to ingest; CUs can have the format:"
62 " <cuNum>[-<cuEventID>][-<serverName>].",
63 "LIST", ""),
64 CLI.Option('F', "forceingest",
65 "try to ingest log file even when data from the same day "
66 "has been ingested before. Use only if you know what "
67 "you're doing!"),
68 CLI.Option('O', "omitobjid",
69 "Omit updating objIDs. Use with care on the VVV!"),
70 CLI.Option('P', "isparallel",
71 "Ingest monthly VVV data in parallel."),
72 CLI.Option('X', "excluded",
73 "exclude the given tables from ingest ",
74 "LIST", "")]
75
76 cli = CLI(CuIngest, "$Revision: 10184 $")
77 IngestLogger.addMessage(cli.getProgDetails())
78
79 archive = cli.getArg("database")
80 sysc = SystemConstants(archive.split('.')[-1])
81
82 if archive.split('.')[-1] == sysc.loadDatabase \
83 and os.getenv('USER') != 'scos':
84 print "Only scos is allowed to ingest into the %s" % sysc.loadDatabase
85 exit()
86
87 if cli.getOpt("isparallel") and "VVV" in archive \
88 and cli.getOpt("cunums") == '4':
89 IngestLogger.addMessage("Running monthly VVV ingests in parallel...")
90 try:
91 dirList = os.listdir(sysc.dbMntPath())
92 logfiles = [IngestLogFile(os.path.join(sysc.dbMntPath(), f))
93 for f in dirList if f.startswith("cu04") \
94 and f.endswith("%s.log" % archive.partition('.')[2])]
95 except Exception as error:
96 IngestLogger.addExceptionDetails(error)
97 raise SystemExit
98
99 ingFileDict = defaultdict(list)
100 for logFile in logfiles:
101 ingFileDict[logFile.ingMonth].append(logFile)
102 ingFileLists = OrderedDict(sorted(ingFileDict.items(),
103 key=lambda t: t[0])).values()
104
105 for i, y in enumerate(ingFileLists):
106 for x in y:
107 print i,x.name
108
109 numprocessors = min(6, len(ingFileLists))
110 chunksize = int(math.ceil(len(ingFileLists)/float(numprocessors)))
111 procs = []
112 for i in xrange(numprocessors):
113 if i > 0:
114 time.sleep(20)
115 p = multiprocessing.Process(
116 target=runIngest,
117 args=(ingFileLists[chunksize * i:chunksize * (i + 1)]))
118
119 procs.append(p)
120 p.start()
121
122
123 for p in procs:
124 p.join()
125 IngestLogger.addMessage("Finished parallel ingests...")
126 else:
127 ingester = CuIngest(
128 curator=cli.getOpt("curator"),
129 database=cli.getArg("database"),
130 isTrialRun=cli.getOpt("test"),
131 logFileName=cli.getOpt("readlog"),
132 ingestOneLog=cli.getOpt("ingestlog"),
133 cuNums=cli.getOpt("cunums"),
134 forceIngest=cli.getOpt("forceingest"),
135 autoCommit=cli.getOpt("autocommit"),
136 excludedTables=cli.getOpt("excluded"),
137 omitObjIDUpdate=cli.getOpt("omitobjid"),
138 isParallelRun=False,
139 comment=cli.getArg("comment"))
140 ingester.run()
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156