1   
  2   
  3   
  4  """ 
  5     Compares the entries in the database with the JPG paths and updates 
  6     them accordingly. 
  7   
  8     @author: E. Sutorius 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10  """ 
 11   
 12  from __future__      import print_function 
 13  from   collections import defaultdict 
 14  import mx.DateTime     as mxTime 
 15  import os 
 16   
 17  from   wsatools.CLI                 import CLI 
 18  from   wsatools.DbConnect.DbSession import DbSession 
 19  from   wsatools.File                import File 
 20  import wsatools.FitsUtils               as fits 
 21  from   wsatools.Logger              import Logger, ForLoopMonitor 
 22  import wsatools.DbConnect.CommonQueries as queries 
 23  from   wsatools.SystemConstants     import SystemConstants 
 24   
 26      """Checks JPG paths in DB and updates them. 
 27      """ 
 28       
 29       
 30       
 31      isTrialRun = False                 
 32      userName = DbSession.userName      
 33   
 34       
 35       
 36   
 37      _databases = []        
 38      _missingJpgs = set()   
 39      _updateEntries = defaultdict()   
 40   
 41       
 42   
 52          """ 
 53          @param beginDate:   First date to process, eg. 20050101. 
 54          @type  beginDate:   int 
 55          @param checkDB:     Check the DB for wrong entries of compFile. 
 56          @type  checkDB:     bool 
 57          @param database:    Name of the database to connect to. 
 58          @type  database:    str 
 59          @param endDate:     Last date to process, eg. 20050131. 
 60          @type  endDate:     int 
 61          @param outPath:     Path to directory for produced files 
 62          @type  outPath:     str 
 63          @param updateRelDBs: List of release DBs to update. 
 64          @type  updateRelDBs: list 
 65          @param versionStr:  Version number of the data. 
 66          @type  versionStr:  str 
 67   
 68          """ 
 69          self.database = database 
 70          self.sysc = SystemConstants(self.database.rpartition('.')[2]) 
 71          self.dateReq = (beginDate if beginDate == endDate 
 72                          else '-'.join([beginDate, endDate])) 
 73          beginDate, endDate = \ 
 74              self.sysc.obsCal.getDatesFromInput(beginDate, endDate) 
 75   
 76          self.startDateStr = beginDate 
 77          self.endDateStr = endDate 
 78          self.versionStr = versionStr 
 79          self.startMJD = int(mxTime.strptime(self.startDateStr, "%Y%m%d").mjd) 
 80          self.endMJD = int(mxTime.strptime(self.endDateStr, "%Y%m%d").mjd) 
 81          self.checkDB = checkDB 
 82          self.outPath = outPath 
 83          self.updateRelDBs = updateRelDBs 
 84   
 85          if self.updateRelDBs: 
 86              server, dbs = self.updateRelDBs.split("::") 
 87              databases = set(dbs.split(',') if dbs != "all" else []) 
 88              self._databases = getAvailDBs(self.database, server, 
 89                                            databases) 
 90   
 91          Logger.addMessage("Connecting to master DB: %s" % 
 92                            self.database.rpartition('.')[2]) 
 93          self.masterDb = DbSession(self.database, autoCommit=True, 
 94                                    isTrialRun=self.isTrialRun, 
 95                                    userName=self.userName) 
 96          Logger.addMessage("Getting jpg names...") 
 97          self.masterDbData = self.getMasterData() 
  98   
 99       
