Package invocations :: Package cu0 :: Module DataDaemon
[hide private]

Source Code for Module invocations.cu0.DataDaemon

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: DataDaemon.py 10227 2014-02-27 11:19:44Z EckhardSutorius $ 
   4  """ 
   5     Daemon for data transfer and ingest. 
   6   
   7     @author: Eckhard Sutorius 
   8     @org:    WFAU, IfA, University of Edinburgh 
   9  """ 
  10  #------------------------------------------------------------------------------ 
  11  from __future__    import print_function, division 
  12  from   collections import defaultdict, namedtuple 
  13  import inspect 
  14  import mx.DateTime     as mxTime 
  15  from   numpy       import array 
  16  import os 
  17  import re 
  18  from   subprocess  import Popen, PIPE 
  19  import time 
  20   
  21  from   invocations.cu1.cu1Transfer import CASUQueries 
  22   
  23  from   wsatools.CLI                    import CLI 
  24  from   wsatools.DbConnect.DbSession    import DbSession 
  25  from   wsatools.DbConnect.IngIngester  import CuIngest 
  26  from   wsatools.File                   import File 
  27  import wsatools.FitsUtils                  as fits 
  28  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  29  from   wsatools.Logger                 import Logger 
  30  import wsatools.DbConnect.CommonQueries    as queries 
  31  from   wsatools.SystemConstants        import SystemConstants 
  32  import wsatools.Utilities                  as utils 
  33  #------------------------------------------------------------------------------ 
  34   
