1
2
3
4 """
5 Fills or updates the FlatFileLookUp table from Multiframe.
6
7 @author: E. Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9
10 @newfield contributors: Contributors, Contributors (Alphabetical Order)
11 @contributors: R.S. Collins
12 """
13
14 from collections import defaultdict
15 import os
16
17 from wsatools.CLI import CLI
18 import wsatools.CSV as csv
19 from wsatools.DbConnect.DbSession import Ingester
20 from wsatools.DbConnect.IngCuSession import IngCuSession
21 import wsatools.DbConnect.Schema as schema
22 from wsatools.Logger import Logger
23
24
26 """ Fill or update the FlatFileLookUp table from Multiframe.
27 """
28
29 _autoCommit = True
30 _ingestSchema = "WSA_CurationLogsSchema.sql"
31 _tableName = 'FlatFileLookUp'
32 _checkIngestDateDef = str(IngCuSession.beginDateDef) + "_v1"
33
34
35
44 """
45 @param curator: Name of curator.
46 @type curator: str
47 @param database: Name of the database to connect to.
48 @type database: str
49 @param ingestOnly: If given, only ingest the existing file.
50 @type ingestOnly: bool
51 @param isTrialRun: If True, do not perform database modifications,
52 just print the SQL statement to the terminal.
53 @type isTrialRun: bool
54 @param writeOnly: If given only write to file, don't ingest.
55 @type writeOnly: bool
56 @param updateOnly: If given only update the DB.
57 @type updateOnly: bool
58 @param checkIngest: Check ingest for given date-version directory.
59 @type checkIngest: str
60
61 """
62
63 super(SyncFlatFileLookUp, self).__init__(
64 cuNum=0, curator=curator, comment="Updating FlatFileLookUp",
65 reqWorkDir=False, database=database,
66 autoCommit=SyncFlatFileLookUp._autoCommit,
67 isTrialRun=isTrialRun)
68
69 self.writeOnly = writeOnly
70 self.ingestOnly = ingestOnly
71 self.updateOnly = updateOnly
72
73 if not any([self.writeOnly, self.ingestOnly, self.updateOnly]):
74 self.writeOnly = True
75 self.updateOnly = True
76
77 self.fullUpdate = fullUpdate
78 self.checkIngestDate = checkIngest
79
80 self.ingestFileName = self.sysc.dbSharePath(
81 "%s_fflu.csv" % database.rpartition('.')[2])
82 self.updateFileName = self.sysc.dbSharePath(
83 "%s_fflu.update.csv" % database.rpartition('.')[2])
84 print self.ingestFileName,self.updateFileName
85
86
87
88 - def getData(self, tableName, where=''):
89 """
90 Get multiframeID, cuEventID, fileName from table.
91
92 @param tableName: The table from where the data is taken.
93 @type tableName: str
94 @param where: Data sample selection.
95 @type where: str
96
97 """
98 dataDict = defaultdict(tuple)
99 nameToMfID = defaultdict(list)
100 for mfID, cuEventID, fileName in \
101 self.archive.query("multiframeID, cuEventID, fileName",
102 tableName, where):
103 dateVersStr = (os.path.basename(os.path.dirname(fileName))
104 if os.path.dirname(fileName) else "NONE")
105 if self.fullUpdate:
106 dataDict[mfID] += (cuEventID, dateVersStr, fileName)
107 else:
108 dataDict[mfID] += (dateVersStr, fileName)
109 nameToMfID[fileName].append(mfID)
110 nameToMfID[mfID].append(fileName)
111 return dataDict, nameToMfID
112
113
114
116 try:
117 self._connectToDb()
118 ingestDataDict = defaultdict(tuple)
119 updateDataDict = defaultdict(tuple)
120
121
122
123 if self.checkIngestDate != SyncFlatFileLookUp._checkIngestDateDef:
124
125 Logger.addMessage("Getting data from FlatFileLookUp...")
126 ffluDataDict, ffluNameToMfID = self.getData("FlatFileLookUp",
127 where="dateVersStr=%r" % self.checkIngestDate)
128 Logger.addMessage(
129 "%s entries in FlatFileLookUp" % len(ffluDataDict))
130
131
132 Logger.addMessage("Getting data from Multiframe...")
133 mfDataDict, mfNameToMfID = self.getData("Multiframe",
134 where="fileName LIKE %r" % self.checkIngestDate)
135 Logger.addMessage("%s entries in Multiframe" % len(mfDataDict))
136 Logger.addMessage(
137 "Comparing Multiframe with FlatFileLookUp...")
138 for mfID in sorted(set(ffluDataDict) - set(mfDataDict)):
139 print "%s: %s" % (mfID, ffluDataDict[mfID][2])
140
141 elif not self.ingestOnly or self.writeOnly:
142 Logger.addMessage("Getting data from FlatFileLookUp...")
143 ffluDataDict, ffluNameToMfID = self.getData("FlatFileLookUp")
144 if not self.updateOnly:
145 Logger.addMessage("Getting data from Multiframe...")
146 mfDataDict, mfNameToMfID = self.getData("Multiframe")
147
148
149 if self.writeOnly:
150
151 Logger.addMessage("Getting data from FlatFileLookUp...")
152 Logger.addMessage(
153 "%s files in FlatFileLookUp." % len(ffluDataDict))
154
155 Logger.addMessage("Getting data from Multiframe...")
156 Logger.addMessage("%s files in Multiframe." % len(mfDataDict))
157 Logger.addMessage("Comparing FlatFileLookUp with Multiframe...")
158 if not ffluDataDict:
159 ingestDataDict.update(mfDataDict)
160 else:
161 for mfID in sorted(mfDataDict):
162 if mfID not in ffluNameToMfID \
163 and mfDataDict[mfID][1] not in ffluNameToMfID:
164 Logger.addMessage(
165 "mfID: %s not in ffluDataDict" % mfID,
166 alwaysLog=False)
167 ingestDataDict[mfID] += mfDataDict[mfID]
168 elif mfDataDict[mfID] != ffluDataDict[mfID]:
169 updateDataDict[mfID] += mfDataDict[mfID]
170 elif ffluNameToMfID[mfDataDict[mfID][1]] != \
171 mfNameToMfID[mfDataDict[mfID][1]]:
172 updateDataDict[
173 mfNameToMfID[mfDataDict[mfID][1]][0]] = \
174 tuple(mfNameToMfID[mfID])
175
176
177 if os.path.exists(self.ingestFileName):
178 os.remove(self.ingestFileName)
179 if os.path.exists(self.updateFileName):
180 os.remove(self.updateFileName)
181
182 if ingestDataDict:
183 Logger.addMessage(
184 "Writing data into " + self.ingestFileName)
185
186 csv.File(self.ingestFileName, 'w').writelines(
187 (mfID,) + ingestDataDict[mfID]
188 for mfID in sorted(ingestDataDict))
189
190 if updateDataDict:
191 Logger.addMessage(
192 "Writing data into " + self.updateFileName)
193
194 csv.File(self.updateFileName, 'w').writelines(
195 (mfID,) + updateDataDict[mfID]
196 for mfID in sorted(updateDataDict))
197
198
199 if self.ingestOnly:
200 if os.path.exists(self.ingestFileName):
201 Logger.addMessage("Ingesting into table FlatFileLookUp...")
202 try:
203 Ingester(self.archive,
204 schema.parseTables(SyncFlatFileLookUp._ingestSchema,
205 [SyncFlatFileLookUp._tableName])
206 ).ingestTable(SyncFlatFileLookUp._tableName,
207 self.ingestFileName,
208 isCsv=True, deleteFile=False)
209
210 except schema.MismatchError as error:
211 raise SyncFlatFileLookUp.IngCuError(error)
212 else:
213 Logger.addMessage("Ingest file not found: %s" % \
214 self.ingestFileName)
215
216 if self.updateOnly and os.path.exists(self.updateFileName):
217 Logger.addMessage("Updating table FlatFileLookUp...")
218 counter = 0
219 for attributes in csv.File(self.updateFileName):
220
221 if int(attributes[0]) in ffluNameToMfID:
222 updateAttributes = [
223 ("dateVersStr", repr(attributes[-2])),
224 ("fileName", repr(attributes[-1]))]
225 if len(attributes) == 4:
226 updateAttributes = [
227 ("cuEventID", str(attributes[-3]))] \
228 + updateAttributes
229 num = self.archive.updateEntries(
230 SyncFlatFileLookUp._tableName,
231 updateAttributes, [("multiframeID",
232 int(attributes[0]))])
233 counter += num
234
235 elif attributes[-1] in ffluNameToMfID:
236 updateAttributes = [
237 ("dateVersStr", repr(attributes[-2])),
238 ("multiframeID", str(attributes[0]))]
239 if len(attributes) == 4:
240 updateAttributes = [
241 ("cuEventID", str(attributes[-3]))] \
242 + updateAttributes
243 num = self.archive.updateEntries(
244 SyncFlatFileLookUp._tableName,
245 updateAttributes, [("fileName",
246 attributes[-1])])
247 counter += num
248 Logger.addMessage("Updated %d entries in FlatFileLookUp." %\
249 counter)
250
251 finally:
252 self._disconnectFromDb()
253
254
255
256
257
258 if __name__ == '__main__':
259
260
261 CLI.progArgs.remove('comment')
262 CLI.progOpts += [
263 CLI.Option('W', 'writeonly',
264 "only write to file"),
265 CLI.Option('I', 'ingestonly',
266 "only ingest existing file '<DB>_fflu.csv'"),
267 CLI.Option('U', 'updateonly',
268 "only update from file '<DB>_fflu.update.csv'"),
269 CLI.Option('N', 'nonverbose',
270 "don't print missing multiframeIDs"),
271 CLI.Option('f', 'fullupdate', "also check and update cuEventID"),
272 CLI.Option('C', 'checkingest', "check CU3 ingests",
273 "DATEVERSSTR", str(IngCuSession.beginDateDef) + "_v1")]
274
275 cli = CLI(SyncFlatFileLookUp, "$Revision: 9344 $")
276 Logger.isVerbose = not cli.getOpt('nonverbose')
277 Logger.addMessage(cli.getProgDetails())
278
279 cu = SyncFlatFileLookUp(cli.getArg('database'),
280 cli.getOpt('curator'),
281 isTrialRun=cli.getOpt('test'),
282 writeOnly=cli.getOpt('writeonly'),
283 ingestOnly=cli.getOpt('ingestonly'),
284 updateOnly=cli.getOpt('updateonly'),
285 fullUpdate=cli.getOpt('fullupdate'),
286 checkIngest=cli.getOpt('checkingest'))
287 cu.run()
288
289
290
291
292
293
294
295
296
297
298
299
300
301