1
2
3
4 """
5 Compares the entries in the database with the JPG paths and updates
6 them accordingly.
7
8 @author: E. Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10 """
11
12 from __future__ import print_function
13 from collections import defaultdict
14 import mx.DateTime as mxTime
15 import os
16
17 from wsatools.CLI import CLI
18 from wsatools.DbConnect.DbSession import DbSession
19 from wsatools.File import File
20 import wsatools.FitsUtils as fits
21 from wsatools.Logger import Logger, ForLoopMonitor
22 import wsatools.DbConnect.CommonQueries as queries
23 from wsatools.SystemConstants import SystemConstants
24
26 """Checks JPG paths in DB and updates them.
27 """
28
29
30
31 isTrialRun = False
32 userName = DbSession.userName
33
34
35
36
37 _databases = []
38 _missingJpgs = set()
39 _updateEntries = defaultdict()
40
41
42
52 """
53 @param beginDate: First date to process, eg. 20050101.
54 @type beginDate: int
55 @param checkDB: Check the DB for wrong entries of compFile.
56 @type checkDB: bool
57 @param database: Name of the database to connect to.
58 @type database: str
59 @param endDate: Last date to process, eg. 20050131.
60 @type endDate: int
61 @param outPath: Path to directory for produced files
62 @type outPath: str
63 @param updateRelDBs: List of release DBs to update.
64 @type updateRelDBs: list
65 @param versionStr: Version number of the data.
66 @type versionStr: str
67
68 """
69 self.database = database
70 self.sysc = SystemConstants(self.database.rpartition('.')[2])
71 self.dateReq = (beginDate if beginDate == endDate
72 else '-'.join([beginDate, endDate]))
73 beginDate, endDate = \
74 self.sysc.obsCal.getDatesFromInput(beginDate, endDate)
75
76 self.startDateStr = beginDate
77 self.endDateStr = endDate
78 self.versionStr = versionStr
79 self.startMJD = int(mxTime.strptime(self.startDateStr, "%Y%m%d").mjd)
80 self.endMJD = int(mxTime.strptime(self.endDateStr, "%Y%m%d").mjd)
81 self.checkDB = checkDB
82 self.outPath = outPath
83 self.updateRelDBs = updateRelDBs
84
85 if self.updateRelDBs:
86 server, dbs = self.updateRelDBs.split("::")
87 databases = set(dbs.split(',') if dbs != "all" else [])
88 self._databases = getAvailDBs(self.database, server,
89 databases)
90
91 Logger.addMessage("Connecting to master DB: %s" %
92 self.database.rpartition('.')[2])
93 self.masterDb = DbSession(self.database, autoCommit=True,
94 isTrialRun=self.isTrialRun,
95 userName=self.userName)
96 Logger.addMessage("Getting jpg names...")
97 self.masterDbData = self.getMasterData()
98
99
100
102 """
103 """
104 outFileTimeStamp = ''.join(["_", self.startDateStr,
105 "_", self.endDateStr,
106 "_", self.versionStr])
107 outFileSuffix = ''.join(["_", self.database.rpartition('.')[2],
108 outFileTimeStamp])
109
110 if self.checkDB:
111 jpgDiskData = self.getJpegPaths()
112 Logger.addMessage("Checking master DB: %s..." %
113 self.database.rpartition('.')[2])
114 self.checkMasterDb(jpgDiskData)
115
116 if self._updateEntries:
117 Logger.addMessage("Updating %s..." %
118 self.database.rpartition('.')[2])
119 updateDBFileNames(self.masterDb, self._updateEntries,
120 isTest=self.isTrialRun)
121
122 if self._missingJpgs:
123 reprocJpgsFile = File(os.path.join(
124 self.outPath, "reprocJPGs%s.list" % outFileSuffix))
125 Logger.addMessage(' '.join([str(len(self._missingJpgs)),
126 "files with no JPGs are in",
127 reprocJpgsFile.name]))
128 reprocJpgsFile.wopen()
129 reprocJpgsFile.writelines(sorted(self._missingJpgs))
130 reprocJpgsFile.close()
131
132 if self.updateRelDBs:
133 for db in self._databases:
134 self.updater(db)
135
136
137
169
170
171
173 """ Create a dictionary of compressed image dates and their dirs.
174 """
175 self.raidDisks = self.sysc.availableRaidFileSystem() + \
176 self.sysc.deprecatedDataStorage
177
178 deprDirs = []
179 for direc in self.sysc.deprecatedDataStorage:
180 for subDir in os.listdir(os.path.join(direc,self.sysc.productsDir)):
181 if self.sysc.deprecatedComprImDir+"_20" in \
182 os.path.join(self.sysc.productsDir, subDir):
183 deprDirs.append(os.path.join(self.sysc.productsDir, subDir))
184
185 fitsDirs = [self.sysc.compressImDir] + deprDirs
186
187 jpgList = fits.FitsList(self.sysc, prefix="CJ_")
188 jpgList.createFitsDateDict(disklist=self.raidDisks,
189 ingestDirectory=fitsDirs,
190 beginDateStr=self.startDateStr,
191 endDateStr=self.endDateStr,
192 versionStr=self.versionStr,
193 forceLists=True)
194
195 jpgDataDict = defaultdict(dict)
196 for dateVersStr in jpgList.invFitsDateDict:
197 jpgDiskDict = defaultdict(list)
198 for jpgDir in jpgList.invFitsDateDict[dateVersStr]:
199 for jpgFile in os.listdir(os.path.join(jpgDir, dateVersStr)):
200 jpgDiskDict[jpgFile].append(os.path.join(
201 jpgDir, dateVersStr, jpgFile))
202 jpgDataDict[dateVersStr].update(jpgDiskDict)
203 return jpgDataDict
204
205
206
208 """ Read multiframeIDs, extNum, fileName, compFile from the master DB.
209 """
210 dbJpgsDict = defaultdict(dict)
211 for mjd in xrange(self.startMJD, self.endMJD+1):
212 date = mxTime.DateTimeFromMJD(mjd)
213 dateVersStr = "%04d%02d%02d_v%s" % (
214 date.year, date.month, date.day, self.versionStr)
215 querySelect = "F.dateVersStr, F.multiframeID, MFD.extNum," \
216 " MFD.compFile, F.fileName"
217 queryFrom = "MultiframeDetector as MFD, FlatFileLookUp as F"
218 queryWhere = "F.multiframeID=MFD.multiframeID AND " \
219 "dateVersStr='%s'" % dateVersStr
220 mfidlist = self.masterDb.query(querySelect, fromStr=queryFrom,
221 whereStr=queryWhere)
222 for entry in mfidlist:
223 dbJpgsDict[entry[0]].update({(entry[1], entry[2]):(
224 entry[3].rpartition(':'), entry[4].rpartition(':'))})
225 Logger.addMessage("%d files found." % len(mfidlist))
226 return dbJpgsDict
227
228
229
231 """ Read multiframeIDs, extNum, compFile from the given database.
232 """
233 dbJpgsDict = defaultdict(dict)
234 for mjd in xrange(self.startMJD, self.endMJD+1):
235 date = mxTime.DateTimeFromMJD(mjd)
236 dateVersStr = "%04d%02d%02d_v%s" % (
237 date.year, date.month, date.day, self.versionStr)
238 querySelect = "'%s', multiframeID, extNum, compFile" % \
239 dateVersStr
240 queryFrom = "MultiframeDetector"
241 queryWhere = "compFile like '%%%s%%'" % dateVersStr
242
243 mfidlist = archive.query(querySelect, fromStr=queryFrom,
244 whereStr=queryWhere)
245 for entry in mfidlist:
246 dbJpgsDict[entry[0]].update({(entry[1], entry[2]):(
247 entry[3].rpartition(':'))})
248 return dbJpgsDict
249
250
251
253 """
254 Update the given database.
255
256 @param database: Database to update.
257 @type database: str
258
259 """
260 Logger.addMessage("Connecting to %s" % database)
261 archive = DbSession(database, autoCommit=True,
262 isTrialRun=self.isTrialRun, userName=self.userName)
263
264
265 Logger.addMessage("Getting file names...")
266 dbJpgsDict = self.getDbData(archive)
267
268
269 newMfidDict = defaultdict()
270
271 Logger.addMessage("Checking %s files..." % len(dbJpgsDict))
272 progress = ForLoopMonitor(sorted(self.masterDbData))
273
274 for dateVersStr in sorted(dbJpgsDict):
275 for mfExt in dbJpgsDict[dateVersStr]:
276 if self.masterDbData[dateVersStr][mfExt][0][2] != dbJpgsDict[
277 dateVersStr][mfExt][2]:
278 newMfidDict[mfExt] = self.masterDbData[dateVersStr][mfExt]
279 progress.testForOutput()
280
281
282 if newMfidDict:
283 Logger.addMessage("Updating %s entries in %s..." %
284 (len(newMfidDict), database))
285
286 if archive.database.upper().startswith(("UKIDSS",
287 "WORLD", "TRANSIT")):
288 path = os.path.join(
289 "/mnt", archive.sysc.loadServer,
290 "SqlUpdates", "JPGS_PubDB_%s.%s_%sv%s_%s.sql"
291 % (archive.server, archive.database,
292 self.dateReq, self.versionStr,
293 mxTime.localtime().strftime('%Y%m%d_%H%M')))
294
295 updateFile = File(path)
296 updateFile.aopen()
297 else:
298 updateFile = None
299
300 updateDBFileNames(archive, newMfidDict, updateFile,
301 isTest=self.isTrialRun)
302 if archive.database.upper().startswith(("UKIDSS",
303 "WORLD", "TRANSIT")):
304 updateFile.close()
305
306 else:
307 Logger.addMessage(database + " is up-to-date.")
308
309
310
311
313 """
314 Get all or selected DBs from given server.
315
316 @param archive: The archive to be updated, e.g. WSA.
317 @type archive: str
318 @param mainServer: The server to query for databases, can be the name or
319 the synonym, e.g. dbload or dbpub
320 (for all public servers).
321 @type mainServer: str
322 @param databases: Optionally request specific database names else all are
323 returned.
324 @type databases: sequence(str)
325
326 @return: List of full paths to databases on selected server.
327 @rtype: list(str)
328
329 """
330 sysc = SystemConstants(archive)
331
332
333 if mainServer == "dbpub":
334 availDatabases = []
335 for server in sysc.publicServers:
336 pubDBs = set(queries.getAllDBs(server))
337 if sysc.isVSA():
338 pubDBs = set([db for db in pubDBs
339 if not db.startswith(("UKIDSS", "TRANSIT",
340 "WORLD"))])
341 else:
342 pubDBs = set([db for db in pubDBs if not (db.startswith("V")
343 or db.endswith("SIAP"))])
344 if databases:
345 pubDBs.intersection_update(databases)
346 availDatabases.extend(server+'.'+db for db in pubDBs)
347
348
349 elif mainServer == "dbload":
350 availDatabases = [sysc.loadServer+'.'+db for db in databases]
351
352
353 elif mainServer in sysc.publicServers + [sysc.loadServer]:
354 availDatabases = [mainServer+'.'+db for db in databases]
355 else:
356 raise SystemExit("<ERROR> %s not known." % mainServer)
357
358 return sorted(availDatabases)
359
360
362 """
363 Update the database entries for compFile.
364
365 @param archive: Connection to database to update.
366 @type archive: DbSession
367 @param mfDict: Dictionary giving compFile for every MfID, extNum tuple.
368 @type mfDict: dict(int:list(str))
369 @param updateFile: Write to this file for read only DBs.
370 @type updateFile: File obj
371
372 """
373 numRows = 0
374 for mfID, extNum in sorted(mfDict):
375
376 if archive.database.upper().startswith(("UKIDSS", "WORLD", "TRANSIT")):
377 numRows += 1
378 updateFile.writetheline(
379 "UPDATE MultiframeDetector SET %s WHERE multiframeID=%d "
380 "AND extNum=%d" % (
381 "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]),
382 mfID, extNum))
383 else:
384 if isTest:
385 numRows += 1
386 print("UPDATE MultiframeDetector SET %s WHERE multiframeID=%d "
387 "AND extNum=%d" % (
388 "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]),
389 mfID, extNum))
390 else:
391 numRows += archive.update(
392 "MultiframeDetector",
393 "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]),
394 " AND ".join(["multiframeID=%s" % mfID, "extNum=%s" % \
395 extNum]))
396
397 Logger.addMessage("%s fileNames affected in MultiframeDetector." % numRows)
398
399
400
401
402
403
404 if __name__ == "__main__":
405
406
407 CLI.progArgs += [
408 CLI.Argument("startdate", "05A", isValOK=CLI.isDateOK),
409 CLI.Argument("enddate", "05A", isValOK=CLI.isDateOK),
410 CLI.Argument("version", '1')
411 ]
412 CLI.progOpts += [
413 CLI.Option('U', 'updatedbs',
414 "release DBs to update; 'server::database' where server "
415 "can be 'dbload' or 'dbpub' and database a comma-seperated "
416 "list of DBs or 'all'",
417 "LIST", isValOK=lambda x: "::" in x),
418 CLI.Option('c', 'check',
419 "check and update the master DB for wrong entries of "
420 "compFile"),
421 CLI.Option('o', 'outpath',
422 "new destination for produced files",
423 "DIR", os.curdir)
424 ]
425
426 cli = CLI(CheckJpgs.__name__, "$Revision: 8674 $",
427 CheckJpgs.__doc__)
428 Logger.isVerbose = False
429 Logger.addMessage(cli.getProgDetails())
430
431 CheckJpgs.isTrialRun = cli.getOpt("test")
432 CheckJpgs.userName = cli.getOpt("user")
433 checkJpgs = CheckJpgs(
434 cli.getArg("database"),
435 cli.getArg("startdate"),
436 cli.getArg("enddate"),
437 cli.getArg("version"),
438 cli.getOpt("check"),
439 cli.getOpt("updatedbs"),
440 cli.getOpt("outpath")
441 )
442 checkJpgs.run()
443