35 -class DataDaemon(IngCuSession):
36 """Top level function to invoke ingests. 37 """ 38 _defServerNames = set(SystemConstants.curationServers) 39 _defCuNums = range(5) 40 41 _defDateRegExpStr = "20[01][90123].*" 42 builderloc = "DataBuilder.py" 43 ingesterloc = ["IngestCUFiles.py", "DataDaemon.py -I"] 44 otherculoc = "cu" 45 othercodeloc = ["AutoCurate.py", "FinaliseFlatFileIngest.py", 46 "DataDaemon.py -M", "NonSurveyRelease.py", 47 "UpdateTilePawPrintTables.py"] 48 casuDisk = None 49 maxLoad = 3.0 50 fullCommand = False 51 reqWorkDir = False 52 waitMins = 10 53 force = False 54 inclTestDb = False 55 omitObjIDUpdate = False 56
57 - def __init__(self, 58 info=CLI.getOptDef("info"), 59 curator=CLI.getOptDef("curator"), 60 database=DbSession.database, 61 serverNames=_defServerNames, 62 janet=CLI.getOptDef("janet"), 63 cuNums=CLI.getOptDef("cunums"), 64 casuDisks=CLI.getOptDef("casudisks"), 65 dateRegExpStr=CLI.getOptDef("timere"), 66 comment=CLI.getArgDef("comment")):
67 """ 68 @param casuDisks: Disk at CASU where the data is stored. 69 @type casuDisks: str 70 @param comment: Descriptive comment as to why curation task is 71 being performed. 72 @type comment: str 73 @param cuNums: Curation task numbers. 74 @type cuNums: list[int] 75 @param curator: Name of curator. 76 @type curator: str 77 @param database: Name of the database to connect to. 78 @type database: str 79 @param info: Only print status of servers, don't calculate usage. 80 @type info: bool 81 @param janet: Use JANET instead of UKLight. 82 @type janet: bool 83 @param serverNames: List of servers to check, or exclude server with 84 preceding '-'. 85 @type serverNames: set(str) 86 @param dateRegExpStr: Regular expression for the dates. 87 @type dateRegExpStr: str 88 89 @todo: This class constructor does not respect its parent class's 90 constructor, so the class hierarchy needs reworking. 91 """ 92 super(DataDaemon, self).__init__(cuNum=0, 93 curator=curator, 94 comment=comment, 95 reqWorkDir=False, 96 keepWorkDir=False, 97 database=database, 98 autoCommit=False, 99 isTrialRun=False) 100 101 typeTranslation = {"info":str, 102 "curator":str, 103 "serverNames":set, 104 "janet":bool, 105 "cuNums":list, 106 "casuDisks":list, 107 "dateRegExpStr":str} 108 109 super(DataDaemon, self).attributesFromArguments( 110 inspect.getargspec(DataDaemon.__init__)[0], locals(), 111 types=typeTranslation) 112 113 if any(serverName.startswith('-') for serverName in self.serverNames): 114 self.serverNames = DataDaemon._defServerNames \ 115 - set(serverName.lstrip('-') for serverName in self.serverNames) 116 117 wrongServers = set() 118 for server in self.serverNames: 119 if server not in DataDaemon._defServerNames: 120 print("<Warning> Server " + server + " not found!") 121 wrongServers.add(server) 122 self.serverNames -= wrongServers 123 124 self.UKLight = not janet
125 126 #-------------------------------------------------------------------------- 127
128 - def getCron(self, serverList):
129 """ 130 Get the crontab information from all servers. 131 132 @param serverList: List containing servers. 133 @type serverList: list(str) 134 135 """ 136 for serverName in sorted(serverList): 137 print('%s:' % serverName) 138 print('=' * (len(serverName) + 1)) 139 try: 140 cmd = self.makeSysCmd(serverName, "crontab -l") 141 for line in self.runSysCmd(cmd)[0]: 142 print(line.strip()) 143 print() 144 145 except Exception as error: 146 print(error)
147 148 #-------------------------------------------------------------------------- 149
150 - def getCUs(self):
151 """ 152 Get the CUed information from all fits directories. 153 154 @return: Dictionary of CU numbers and associated fits dirs. 155 @rtype: dict(int:set(str)) 156 157 """ 158 disks = self.sysc.availableRaidFileSystem() 159 cuedDirs = defaultdict(set) 160 for dirName, dateDirs in fits.getAllPaths(disks): 161 for dateDir in dateDirs: 162 dirPath = os.path.join(dirName, dateDir) 163 for cuNum in range(1, 5): 164 cuedFileName = "CU%02dED_%s" % \ 165 (cuNum, self.database.rpartition('.')[2]) 166 if os.path.exists(os.path.join(dirPath, cuedFileName)): 167 cuedDirs[cuNum].add(dirPath) 168 return cuedDirs
169 170 #-------------------------------------------------------------------------- 171
172 - def getDBOverview(self, dbDict, archive=None):
173 """ 174 Print a list of all DBs on given server group (dbload or dbpub). 175 176 @param dbDict: Dictionary of DB informations. 177 @type dbDict: dict(str:list(str)) 178 179 """ 180 archive = archive or self.sysc.loadDatabase 181 self.availPubDBs = {} 182 serverList = [] 183 getAll = self.force or archive == "EXT" 184 serverList = ["ramses%d" % i for i in range(1, 15) if i != 12] 185 serverList = utils.naturalSorted(set(serverList)) 186 187 for hostname in serverList: 188 self.availPubDBs[hostname] = queries.getAllDBs( 189 hostname, includeTestDBs=self.inclTestDb, 190 archive=("ALL" if self.force else archive)) 191 192 for serverName in serverList: 193 if self.availPubDBs[serverName]: 194 print(''.join([serverName, ':'])) 195 print(', '.join(sorted(self.availPubDBs[serverName])))
196 197 #-------------------------------------------------------------------------- 198
199 - def getDateList(self, begin, end, version):
200 """ 201 Create a dateVersStr list. 202 203 @param begin: Begin date. 204 @type begin: int 205 @param end: End date. 206 @type end: int 207 @param version: Version number. 208 @type version: str 209 @return: List of dateVersStr. 210 @rtype: list(str) 211 212 """ 213 startMJD = int(mxTime.strptime(str(begin), "%Y%m%d").mjd) 214 endMJD = int(mxTime.strptime(str(end), "%Y%m%d").mjd) 215 dateList = [] 216 for mjd in xrange(startMJD, endMJD + 1): 217 date = mxTime.DateTimeFromMJD(mjd) 218 dateVersStr = "%04d%02d%02d_v%s" % (date.year, date.month, 219 date.day, version) 220 dateList.append(dateVersStr) 221 return dateList
222 223 #-------------------------------------------------------------------------- 224
225 - def getPendingIngestDates(self, cuNum):
226 """ 227 Get the DateVersStr from the ingest log files. 228 229 @param cuNum: CU number. 230 @type cuNum: int 231 232 @return: Dictionary containing dates per cuID. 233 @rtype: dict(str:list(str)) 234 235 """ 236 pendingIngestDates = [] 237 cmd = "grep -h -d skip -e 'Running CU0/' " \ 238 + self.sysc.dbSharePath("cu0%sid*_%s.log*" 239 % (cuNum, self.sysc.loadDatabase)) 240 241 result = self.runSysCmd(cmd, showErrors=False)[0] 242 for line in result: 243 words = line.rstrip("'").split(' ') 244 cuid = words[1].rpartition('/')[2] 245 if cuid == "CU%1u" % cuNum: 246 pendingIngestDates.append(words[2]) 247 else: 248 print("CUid differs:", cuid , "expected, got", "CU%1u" % cuNum) 249 return pendingIngestDates
250 251 #-------------------------------------------------------------------------- 252
253 - def getPsStats(self, serverName, searchTerms):
254 """ 255 Get stats from ps command on given server. 256 257 @param searchTerms: List of jobs to search for. 258 @type searchTerms: list(str) 259 @return: List of (job, etime) pairs. 260 @rtype: list((st, str)) 261 262 """ 263 cmd = self.makeSysCmd(serverName, ''.join( 264 ['ps -u scos -o etime,pid,cmd | ', 265 'grep "[0-9] python" | ', 266 'grep -E "%s"' % '|'.join(searchTerms)])) 267 result = self.runSysCmd(cmd)[0] 268 269 resdict = defaultdict(dict) 270 for entry in result: 271 timepid, pycmd = entry.strip().partition("python ")[::2] 272 time, pypid = timepid.split() 273 pypc = "%s-%s:::%s" % (serverName, pypid, pycmd) 274 if int(pypid) != os.getpid() \ 275 and self.database.partition('.')[2] in pycmd: 276 if time in resdict[pypc]: 277 resdict[pypc][time] += 1 278 else: 279 resdict[pypc][time] = 1 280 stats = [] 281 for pypc in resdict: 282 threadlist = sorted([x.strip() + "[*%d]" % resdict[pypc][x] 283 for x in resdict[pypc]]) 284 pypid, pycmd = pypc.partition(":::")[::2] 285 stats.append( 286 (pycmd.split(' '), pypid, 287 threadlist[0].split('[')[0], threadlist)) 288 return stats
289 290 #-------------------------------------------------------------------------- 291
292 - def getServerJobs(self, serverDict):
293 """ 294 Get jobs ran by scos on given servers. 295 296 @param serverDict: Rated dictionary containing servers. 297 @type serverDict: dict 298 @return: Dictionary of dictionaries of CU statistics on each server. 299 @rtype: dict(str:dict(str:list)) 300 301 """ 302 dataBuilderJobs = {} 303 for serverName in serverDict: 304 dataBuilderStats = defaultdict(list) 305 try: 306 searchTerms = DataDaemon.othercodeloc + \ 307 DataDaemon.ingesterloc + \ 308 [DataDaemon.builderloc, 309 "%s[0-9]" % DataDaemon.otherculoc] 310 dbsStats = self.getPsStats(serverName, searchTerms) 311 dbsOther = 0 312 for entry, pid, etime, threads in dbsStats: 313 thrtxt = (' (' + ', '.join(threads) + ')' 314 if len(threads) > 1 else '') 315 if any(x in entry[0] for x in DataDaemon.ingesterloc): 316 dbsCU = 0 317 etime = ["00", "00", "00", "00"] \ 318 + etime.replace('-', ':').split(':') 319 etime = ':'.join(etime[-4:]) 320 if any(x == self.sysc.loadDatabase for x in entry) \ 321 and ("-x" in entry or "-r" in entry): 322 if "-r" in entry: 323 cuid = int(entry[entry.index("-r") + 1][ 324 entry[entry.index("-r") + 1].find("cu") + 2: 325 entry[entry.index("-r") + 1].find("id")]) 326 evtid = '%s_' % self.sysc.loadDatabase + \ 327 entry[entry.index("-r") + 1][ 328 entry[entry.index("-r") + 1].find("id") - 2: 329 entry[entry.index("-r") + 1].find("_")] 330 elif "-x" in entry: 331 cuid = int(entry[entry.index("-x") + 1]) 332 evtid = '%s_all' % self.sysc.loadDatabase 333 dataBuilderStats[dbsCU].extend( 334 [etime.replace('-', ':'), cuid, evtid]) 335 elif DataDaemon.builderloc in entry[0]: 336 if "-v" in entry: 337 dbsVersion = entry[entry.index("-v") + 1] 338 else: 339 dbsVersion = '1' 340 if "-b" in entry and "-e" in entry and "-x" in entry: 341 dbsBegin = int(entry[entry.index("-b") + 1]) 342 dbsEnd = int(entry[entry.index("-e") + 1]) 343 dbsCU = int(entry[entry.index("-x") + 1]) 344 dataBuilderStats[dbsCU].extend( 345 self.getDateList(dbsBegin, dbsEnd, dbsVersion)) 346 dataBuilderStats[dbsCU] = \ 347 sorted(set(dataBuilderStats[dbsCU])) 348 if "-l" in entry and "-x" in entry: 349 dbsCU = int(entry[entry.index("-x") + 1]) 350 dataBuilderStats[dbsCU] = ' '.join( 351 entry[1:]) + thrtxt 352 elif DataDaemon.otherculoc in entry[0]: 353 dbsCU = int(os.path.splitext(entry[0])[0].rpartition( 354 DataDaemon.otherculoc)[2]) 355 dataBuilderStats[dbsCU] = ' '.join(entry[1:]) + thrtxt 356 elif any(x.split()[0] in entry[0] 357 for x in DataDaemon.othercodeloc): 358 for prog in DataDaemon.othercodeloc: 359 if prog.split()[0] in entry[0]: 360 dbsOther -= 1 361 dataBuilderStats[dbsOther] = ' '.join( 362 entry[:]) + thrtxt 363 dataBuilderJobs[serverName] = dataBuilderStats 364 except Exception as error: 365 print("<ERROR>", error) 366 367 return dataBuilderJobs
368 369 #-------------------------------------------------------------------------- 370
371 - def getServerLoad(self):
372 """ 373 Get the mean load average of all servers. 374 375 @return: Rated dictionary of servers by their laod average. 376 @rtype: dict(str:float) 377 378 """ 379 serverStatus = {} 380 missServers = set() 381 for serverName in sorted(self.serverNames): 382 try: 383 cmd = self.makeSysCmd(serverName, "uptime") 384 result = self.runSysCmd(cmd)[0] 385 if result: 386 tmpLA = result[0].strip().partition( 387 "load average:")[2].split(',') 388 else: 389 tmpLA = [] 390 missServers.add(serverName) 391 print("<Warning> Can't connect to " + serverName) 392 loadAverage = [float(x) for x in tmpLA] 393 if serverName not in missServers: 394 serverStatus.setdefault(serverName, []).extend(loadAverage) 395 except Exception, details: 396 print(details) 397 398 self.serverNames -= missServers 399 ratedServers = utils.Ratings() 400 for serverName in serverStatus: 401 ratedServers[serverName] = array(serverStatus[serverName]).mean() 402 return ratedServers
403 404 #-------------------------------------------------------------------------- 405
406 - def makeDateVersStrList(self, dirList, versDict=None):
407 """ 408 Make a list of dateVersStrs from a list of date directories. 409 410 @param dirList: List containig directory paths. 411 @type dirList: list(str) 412 @param versDict: Dictionary of dates and their version number. 413 @type versDict: Dict(str:str) 414 @return: List of dateVersStrs. 415 @rtype: list(str) 416 417 """ 418 dateVersStrList = [] 419 for entry in dirList: 420 if versDict: 421 dateVersStr = '_v'.join([os.path.basename(entry), 422 versDict[entry]]) 423 else: 424 dateVersStr = os.path.basename(entry) 425 dateVersStrList.append(dateVersStr) 426 dateVersStrList.sort() 427 return dateVersStrList
428 429 #-------------------------------------------------------------------------- 430
431 - def readCasuSr(self):
432 """ 433 List successfully_read dirs at CASU. 434 435 @return: List of directories that are successfully read. 436 @rtype: list(str) 437 438 """ 439 srDirs = [] 440 dateRegExp = re.compile(r'%s' % self.dateRegExpStr) 441 for casuDisk in self.casuDisks: 442 srDict = CASUQueries.getSRDirs(casuDisk, self.UKLight) 443 srDirs.extend(sorted([path for path in srDict 444 if dateRegExp.search(path)])) 445 return srDirs
446 447 #-------------------------------------------------------------------------- 448
449 - def readCasuOtc(self):
450 """ 451 List OK_TO_COPY dirs at CASU. 452 453 @return: List of directories that are OK to copy. 454 @rtype: list(str) 455 456 """ 457 otcDirs = [] 458 for casuDisk in self.casuDisks: 459 cmd = ' '.join([CASUQueries.sshCmd(self.UKLight), "'ls -1 ", 460 os.path.join(casuDisk, 461 self.dateRegExpStr.replace('.', ''), 462 "OK_TO_COPY"), "'"]) 463 otcDirs.extend(os.path.dirname(x.rstrip()) for x in os.popen(cmd)) 464 465 return sorted(otcDirs)
466 467 #-------------------------------------------------------------------------- 468
469 - def readCasuVers(self):
470 """ 471 List version numbers at CASU. 472 473 @return: Dictionary of directories and their version number. 474 @rtype: dict(str:str) 475 476 """ 477 dateRegExp = re.compile(r'%s' % self.dateRegExpStr) 478 versDict = {} 479 for casuDisk in self.casuDisks: 480 versDict.update(CASUQueries.getDirVersions(casuDisk)) 481 482 return dict((path, versDict[path].partition('_v')[2]) 483 for path in versDict if dateRegExp.search(path))
484 485 #-------------------------------------------------------------------------- 486
487 - def runContinuousIngests(self, cuNums, curator, comment):
488 archive = self.database.split('.')[-1] 489 if archive == self.sysc.loadDatabase \ 490 and os.getenv('USER') != 'scos': 491 Logger.addMessage( 492 "<ERROR> Only scos is allowed to ingest into the %s" % 493 self.sysc.loadDatabase) 494 raise 495 searchTerms = DataDaemon.ingesterloc 496 cuNums = self._parseCuNums([cuNums])[0] 497 try: 498 ingCounter = 0 499 while 1: 500 ingRuns = 0 501 counter = 0 502 ingStats = [] 503 while counter < 4: 504 for serverName in sorted(self.serverNames): 505 ingStatsTmp = self.getPsStats(serverName, searchTerms) 506 if ingStatsTmp and any(x == archive 507 for x in ingStatsTmp[0][0]): 508 ingRuns += 1 509 ingStats = ingStatsTmp[:] 510 time.sleep(1) 511 counter += 1 512 513 # check share disk space 514 if ingStats: 515 self.purgeShare('d', archive, isIngest=True) 516 517 if ingRuns == 0: 518 ingestList = [] 519 for fileName in os.listdir(self.dbMntPath): 520 if all(x in fileName for x in cuNums) \ 521 and fileName.endswith("%s.log" % archive): 522 ingestList.append(os.path.join( 523 self.dbMntPath, fileName)) 524 525 homeLockFile = File("/home/scos/dblock-ramses14.VSAVVV") 526 sysLockFile = File( 527 "/disk01/wsa/sys/ingest_ramses14.VSAVVV_lock") 528 ingestList.sort() 529 if ingestList: 530 for fileName in ingestList: 531 omitObjid = DataDaemon.omitObjIDUpdate 532 ingester = CuIngest( 533 curator=curator, 534 database=self.database, 535 isTrialRun=False, 536 logFileName=fileName, 537 ingestOneLog=True, 538 cuNums='', 539 forceIngest=DataDaemon.force, 540 autoCommit=False, 541 excludedTables='', 542 omitObjIDUpdate=omitObjid, 543 isParallelRun=False, 544 comment=comment) 545 ingester.run() 546 if not omitObjid: 547 ingCounter = 0 548 else: 549 ingCounter += 1 550 del ingester 551 time.sleep(20) 552 homeLockFile.remove() 553 sysLockFile.remove() 554 self.purgeShare('d', archive, isIngest=True) 555 556 else: 557 Logger.addMessage( 558 "No files for CU%s to ingest into %s on %s," 559 " waiting %d mins." % ( 560 cuNums, archive, self.dbMntPath, 561 DataDaemon.waitMins)) 562 time.sleep(DataDaemon.waitMins * 60) 563 else: 564 server, pid = ingStats[0][1].split('-') 565 Logger.addMessage( 566 "Ingest into %s already running on %s, PID %s," 567 " waiting %d mins." % ( 568 self.database, server, pid, DataDaemon.waitMins)) 569 570 time.sleep(DataDaemon.waitMins * 60) 571 572 except KeyboardInterrupt: 573 if ingCounter > 0: 574 Logger.addMessage("Not all objIDs updated, make sure to run " 575 "'FinaliseCatalogueIngest -O'!") 576 print('\n') 577 pass
578 579 #-------------------------------------------------------------------------- 580
581 - def purgeShare(self, mode, archive, isIngest=False):
582 """ 583 Cleanup the samba file shares. 584 585 @param mode: Either check or delete ingested files. 586 @type mode: str 587 @param isIngest: Is this a check during continuous ingests? 588 @type isIngest: bool 589 590 """ 591 thisHost = os.getenv('HOST') 592 thisSharePath = self.sysc.dbSharePath() 593 isCheck = (mode == 'c') 594 DfResult = namedtuple("DfResult", 595 "filesys, size, used, avail, perc, mount") 596 for serverName in sorted(self.serverNames): 597 serverSharePath = thisSharePath.replace(thisHost, serverName) 598 ingFilesPath = self.sysc.ingestedFilesPath(serverSharePath) 599 600 dfCmd = self.makeSysCmd(serverName, 601 "df -Ph %s" % serverSharePath) 602 for line in self.runSysCmd(dfCmd)[0]: 603 if serverName in line \ 604 or ("home" in line and serverName in ["thoth"]): 605 dfResult = DfResult._make(line.split()) 606 if isCheck: 607 Logger.addMessage("%s on %s used. %s available." % ( 608 dfResult.perc, dfResult.mount, dfResult.avail)) 609 610 if isIngest: 611 if int(dfResult.perc.replace('%', '')) > 90: 612 Logger.addMessage("<WARNING> %s of %s used!" % ( 613 dfResult.perc, serverSharePath)) 614 self.deleteFiles(serverName, ingFilesPath, isCheck, 615 archive, verbose=False) 616 else: 617 self.deleteFiles(serverName, ingFilesPath, isCheck, 618 archive, verbose=True)
619 620 #-------------------------------------------------------------------------- 621
622 - def deleteFiles(self, serverName, filesPath, isTest, archive, 623 verbose=False):
624 """ 625 Remove files from given server and path. 626 627 @param archive: The archive for which the files are deleted. 628 @type archive: str 629 @param filesPath: Path to checked dir. 630 @type filesPath: str 631 @param isTest: If true, don't delete files. 632 @type isTest: bool 633 @param serverName: Name of server to be checked. 634 @type serverName: str 635 @param verbose: If true add additional log messages. 636 @type verbose: bool 637 638 """ 639 duCmd = self.makeSysCmd(serverName, 640 "du -m --max-depth=1 %s" % filesPath) 641 duhCmd = self.makeSysCmd(serverName, 642 "du -h --max-depth=1 %s" % filesPath) 643 delCmd = self.makeSysCmd(serverName, "rm -f %s/%s" % ( 644 filesPath, ('*' if archive == 'ALL' \ 645 else "*_%s_*" % archive.upper()))) 646 647 duResultBegin = int(self.runSysCmd(duCmd)[0][0].split('\t')[0]) 648 if duResultBegin > 10 and not isTest: 649 self.runSysCmd(delCmd, False)[0] 650 duResultEnd = int(self.runSysCmd(duCmd)[0][0].split('\t')[0]) 651 Logger.addMessage("%.1fG of %s deleted from %s" % ( 652 (duResultBegin - duResultEnd) / 1000, archive, filesPath)) 653 else: 654 if verbose: 655 Logger.addMessage("%.1fG used on %s" % ( 656 duResultBegin / 1000, filesPath))
657 658 #-------------------------------------------------------------------------- 659
660 - def runMonitor(self, semester, excludedCUs, remStats, versions):
661 """ 662 Run the createCalendar script to update the monitor pages. 663 664 @param semester: Semester to be monitored. 665 @type semester: str 666 @param excludedCUs: List of excluded CUs. 667 @type excludedCUs: list(str) 668 @param remStats: Remove stats for these CUs 669 @type remStats: str 670 @param versions: Versions to be monitored. 671 @type versions: str 672 673 """ 674 try: 675 cmd = ' '.join([ 676 os.path.join(self.sysc.monitorPath(), "createCalendar.py"), 677 "-s %s" % semester, 678 ("-R %s" % remStats if remStats else ''), 679 ("-x %s" % excludedCUs if excludedCUs else ''), 680 ("-v %s" % versions if versions else ''), 681 self.database]) 682 os.system(cmd) 683 except Exception, details: 684 print(details)
685 686 #-------------------------------------------------------------------------- 687
688 - def svnupdate(self, updateDir, serverList):
689 """ 690 Update sandboxes on servers given in the serverList. 691 692 @param updateDir: Directory to update. 693 @type updateDir: str 694 @param serverList: List of servers. 695 @type serverList: list(str) 696 697 """ 698 for serverName in sorted(serverList): 699 print("Updating %s on %s..." % (updateDir, serverName)) 700 try: 701 updatePath = os.path.join("/home", serverName, updateDir) 702 if serverName not in ["sneferu", "thoth"]: 703 updatePath = updatePath.replace("/home", '') 704 705 updCmd = self.makeSysCmd(serverName, 706 "svn update " + updatePath) 707 708 recompile = False 709 for line in self.runSysCmd(updCmd)[0]: 710 print(line) 711 cExts = (".c", ".cpp", ".h", ".hpp", ".hxx") 712 recompile = recompile or line.endswith(cExts) 713 714 if recompile: 715 print("<Warning> You need to recompile the code on %s!" % 716 serverName) 717 except Exception, details: 718 print(details)
719 720 #-------------------------------------------------------------------------- 721
722 - def assertComp(self, sandbox, serverList):
723 """ 724 Assert that the compilation of sandboxes on servers is up-to-date. 725 726 @param sandbox: Sandbox to check. 727 @type sandbox: str 728 @param serverList: List of servers. 729 @type serverList: list(str) 730 731 """ 732 for serverName in sorted(serverList): 733 print("\nChecking %s on %s..." % (sandbox, serverName)) 734 try: 735 # set paths to code and binaries 736 binPath = "/home/%s/%s/bin" % (serverName, sandbox.replace( 737 "VDFS", "usr/local/wsa")) 738 codePath = "/home/%s/%s/src" % (serverName, sandbox) 739 if serverName not in ["sneferu", "thoth"]: 740 binPath = binPath.replace("/home", '') 741 codePath = codePath.replace("/home", '') 742 743 # get last compilation time 744 statCmd = self.makeSysCmd(serverName, 745 "stat --format \"%y\" " + binPath) 746 binTime = (self.runSysCmd(statCmd, False)[0] \ 747 or ['0000-01-01 00:00:00.000000000 +0000']) 748 binTime = ''.join(binTime[0].rpartition(' ')[::2]) 749 750 if DataDaemon.force: 751 print("%s - %s: %s" % (serverName, binPath.partition( 752 "usr/local/wsa")[2], binTime)) 753 754 # get latest file change time 755 findCmd = self.makeSysCmd(serverName, 756 "find " + codePath + \ 757 " -regex \".*\.\(cpp\|c\|h\|hpp\|hxx\)\" " + \ 758 "| xargs stat --format \"%y\" | sort -nr | head -1") 759 codeTime = (self.runSysCmd(findCmd, False)[0] \ 760 or ['0000-01-01 00:00:00.000000000 +0000']) 761 codeTime = ''.join(codeTime[0].rpartition(' ')[::2]) 762 763 if DataDaemon.force: 764 print("%s - %s: %s" % (serverName, codePath.partition( 765 "VDFS")[2], codeTime)) 766 767 # check time difference 768 deltaT = mxTime.ISO.ParseDateTimeGMT(binTime) - \ 769 mxTime.ISO.ParseDateTimeGMT(codeTime) 770 if deltaT < 0: 771 print("You need to re-compile %s (%dd %0.2fh)" % ( 772 serverName, deltaT.day, deltaT.hours - 24 * deltaT.day)) 773 else: 774 print("OK") 775 776 except Exception, details: 777 print(details)
778 779 #-------------------------------------------------------------------------- 780
781 - def compileServer(self, branch, serverList):
782 """ 783 Compile code on servers given in the serverList. 784 785 @param branch: The branch to be compiled. 786 @type branch: str 787 @param serverList: List of servers. 788 @type serverList: list(str) 789 790 """ 791 messages = defaultdict(dict) 792 for serverName in sorted(serverList): 793 print("Compiling %s on %s..." % (branch, serverName)) 794 try: 795 cmd = self.makeSysCmd(serverName, 796 "%s/bin/vdfsCompile.sh %s" % (os.getenv("HOME"), branch)) 797 messages[serverName]["err"] = self.runSysCmd( 798 cmd, isVerbose=True)[1] 799 800 except Exception, details: 801 messages[serverName]["exc"] = details 802 print(details) 803 804 for serverName in messages: 805 Logger.addMessage("ERRORS on %s:\n" % serverName) 806 print(''.join(messages[serverName]["err"])) 807 if "exc" in messages[serverName]: 808 Logger.addMessage("EXCEPTIONS on :\n" % serverName) 809 print(messages[serverName]["exc"]) 810 811 cmd = self.makeSysCmd(serverName, 812 "ls -ldHG /%s/%s/usr/local/wsa/%s/bin" % ( 813 serverName, os.getenv("USER"), branch)) 814 815 for line in self.runSysCmd(cmd)[0]: 816 print(line)
817 818 #-------------------------------------------------------------------------- 819
820 - def createCU0Run(self, dataBuilderJobs):
821 """ 822 Create a list of CU0 runs, ie. ingests. 823 824 @param dataBuilderJobs: Dictionary of jobs already running. 825 @type dataBuilderJobs: disct(str:str) 826 @return: List of ingest runs, list of cuNum/cueventIDs to ingest. 827 @rtype: list(str), list(str) 828 829 """ 830 cmd = "ls -gG --full-time " + \ 831 self.sysc.dbSharePath("cu0?id*_%s.log*" % self.sysc.loadDatabase) 832 result = self.runSysCmd(cmd)[0] 833 cu0Runs = [] 834 toProcess = [] 835 processing = [] 836 elTimeDict = {} 837 now = mxTime.now() 838 839 for line in result: 840 words = line.split() 841 datetime = ' '.join([words[3].strip(), 842 words[4][:words[4].find('.')].strip()]) 843 cuid = words[6][words[6].find('cu') + 2:words[6].find('id')] 844 evtid = words[6][words[6].find('id'):words[6].find('_')] 845 dbid = words[6][words[6].rfind('_') + 1:words[6].rfind('.log')] 846 if line.endswith(".pending"): 847 evtid += 'P' 848 etime = now - mxTime.strptime(datetime, "%Y-%m-%d %H:%M:%S") 849 etime = ["00", "00", "00", "00"] \ 850 + str(etime).replace('-', ':').split(':') 851 etime = ':'.join(etime[-4:]) 852 elTimeDict[dbid + '_' + cuid + evtid] = str(etime) 853 854 if 0 in utils.invertDict(dataBuilderJobs): 855 if isinstance(utils.invertDict(dataBuilderJobs)[0], str): 856 serverList = [utils.invertDict(dataBuilderJobs)[0]] 857 else: 858 serverList = utils.invertDict(dataBuilderJobs)[0] 859 860 for server in serverList: 861 ingTime, _ingMode, ingCuid = dataBuilderJobs[server][0] 862 for evt in elTimeDict: 863 if elTimeDict[evt] > ingTime and ( 864 ingCuid in evt or 'all' in ingCuid): 865 processing.append(evt) 866 else: 867 toProcess.append(evt) 868 869 if dataBuilderJobs[server]: 870 print(' '.join(["Ingester is already running on", 871 server, "on", 872 ', '.join(sorted(processing))])) 873 if toProcess: 874 cmd = ' '.join([DataDaemon.ingesterloc, "on", 875 ', '.join(sorted(toProcess))]) 876 cu0Runs.append(cmd) 877 else: 878 if elTimeDict: 879 for evt in elTimeDict: 880 toProcess.append(evt) 881 cuidSet = set(evt[evt.find('_') + 1:evt.find('id')]) 882 for cuid in cuidSet: 883 cmd = "%s -i -x %d %s" % (DataDaemon.ingesterloc[0], 884 int(cuid), self.sysc.loadDatabase) 885 cu0Runs.append(cmd) 886 return cu0Runs, sorted(toProcess)
887 888 #-------------------------------------------------------------------------- 889
890 - def createCU1Run(self, dataBuilderJobs):
891 """ 892 Create a list of CU1 runs, ie. transfers. 893 894 @param dataBuilderJobs: Dictionary of jobs already running. 895 @type dataBuilderJobs: disct(str:str) 896 @return: List of ingest runs, list of dates to process. 897 @rtype: list(str), list(str) 898 899 """ 900 srDirs = self.readCasuSr() 901 #print("SR>", sorted(srDirs)) 902 otcDirs = self.readCasuOtc() 903 #print("OTC>", sorted(otcDirs)) 904 versDict = self.readCasuVers() 905 #print("V>", versDict) 906 casuToCopy = list(set(otcDirs).difference(srDirs)) 907 #print("TC>", sorted(casuToCopy)) 908 dateVersStrList = self.makeDateVersStrList(casuToCopy, versDict) 909 #print(">CU1> ", dateVersStrList) 910 cu1Runs = [] 911 inProcess = [] 912 serverList = ["khafre"] 913 if 1 in utils.invertDict(dataBuilderJobs) \ 914 and "khafre" in utils.invertDict(dataBuilderJobs)[1]: 915 for server in serverList: 916 inProcDates = [] 917 for dateVersStr in dateVersStrList: 918 if dataBuilderJobs[server]: 919 if dateVersStr in dataBuilderJobs[server][1]: 920 inProcDates.append(dateVersStr) 921 inProcess.append(dateVersStr) 922 if inProcDates: 923 print(' '.join(["CU1: already in process on %s: %s" % ( 924 server, ', '.join(inProcDates))])) 925 else: 926 for dateVersStr in dateVersStrList: 927 # @@NOTE: this is equivalent of .split("_v", 1)[0] 928 dateStr = dateVersStr.partition("_v")[0] 929 cmd = ' '.join([ 930 DataDaemon.builderloc, 931 "-c", self.curator, 932 "-b", dateStr, 933 "-e", dateStr, 934 "-y 1 -x 1"]) 935 cu1Runs.append(cmd) 936 937 return cu1Runs, sorted(set(dateVersStrList).difference(inProcess))
938 939 #-------------------------------------------------------------------------- 940
941 - def createCU2Run(self, cuedDirs, dataBuilderJobs):
942 """ 943 Create a list of CU2 runs, ie. JPG creation. 944 945 @param cuedDirs: Dictionary of already CUed directories. 946 @type cuedDirs: dict(int:list(str)) 947 @param dataBuilderJobs: Dictionary of jobs already running. 948 @type dataBuilderJobs: disct(str:str) 949 @return: List of ingest runs, list of dates to process. 950 @rtype: list(str), list(str) 951 952 """ 953 diffCu1Cu2 = list(set(cuedDirs[1]).difference(cuedDirs[2])) 954 #print(">CU2>",diffCu1Cu2) 955 dateVersStrList = self.makeDateVersStrList(diffCu1Cu2) 956 cu2Runs = [] 957 inProcess = [] 958 pendingIngests = self.getPendingIngestDates(cuNum=2) 959 dateVersStrList = \ 960 sorted(set(dateVersStrList).difference(pendingIngests)) 961 if 2 in utils.invertDict(dataBuilderJobs): 962 if isinstance(utils.invertDict(dataBuilderJobs)[2], str): 963 serverList = [utils.invertDict(dataBuilderJobs)[2]] 964 else: 965 serverList = utils.invertDict(dataBuilderJobs)[2] 966 for server in serverList: 967 inProcDates = [] 968 for dateVersStr in dateVersStrList: 969 if dateVersStr in dataBuilderJobs[server][2]: 970 inProcDates.append(dateVersStr) 971 inProcess.append(dateVersStr) 972 if inProcDates: 973 print(' '.join(["CU2: already in process on %s: %s" % ( 974 server, ', '.join(inProcDates))])) 975 for dateVersStr in dateVersStrList: 976 if dateVersStr not in inProcess: 977 # @@NOTE: this is equivalent of .split("_v", 1) 978 dateStr, versStr = dateVersStr.partition("_v")[::2] 979 cmd = ' '.join([ 980 DataDaemon.builderloc, 981 "-c", self.curator, 982 "-b", dateStr, 983 "-e", dateStr, 984 "-v", versStr, 985 "-x 2"]) 986 cu2Runs.append(cmd) 987 else: 988 for dateVersStr in dateVersStrList: 989 # @@NOTE: this is equivalent of .split("_v", 1) 990 dateStr, versStr = dateVersStr.partition("_v")[::2] 991 cmd = ' '.join([ 992 DataDaemon.builderloc, 993 "-c", self.curator, 994 "-b", dateStr, 995 "-e", dateStr, 996 "-v", versStr, 997 "-x 2"]) 998 cu2Runs.append(cmd) 999 return cu2Runs, sorted(set(dateVersStrList).difference(inProcess))
1000 1001 #-------------------------------------------------------------------------- 1002
1003 - def createCU3Run(self, cuedDirs, dataBuilderJobs):
1004 """ 1005 Create a list of CU3 runs, ie. metadata ingest. 1006 1007 @param cuedDirs: Dictionary of already CUed directories. 1008 @type cuedDirs: dict(int:list(str)) 1009 @param dataBuilderJobs: Dictionary of jobs already running. 1010 @type dataBuilderJobs: disct(str:str) 1011 @return: List of ingest runs, list of dates to process. 1012 @rtype: list(str), list(str) 1013 1014 """ 1015 diffCu1Cu3 = list(set(cuedDirs[1]).difference(cuedDirs[3])) 1016 #print(">CU3>",diffCu1Cu3) 1017 dateVersStrList = self.makeDateVersStrList(diffCu1Cu3) 1018 cu3Runs = [] 1019 inProcess = [] 1020 pendingIngests = self.getPendingIngestDates(cuNum=3) 1021 dateVersStrList = \ 1022 sorted(set(dateVersStrList).difference(pendingIngests)) 1023 if 3 in utils.invertDict(dataBuilderJobs): 1024 if isinstance(utils.invertDict(dataBuilderJobs)[3], str): 1025 serverList = [utils.invertDict(dataBuilderJobs)[3]] 1026 else: 1027 serverList = utils.invertDict(dataBuilderJobs)[3] 1028 for server in serverList: 1029 inProcDates = [] 1030 for dateVersStr in dateVersStrList: 1031 if dateVersStr in dataBuilderJobs[server][3]: 1032 inProcDates.append(dateVersStr) 1033 inProcess.append(dateVersStr) 1034 if inProcDates: 1035 print(' '.join(["CU3: already in process on %s: %s" % ( 1036 server, ', '.join(inProcDates))])) 1037 for dateVersStr in dateVersStrList: 1038 if dateVersStr not in inProcess: 1039 # @@NOTE: this is equivalent of .split("_v", 1) 1040 dateStr, versStr = dateVersStr.partition("_v")[::2] 1041 cmd = ' '.join([ 1042 self.builderloc, 1043 "-c", self.curator, 1044 "-b", dateStr, 1045 "-e", dateStr, 1046 "-v", versStr, 1047 "-n", "-x 3"]) 1048 cu3Runs.append(cmd) 1049 else: 1050 for dateVersStr in dateVersStrList: 1051 # @@NOTE: this is equivalent of .split("_v", 1) 1052 dateStr, versStr = dateVersStr.partition("_v")[::2] 1053 cmd = ' '.join([ 1054 DataDaemon.builderloc, 1055 "-c", self.curator, 1056 "-b", dateStr, 1057 "-e", dateStr, 1058 "-v", versStr, 1059 "-x 3"]) 1060 cu3Runs.append(cmd) 1061 return cu3Runs, sorted(set(dateVersStrList).difference(inProcess))
1062 1063 #-------------------------------------------------------------------------- 1064
1065 - def createCU4Run(self, cuedDirs, dataBuilderJobs):
1066 """ 1067 Create a list of CU4 runs, ie. catalogue data ingest. 1068 1069 @param cuedDirs: Dictionary of already CUed directories. 1070 @type cuedDirs: dict(int:list(str)) 1071 @param dataBuilderJobs: Dictionary of jobs already running. 1072 @type dataBuilderJobs: disct(str:str) 1073 @return: List of ingest runs, list of dates to process. 1074 @rtype: list(str), list(str) 1075 1076 """ 1077 diffCu3Cu4 = list(set(cuedDirs[3]).difference(cuedDirs[4])) 1078 #print(">CU4>",diffCu3Cu4) 1079 dateVersStrList = self.makeDateVersStrList(diffCu3Cu4) 1080 cu4Runs = [] 1081 inProcess = [] 1082 pendingIngests = self.getPendingIngestDates(cuNum=4) 1083 dateVersStrList = \ 1084 sorted(set(dateVersStrList).difference(pendingIngests)) 1085 if 4 in utils.invertDict(dataBuilderJobs): 1086 if isinstance(utils.invertDict(dataBuilderJobs)[4], str): 1087 serverList = [utils.invertDict(dataBuilderJobs)[4]] 1088 else: 1089 serverList = utils.invertDict(dataBuilderJobs)[4] 1090 for server in serverList: 1091 inProcDates = [] 1092 for dateVersStr in dateVersStrList: 1093 if dateVersStr in dataBuilderJobs[server][4]: 1094 inProcDates.append(dateVersStr) 1095 inProcess.append(dateVersStr) 1096 if inProcDates: 1097 print(' '.join(["CU4: already in process on %s: %s" % ( 1098 server, ', '.join(inProcDates))])) 1099 for dateVersStr in dateVersStrList: 1100 if dateVersStr not in inProcess: 1101 # @@NOTE: this is equivalent of .split("_v", 1) 1102 dateStr, versStr = dateVersStr.partition("_v")[::2] 1103 cmd = ' '.join([ 1104 DataDaemon.builderloc, 1105 "-c", self.curator, 1106 "-b", dateStr, 1107 "-e", dateStr, 1108 "-v", versStr, 1109 "-x 4"]) 1110 cu4Runs.append(cmd) 1111 else: 1112 for dateVersStr in dateVersStrList: 1113 # @@NOTE: this is equivalent of .split("_v", 1) 1114 dateStr, versStr = dateVersStr.partition("_v")[::2] 1115 cmd = ' '.join([ 1116 DataDaemon.builderloc, 1117 "-c", self.curator, 1118 "-b", dateStr, 1119 "-e", dateStr, 1120 "-v", versStr, 1121 "-x 4"]) 1122 cu4Runs.append(cmd) 1123 return cu4Runs, sorted(set(dateVersStrList).difference(inProcess))
1124 1125 #-------------------------------------------------------------------------- 1126
1127 - def runCUs(self, cuRunDict):
1128 """ 1129 Print out commands to run the CUs on the servers. 1130 1131 @param cuRunDict: Dictionary containing commands for CUs. 1132 @type cuRunDict: dict(str:str) 1133 1134 """ 1135 print("\nRun the following processes:") 1136 for server in cuRunDict: 1137 # runPath = os.path.join( 1138 # sysc.curationCodePath(), "invocations", 1139 # "cu0").replace("khafre", server) 1140 # cmd = ''.join(["ssh ", server, "-p ", "\"cd ",runPath, 1141 # "; ./remote.cmd ", "\"echo '' | nohup ", 1142 # cuRunDict[server][0],"\"\""]) 1143 cmd = ''.join(["on ", server, ":\t ", cuRunDict[server][0]]) 1144 print(cmd)
1145 # os.system(cmd) 1146 1147 #-------------------------------------------------------------------------- 1148
1149 - def run(self):
1150 """ 1151 Do all the stuff needed. 1152 """ 1153 cu0Runs = cu1Runs = cu2Runs = cu3Runs = cu4Runs = [] 1154 cu0Dates = cu1Dates = cu2Dates = cu3Dates = cu4Dates = [] 1155 1156 ratedServers = self.getServerLoad() 1157 dataBuilderJobs = self.getServerJobs(ratedServers) 1158 if self.info: 1159 print() 1160 print("Server load average: ") 1161 for server in ratedServers: 1162 print(" %-10s: % .3f" % (server, ratedServers[server])) 1163 print() 1164 print("Server joblist: ") 1165 for server in ratedServers: 1166 for cuNum in sorted(dataBuilderJobs[server]): 1167 cuNoStr = ("CU%2s:" % cuNum if cuNum >= 0 else '') 1168 print(" %-10s: %s %r\n" % ( 1169 server, cuNoStr, dataBuilderJobs[server][cuNum])) 1170 1171 print() 1172 cuRunDict = {} 1173 1174 if not self.info: 1175 cuedDirs = self.getCUs() 1176 if 0 in self.cuNums: 1177 cu0Runs, cu0Dates = self.createCU0Run(dataBuilderJobs) 1178 if 1 in self.cuNums: 1179 cu1Runs, cu1Dates = self.createCU1Run(dataBuilderJobs) 1180 if 2 in self.cuNums: 1181 cu2Runs, cu2Dates = self.createCU2Run(cuedDirs, 1182 dataBuilderJobs) 1183 if 3 in self.cuNums: 1184 cu3Runs, cu3Dates = self.createCU3Run(cuedDirs, 1185 dataBuilderJobs) 1186 if 4 in self.cuNums: 1187 cu4Runs, cu4Dates = self.createCU4Run(cuedDirs, 1188 dataBuilderJobs) 1189 1190 if cu1Runs: 1191 cuRunDict.setdefault("khafre", []).extend(cu1Runs) 1192 del ratedServers["khafre"] 1193 1194 if cu0Runs: 1195 if ratedServers.getValueByRating(0) >= DataDaemon.maxLoad: 1196 del ratedServers[ratedServers.getKeyByRating(0)] 1197 if cu0Runs and ratedServers: 1198 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad: 1199 cuRunDict.setdefault( 1200 ratedServers.getKeyByRating(0), []).append( 1201 cu0Runs[0]) 1202 del cu0Runs[0] 1203 del ratedServers[ratedServers.getKeyByRating(0)] 1204 1205 if cu2Runs or cu3Runs: 1206 while ratedServers: 1207 if ratedServers.getValueByRating(0) >= DataDaemon.maxLoad: 1208 del ratedServers[ratedServers.getKeyByRating(0)] 1209 1210 if cu2Runs and ratedServers: 1211 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad: 1212 cuRunDict.setdefault( 1213 ratedServers.getKeyByRating(0), []).append( 1214 cu2Runs[0]) 1215 del cu2Runs[0] 1216 del ratedServers[ratedServers.getKeyByRating(0)] 1217 if cu3Runs and ratedServers: 1218 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad: 1219 cuRunDict.setdefault( 1220 ratedServers.getKeyByRating(0), []).append( 1221 cu3Runs[0]) 1222 del cu3Runs[0] 1223 del ratedServers[ratedServers.getKeyByRating(0)] 1224 1225 if not cu2Runs and not cu3Runs: 1226 break 1227 1228 if cu4Runs: 1229 while ratedServers: 1230 if ratedServers.getValueByRating(0) >= DataDaemon.maxLoad: 1231 del ratedServers[ratedServers.getKeyByRating(0)] 1232 if cu4Runs and ratedServers: 1233 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad: 1234 cuRunDict.setdefault( 1235 ratedServers.getKeyByRating(0), []).append( 1236 cu4Runs[0]) 1237 del cu4Runs[0] 1238 del ratedServers[ratedServers.getKeyByRating(0)] 1239 if not cu4Runs: 1240 break 1241 1242 print("\nThe following eventIDs need ingesting:") 1243 if not DataDaemon.fullCommand: 1244 print("CU0>", cu0Dates) 1245 else: 1246 print("CU0>", cu0Runs) 1247 1248 1249 print("\nThe following days need processing:") 1250 if not DataDaemon.fullCommand: 1251 print("CU1>", cu1Dates) 1252 print("CU2>", cu2Dates) 1253 print("CU3>", cu3Dates) 1254 print("CU4>", cu4Dates) 1255 else: 1256 print("CU1>", cu1Runs) 1257 print("CU2>", cu2Runs) 1258 print("CU3>", cu3Runs) 1259 print("CU4>", cu4Runs) 1260 1261 self.runCUs(cuRunDict)
1262 1263 #-------------------------------------------------------------------------- 1264
1265 - def removeFileSuffixes(self, suffix, fileTags):
1266 """ 1267 """ 1268 anyRemoved = False 1269 for entry in os.listdir(self.dbMntPath): 1270 if all(phrase in entry for phrase in fileTags): 1271 ext = "." + suffix 1272 print("removing %s:" % ext, entry) 1273 oldPath = os.path.join(self.dbMntPath, entry) 1274 newPath = (oldPath.partition(ext)[0] if suffix != "rename" else 1275 oldPath.replace(ext, '')) 1276 1277 os.rename(oldPath, newPath) 1278 anyRemoved = True 1279 1280 if not anyRemoved: 1281 print("Nothing %s." % suffix)
1282 1283 1284 #------------------------------------------------------------------------------
1285 -def checkDataBases(archive):
1286 """ 1287 """ 1288 1289 loadServer, archiveName = archive.rpartition('.')[::2] 1290 sysc = SystemConstants(archiveName) 1291 IngCuSession.sysc = sysc 1292 loadServer = loadServer or sysc.loadServer 1293 archiveName = (sysc.loadDatabase if 'test' in archiveName else archiveName) 1294 mainDb = DbSession(archive) 1295 programmeIDDict = dict(mainDb.query("dfsIDString,programmeID", "Programme")) 1296 daemon = DataDaemon(False, '', 1297 '.'.join([loadServer, archiveName]), 1298 ','.join(sorted(DataDaemon._defServerNames)), False, 1299 ','.join(str(x) for x in DataDaemon._defCuNums), '', 1300 DataDaemon._defDateRegExpStr, "Check databases") 1301 1302 dataBasesDef = {"dbload":["all"], 1303 "dbpub":["all"]} 1304 daemon.getDBOverview(dataBasesDef) 1305 availPubDBs = daemon.availPubDBs 1306 dbInfo = [] 1307 programmeList = ['VVV', 'VMC', 'VIDEO', 'VIKING'] 1308 earliestEsoDBs = {'VVV':'VVVDR1', 'VMC':'VMCv20121128', 1309 'VIDEO':'VIDEOv20120712', 'VIKING':'VIKINGv20121205'} 1310 for server in availPubDBs: 1311 dbList = availPubDBs[server] 1312 for dbName in dbList: 1313 programme = [progName for progName in programmeList 1314 if dbName.startswith(progName)] 1315 if programme: 1316 # Open database check max releaseNum. 1317 db = DbSession("%s.%s" % (server, dbName), userName="ldservrw") 1318 relNum = db.queryAttrMax("releaseNum", "ProgrammeFrame", 1319 where="programmeID=%s" % programmeIDDict[programme[0]]) 1320 dbInfo.append((dbName, server, programme[0], relNum)) 1321 curRelNumDict = {} 1322 for programme in programmeList: 1323 curRelNum = max(relNum for _dbName, _server, progName, relNum in dbInfo 1324 if progName == programme) 1325 curRelNumDict[programme] = curRelNum 1326 minRelNumDict = {} 1327 for programme in programmeList: 1328 eedbName = earliestEsoDBs[programme] 1329 eeRelNumSet = set([relNum for dbName, _server, progName, relNum in dbInfo 1330 if progName == programme and eedbName == dbName]) 1331 if len(eeRelNumSet) == 1: 1332 minRelNumDict[programme] = eeRelNumSet.pop() 1333 else: 1334 raise Exception("Inconsistency in %s databases for programme %s: relNums %s" % 1335 (eedbName, programme, eeRelNumSet)) 1336 1337 fullDbInfo = [] 1338 for dbName, server, progName, relNum in dbInfo: 1339 if relNum >= minRelNumDict[progName]: 1340 curRelNum = curRelNumDict[progName] 1341 fullDbInfo.append((dbName, server, progName, relNum, curRelNum)) 1342 return fullDbInfo
1343 1344 1345 #------------------------------------------------------------------------------ 1346 # Entry point for DataDaemon 1347 1348 if __name__ == '__main__': 1349 CLI.progArgs.remove("database") 1350 CLI.progArgs.append(CLI.Argument("archive", "WSA")) 1351 CLI.progOpts.remove("test") 1352 CLI.progOpts += [ 1353 CLI.Option('i', "info", 1354 "only print status of servers"), 1355 CLI.Option('j', "janet", 1356 "use JANET instead of UKLight"), 1357 CLI.Option('s', "server", 1358 "list of servers to check," 1359 " or exclude server with preceeding '-'", 1360 "LIST", ','.join(sorted(DataDaemon._defServerNames))), 1361 CLI.Option('t', "testdbs", "with -D: include testDBs in DB listing"), 1362 CLI.Option('v', "versions", 1363 "list of version numbers", 1364 "LIST", ''), 1365 CLI.Option('w', "casudisks", 1366 "data disks at CASU", 1367 "LIST", ''), 1368 CLI.Option('x', "cunums", 1369 "list of CUs to check", "LIST", 1370 ','.join(str(x) for x in DataDaemon._defCuNums)), 1371 CLI.Option('A', "assertcomp", 1372 "assert that all servers are compiled"), 1373 CLI.Option('C', "compile", 1374 "compile the code in BRANCH on the given (-s)" 1375 " server(s)", 1376 "BRANCH", ''), 1377 CLI.Option('D', "dbinfo", 1378 "list of all DBs available for given archive (archive='EXT'" 1379 " for external databases)"), 1380 CLI.Option('F', "full", 1381 "run an option to full extend:\n" 1382 "\t\t\t\t with '-A': show timestamps for bin/ and src/;\n" 1383 "\t\t\t\t with '-D': show DBs for all archives;\n" 1384 "\t\t\t\t with '-I': set the force flag for continuous" 1385 " ingests;\n" 1386 "\t\t\t\t with '-P d': delete files for all archives;\n" 1387 "\t\t\t\t with '-U': update including installation/ dir"), 1388 CLI.Option('I', "ingloop", 1389 "run continuous ingests for this cu", 1390 "INT", 0), 1391 CLI.Option('M', "monitor", 1392 "update the monitor webpage (give one semester, " 1393 "excluded CU(s), and CU [1/2] for which stats should be " 1394 "re-calculated as comma seperated list)", 1395 "LIST", "01C,4,0"), 1396 CLI.Option('O', "omitobjid", 1397 "Omit updating objIDs. Use with care on the VVV!"), 1398 CLI.Option('P', "purgeshare", 1399 "Check [c] or delete [d] the ingested files on the shares" 1400 " on the given (-s) server(s)", 1401 "STR", isValOK=lambda x: x.lower() in "cd"), 1402 CLI.Option('R', "timere", 1403 "date regular expression", 1404 "REGEXP", DataDaemon._defDateRegExpStr), 1405 CLI.Option('S', "ingeststatus", 1406 "change status suffix of ingest files, use any" 1407 " combination of:\n" 1408 "\t\t\t\t p/P: remove pending status from CU2 ingests;\n" 1409 "\t\t\t\t f/F: remove failed status from ingest logs;\n" 1410 "\t\t\t\t r/R: remove rename status from ingest files", 1411 "STR", ''), 1412 CLI.Option('T', "crontab", 1413 "show crontabs on all servers"), 1414 CLI.Option('U', "svnup", 1415 "svn update on given (-s) or all (trunk)" 1416 " or current servers"), 1417 CLI.Option('V', "svnpath", 1418 "svn update the given path relative to " 1419 "/<server>/<user>/ in combination" 1420 " with '-U'", "PATH", None)] 1421 1422 cli = CLI(DataDaemon, "$Revision: 10227 $", checkSVN=False) 1423 Logger.addMessage(cli.getProgDetails()) 1424 loadServer, archiveName = cli.getArg("archive").rpartition('.')[::2] 1425 sysc = SystemConstants(archiveName) 1426 IngCuSession.sysc = sysc 1427 loadServer = loadServer or sysc.loadServer 1428 archiveName = (sysc.loadDatabase if 'test' in archiveName else archiveName) 1429 DataDaemon.casuDisk = (cli.getOpt("casudisk") if cli.getOpt("casudisk") 1430 else sysc.scpMainDir()) 1431 DataDaemon.inclTestDb = cli.getOpt("testdbs") 1432 DataDaemon.force = cli.getOpt("full") 1433 DataDaemon.omitObjIDUpdate = cli.getOpt("omitobjid") 1434 1435 daemon = DataDaemon(cli.getOpt("info"), 1436 cli.getOpt("curator"), 1437 '.'.join([loadServer, archiveName]), 1438 cli.getOpt("server"), 1439 cli.getOpt("janet"), 1440 cli.getOpt("cunums"), 1441 cli.getOpt("casudisks"), 1442 cli.getOpt("timere"), 1443 cli.getArg("comment")) 1444 1445 # SVN update 1446 if cli.getOpt("svnup"): 1447 if not 'trunk' in os.getenv("WSA_SANDBOX"): 1448 svnPath = (cli.getOpt("svnpath") if cli.getOpt("svnpath") else 1449 "VDFS/%s" % os.path.split(os.getenv("WSA_SANDBOX"))[1]) 1450 daemon.svnupdate("%s/%s" % (os.getenv("USER"), svnPath), 1451 [os.getenv("HOST")]) 1452 if os.getenv("USER") == "scos" and \ 1453 cli.getOpt("server") != ','.join(DataDaemon._defServerNames): 1454 daemon.svnupdate("%s/%s" % (os.getenv("USER"), svnPath), 1455 daemon.serverNames) 1456 else: 1457 svnPath = (cli.getOpt("svnpath") if cli.getOpt("svnpath") 1458 else "VDFS/trunk") 1459 serverNames = (daemon.serverNames if os.getenv("USER") == "scos" 1460 else [os.getenv("HOST")]) 1461 daemon.svnupdate("%s/%s" % (os.getenv("USER"), svnPath), 1462 serverNames) 1463 daemon.svnupdate("%s/WFAUweb" % os.getenv("USER"), serverNames) 1464 if cli.getOpt("full"): 1465 daemon.svnupdate("%s/VDFS/installation" % os.getenv("USER"), 1466 serverNames) 1467 daemon.svnupdate("%s/GES/trunk" % os.getenv("USER"), serverNames) 1468 1469 # Assert that compilation on all servers is up-to-date 1470 elif cli.getOpt("assertcomp") and os.getenv("USER") == "scos": 1471 if not 'trunk' in os.getenv("WSA_SANDBOX"): 1472 svnPath = "VDFS/%s" % os.path.split(os.getenv("WSA_SANDBOX"))[1] 1473 daemon.assertComp("%s/%s" % (os.getenv("USER"), svnPath), 1474 [os.getenv("HOST")]) 1475 if os.getenv("USER") == "scos" and \ 1476 cli.getOpt("server") != ','.join(DataDaemon._defServerNames): 1477 daemon.assertComp("%s/%s" % (os.getenv("USER"), svnPath), 1478 daemon.serverNames) 1479 else: 1480 svnPath = "VDFS/trunk" 1481 serverNames = (daemon.serverNames if os.getenv("USER") == "scos" 1482 else [os.getenv("HOST")]) 1483 daemon.assertComp("%s/%s" % (os.getenv("USER"), svnPath), 1484 serverNames) 1485 1486 # Compile source code 1487 elif cli.getOpt("compile") and os.getenv("USER") == "scos" and \ 1488 cli.getOpt("server") != ','.join(DataDaemon._defServerNames): 1489 daemon.compileServer(cli.getOpt("compile"), daemon.serverNames) 1490 1491 # Clean-up the data shares 1492 elif cli.getOpt("purgeshare") and os.getenv("USER") == "scos": 1493 arch = ("ALL" if cli.getOpt("full") else archiveName) 1494 daemon.purgeShare(cli.getOpt("purgeshare"), arch, isIngest=False) 1495 1496 # Show crontab jobs 1497 elif cli.getOpt("crontab"): 1498 daemon.getCron(daemon.serverNames) 1499 1500 # Run continuous ingests 1501 elif cli.getOpt("ingloop"): 1502 daemon.runContinuousIngests(cli.getOpt("ingloop"), 1503 cli.getOpt("curator"), 1504 cli.getArg("comment")) 1505 1506 # Remove status suffixes 1507 elif cli.getOpt("ingeststatus"): 1508 statusTag = cli.getOpt("ingeststatus").lower() 1509 for suffix in ["pending", "failed", "rename"]: 1510 if suffix[0] in statusTag: 1511 fileTags = \ 1512 [(".log" if suffix != "rename" else '') + '.' + suffix, 1513 ("cu0" if suffix != "pending" else "cu02id"), 1514 '_' + archiveName] 1515 1516 daemon.removeFileSuffixes(suffix, fileTags) 1517 1518 # Get DB listing from all servers 1519 elif cli.getOpt("dbinfo"): 1520 dataBasesDef = {"dbload":["all"], 1521 "dbpub":["all"]} 1522 daemon.getDBOverview(dataBasesDef, archiveName) 1523 1524 # Create monitor web pages 1525 elif cli.getOpt("monitor") and os.getenv("USER") == "scos" \ 1526 and cli.getOpt("monitor") != cli.getOptDef("monitor"): 1527 exclCuList = [] 1528 semesterList = [] 1529 monOpts = cli.getOpt("monitor").split(',') 1530 if len(monOpts) < 3: 1531 for opt in monOpts: 1532 if opt.isdigit(): 1533 exclCuList.append(opt) 1534 else: 1535 semesterList.append(opt) 1536 remStats = None 1537 else: 1538 semesterList.append(monOpts[0]) 1539 if monOpts[1].isdigit(): 1540 exclCuList.append(monOpts[1]) 1541 if monOpts[2].isdigit() and monOpts[2] in "12": 1542 remStats = monOpts[2] 1543 else: 1544 print("Stats can only be removed for CU1 or 2!") 1545 remStats = None 1546 1547 for semester in semesterList: 1548 print(' '.join(["Running Monitor for", semester, "excluding CU:", 1549 (','.join(exclCuList) if exclCuList else '---'), 1550 " Versions:", 1551 (cli.getOpt("versions") if cli.getOpt("versions") 1552 else 'all')])) 1553 daemon.runMonitor(semester, ','.join(exclCuList), remStats, 1554 cli.getOpt("versions")) 1555 1556 # Create CU0-4 run info 1557 else: 1558 daemon.run() 1559 1560 #------------------------------------------------------------------------------ 1561 # Change log: 1562 # 1563 # 23-May-2007, ETWS: First version 1564 # 24-May-2007, ETWS: Extended output. 1565 # 31-May-2007, JB: Removed spaces infront of '-v's, fixed missing '.py's and 1566 # modified part of 'getServerJobs' 1567 # 05-Jul-2007, ETWS: Included options to run it as a checking tool only for a 1568 # non-scos user. 1569 # 09-Jul-2007, ETWS: Included JANET/UKLight switch. 1570 # 23-Jul-2007, ETWS: Included CU number list switch. 1571 # 11-Dec-2007, ETWS: Fixed small bug in load average parsing (only occuring 1572 # if a machine is up for less than a day). 1573 # 16-Jan-2008, ETWS: Included remote svn updates. 1574 # 5-Feb-2008, ETWS: Updated CU1 script. 1575 # 7-Feb-2008, ETWS: Fixed bug in time calculation of running ingests; 1576 # fixed mix-up of already and not yet running processes. 1577 # 8-Feb-2008, ETWS: Fixed empty ingest message and typo in svn update; 1578 # included remote monitor run; updated documentation. 1579 # 13-Feb-2008, ETWS: Included handling of pending CU2 ingests. 1580 # 19-Feb-2008, ETWS: Fixed CU4 query; fixed svn update output. 1581 # 20-Feb-2008, ETWS: Fixed missing day part in times of running ingests. 1582 # 21-Feb-2008, ETWS: Included processed but not yet ingested CU3/4 runs. 1583 # 4-Mar-2008, ETWS: Included processed but not yet ingested CU2 runs. 1584 # 7-Apr-2008, ETWS: Upgraded to use new detection table layout. 1585 # 18-Apr-2008, ETWS: Included check if code needs recompilation after 1586 # an svn update. 1587 # 2-May-2008, ETWS: Added curator to process run suggestions. 1588 # 2-Jun-2008, ETWS: Enhanced option for remote monitor runs. 1589 # 17-Jun-2008, ETWS: Included higher CU7 in job listing. 1590 # 3-Nov-2008, ETWS: Enhanced to unfail and unpend on all servers. 1591 # 18-Nov-2008, ETWS: Included svn update of WFAUweb. 1592 # 6-May-2009, ETWS: Fixed CASU queries and updated servers and date regexp. 1593