1
2
3
4 """
5 Daemon for data transfer and ingest.
6
7 @author: Eckhard Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9 """
10
11 from __future__ import print_function, division
12 from collections import defaultdict, namedtuple
13 import inspect
14 import mx.DateTime as mxTime
15 from numpy import array
16 import os
17 import re
18 from subprocess import Popen, PIPE
19 import time
20
21 from invocations.cu1.cu1Transfer import CASUQueries
22
23 from wsatools.CLI import CLI
24 from wsatools.DbConnect.DbSession import DbSession
25 from wsatools.DbConnect.IngIngester import CuIngest
26 from wsatools.File import File
27 import wsatools.FitsUtils as fits
28 from wsatools.DbConnect.IngCuSession import IngCuSession
29 from wsatools.Logger import Logger
30 import wsatools.DbConnect.CommonQueries as queries
31 from wsatools.SystemConstants import SystemConstants
32 import wsatools.Utilities as utils
33
34
36 """Top level function to invoke ingests.
37 """
38 _defServerNames = set(SystemConstants.curationServers)
39 _defCuNums = range(5)
40
41 _defDateRegExpStr = "20[01][90123].*"
42 builderloc = "DataBuilder.py"
43 ingesterloc = ["IngestCUFiles.py", "DataDaemon.py -I"]
44 otherculoc = "cu"
45 othercodeloc = ["AutoCurate.py", "FinaliseFlatFileIngest.py",
46 "DataDaemon.py -M", "NonSurveyRelease.py",
47 "UpdateTilePawPrintTables.py"]
48 casuDisk = None
49 maxLoad = 3.0
50 fullCommand = False
51 reqWorkDir = False
52 waitMins = 10
53 force = False
54 inclTestDb = False
55 omitObjIDUpdate = False
56
67 """
68 @param casuDisks: Disk at CASU where the data is stored.
69 @type casuDisks: str
70 @param comment: Descriptive comment as to why curation task is
71 being performed.
72 @type comment: str
73 @param cuNums: Curation task numbers.
74 @type cuNums: list[int]
75 @param curator: Name of curator.
76 @type curator: str
77 @param database: Name of the database to connect to.
78 @type database: str
79 @param info: Only print status of servers, don't calculate usage.
80 @type info: bool
81 @param janet: Use JANET instead of UKLight.
82 @type janet: bool
83 @param serverNames: List of servers to check, or exclude server with
84 preceding '-'.
85 @type serverNames: set(str)
86 @param dateRegExpStr: Regular expression for the dates.
87 @type dateRegExpStr: str
88
89 @todo: This class constructor does not respect its parent class's
90 constructor, so the class hierarchy needs reworking.
91 """
92 super(DataDaemon, self).__init__(cuNum=0,
93 curator=curator,
94 comment=comment,
95 reqWorkDir=False,
96 keepWorkDir=False,
97 database=database,
98 autoCommit=False,
99 isTrialRun=False)
100
101 typeTranslation = {"info":str,
102 "curator":str,
103 "serverNames":set,
104 "janet":bool,
105 "cuNums":list,
106 "casuDisks":list,
107 "dateRegExpStr":str}
108
109 super(DataDaemon, self).attributesFromArguments(
110 inspect.getargspec(DataDaemon.__init__)[0], locals(),
111 types=typeTranslation)
112
113 if any(serverName.startswith('-') for serverName in self.serverNames):
114 self.serverNames = DataDaemon._defServerNames \
115 - set(serverName.lstrip('-') for serverName in self.serverNames)
116
117 wrongServers = set()
118 for server in self.serverNames:
119 if server not in DataDaemon._defServerNames:
120 print("<Warning> Server " + server + " not found!")
121 wrongServers.add(server)
122 self.serverNames -= wrongServers
123
124 self.UKLight = not janet
125
126
127
129 """
130 Get the crontab information from all servers.
131
132 @param serverList: List containing servers.
133 @type serverList: list(str)
134
135 """
136 for serverName in sorted(serverList):
137 print('%s:' % serverName)
138 print('=' * (len(serverName) + 1))
139 try:
140 cmd = self.makeSysCmd(serverName, "crontab -l")
141 for line in self.runSysCmd(cmd)[0]:
142 print(line.strip())
143 print()
144
145 except Exception as error:
146 print(error)
147
148
149
151 """
152 Get the CUed information from all fits directories.
153
154 @return: Dictionary of CU numbers and associated fits dirs.
155 @rtype: dict(int:set(str))
156
157 """
158 disks = self.sysc.availableRaidFileSystem()
159 cuedDirs = defaultdict(set)
160 for dirName, dateDirs in fits.getAllPaths(disks):
161 for dateDir in dateDirs:
162 dirPath = os.path.join(dirName, dateDir)
163 for cuNum in range(1, 5):
164 cuedFileName = "CU%02dED_%s" % \
165 (cuNum, self.database.rpartition('.')[2])
166 if os.path.exists(os.path.join(dirPath, cuedFileName)):
167 cuedDirs[cuNum].add(dirPath)
168 return cuedDirs
169
170
171
173 """
174 Print a list of all DBs on given server group (dbload or dbpub).
175
176 @param dbDict: Dictionary of DB informations.
177 @type dbDict: dict(str:list(str))
178
179 """
180 archive = archive or self.sysc.loadDatabase
181 self.availPubDBs = {}
182 serverList = []
183 getAll = self.force or archive == "EXT"
184 serverList = ["ramses%d" % i for i in range(1, 15) if i != 12]
185 serverList = utils.naturalSorted(set(serverList))
186
187 for hostname in serverList:
188 self.availPubDBs[hostname] = queries.getAllDBs(
189 hostname, includeTestDBs=self.inclTestDb,
190 archive=("ALL" if self.force else archive))
191
192 for serverName in serverList:
193 if self.availPubDBs[serverName]:
194 print(''.join([serverName, ':']))
195 print(', '.join(sorted(self.availPubDBs[serverName])))
196
197
198
200 """
201 Create a dateVersStr list.
202
203 @param begin: Begin date.
204 @type begin: int
205 @param end: End date.
206 @type end: int
207 @param version: Version number.
208 @type version: str
209 @return: List of dateVersStr.
210 @rtype: list(str)
211
212 """
213 startMJD = int(mxTime.strptime(str(begin), "%Y%m%d").mjd)
214 endMJD = int(mxTime.strptime(str(end), "%Y%m%d").mjd)
215 dateList = []
216 for mjd in xrange(startMJD, endMJD + 1):
217 date = mxTime.DateTimeFromMJD(mjd)
218 dateVersStr = "%04d%02d%02d_v%s" % (date.year, date.month,
219 date.day, version)
220 dateList.append(dateVersStr)
221 return dateList
222
223
224
226 """
227 Get the DateVersStr from the ingest log files.
228
229 @param cuNum: CU number.
230 @type cuNum: int
231
232 @return: Dictionary containing dates per cuID.
233 @rtype: dict(str:list(str))
234
235 """
236 pendingIngestDates = []
237 cmd = "grep -h -d skip -e 'Running CU0/' " \
238 + self.sysc.dbSharePath("cu0%sid*_%s.log*"
239 % (cuNum, self.sysc.loadDatabase))
240
241 result = self.runSysCmd(cmd, showErrors=False)[0]
242 for line in result:
243 words = line.rstrip("'").split(' ')
244 cuid = words[1].rpartition('/')[2]
245 if cuid == "CU%1u" % cuNum:
246 pendingIngestDates.append(words[2])
247 else:
248 print("CUid differs:", cuid , "expected, got", "CU%1u" % cuNum)
249 return pendingIngestDates
250
251
252
254 """
255 Get stats from ps command on given server.
256
257 @param searchTerms: List of jobs to search for.
258 @type searchTerms: list(str)
259 @return: List of (job, etime) pairs.
260 @rtype: list((st, str))
261
262 """
263 cmd = self.makeSysCmd(serverName, ''.join(
264 ['ps -u scos -o etime,pid,cmd | ',
265 'grep "[0-9] python" | ',
266 'grep -E "%s"' % '|'.join(searchTerms)]))
267 result = self.runSysCmd(cmd)[0]
268
269 resdict = defaultdict(dict)
270 for entry in result:
271 timepid, pycmd = entry.strip().partition("python ")[::2]
272 time, pypid = timepid.split()
273 pypc = "%s-%s:::%s" % (serverName, pypid, pycmd)
274 if int(pypid) != os.getpid() \
275 and self.database.partition('.')[2] in pycmd:
276 if time in resdict[pypc]:
277 resdict[pypc][time] += 1
278 else:
279 resdict[pypc][time] = 1
280 stats = []
281 for pypc in resdict:
282 threadlist = sorted([x.strip() + "[*%d]" % resdict[pypc][x]
283 for x in resdict[pypc]])
284 pypid, pycmd = pypc.partition(":::")[::2]
285 stats.append(
286 (pycmd.split(' '), pypid,
287 threadlist[0].split('[')[0], threadlist))
288 return stats
289
290
291
293 """
294 Get jobs ran by scos on given servers.
295
296 @param serverDict: Rated dictionary containing servers.
297 @type serverDict: dict
298 @return: Dictionary of dictionaries of CU statistics on each server.
299 @rtype: dict(str:dict(str:list))
300
301 """
302 dataBuilderJobs = {}
303 for serverName in serverDict:
304 dataBuilderStats = defaultdict(list)
305 try:
306 searchTerms = DataDaemon.othercodeloc + \
307 DataDaemon.ingesterloc + \
308 [DataDaemon.builderloc,
309 "%s[0-9]" % DataDaemon.otherculoc]
310 dbsStats = self.getPsStats(serverName, searchTerms)
311 dbsOther = 0
312 for entry, pid, etime, threads in dbsStats:
313 thrtxt = (' (' + ', '.join(threads) + ')'
314 if len(threads) > 1 else '')
315 if any(x in entry[0] for x in DataDaemon.ingesterloc):
316 dbsCU = 0
317 etime = ["00", "00", "00", "00"] \
318 + etime.replace('-', ':').split(':')
319 etime = ':'.join(etime[-4:])
320 if any(x == self.sysc.loadDatabase for x in entry) \
321 and ("-x" in entry or "-r" in entry):
322 if "-r" in entry:
323 cuid = int(entry[entry.index("-r") + 1][
324 entry[entry.index("-r") + 1].find("cu") + 2:
325 entry[entry.index("-r") + 1].find("id")])
326 evtid = '%s_' % self.sysc.loadDatabase + \
327 entry[entry.index("-r") + 1][
328 entry[entry.index("-r") + 1].find("id") - 2:
329 entry[entry.index("-r") + 1].find("_")]
330 elif "-x" in entry:
331 cuid = int(entry[entry.index("-x") + 1])
332 evtid = '%s_all' % self.sysc.loadDatabase
333 dataBuilderStats[dbsCU].extend(
334 [etime.replace('-', ':'), cuid, evtid])
335 elif DataDaemon.builderloc in entry[0]:
336 if "-v" in entry:
337 dbsVersion = entry[entry.index("-v") + 1]
338 else:
339 dbsVersion = '1'
340 if "-b" in entry and "-e" in entry and "-x" in entry:
341 dbsBegin = int(entry[entry.index("-b") + 1])
342 dbsEnd = int(entry[entry.index("-e") + 1])
343 dbsCU = int(entry[entry.index("-x") + 1])
344 dataBuilderStats[dbsCU].extend(
345 self.getDateList(dbsBegin, dbsEnd, dbsVersion))
346 dataBuilderStats[dbsCU] = \
347 sorted(set(dataBuilderStats[dbsCU]))
348 if "-l" in entry and "-x" in entry:
349 dbsCU = int(entry[entry.index("-x") + 1])
350 dataBuilderStats[dbsCU] = ' '.join(
351 entry[1:]) + thrtxt
352 elif DataDaemon.otherculoc in entry[0]:
353 dbsCU = int(os.path.splitext(entry[0])[0].rpartition(
354 DataDaemon.otherculoc)[2])
355 dataBuilderStats[dbsCU] = ' '.join(entry[1:]) + thrtxt
356 elif any(x.split()[0] in entry[0]
357 for x in DataDaemon.othercodeloc):
358 for prog in DataDaemon.othercodeloc:
359 if prog.split()[0] in entry[0]:
360 dbsOther -= 1
361 dataBuilderStats[dbsOther] = ' '.join(
362 entry[:]) + thrtxt
363 dataBuilderJobs[serverName] = dataBuilderStats
364 except Exception as error:
365 print("<ERROR>", error)
366
367 return dataBuilderJobs
368
369
370
372 """
373 Get the mean load average of all servers.
374
375 @return: Rated dictionary of servers by their laod average.
376 @rtype: dict(str:float)
377
378 """
379 serverStatus = {}
380 missServers = set()
381 for serverName in sorted(self.serverNames):
382 try:
383 cmd = self.makeSysCmd(serverName, "uptime")
384 result = self.runSysCmd(cmd)[0]
385 if result:
386 tmpLA = result[0].strip().partition(
387 "load average:")[2].split(',')
388 else:
389 tmpLA = []
390 missServers.add(serverName)
391 print("<Warning> Can't connect to " + serverName)
392 loadAverage = [float(x) for x in tmpLA]
393 if serverName not in missServers:
394 serverStatus.setdefault(serverName, []).extend(loadAverage)
395 except Exception, details:
396 print(details)
397
398 self.serverNames -= missServers
399 ratedServers = utils.Ratings()
400 for serverName in serverStatus:
401 ratedServers[serverName] = array(serverStatus[serverName]).mean()
402 return ratedServers
403
404
405
407 """
408 Make a list of dateVersStrs from a list of date directories.
409
410 @param dirList: List containig directory paths.
411 @type dirList: list(str)
412 @param versDict: Dictionary of dates and their version number.
413 @type versDict: Dict(str:str)
414 @return: List of dateVersStrs.
415 @rtype: list(str)
416
417 """
418 dateVersStrList = []
419 for entry in dirList:
420 if versDict:
421 dateVersStr = '_v'.join([os.path.basename(entry),
422 versDict[entry]])
423 else:
424 dateVersStr = os.path.basename(entry)
425 dateVersStrList.append(dateVersStr)
426 dateVersStrList.sort()
427 return dateVersStrList
428
429
430
432 """
433 List successfully_read dirs at CASU.
434
435 @return: List of directories that are successfully read.
436 @rtype: list(str)
437
438 """
439 srDirs = []
440 dateRegExp = re.compile(r'%s' % self.dateRegExpStr)
441 for casuDisk in self.casuDisks:
442 srDict = CASUQueries.getSRDirs(casuDisk, self.UKLight)
443 srDirs.extend(sorted([path for path in srDict
444 if dateRegExp.search(path)]))
445 return srDirs
446
447
448
450 """
451 List OK_TO_COPY dirs at CASU.
452
453 @return: List of directories that are OK to copy.
454 @rtype: list(str)
455
456 """
457 otcDirs = []
458 for casuDisk in self.casuDisks:
459 cmd = ' '.join([CASUQueries.sshCmd(self.UKLight), "'ls -1 ",
460 os.path.join(casuDisk,
461 self.dateRegExpStr.replace('.', ''),
462 "OK_TO_COPY"), "'"])
463 otcDirs.extend(os.path.dirname(x.rstrip()) for x in os.popen(cmd))
464
465 return sorted(otcDirs)
466
467
468
470 """
471 List version numbers at CASU.
472
473 @return: Dictionary of directories and their version number.
474 @rtype: dict(str:str)
475
476 """
477 dateRegExp = re.compile(r'%s' % self.dateRegExpStr)
478 versDict = {}
479 for casuDisk in self.casuDisks:
480 versDict.update(CASUQueries.getDirVersions(casuDisk))
481
482 return dict((path, versDict[path].partition('_v')[2])
483 for path in versDict if dateRegExp.search(path))
484
485
486
488 archive = self.database.split('.')[-1]
489 if archive == self.sysc.loadDatabase \
490 and os.getenv('USER') != 'scos':
491 Logger.addMessage(
492 "<ERROR> Only scos is allowed to ingest into the %s" %
493 self.sysc.loadDatabase)
494 raise
495 searchTerms = DataDaemon.ingesterloc
496 cuNums = self._parseCuNums([cuNums])[0]
497 try:
498 ingCounter = 0
499 while 1:
500 ingRuns = 0
501 counter = 0
502 ingStats = []
503 while counter < 4:
504 for serverName in sorted(self.serverNames):
505 ingStatsTmp = self.getPsStats(serverName, searchTerms)
506 if ingStatsTmp and any(x == archive
507 for x in ingStatsTmp[0][0]):
508 ingRuns += 1
509 ingStats = ingStatsTmp[:]
510 time.sleep(1)
511 counter += 1
512
513
514 if ingStats:
515 self.purgeShare('d', archive, isIngest=True)
516
517 if ingRuns == 0:
518 ingestList = []
519 for fileName in os.listdir(self.dbMntPath):
520 if all(x in fileName for x in cuNums) \
521 and fileName.endswith("%s.log" % archive):
522 ingestList.append(os.path.join(
523 self.dbMntPath, fileName))
524
525 homeLockFile = File("/home/scos/dblock-ramses14.VSAVVV")
526 sysLockFile = File(
527 "/disk01/wsa/sys/ingest_ramses14.VSAVVV_lock")
528 ingestList.sort()
529 if ingestList:
530 for fileName in ingestList:
531 omitObjid = DataDaemon.omitObjIDUpdate
532 ingester = CuIngest(
533 curator=curator,
534 database=self.database,
535 isTrialRun=False,
536 logFileName=fileName,
537 ingestOneLog=True,
538 cuNums='',
539 forceIngest=DataDaemon.force,
540 autoCommit=False,
541 excludedTables='',
542 omitObjIDUpdate=omitObjid,
543 isParallelRun=False,
544 comment=comment)
545 ingester.run()
546 if not omitObjid:
547 ingCounter = 0
548 else:
549 ingCounter += 1
550 del ingester
551 time.sleep(20)
552 homeLockFile.remove()
553 sysLockFile.remove()
554 self.purgeShare('d', archive, isIngest=True)
555
556 else:
557 Logger.addMessage(
558 "No files for CU%s to ingest into %s on %s,"
559 " waiting %d mins." % (
560 cuNums, archive, self.dbMntPath,
561 DataDaemon.waitMins))
562 time.sleep(DataDaemon.waitMins * 60)
563 else:
564 server, pid = ingStats[0][1].split('-')
565 Logger.addMessage(
566 "Ingest into %s already running on %s, PID %s,"
567 " waiting %d mins." % (
568 self.database, server, pid, DataDaemon.waitMins))
569
570 time.sleep(DataDaemon.waitMins * 60)
571
572 except KeyboardInterrupt:
573 if ingCounter > 0:
574 Logger.addMessage("Not all objIDs updated, make sure to run "
575 "'FinaliseCatalogueIngest -O'!")
576 print('\n')
577 pass
578
579
580
581 - def purgeShare(self, mode, archive, isIngest=False):
582 """
583 Cleanup the samba file shares.
584
585 @param mode: Either check or delete ingested files.
586 @type mode: str
587 @param isIngest: Is this a check during continuous ingests?
588 @type isIngest: bool
589
590 """
591 thisHost = os.getenv('HOST')
592 thisSharePath = self.sysc.dbSharePath()
593 isCheck = (mode == 'c')
594 DfResult = namedtuple("DfResult",
595 "filesys, size, used, avail, perc, mount")
596 for serverName in sorted(self.serverNames):
597 serverSharePath = thisSharePath.replace(thisHost, serverName)
598 ingFilesPath = self.sysc.ingestedFilesPath(serverSharePath)
599
600 dfCmd = self.makeSysCmd(serverName,
601 "df -Ph %s" % serverSharePath)
602 for line in self.runSysCmd(dfCmd)[0]:
603 if serverName in line \
604 or ("home" in line and serverName in ["thoth"]):
605 dfResult = DfResult._make(line.split())
606 if isCheck:
607 Logger.addMessage("%s on %s used. %s available." % (
608 dfResult.perc, dfResult.mount, dfResult.avail))
609
610 if isIngest:
611 if int(dfResult.perc.replace('%', '')) > 90:
612 Logger.addMessage("<WARNING> %s of %s used!" % (
613 dfResult.perc, serverSharePath))
614 self.deleteFiles(serverName, ingFilesPath, isCheck,
615 archive, verbose=False)
616 else:
617 self.deleteFiles(serverName, ingFilesPath, isCheck,
618 archive, verbose=True)
619
620
621
622 - def deleteFiles(self, serverName, filesPath, isTest, archive,
623 verbose=False):
624 """
625 Remove files from given server and path.
626
627 @param archive: The archive for which the files are deleted.
628 @type archive: str
629 @param filesPath: Path to checked dir.
630 @type filesPath: str
631 @param isTest: If true, don't delete files.
632 @type isTest: bool
633 @param serverName: Name of server to be checked.
634 @type serverName: str
635 @param verbose: If true add additional log messages.
636 @type verbose: bool
637
638 """
639 duCmd = self.makeSysCmd(serverName,
640 "du -m --max-depth=1 %s" % filesPath)
641 duhCmd = self.makeSysCmd(serverName,
642 "du -h --max-depth=1 %s" % filesPath)
643 delCmd = self.makeSysCmd(serverName, "rm -f %s/%s" % (
644 filesPath, ('*' if archive == 'ALL' \
645 else "*_%s_*" % archive.upper())))
646
647 duResultBegin = int(self.runSysCmd(duCmd)[0][0].split('\t')[0])
648 if duResultBegin > 10 and not isTest:
649 self.runSysCmd(delCmd, False)[0]
650 duResultEnd = int(self.runSysCmd(duCmd)[0][0].split('\t')[0])
651 Logger.addMessage("%.1fG of %s deleted from %s" % (
652 (duResultBegin - duResultEnd) / 1000, archive, filesPath))
653 else:
654 if verbose:
655 Logger.addMessage("%.1fG used on %s" % (
656 duResultBegin / 1000, filesPath))
657
658
659
660 - def runMonitor(self, semester, excludedCUs, remStats, versions):
661 """
662 Run the createCalendar script to update the monitor pages.
663
664 @param semester: Semester to be monitored.
665 @type semester: str
666 @param excludedCUs: List of excluded CUs.
667 @type excludedCUs: list(str)
668 @param remStats: Remove stats for these CUs
669 @type remStats: str
670 @param versions: Versions to be monitored.
671 @type versions: str
672
673 """
674 try:
675 cmd = ' '.join([
676 os.path.join(self.sysc.monitorPath(), "createCalendar.py"),
677 "-s %s" % semester,
678 ("-R %s" % remStats if remStats else ''),
679 ("-x %s" % excludedCUs if excludedCUs else ''),
680 ("-v %s" % versions if versions else ''),
681 self.database])
682 os.system(cmd)
683 except Exception, details:
684 print(details)
685
686
687
689 """
690 Update sandboxes on servers given in the serverList.
691
692 @param updateDir: Directory to update.
693 @type updateDir: str
694 @param serverList: List of servers.
695 @type serverList: list(str)
696
697 """
698 for serverName in sorted(serverList):
699 print("Updating %s on %s..." % (updateDir, serverName))
700 try:
701 updatePath = os.path.join("/home", serverName, updateDir)
702 if serverName not in ["sneferu", "thoth"]:
703 updatePath = updatePath.replace("/home", '')
704
705 updCmd = self.makeSysCmd(serverName,
706 "svn update " + updatePath)
707
708 recompile = False
709 for line in self.runSysCmd(updCmd)[0]:
710 print(line)
711 cExts = (".c", ".cpp", ".h", ".hpp", ".hxx")
712 recompile = recompile or line.endswith(cExts)
713
714 if recompile:
715 print("<Warning> You need to recompile the code on %s!" %
716 serverName)
717 except Exception, details:
718 print(details)
719
720
721
723 """
724 Assert that the compilation of sandboxes on servers is up-to-date.
725
726 @param sandbox: Sandbox to check.
727 @type sandbox: str
728 @param serverList: List of servers.
729 @type serverList: list(str)
730
731 """
732 for serverName in sorted(serverList):
733 print("\nChecking %s on %s..." % (sandbox, serverName))
734 try:
735
736 binPath = "/home/%s/%s/bin" % (serverName, sandbox.replace(
737 "VDFS", "usr/local/wsa"))
738 codePath = "/home/%s/%s/src" % (serverName, sandbox)
739 if serverName not in ["sneferu", "thoth"]:
740 binPath = binPath.replace("/home", '')
741 codePath = codePath.replace("/home", '')
742
743
744 statCmd = self.makeSysCmd(serverName,
745 "stat --format \"%y\" " + binPath)
746 binTime = (self.runSysCmd(statCmd, False)[0] \
747 or ['0000-01-01 00:00:00.000000000 +0000'])
748 binTime = ''.join(binTime[0].rpartition(' ')[::2])
749
750 if DataDaemon.force:
751 print("%s - %s: %s" % (serverName, binPath.partition(
752 "usr/local/wsa")[2], binTime))
753
754
755 findCmd = self.makeSysCmd(serverName,
756 "find " + codePath + \
757 " -regex \".*\.\(cpp\|c\|h\|hpp\|hxx\)\" " + \
758 "| xargs stat --format \"%y\" | sort -nr | head -1")
759 codeTime = (self.runSysCmd(findCmd, False)[0] \
760 or ['0000-01-01 00:00:00.000000000 +0000'])
761 codeTime = ''.join(codeTime[0].rpartition(' ')[::2])
762
763 if DataDaemon.force:
764 print("%s - %s: %s" % (serverName, codePath.partition(
765 "VDFS")[2], codeTime))
766
767
768 deltaT = mxTime.ISO.ParseDateTimeGMT(binTime) - \
769 mxTime.ISO.ParseDateTimeGMT(codeTime)
770 if deltaT < 0:
771 print("You need to re-compile %s (%dd %0.2fh)" % (
772 serverName, deltaT.day, deltaT.hours - 24 * deltaT.day))
773 else:
774 print("OK")
775
776 except Exception, details:
777 print(details)
778
779
780
782 """
783 Compile code on servers given in the serverList.
784
785 @param branch: The branch to be compiled.
786 @type branch: str
787 @param serverList: List of servers.
788 @type serverList: list(str)
789
790 """
791 messages = defaultdict(dict)
792 for serverName in sorted(serverList):
793 print("Compiling %s on %s..." % (branch, serverName))
794 try:
795 cmd = self.makeSysCmd(serverName,
796 "%s/bin/vdfsCompile.sh %s" % (os.getenv("HOME"), branch))
797 messages[serverName]["err"] = self.runSysCmd(
798 cmd, isVerbose=True)[1]
799
800 except Exception, details:
801 messages[serverName]["exc"] = details
802 print(details)
803
804 for serverName in messages:
805 Logger.addMessage("ERRORS on %s:\n" % serverName)
806 print(''.join(messages[serverName]["err"]))
807 if "exc" in messages[serverName]:
808 Logger.addMessage("EXCEPTIONS on :\n" % serverName)
809 print(messages[serverName]["exc"])
810
811 cmd = self.makeSysCmd(serverName,
812 "ls -ldHG /%s/%s/usr/local/wsa/%s/bin" % (
813 serverName, os.getenv("USER"), branch))
814
815 for line in self.runSysCmd(cmd)[0]:
816 print(line)
817
818
819
821 """
822 Create a list of CU0 runs, ie. ingests.
823
824 @param dataBuilderJobs: Dictionary of jobs already running.
825 @type dataBuilderJobs: disct(str:str)
826 @return: List of ingest runs, list of cuNum/cueventIDs to ingest.
827 @rtype: list(str), list(str)
828
829 """
830 cmd = "ls -gG --full-time " + \
831 self.sysc.dbSharePath("cu0?id*_%s.log*" % self.sysc.loadDatabase)
832 result = self.runSysCmd(cmd)[0]
833 cu0Runs = []
834 toProcess = []
835 processing = []
836 elTimeDict = {}
837 now = mxTime.now()
838
839 for line in result:
840 words = line.split()
841 datetime = ' '.join([words[3].strip(),
842 words[4][:words[4].find('.')].strip()])
843 cuid = words[6][words[6].find('cu') + 2:words[6].find('id')]
844 evtid = words[6][words[6].find('id'):words[6].find('_')]
845 dbid = words[6][words[6].rfind('_') + 1:words[6].rfind('.log')]
846 if line.endswith(".pending"):
847 evtid += 'P'
848 etime = now - mxTime.strptime(datetime, "%Y-%m-%d %H:%M:%S")
849 etime = ["00", "00", "00", "00"] \
850 + str(etime).replace('-', ':').split(':')
851 etime = ':'.join(etime[-4:])
852 elTimeDict[dbid + '_' + cuid + evtid] = str(etime)
853
854 if 0 in utils.invertDict(dataBuilderJobs):
855 if isinstance(utils.invertDict(dataBuilderJobs)[0], str):
856 serverList = [utils.invertDict(dataBuilderJobs)[0]]
857 else:
858 serverList = utils.invertDict(dataBuilderJobs)[0]
859
860 for server in serverList:
861 ingTime, _ingMode, ingCuid = dataBuilderJobs[server][0]
862 for evt in elTimeDict:
863 if elTimeDict[evt] > ingTime and (
864 ingCuid in evt or 'all' in ingCuid):
865 processing.append(evt)
866 else:
867 toProcess.append(evt)
868
869 if dataBuilderJobs[server]:
870 print(' '.join(["Ingester is already running on",
871 server, "on",
872 ', '.join(sorted(processing))]))
873 if toProcess:
874 cmd = ' '.join([DataDaemon.ingesterloc, "on",
875 ', '.join(sorted(toProcess))])
876 cu0Runs.append(cmd)
877 else:
878 if elTimeDict:
879 for evt in elTimeDict:
880 toProcess.append(evt)
881 cuidSet = set(evt[evt.find('_') + 1:evt.find('id')])
882 for cuid in cuidSet:
883 cmd = "%s -i -x %d %s" % (DataDaemon.ingesterloc[0],
884 int(cuid), self.sysc.loadDatabase)
885 cu0Runs.append(cmd)
886 return cu0Runs, sorted(toProcess)
887
888
889
891 """
892 Create a list of CU1 runs, ie. transfers.
893
894 @param dataBuilderJobs: Dictionary of jobs already running.
895 @type dataBuilderJobs: disct(str:str)
896 @return: List of ingest runs, list of dates to process.
897 @rtype: list(str), list(str)
898
899 """
900 srDirs = self.readCasuSr()
901
902 otcDirs = self.readCasuOtc()
903
904 versDict = self.readCasuVers()
905
906 casuToCopy = list(set(otcDirs).difference(srDirs))
907
908 dateVersStrList = self.makeDateVersStrList(casuToCopy, versDict)
909
910 cu1Runs = []
911 inProcess = []
912 serverList = ["khafre"]
913 if 1 in utils.invertDict(dataBuilderJobs) \
914 and "khafre" in utils.invertDict(dataBuilderJobs)[1]:
915 for server in serverList:
916 inProcDates = []
917 for dateVersStr in dateVersStrList:
918 if dataBuilderJobs[server]:
919 if dateVersStr in dataBuilderJobs[server][1]:
920 inProcDates.append(dateVersStr)
921 inProcess.append(dateVersStr)
922 if inProcDates:
923 print(' '.join(["CU1: already in process on %s: %s" % (
924 server, ', '.join(inProcDates))]))
925 else:
926 for dateVersStr in dateVersStrList:
927
928 dateStr = dateVersStr.partition("_v")[0]
929 cmd = ' '.join([
930 DataDaemon.builderloc,
931 "-c", self.curator,
932 "-b", dateStr,
933 "-e", dateStr,
934 "-y 1 -x 1"])
935 cu1Runs.append(cmd)
936
937 return cu1Runs, sorted(set(dateVersStrList).difference(inProcess))
938
939
940
942 """
943 Create a list of CU2 runs, ie. JPG creation.
944
945 @param cuedDirs: Dictionary of already CUed directories.
946 @type cuedDirs: dict(int:list(str))
947 @param dataBuilderJobs: Dictionary of jobs already running.
948 @type dataBuilderJobs: disct(str:str)
949 @return: List of ingest runs, list of dates to process.
950 @rtype: list(str), list(str)
951
952 """
953 diffCu1Cu2 = list(set(cuedDirs[1]).difference(cuedDirs[2]))
954
955 dateVersStrList = self.makeDateVersStrList(diffCu1Cu2)
956 cu2Runs = []
957 inProcess = []
958 pendingIngests = self.getPendingIngestDates(cuNum=2)
959 dateVersStrList = \
960 sorted(set(dateVersStrList).difference(pendingIngests))
961 if 2 in utils.invertDict(dataBuilderJobs):
962 if isinstance(utils.invertDict(dataBuilderJobs)[2], str):
963 serverList = [utils.invertDict(dataBuilderJobs)[2]]
964 else:
965 serverList = utils.invertDict(dataBuilderJobs)[2]
966 for server in serverList:
967 inProcDates = []
968 for dateVersStr in dateVersStrList:
969 if dateVersStr in dataBuilderJobs[server][2]:
970 inProcDates.append(dateVersStr)
971 inProcess.append(dateVersStr)
972 if inProcDates:
973 print(' '.join(["CU2: already in process on %s: %s" % (
974 server, ', '.join(inProcDates))]))
975 for dateVersStr in dateVersStrList:
976 if dateVersStr not in inProcess:
977
978 dateStr, versStr = dateVersStr.partition("_v")[::2]
979 cmd = ' '.join([
980 DataDaemon.builderloc,
981 "-c", self.curator,
982 "-b", dateStr,
983 "-e", dateStr,
984 "-v", versStr,
985 "-x 2"])
986 cu2Runs.append(cmd)
987 else:
988 for dateVersStr in dateVersStrList:
989
990 dateStr, versStr = dateVersStr.partition("_v")[::2]
991 cmd = ' '.join([
992 DataDaemon.builderloc,
993 "-c", self.curator,
994 "-b", dateStr,
995 "-e", dateStr,
996 "-v", versStr,
997 "-x 2"])
998 cu2Runs.append(cmd)
999 return cu2Runs, sorted(set(dateVersStrList).difference(inProcess))
1000
1001
1002
1004 """
1005 Create a list of CU3 runs, ie. metadata ingest.
1006
1007 @param cuedDirs: Dictionary of already CUed directories.
1008 @type cuedDirs: dict(int:list(str))
1009 @param dataBuilderJobs: Dictionary of jobs already running.
1010 @type dataBuilderJobs: disct(str:str)
1011 @return: List of ingest runs, list of dates to process.
1012 @rtype: list(str), list(str)
1013
1014 """
1015 diffCu1Cu3 = list(set(cuedDirs[1]).difference(cuedDirs[3]))
1016
1017 dateVersStrList = self.makeDateVersStrList(diffCu1Cu3)
1018 cu3Runs = []
1019 inProcess = []
1020 pendingIngests = self.getPendingIngestDates(cuNum=3)
1021 dateVersStrList = \
1022 sorted(set(dateVersStrList).difference(pendingIngests))
1023 if 3 in utils.invertDict(dataBuilderJobs):
1024 if isinstance(utils.invertDict(dataBuilderJobs)[3], str):
1025 serverList = [utils.invertDict(dataBuilderJobs)[3]]
1026 else:
1027 serverList = utils.invertDict(dataBuilderJobs)[3]
1028 for server in serverList:
1029 inProcDates = []
1030 for dateVersStr in dateVersStrList:
1031 if dateVersStr in dataBuilderJobs[server][3]:
1032 inProcDates.append(dateVersStr)
1033 inProcess.append(dateVersStr)
1034 if inProcDates:
1035 print(' '.join(["CU3: already in process on %s: %s" % (
1036 server, ', '.join(inProcDates))]))
1037 for dateVersStr in dateVersStrList:
1038 if dateVersStr not in inProcess:
1039
1040 dateStr, versStr = dateVersStr.partition("_v")[::2]
1041 cmd = ' '.join([
1042 self.builderloc,
1043 "-c", self.curator,
1044 "-b", dateStr,
1045 "-e", dateStr,
1046 "-v", versStr,
1047 "-n", "-x 3"])
1048 cu3Runs.append(cmd)
1049 else:
1050 for dateVersStr in dateVersStrList:
1051
1052 dateStr, versStr = dateVersStr.partition("_v")[::2]
1053 cmd = ' '.join([
1054 DataDaemon.builderloc,
1055 "-c", self.curator,
1056 "-b", dateStr,
1057 "-e", dateStr,
1058 "-v", versStr,
1059 "-x 3"])
1060 cu3Runs.append(cmd)
1061 return cu3Runs, sorted(set(dateVersStrList).difference(inProcess))
1062
1063
1064
1066 """
1067 Create a list of CU4 runs, ie. catalogue data ingest.
1068
1069 @param cuedDirs: Dictionary of already CUed directories.
1070 @type cuedDirs: dict(int:list(str))
1071 @param dataBuilderJobs: Dictionary of jobs already running.
1072 @type dataBuilderJobs: disct(str:str)
1073 @return: List of ingest runs, list of dates to process.
1074 @rtype: list(str), list(str)
1075
1076 """
1077 diffCu3Cu4 = list(set(cuedDirs[3]).difference(cuedDirs[4]))
1078
1079 dateVersStrList = self.makeDateVersStrList(diffCu3Cu4)
1080 cu4Runs = []
1081 inProcess = []
1082 pendingIngests = self.getPendingIngestDates(cuNum=4)
1083 dateVersStrList = \
1084 sorted(set(dateVersStrList).difference(pendingIngests))
1085 if 4 in utils.invertDict(dataBuilderJobs):
1086 if isinstance(utils.invertDict(dataBuilderJobs)[4], str):
1087 serverList = [utils.invertDict(dataBuilderJobs)[4]]
1088 else:
1089 serverList = utils.invertDict(dataBuilderJobs)[4]
1090 for server in serverList:
1091 inProcDates = []
1092 for dateVersStr in dateVersStrList:
1093 if dateVersStr in dataBuilderJobs[server][4]:
1094 inProcDates.append(dateVersStr)
1095 inProcess.append(dateVersStr)
1096 if inProcDates:
1097 print(' '.join(["CU4: already in process on %s: %s" % (
1098 server, ', '.join(inProcDates))]))
1099 for dateVersStr in dateVersStrList:
1100 if dateVersStr not in inProcess:
1101
1102 dateStr, versStr = dateVersStr.partition("_v")[::2]
1103 cmd = ' '.join([
1104 DataDaemon.builderloc,
1105 "-c", self.curator,
1106 "-b", dateStr,
1107 "-e", dateStr,
1108 "-v", versStr,
1109 "-x 4"])
1110 cu4Runs.append(cmd)
1111 else:
1112 for dateVersStr in dateVersStrList:
1113
1114 dateStr, versStr = dateVersStr.partition("_v")[::2]
1115 cmd = ' '.join([
1116 DataDaemon.builderloc,
1117 "-c", self.curator,
1118 "-b", dateStr,
1119 "-e", dateStr,
1120 "-v", versStr,
1121 "-x 4"])
1122 cu4Runs.append(cmd)
1123 return cu4Runs, sorted(set(dateVersStrList).difference(inProcess))
1124
1125
1126
1127 - def runCUs(self, cuRunDict):
1128 """
1129 Print out commands to run the CUs on the servers.
1130
1131 @param cuRunDict: Dictionary containing commands for CUs.
1132 @type cuRunDict: dict(str:str)
1133
1134 """
1135 print("\nRun the following processes:")
1136 for server in cuRunDict:
1137
1138
1139
1140
1141
1142
1143 cmd = ''.join(["on ", server, ":\t ", cuRunDict[server][0]])
1144 print(cmd)
1145
1146
1147
1148
1150 """
1151 Do all the stuff needed.
1152 """
1153 cu0Runs = cu1Runs = cu2Runs = cu3Runs = cu4Runs = []
1154 cu0Dates = cu1Dates = cu2Dates = cu3Dates = cu4Dates = []
1155
1156 ratedServers = self.getServerLoad()
1157 dataBuilderJobs = self.getServerJobs(ratedServers)
1158 if self.info:
1159 print()
1160 print("Server load average: ")
1161 for server in ratedServers:
1162 print(" %-10s: % .3f" % (server, ratedServers[server]))
1163 print()
1164 print("Server joblist: ")
1165 for server in ratedServers:
1166 for cuNum in sorted(dataBuilderJobs[server]):
1167 cuNoStr = ("CU%2s:" % cuNum if cuNum >= 0 else '')
1168 print(" %-10s: %s %r\n" % (
1169 server, cuNoStr, dataBuilderJobs[server][cuNum]))
1170
1171 print()
1172 cuRunDict = {}
1173
1174 if not self.info:
1175 cuedDirs = self.getCUs()
1176 if 0 in self.cuNums:
1177 cu0Runs, cu0Dates = self.createCU0Run(dataBuilderJobs)
1178 if 1 in self.cuNums:
1179 cu1Runs, cu1Dates = self.createCU1Run(dataBuilderJobs)
1180 if 2 in self.cuNums:
1181 cu2Runs, cu2Dates = self.createCU2Run(cuedDirs,
1182 dataBuilderJobs)
1183 if 3 in self.cuNums:
1184 cu3Runs, cu3Dates = self.createCU3Run(cuedDirs,
1185 dataBuilderJobs)
1186 if 4 in self.cuNums:
1187 cu4Runs, cu4Dates = self.createCU4Run(cuedDirs,
1188 dataBuilderJobs)
1189
1190 if cu1Runs:
1191 cuRunDict.setdefault("khafre", []).extend(cu1Runs)
1192 del ratedServers["khafre"]
1193
1194 if cu0Runs:
1195 if ratedServers.getValueByRating(0) >= DataDaemon.maxLoad:
1196 del ratedServers[ratedServers.getKeyByRating(0)]
1197 if cu0Runs and ratedServers:
1198 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad:
1199 cuRunDict.setdefault(
1200 ratedServers.getKeyByRating(0), []).append(
1201 cu0Runs[0])
1202 del cu0Runs[0]
1203 del ratedServers[ratedServers.getKeyByRating(0)]
1204
1205 if cu2Runs or cu3Runs:
1206 while ratedServers:
1207 if ratedServers.getValueByRating(0) >= DataDaemon.maxLoad:
1208 del ratedServers[ratedServers.getKeyByRating(0)]
1209
1210 if cu2Runs and ratedServers:
1211 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad:
1212 cuRunDict.setdefault(
1213 ratedServers.getKeyByRating(0), []).append(
1214 cu2Runs[0])
1215 del cu2Runs[0]
1216 del ratedServers[ratedServers.getKeyByRating(0)]
1217 if cu3Runs and ratedServers:
1218 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad:
1219 cuRunDict.setdefault(
1220 ratedServers.getKeyByRating(0), []).append(
1221 cu3Runs[0])
1222 del cu3Runs[0]
1223 del ratedServers[ratedServers.getKeyByRating(0)]
1224
1225 if not cu2Runs and not cu3Runs:
1226 break
1227
1228 if cu4Runs:
1229 while ratedServers:
1230 if ratedServers.getValueByRating(0) >= DataDaemon.maxLoad:
1231 del ratedServers[ratedServers.getKeyByRating(0)]
1232 if cu4Runs and ratedServers:
1233 if ratedServers.getValueByRating(0) < DataDaemon.maxLoad:
1234 cuRunDict.setdefault(
1235 ratedServers.getKeyByRating(0), []).append(
1236 cu4Runs[0])
1237 del cu4Runs[0]
1238 del ratedServers[ratedServers.getKeyByRating(0)]
1239 if not cu4Runs:
1240 break
1241
1242 print("\nThe following eventIDs need ingesting:")
1243 if not DataDaemon.fullCommand:
1244 print("CU0>", cu0Dates)
1245 else:
1246 print("CU0>", cu0Runs)
1247
1248
1249 print("\nThe following days need processing:")
1250 if not DataDaemon.fullCommand:
1251 print("CU1>", cu1Dates)
1252 print("CU2>", cu2Dates)
1253 print("CU3>", cu3Dates)
1254 print("CU4>", cu4Dates)
1255 else:
1256 print("CU1>", cu1Runs)
1257 print("CU2>", cu2Runs)
1258 print("CU3>", cu3Runs)
1259 print("CU4>", cu4Runs)
1260
1261 self.runCUs(cuRunDict)
1262
1263
1264
1266 """
1267 """
1268 anyRemoved = False
1269 for entry in os.listdir(self.dbMntPath):
1270 if all(phrase in entry for phrase in fileTags):
1271 ext = "." + suffix
1272 print("removing %s:" % ext, entry)
1273 oldPath = os.path.join(self.dbMntPath, entry)
1274 newPath = (oldPath.partition(ext)[0] if suffix != "rename" else
1275 oldPath.replace(ext, ''))
1276
1277 os.rename(oldPath, newPath)
1278 anyRemoved = True
1279
1280 if not anyRemoved:
1281 print("Nothing %s." % suffix)
1282
1283
1284
1286 """
1287 """
1288
1289 loadServer, archiveName = archive.rpartition('.')[::2]
1290 sysc = SystemConstants(archiveName)
1291 IngCuSession.sysc = sysc
1292 loadServer = loadServer or sysc.loadServer
1293 archiveName = (sysc.loadDatabase if 'test' in archiveName else archiveName)
1294 mainDb = DbSession(archive)
1295 programmeIDDict = dict(mainDb.query("dfsIDString,programmeID", "Programme"))
1296 daemon = DataDaemon(False, '',
1297 '.'.join([loadServer, archiveName]),
1298 ','.join(sorted(DataDaemon._defServerNames)), False,
1299 ','.join(str(x) for x in DataDaemon._defCuNums), '',
1300 DataDaemon._defDateRegExpStr, "Check databases")
1301
1302 dataBasesDef = {"dbload":["all"],
1303 "dbpub":["all"]}
1304 daemon.getDBOverview(dataBasesDef)
1305 availPubDBs = daemon.availPubDBs
1306 dbInfo = []
1307 programmeList = ['VVV', 'VMC', 'VIDEO', 'VIKING']
1308 earliestEsoDBs = {'VVV':'VVVDR1', 'VMC':'VMCv20121128',
1309 'VIDEO':'VIDEOv20120712', 'VIKING':'VIKINGv20121205'}
1310 for server in availPubDBs:
1311 dbList = availPubDBs[server]
1312 for dbName in dbList:
1313 programme = [progName for progName in programmeList
1314 if dbName.startswith(progName)]
1315 if programme:
1316
1317 db = DbSession("%s.%s" % (server, dbName), userName="ldservrw")
1318 relNum = db.queryAttrMax("releaseNum", "ProgrammeFrame",
1319 where="programmeID=%s" % programmeIDDict[programme[0]])
1320 dbInfo.append((dbName, server, programme[0], relNum))
1321 curRelNumDict = {}
1322 for programme in programmeList:
1323 curRelNum = max(relNum for _dbName, _server, progName, relNum in dbInfo
1324 if progName == programme)
1325 curRelNumDict[programme] = curRelNum
1326 minRelNumDict = {}
1327 for programme in programmeList:
1328 eedbName = earliestEsoDBs[programme]
1329 eeRelNumSet = set([relNum for dbName, _server, progName, relNum in dbInfo
1330 if progName == programme and eedbName == dbName])
1331 if len(eeRelNumSet) == 1:
1332 minRelNumDict[programme] = eeRelNumSet.pop()
1333 else:
1334 raise Exception("Inconsistency in %s databases for programme %s: relNums %s" %
1335 (eedbName, programme, eeRelNumSet))
1336
1337 fullDbInfo = []
1338 for dbName, server, progName, relNum in dbInfo:
1339 if relNum >= minRelNumDict[progName]:
1340 curRelNum = curRelNumDict[progName]
1341 fullDbInfo.append((dbName, server, progName, relNum, curRelNum))
1342 return fullDbInfo
1343
1344
1345
1346
1347
1348 if __name__ == '__main__':
1349 CLI.progArgs.remove("database")
1350 CLI.progArgs.append(CLI.Argument("archive", "WSA"))
1351 CLI.progOpts.remove("test")
1352 CLI.progOpts += [
1353 CLI.Option('i', "info",
1354 "only print status of servers"),
1355 CLI.Option('j', "janet",
1356 "use JANET instead of UKLight"),
1357 CLI.Option('s', "server",
1358 "list of servers to check,"
1359 " or exclude server with preceeding '-'",
1360 "LIST", ','.join(sorted(DataDaemon._defServerNames))),
1361 CLI.Option('t', "testdbs", "with -D: include testDBs in DB listing"),
1362 CLI.Option('v', "versions",
1363 "list of version numbers",
1364 "LIST", ''),
1365 CLI.Option('w', "casudisks",
1366 "data disks at CASU",
1367 "LIST", ''),
1368 CLI.Option('x', "cunums",
1369 "list of CUs to check", "LIST",
1370 ','.join(str(x) for x in DataDaemon._defCuNums)),
1371 CLI.Option('A', "assertcomp",
1372 "assert that all servers are compiled"),
1373 CLI.Option('C', "compile",
1374 "compile the code in BRANCH on the given (-s)"
1375 " server(s)",
1376 "BRANCH", ''),
1377 CLI.Option('D', "dbinfo",
1378 "list of all DBs available for given archive (archive='EXT'"
1379 " for external databases)"),
1380 CLI.Option('F', "full",
1381 "run an option to full extend:\n"
1382 "\t\t\t\t with '-A': show timestamps for bin/ and src/;\n"
1383 "\t\t\t\t with '-D': show DBs for all archives;\n"
1384 "\t\t\t\t with '-I': set the force flag for continuous"
1385 " ingests;\n"
1386 "\t\t\t\t with '-P d': delete files for all archives;\n"
1387 "\t\t\t\t with '-U': update including installation/ dir"),
1388 CLI.Option('I', "ingloop",
1389 "run continuous ingests for this cu",
1390 "INT", 0),
1391 CLI.Option('M', "monitor",
1392 "update the monitor webpage (give one semester, "
1393 "excluded CU(s), and CU [1/2] for which stats should be "
1394 "re-calculated as comma seperated list)",
1395 "LIST", "01C,4,0"),
1396 CLI.Option('O', "omitobjid",
1397 "Omit updating objIDs. Use with care on the VVV!"),
1398 CLI.Option('P', "purgeshare",
1399 "Check [c] or delete [d] the ingested files on the shares"
1400 " on the given (-s) server(s)",
1401 "STR", isValOK=lambda x: x.lower() in "cd"),
1402 CLI.Option('R', "timere",
1403 "date regular expression",
1404 "REGEXP", DataDaemon._defDateRegExpStr),
1405 CLI.Option('S', "ingeststatus",
1406 "change status suffix of ingest files, use any"
1407 " combination of:\n"
1408 "\t\t\t\t p/P: remove pending status from CU2 ingests;\n"
1409 "\t\t\t\t f/F: remove failed status from ingest logs;\n"
1410 "\t\t\t\t r/R: remove rename status from ingest files",
1411 "STR", ''),
1412 CLI.Option('T', "crontab",
1413 "show crontabs on all servers"),
1414 CLI.Option('U', "svnup",
1415 "svn update on given (-s) or all (trunk)"
1416 " or current servers"),
1417 CLI.Option('V', "svnpath",
1418 "svn update the given path relative to "
1419 "/<server>/<user>/ in combination"
1420 " with '-U'", "PATH", None)]
1421
1422 cli = CLI(DataDaemon, "$Revision: 10227 $", checkSVN=False)
1423 Logger.addMessage(cli.getProgDetails())
1424 loadServer, archiveName = cli.getArg("archive").rpartition('.')[::2]
1425 sysc = SystemConstants(archiveName)
1426 IngCuSession.sysc = sysc
1427 loadServer = loadServer or sysc.loadServer
1428 archiveName = (sysc.loadDatabase if 'test' in archiveName else archiveName)
1429 DataDaemon.casuDisk = (cli.getOpt("casudisk") if cli.getOpt("casudisk")
1430 else sysc.scpMainDir())
1431 DataDaemon.inclTestDb = cli.getOpt("testdbs")
1432 DataDaemon.force = cli.getOpt("full")
1433 DataDaemon.omitObjIDUpdate = cli.getOpt("omitobjid")
1434
1435 daemon = DataDaemon(cli.getOpt("info"),
1436 cli.getOpt("curator"),
1437 '.'.join([loadServer, archiveName]),
1438 cli.getOpt("server"),
1439 cli.getOpt("janet"),
1440 cli.getOpt("cunums"),
1441 cli.getOpt("casudisks"),
1442 cli.getOpt("timere"),
1443 cli.getArg("comment"))
1444
1445
1446 if cli.getOpt("svnup"):
1447 if not 'trunk' in os.getenv("WSA_SANDBOX"):
1448 svnPath = (cli.getOpt("svnpath") if cli.getOpt("svnpath") else
1449 "VDFS/%s" % os.path.split(os.getenv("WSA_SANDBOX"))[1])
1450 daemon.svnupdate("%s/%s" % (os.getenv("USER"), svnPath),
1451 [os.getenv("HOST")])
1452 if os.getenv("USER") == "scos" and \
1453 cli.getOpt("server") != ','.join(DataDaemon._defServerNames):
1454 daemon.svnupdate("%s/%s" % (os.getenv("USER"), svnPath),
1455 daemon.serverNames)
1456 else:
1457 svnPath = (cli.getOpt("svnpath") if cli.getOpt("svnpath")
1458 else "VDFS/trunk")
1459 serverNames = (daemon.serverNames if os.getenv("USER") == "scos"
1460 else [os.getenv("HOST")])
1461 daemon.svnupdate("%s/%s" % (os.getenv("USER"), svnPath),
1462 serverNames)
1463 daemon.svnupdate("%s/WFAUweb" % os.getenv("USER"), serverNames)
1464 if cli.getOpt("full"):
1465 daemon.svnupdate("%s/VDFS/installation" % os.getenv("USER"),
1466 serverNames)
1467 daemon.svnupdate("%s/GES/trunk" % os.getenv("USER"), serverNames)
1468
1469
1470 elif cli.getOpt("assertcomp") and os.getenv("USER") == "scos":
1471 if not 'trunk' in os.getenv("WSA_SANDBOX"):
1472 svnPath = "VDFS/%s" % os.path.split(os.getenv("WSA_SANDBOX"))[1]
1473 daemon.assertComp("%s/%s" % (os.getenv("USER"), svnPath),
1474 [os.getenv("HOST")])
1475 if os.getenv("USER") == "scos" and \
1476 cli.getOpt("server") != ','.join(DataDaemon._defServerNames):
1477 daemon.assertComp("%s/%s" % (os.getenv("USER"), svnPath),
1478 daemon.serverNames)
1479 else:
1480 svnPath = "VDFS/trunk"
1481 serverNames = (daemon.serverNames if os.getenv("USER") == "scos"
1482 else [os.getenv("HOST")])
1483 daemon.assertComp("%s/%s" % (os.getenv("USER"), svnPath),
1484 serverNames)
1485
1486
1487 elif cli.getOpt("compile") and os.getenv("USER") == "scos" and \
1488 cli.getOpt("server") != ','.join(DataDaemon._defServerNames):
1489 daemon.compileServer(cli.getOpt("compile"), daemon.serverNames)
1490
1491
1492 elif cli.getOpt("purgeshare") and os.getenv("USER") == "scos":
1493 arch = ("ALL" if cli.getOpt("full") else archiveName)
1494 daemon.purgeShare(cli.getOpt("purgeshare"), arch, isIngest=False)
1495
1496
1497 elif cli.getOpt("crontab"):
1498 daemon.getCron(daemon.serverNames)
1499
1500
1501 elif cli.getOpt("ingloop"):
1502 daemon.runContinuousIngests(cli.getOpt("ingloop"),
1503 cli.getOpt("curator"),
1504 cli.getArg("comment"))
1505
1506
1507 elif cli.getOpt("ingeststatus"):
1508 statusTag = cli.getOpt("ingeststatus").lower()
1509 for suffix in ["pending", "failed", "rename"]:
1510 if suffix[0] in statusTag:
1511 fileTags = \
1512 [(".log" if suffix != "rename" else '') + '.' + suffix,
1513 ("cu0" if suffix != "pending" else "cu02id"),
1514 '_' + archiveName]
1515
1516 daemon.removeFileSuffixes(suffix, fileTags)
1517
1518
1519 elif cli.getOpt("dbinfo"):
1520 dataBasesDef = {"dbload":["all"],
1521 "dbpub":["all"]}
1522 daemon.getDBOverview(dataBasesDef, archiveName)
1523
1524
1525 elif cli.getOpt("monitor") and os.getenv("USER") == "scos" \
1526 and cli.getOpt("monitor") != cli.getOptDef("monitor"):
1527 exclCuList = []
1528 semesterList = []
1529 monOpts = cli.getOpt("monitor").split(',')
1530 if len(monOpts) < 3:
1531 for opt in monOpts:
1532 if opt.isdigit():
1533 exclCuList.append(opt)
1534 else:
1535 semesterList.append(opt)
1536 remStats = None
1537 else:
1538 semesterList.append(monOpts[0])
1539 if monOpts[1].isdigit():
1540 exclCuList.append(monOpts[1])
1541 if monOpts[2].isdigit() and monOpts[2] in "12":
1542 remStats = monOpts[2]
1543 else:
1544 print("Stats can only be removed for CU1 or 2!")
1545 remStats = None
1546
1547 for semester in semesterList:
1548 print(' '.join(["Running Monitor for", semester, "excluding CU:",
1549 (','.join(exclCuList) if exclCuList else '---'),
1550 " Versions:",
1551 (cli.getOpt("versions") if cli.getOpt("versions")
1552 else 'all')]))
1553 daemon.runMonitor(semester, ','.join(exclCuList), remStats,
1554 cli.getOpt("versions"))
1555
1556
1557 else:
1558 daemon.run()
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593