100   
102          """ 
103          """ 
104          outFileTimeStamp = ''.join(["_", self.startDateStr, 
105                                      "_", self.endDateStr, 
106                                      "_", self.versionStr]) 
107          outFileSuffix = ''.join(["_", self.database.rpartition('.')[2], 
108                                   outFileTimeStamp]) 
109   
110          if self.checkDB: 
111              jpgDiskData = self.getJpegPaths() 
112              Logger.addMessage("Checking master DB: %s..." % 
113                                self.database.rpartition('.')[2]) 
114              self.checkMasterDb(jpgDiskData) 
115   
116              if self._updateEntries: 
117                  Logger.addMessage("Updating %s..." % 
118                                    self.database.rpartition('.')[2]) 
119                  updateDBFileNames(self.masterDb, self._updateEntries, 
120                                    isTest=self.isTrialRun) 
121   
122              if self._missingJpgs: 
123                  reprocJpgsFile = File(os.path.join( 
124                      self.outPath, "reprocJPGs%s.list" % outFileSuffix)) 
125                  Logger.addMessage(' '.join([str(len(self._missingJpgs)), 
126                                              "files with no JPGs are in", 
127                                              reprocJpgsFile.name])) 
128                  reprocJpgsFile.wopen() 
129                  reprocJpgsFile.writelines(sorted(self._missingJpgs)) 
130                  reprocJpgsFile.close() 
131   
132          if self.updateRelDBs: 
133              for db in self._databases: 
134                  self.updater(db) 
 135   
136       
137   
169   
170       
171   
173          """ Create a dictionary of compressed image dates and their dirs. 
174          """ 
175          self.raidDisks = self.sysc.availableRaidFileSystem() + \ 
176                           self.sysc.deprecatedDataStorage 
177   
178          deprDirs = [] 
179          for direc in self.sysc.deprecatedDataStorage: 
180              for subDir in os.listdir(os.path.join(direc,self.sysc.productsDir)): 
181                  if self.sysc.deprecatedComprImDir+"_20" in \ 
182                         os.path.join(self.sysc.productsDir, subDir): 
183                      deprDirs.append(os.path.join(self.sysc.productsDir, subDir)) 
184   
185          fitsDirs = [self.sysc.compressImDir] + deprDirs 
186   
187          jpgList = fits.FitsList(self.sysc, prefix="CJ_") 
188          jpgList.createFitsDateDict(disklist=self.raidDisks, 
189                                     ingestDirectory=fitsDirs, 
190                                     beginDateStr=self.startDateStr, 
191                                     endDateStr=self.endDateStr, 
192                                     versionStr=self.versionStr, 
193                                     forceLists=True) 
194   
195          jpgDataDict = defaultdict(dict) 
196          for dateVersStr in jpgList.invFitsDateDict: 
197              jpgDiskDict = defaultdict(list) 
198              for jpgDir in jpgList.invFitsDateDict[dateVersStr]: 
199                  for jpgFile in os.listdir(os.path.join(jpgDir, dateVersStr)): 
200                      jpgDiskDict[jpgFile].append(os.path.join( 
201                          jpgDir, dateVersStr, jpgFile)) 
202              jpgDataDict[dateVersStr].update(jpgDiskDict) 
203          return jpgDataDict 
 204   
205       
206   
208          """ Read multiframeIDs, extNum, fileName, compFile from the master DB. 
209          """ 
210          dbJpgsDict = defaultdict(dict) 
211          for mjd in xrange(self.startMJD, self.endMJD+1): 
212              date = mxTime.DateTimeFromMJD(mjd) 
213              dateVersStr = "%04d%02d%02d_v%s" % ( 
214                  date.year, date.month, date.day, self.versionStr) 
215              querySelect = "F.dateVersStr, F.multiframeID, MFD.extNum," \ 
216                            " MFD.compFile, F.fileName" 
217              queryFrom = "MultiframeDetector as MFD, FlatFileLookUp as F" 
218              queryWhere = "F.multiframeID=MFD.multiframeID AND " \ 
219                           "dateVersStr='%s'" % dateVersStr 
220              mfidlist = self.masterDb.query(querySelect, fromStr=queryFrom, 
221                                             whereStr=queryWhere) 
222              for entry in mfidlist: 
223                  dbJpgsDict[entry[0]].update({(entry[1], entry[2]):( 
224                      entry[3].rpartition(':'), entry[4].rpartition(':'))}) 
225          Logger.addMessage("%d files found." % len(mfidlist)) 
226          return dbJpgsDict 
 227   
228       
229   
231          """ Read multiframeIDs, extNum, compFile from the given database. 
232          """ 
233          dbJpgsDict = defaultdict(dict) 
234          for mjd in xrange(self.startMJD, self.endMJD+1): 
235              date = mxTime.DateTimeFromMJD(mjd) 
236              dateVersStr = "%04d%02d%02d_v%s" % ( 
237                  date.year, date.month, date.day, self.versionStr) 
238              querySelect = "'%s', multiframeID, extNum, compFile" % \ 
239                            dateVersStr 
240              queryFrom = "MultiframeDetector" 
241              queryWhere = "compFile like '%%%s%%'" % dateVersStr 
242   
243              mfidlist = archive.query(querySelect, fromStr=queryFrom, 
244                                       whereStr=queryWhere) 
245              for entry in mfidlist: 
246                  dbJpgsDict[entry[0]].update({(entry[1], entry[2]):( 
247                      entry[3].rpartition(':'))}) 
248          return dbJpgsDict 
 249   
250       
251   
253          """ 
254          Update the given database. 
255   
256          @param database: Database to update. 
257          @type  database: str 
258   
259          """ 
260          Logger.addMessage("Connecting to %s" % database) 
261          archive = DbSession(database, autoCommit=True, 
262                      isTrialRun=self.isTrialRun, userName=self.userName) 
263   
264           
265          Logger.addMessage("Getting file names...") 
266          dbJpgsDict = self.getDbData(archive) 
267   
268           
269          newMfidDict = defaultdict() 
270   
271          Logger.addMessage("Checking %s files..." % len(dbJpgsDict)) 
272          progress = ForLoopMonitor(sorted(self.masterDbData)) 
273   
274          for dateVersStr in sorted(dbJpgsDict): 
275              for mfExt in dbJpgsDict[dateVersStr]: 
276                  if self.masterDbData[dateVersStr][mfExt][0][2] != dbJpgsDict[ 
277                          dateVersStr][mfExt][2]: 
278                      newMfidDict[mfExt] = self.masterDbData[dateVersStr][mfExt] 
279              progress.testForOutput() 
280   
281           
282          if newMfidDict: 
283              Logger.addMessage("Updating %s entries in %s..." % 
284                (len(newMfidDict), database)) 
285               
286              if archive.database.upper().startswith(("UKIDSS", 
287                                                      "WORLD", "TRANSIT")): 
288                  path = os.path.join( 
289                      "/mnt", archive.sysc.loadServer, 
290                      "SqlUpdates", "JPGS_PubDB_%s.%s_%sv%s_%s.sql" 
291                      % (archive.server, archive.database, 
292                         self.dateReq, self.versionStr, 
293                         mxTime.localtime().strftime('%Y%m%d_%H%M'))) 
294   
295                  updateFile = File(path) 
296                  updateFile.aopen() 
297              else: 
298                  updateFile = None 
299   
300              updateDBFileNames(archive, newMfidDict, updateFile, 
301                                isTest=self.isTrialRun) 
302              if archive.database.upper().startswith(("UKIDSS", 
303                                                      "WORLD", "TRANSIT")): 
304                  updateFile.close() 
305   
306          else: 
307              Logger.addMessage(database + " is up-to-date.") 
  308   
309   
310   
311   
313      """ 
314      Get all or selected DBs from given server. 
315   
316      @param archive:    The archive to be updated, e.g. WSA. 
317      @type  archive:    str 
318      @param mainServer: The server to query for databases, can be the name or 
319                         the synonym, e.g. dbload or dbpub 
320                         (for all public servers). 
321      @type  mainServer: str 
322      @param databases:  Optionally request specific database names else all are 
323                         returned. 
324      @type  databases:  sequence(str) 
325   
326      @return: List of full paths to databases on selected server. 
327      @rtype:  list(str) 
328   
329      """ 
330      sysc = SystemConstants(archive) 
331   
332       
333      if mainServer == "dbpub": 
334          availDatabases = [] 
335          for server in sysc.publicServers: 
336              pubDBs = set(queries.getAllDBs(server)) 
337              if sysc.isVSA(): 
338                  pubDBs = set([db for db in pubDBs 
339                                if not db.startswith(("UKIDSS", "TRANSIT", 
340                                                      "WORLD"))]) 
341              else: 
342                  pubDBs = set([db for db in pubDBs if not (db.startswith("V") 
343                                or db.endswith("SIAP"))]) 
344              if databases: 
345                  pubDBs.intersection_update(databases) 
346              availDatabases.extend(server+'.'+db for db in pubDBs) 
347   
348       
349      elif mainServer == "dbload": 
350          availDatabases = [sysc.loadServer+'.'+db for db in databases] 
351   
352       
353      elif mainServer in sysc.publicServers + [sysc.loadServer]: 
354          availDatabases = [mainServer+'.'+db for db in databases] 
355      else: 
356          raise SystemExit("<ERROR> %s not known." % mainServer) 
357   
358      return sorted(availDatabases) 
 359   
360   
362      """ 
363      Update the database entries for compFile. 
364   
365      @param archive: Connection to database to update. 
366      @type  archive: DbSession 
367      @param mfDict:  Dictionary giving compFile for every MfID, extNum tuple. 
368      @type  mfDict:  dict(int:list(str)) 
369      @param updateFile: Write to this file for read only DBs. 
370      @type  updateFile: File obj 
371   
372      """ 
373      numRows = 0 
374      for mfID, extNum in sorted(mfDict): 
375           
376          if archive.database.upper().startswith(("UKIDSS", "WORLD", "TRANSIT")): 
377              numRows += 1 
378              updateFile.writetheline( 
379                  "UPDATE MultiframeDetector SET %s WHERE multiframeID=%d " 
380                  "AND extNum=%d" % ( 
381                      "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]), 
382                      mfID, extNum)) 
383          else: 
384              if isTest: 
385                  numRows += 1 
386                  print("UPDATE MultiframeDetector SET %s WHERE multiframeID=%d " 
387                        "AND extNum=%d" % ( 
388                            "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]), 
389                            mfID, extNum)) 
390              else: 
391                  numRows += archive.update( 
392                      "MultiframeDetector", 
393                      "compFile='%s'" % ''.join(mfDict[(mfID, extNum)][0]), 
394                      " AND ".join(["multiframeID=%s" % mfID, "extNum=%s" % \ 
395                                    extNum])) 
396   
397      Logger.addMessage("%s fileNames affected in MultiframeDetector." % numRows) 
 398   
399   
400   
401   
402   
403   
404  if __name__ == "__main__": 
405   
406       
407      CLI.progArgs += [ 
408          CLI.Argument("startdate", "05A", isValOK=CLI.isDateOK), 
409          CLI.Argument("enddate", "05A", isValOK=CLI.isDateOK), 
410          CLI.Argument("version", '1') 
411          ] 
412      CLI.progOpts += [ 
413          CLI.Option('U', 'updatedbs', 
414                     "release DBs to update; 'server::database' where server " 
415                     "can be 'dbload' or 'dbpub' and database a comma-seperated " 
416                     "list of DBs or 'all'", 
417                     "LIST", isValOK=lambda x: "::" in x), 
418          CLI.Option('c', 'check', 
419                     "check and update the master DB for wrong entries of " 
420                     "compFile"), 
421          CLI.Option('o', 'outpath', 
422                     "new destination for produced files", 
423                     "DIR", os.curdir) 
424        ] 
425   
426      cli = CLI(CheckJpgs.__name__, "$Revision: 8674 $", 
427                CheckJpgs.__doc__) 
428      Logger.isVerbose = False 
429      Logger.addMessage(cli.getProgDetails()) 
430   
431      CheckJpgs.isTrialRun = cli.getOpt("test") 
432      CheckJpgs.userName = cli.getOpt("user") 
433      checkJpgs = CheckJpgs( 
434          cli.getArg("database"), 
435          cli.getArg("startdate"), 
436          cli.getArg("enddate"), 
437          cli.getArg("version"), 
438          cli.getOpt("check"), 
439          cli.getOpt("updatedbs"), 
440          cli.getOpt("outpath") 
441          ) 
442      checkJpgs.run() 
443